from __future__ import annotations
import typing
from types import MappingProxyType
from cryptography.hazmat.primitives import hashes
from pyflocker.ciphers import base, exc
HASHES = MappingProxyType(
{
"sha1": hashes.SHA1,
"sha224": hashes.SHA224,
"sha256": hashes.SHA256,
"sha384": hashes.SHA384,
"sha512": hashes.SHA512,
"sha3_224": hashes.SHA3_224,
"sha3_256": hashes.SHA3_256,
"sha3_384": hashes.SHA3_384,
"sha3_512": hashes.SHA3_512,
"sha512_224": hashes.SHA512_224,
"sha512_256": hashes.SHA512_256,
"blake2b": hashes.BLAKE2b,
"blake2s": hashes.BLAKE2s,
# XOFS
"shake128": hashes.SHAKE128,
"shake256": hashes.SHAKE256,
}
)
# Names of hash functions that support variable digest sizes. They are here
# to distinguish between constructors who need extra param `digest_size`.
VAR_DIGEST_SIZE = frozenset(
{
"shake128",
"shake256",
"blake2b",
"blake2s",
}
)
# the ASN.1 Object IDs
OIDS = MappingProxyType(
{
"sha224": "2.16.840.1.101.3.4.2.4",
"sha256": "2.16.840.1.101.3.4.2.1",
"sha384": "2.16.840.1.101.3.4.2.2",
"sha512": "2.16.840.1.101.3.4.2.3",
"sha512_224": "2.16.840.1.101.3.4.2.5",
"sha512_256": "2.16.840.1.101.3.4.2.6",
"sha3_224": "2.16.840.1.101.3.4.2.7",
"sha3_256": "2.16.840.1.101.3.4.2.8",
"sha3_384": "2.16.840.1.101.3.4.2.9",
"sha3_512": "2.16.840.1.101.3.4.2.10",
"shake128": "2.16.840.1.101.3.4.2.11",
"shake256": "2.16.840.1.101.3.4.2.12",
"blake2b": "1.3.6.1.4.1.1722.12.2.1.64",
"blake2s": "1.3.6.1.4.1.1722.12.2.2.32",
}
)
[docs]
class Hash(base.BaseHash):
__slots__ = (
"_name",
"_digest",
"_ctx",
"_digest_size",
"_block_size",
"_oid",
)
def __init__(
self,
name: str,
data: bytes | None = None,
*,
digest_size: int | None = None,
_copy: hashes.Hash | None = None,
) -> None:
self._ctx: hashes.Hash | None
self._digest: bytes
if _copy is not None:
self._ctx = _copy
else:
self._ctx = self._create_ctx(name, data, digest_size=digest_size)
self._name = name
self._digest_size = getattr(
self._ctx.algorithm,
"digest_size",
digest_size,
)
self._block_size = (
getattr(
self._ctx.algorithm,
"block_size",
NotImplemented,
)
or NotImplemented
)
self._oid = OIDS.get(name, NotImplemented)
@property
def digest_size(self) -> int:
return self._digest_size # type: ignore
@property
def block_size(self) -> int:
return self._block_size
@property
def name(self) -> str:
return self._name
@property
def oid(self) -> str: # pragma: no cover
"""The ASN.1 Object ID."""
if self._oid is NotImplemented:
msg = f"OID not available for {self.name!r}"
raise AttributeError(msg)
if self.name in ("blake2b", "blake2s") and self.digest_size not in (
32,
64,
):
msg = f"OID not available for {self.name!r} with digest size "
f"{self.digest_size}"
raise AttributeError(msg)
return self._oid
[docs]
def update(self, data: bytes) -> None:
if self._ctx is None:
raise exc.AlreadyFinalized
self._ctx.update(data)
[docs]
def digest(self) -> bytes:
if self._ctx is None:
return self._digest
ctx, self._ctx = self._ctx, None
self._digest = ctx.finalize()
return self._digest
[docs]
def copy(self) -> Hash:
if self._ctx is None:
raise exc.AlreadyFinalized
return type(self)(
self.name,
digest_size=self.digest_size,
_copy=self._ctx.copy(),
)
[docs]
def new(
self,
data: bytes | None = None,
*,
digest_size: int | None = None,
) -> Hash:
return type(self)(
self.name,
data,
digest_size=self.digest_size
if digest_size is None
else digest_size,
)
@staticmethod
def _create_ctx(
name: str,
data: bytes | None = None,
*,
digest_size: int | None = None,
) -> hashes.Hash:
"""
Creates a pyca/cryptography based hash function object.
"""
hashfunc = HASHES[name]
digest_size_kwargs = {}
if name in VAR_DIGEST_SIZE:
if digest_size is None:
msg = "digest_size is required"
raise ValueError(msg)
digest_size_kwargs = {"digest_size": digest_size}
hashobj = hashes.Hash(hashfunc(**digest_size_kwargs))
if data is not None:
hashobj.update(data)
return hashobj
[docs]
def algorithms_available() -> set[str]:
"""Return the names of the available hash algorithms."""
return set(HASHES)
[docs]
def new(
name: str,
data: bytes = b"",
*,
digest_size: int | None = None,
**kwargs: typing.Any, # only for compatibility with Cryptodome
) -> Hash:
"""
Instantiate a hash object.
Args:
name: The name of the hash function.
data: The initial chunk of message to feed to hash.
Keyword Arguments:
digest_size:
The length of the digest size. Must be supplied if the hash
function supports it.
Raises:
KeyError: If ``name`` is not a hash function name.
ValueError: If ``digest_size`` is required but not provided.
"""
extra_params = {"custom", "key"}
for key in extra_params:
if kwargs.get(key) is None:
kwargs.pop(key, None)
# at this point, kwargs should be empty, otherwise we ge `TypeError`
return Hash(name, data, digest_size=digest_size, **kwargs)
def _get_hash_algorithm(hashfunc: base.BaseHash) -> hashes.HashAlgorithm:
"""
Get the cryptography backend specific ``hash algorithm`` object from the
given hash ``hashfunc``.
"""
return new( # pragma: no cover
hashfunc.name,
digest_size=hashfunc.digest_size,
)._ctx.algorithm # type: ignore