Source code for pyflocker.ciphers.backends.symmetric

"""Tools for Symmetric ciphers common to all the backends."""

from __future__ import annotations

import hmac
import typing
from functools import partial

from pyflocker.ciphers import base, exc

if typing.TYPE_CHECKING:
    import io


[docs] class FileCipherWrapper: """ Wraps AEAD ciphers and provides file encryption and decryption facility. """ def __init__( self, cipher: base.BaseAEADCipher, file: io.BufferedIOBase, offset: int = 0, ) -> None: """Initialize a file cipher wrapper. Args: cipher: A cipher that supports :py:class:`BaseAEADCipher` interface. file: A file or file-like object. offset: The difference between the length of ``in`` buffer and ``out`` buffer in ``update_into`` method of a BaseAEADCipher. This is required because backend like ``pyca/cryptography`` needs an output buffer that is bigger than the input buffer. """ if not isinstance(cipher, base.BaseAEADCipher): msg = "cipher must implement BaseAEADCipher interface." raise TypeError(msg) # the cipher already has an internal context self._ctx = cipher self._file = file self._tag: bytes | None = None self._encrypting = self._ctx.is_encrypting() self._offset = offset
[docs] def authenticate(self, data: bytes) -> None: if self._ctx is None: raise exc.AlreadyFinalized return self._ctx.authenticate(data)
[docs] def is_encrypting(self) -> bool: return self._encrypting
[docs] def update(self, blocksize: int = 16384) -> bytes | None: """ Reads at most ``blocksize`` bytes from ``file``, passes through the cipher and returns the cipher's output. Args: blocksize: Maximum amount of data to read in a single call. Returns: bytes: Encrypted or decrypted data. Raises: AlreadyFinalized: if the cipher has been finalized. """ if self._ctx is None: raise exc.AlreadyFinalized if data := self._file.read(blocksize): return self._ctx.update(data) return None
[docs] def update_into( self, file: typing.IO[bytes], tag: bytes | None = None, blocksize: int = 16384, ) -> None: """ Read from ``infile``, pass through cipher and write the output of the cipher to ``file``. Use this method if you want to encrypt/decrypt the ``infile`` and write its output to ``outfile``. This method is very fast (compared to :py:meth:`FileCipherWrapper.update`) because no intermediate copies of data are made during the entire operation. Args: file: File to write the output of the cipher into. tag: The tag to verify decryption. If the file is being decrypted, this must be passed. blocksize: Maximum amount of data to read in a single call. Raises: AlreadyFinalized: if the cipher has been finalized. ValueError: if the file is being decrypted and tag is not supplied. """ if self._ctx is None: raise exc.AlreadyFinalized if not self._encrypting and tag is None: msg = "tag is required for decryption" raise ValueError(msg) buf = memoryview(bytearray(blocksize + self._offset)) rbuf = buf[:blocksize] # localize variables for better performance offset = self._offset write = file.write reads = iter(partial(self._file.readinto, rbuf), 0) update_into = self._ctx.update_into for i in reads: if i < blocksize: rbuf = rbuf[:i] buf = buf[: i + offset] update_into(rbuf, buf) write(rbuf) self.finalize(tag)
[docs] def finalize(self, tag: bytes | None = None) -> None: if self._ctx is None: raise exc.AlreadyFinalized try: self._ctx.finalize(tag) finally: self._tag, self._ctx = ( self._ctx.calculate_tag(), None, # type: ignore )
[docs] def calculate_tag(self) -> bytes | None: if self._ctx is not None: msg = "Cipher has not been finalized yet." raise exc.NotFinalized(msg) return self._tag
StreamCipherWrapper = FileCipherWrapper
[docs] class HMACWrapper(base.BaseAEADCipher): """ Wraps a cipher that supports BaseNonAEADCipher cipher interface and provides authentication capability using HMAC. """ def __init__( self, cipher: base.BaseNonAEADCipher, hmac_key: bytes, hmac_random: bytes, hashfunc: str | base.BaseHash = "sha256", offset: int = 0, tag_length: int | None = 16, ) -> None: if not isinstance(cipher, base.BaseNonAEADCipher): msg = "Only NonAEAD ciphers can be wrapped." raise TypeError(msg) if isinstance(hashfunc, base.BaseHash): # always use a fresh hash object. hashfunc = hashfunc.new() self._auth = hmac.new(hmac_key, digestmod=hashfunc) # type: ignore self._auth.update(hmac_random) self._ctx: typing.Any | None self._ctx = self._get_mac_ctx(cipher, self._auth, offset) self._encrypting = cipher.is_encrypting() self._len_aad, self._len_ct = 0, 0 self._updated = False self._tag = None self._tag_length = ( self._auth.digest_size if tag_length is None else tag_length )
[docs] def is_encrypting(self) -> bool: return self._encrypting
[docs] def authenticate(self, data: bytes) -> None: if self._ctx is None: raise exc.AlreadyFinalized if self._updated: msg = "Cannot call authenticate after update/update_into has been " "called" raise TypeError(msg) self._auth.update(data) self._len_aad += len(data)
[docs] def update(self, data: bytes) -> bytes: if self._ctx is None: raise exc.AlreadyFinalized self._updated = True self._len_ct += len(data) return self._ctx.update(data)
[docs] def update_into( self, data: bytes, out: bytearray | memoryview, ) -> None: if self._ctx is None: raise exc.AlreadyFinalized self._updated = True self._ctx.update_into(data, out) self._len_ct += len(data)
[docs] def finalize(self, tag: bytes | None = None) -> None: if self._ctx is None: raise exc.AlreadyFinalized if not self.is_encrypting(): if tag is None: msg = "tag is required for decryption" raise ValueError(msg) if len(tag) != self._tag_length: msg = f"Invalid tag length: (required {self._tag_length})" raise ValueError(msg) self._auth.update(self._len_aad.to_bytes(8, "little")) self._auth.update(self._len_ct.to_bytes(8, "little")) self._ctx = None if not self._encrypting and not hmac.compare_digest( self._auth.digest()[: self._tag_length], tag, # type: ignore ): raise exc.DecryptionError
[docs] def calculate_tag(self) -> bytes | None: if self._ctx is not None: raise exc.NotFinalized if self.is_encrypting(): return self._auth.digest()[: self._tag_length] return None
@staticmethod def _get_mac_ctx( cipher: base.BaseNonAEADCipher, auth: typing.Any, offset: int, ) -> _EncryptionCtx | _DecryptionCtx: if cipher.is_encrypting(): return _EncryptionCtx(cipher, auth, offset) return _DecryptionCtx(cipher, auth)
class _EncryptionCtx: def __init__( self, cipher: base.BaseNonAEADCipher, auth: typing.Any, offset: int, ) -> None: self._ctx = cipher self._auth = auth self._offset = -offset or None def update(self, data: bytes) -> bytes: ctxt = self._ctx.update(data) self._auth.update(ctxt) return ctxt def update_into( self, data: bytes, out: bytearray | memoryview, ) -> None: self._ctx.update_into(data, out) self._auth.update(out[: self._offset]) class _DecryptionCtx: def __init__( self, cipher: base.BaseNonAEADCipher, auth: typing.Any ) -> None: self._ctx = cipher self._auth = auth def update(self, data: bytes) -> bytes: self._auth.update(data) return self._ctx.update(data) def update_into( self, data: bytes, out: bytearray | memoryview, ) -> None: self._auth.update(data) self._ctx.update_into(data, out)