Source code for pyflocker.ciphers.backends.cryptodome_.Hash

from __future__ import annotations

import typing

from Cryptodome.Hash import (
    SHA3_224,
    SHA3_256,
    SHA3_384,
    SHA3_512,
    SHA224,
    SHA256,
    SHA384,
    SHA512,
    SHAKE128,
    SHAKE256,
    BLAKE2b,
    BLAKE2s,
    KangarooTwelve,
    TupleHash128,
    TupleHash256,
    cSHAKE128,
    cSHAKE256,
)

from pyflocker.ciphers import base, exc

HASHES = {
    "sha224": SHA224.new,
    "sha256": SHA256.new,
    "sha384": SHA384.new,
    "sha512": SHA512.new,
    "sha512_224": lambda data=b"": SHA512.new(data, "224"),
    "sha512_256": lambda data=b"": SHA512.new(data, "256"),
    "sha3_224": SHA3_224.new,
    "sha3_256": SHA3_256.new,
    "sha3_384": SHA3_384.new,
    "sha3_512": SHA3_512.new,
    # Blakes
    "blake2b": BLAKE2b.new,
    "blake2s": BLAKE2s.new,
    # TupleHashes: similar to Blakes' API
    "tuplehash128": TupleHash128.new,
    "tuplehash256": TupleHash256.new,
    # XOFS
    "shake128": SHAKE128.new,
    "shake256": SHAKE256.new,
    "cshake128": cSHAKE128.new,
    "cshake256": cSHAKE256.new,
    "kangarootwelve": KangarooTwelve.new,
}


# Names of hash functions that support variable digest sizes.
VAR_DIGEST_SIZE = frozenset(
    {
        "blake2b",
        "blake2s",
        "shake128",
        "shake256",
        "cshake128",
        "cshake256",
        "kangarootwelve",
        "tuplehash128",
        "tuplehash256",
    }
)

# Names of extendable-output functions. This is only for separation of APIs.
# Cryptodome uses `read(int)` for XOFs and the usual `digest()` for other
# hashes.
XOFS = frozenset(
    {
        "shake128",
        "shake256",
        "cshake128",
        "cshake256",
        "kangarootwelve",
    }
)

# Names of hash functions that support customization string. Basically it means
# we get the `custom=...` param.
SUPPORTS_CUSTOM = frozenset(
    {
        "cshake128",
        "cshake256",
        "kangarootwelve",
        "tuplehash128",
        "tuplehash256",
    }
)


# Names of hash functions that support key. This essentially turns them into a
# MAC. Here it is used as a way to separate the API.
SUPPORTS_KEY = frozenset(
    {
        "blake2b",
        "blake2s",
    }
)


[docs] class Hash(base.BaseHash): __slots__ = ( "_name", "_digest", "_ctx", "_digest_size", "_block_size", "_oid", ) def __init__( self, name: str, data: bytes | None = None, digest_size: int | None = None, *, custom: bytes | None = None, # cshakes, kangarootwelve key: bytes | None = None, # for blakes _copy: typing.Any = None, ) -> None: if _copy is not None: self._ctx = _copy else: self._ctx = self._create_ctx( name, data, digest_size, custom=custom, key=key, ) self._name = name self._digest_size = getattr(self._ctx, "digest_size", digest_size) self._block_size = getattr(self._ctx, "block_size", NotImplemented) self._oid = getattr(self._ctx, "oid", NotImplemented) self._digest = b"" @property def digest_size(self) -> int: return self._digest_size # type: ignore @property def block_size(self) -> int: return self._block_size @property def name(self) -> str: return self._name @property def oid(self) -> str: # pragma: no cover """The ASN.1 Object ID.""" if self._oid is NotImplemented: msg = f"OID not available for {self.name!r}" raise AttributeError(msg) return self._oid # type: ignore
[docs] def update(self, data: bytes) -> None: if self._ctx is None: raise exc.AlreadyFinalized self._ctx.update(data)
[docs] def digest(self) -> bytes: if self._ctx is None: return self._digest ctx, self._ctx = self._ctx, None if self.name in XOFS: digest = ctx.read(self.digest_size) else: digest = ctx.digest() self._digest = digest return digest
[docs] def copy(self) -> Hash: if self._ctx is None: raise exc.AlreadyFinalized try: hashobj = self._ctx.copy() except AttributeError as e: msg = f"copying not supported by {self.name!r}" raise ValueError(msg) from e return type(self)( self.name, digest_size=self.digest_size, _copy=hashobj, )
[docs] def new( self, data: bytes | None = None, *, digest_size: int | None = None, custom: bytes | None = None, key: bytes | None = None, ) -> Hash: return type(self)( self.name, data, digest_size=self.digest_size if digest_size is None else digest_size, custom=custom, key=key, )
@staticmethod def _create_ctx( name: str, data: bytes | None = None, digest_size: int | None = None, *, custom: bytes | None = None, # cshakes, kangarootwelve key: bytes | None = None, # for blakes ) -> typing.Any: """ Creates a Cryptodome based hash function object. """ hashfunc = HASHES[name] digest_size_kwargs = {} if name in VAR_DIGEST_SIZE: if digest_size is None: msg = "digest_size is required" raise ValueError(msg) # XOFs have the `read()` API, which is frustrating! if name not in XOFS: digest_size_kwargs = {"digest_bytes": digest_size} custom_kwargs = {} if name in SUPPORTS_CUSTOM and custom is not None: custom_kwargs = {"custom": custom} key_kwargs = {} if name in SUPPORTS_KEY and key is not None: key_kwargs = {"key": key} hashobj = hashfunc( # type: ignore **digest_size_kwargs, **custom_kwargs, **key_kwargs, ) # `tuplehash*`'s internal hash state can change even if empty byte # string is fed. `None` is a protection against that. if data is not None: hashobj.update(data) return hashobj
[docs] def algorithms_available() -> set[str]: """Return the names of the available hash algorithms. Returns: Names of hash algorithms. """ return set(HASHES)
[docs] def new( name: str, data: bytes | None = b"", digest_size: int | None = None, *, custom: bytes | None = None, # cshakes, kangarootwelve key: bytes | None = None, # for blakes ) -> Hash: """ Instantiate a hash object. Args: name: The name of the hash function. data: The initial chunk of message to feed to hash. Note that for ``TupleHash`` variants, even an empty byte string changes its internal state. digest_size: The length of the digest size. Must be supplied if the hash function supports it. Keyword Arguments: custom: A customization string. Can be supplied for hash functions that support domain separation. key: A key that is used to compute the MAC. Can be supplied for hash functions that support working as cryptographic MAC. Raises: KeyError: If ``name`` is not a hash function name. ValueError: If ``digest_size`` is required but not provided. """ return Hash(name, data, digest_size=digest_size, custom=custom, key=key)