"""Tools for Symmetric ciphers common to all the backends."""
from __future__ import annotations
import hmac
import typing
from functools import partial
from pyflocker.ciphers import base, exc
if typing.TYPE_CHECKING:
import io
[docs]
class FileCipherWrapper:
"""
Wraps AEAD ciphers and provides file encryption and decryption facility.
"""
def __init__(
self,
cipher: base.BaseAEADCipher,
file: io.BufferedIOBase,
offset: int = 0,
) -> None:
"""Initialize a file cipher wrapper.
Args:
cipher:
A cipher that supports :py:class:`BaseAEADCipher` interface.
file: A file or file-like object.
offset:
The difference between the length of ``in`` buffer and ``out``
buffer in ``update_into`` method of a BaseAEADCipher. This is
required because backend like ``pyca/cryptography`` needs an
output buffer that is bigger than the input buffer.
"""
if not isinstance(cipher, base.BaseAEADCipher):
msg = "cipher must implement BaseAEADCipher interface."
raise TypeError(msg)
# the cipher already has an internal context
self._ctx = cipher
self._file = file
self._tag: bytes | None = None
self._encrypting = self._ctx.is_encrypting()
self._offset = offset
[docs]
def authenticate(self, data: bytes) -> None:
if self._ctx is None:
raise exc.AlreadyFinalized
return self._ctx.authenticate(data)
[docs]
def is_encrypting(self) -> bool:
return self._encrypting
[docs]
def update(self, blocksize: int = 16384) -> bytes | None:
"""
Reads at most ``blocksize`` bytes from ``file``, passes through the
cipher and returns the cipher's output.
Args:
blocksize: Maximum amount of data to read in a single call.
Returns:
bytes: Encrypted or decrypted data.
Raises:
AlreadyFinalized: if the cipher has been finalized.
"""
if self._ctx is None:
raise exc.AlreadyFinalized
if data := self._file.read(blocksize):
return self._ctx.update(data)
return None
[docs]
def update_into(
self,
file: typing.IO[bytes],
tag: bytes | None = None,
blocksize: int = 16384,
) -> None:
"""
Read from ``infile``, pass through cipher and write the output of the
cipher to ``file``. Use this method if you want to encrypt/decrypt the
``infile`` and write its output to ``outfile``.
This method is very fast
(compared to :py:meth:`FileCipherWrapper.update`) because no
intermediate copies of data are made during the entire operation.
Args:
file: File to write the output of the cipher into.
tag:
The tag to verify decryption. If the file is being decrypted,
this must be passed.
blocksize: Maximum amount of data to read in a single call.
Raises:
AlreadyFinalized: if the cipher has been finalized.
ValueError: if the file is being decrypted and tag is not supplied.
"""
if self._ctx is None:
raise exc.AlreadyFinalized
if not self._encrypting and tag is None:
msg = "tag is required for decryption"
raise ValueError(msg)
buf = memoryview(bytearray(blocksize + self._offset))
rbuf = buf[:blocksize]
# localize variables for better performance
offset = self._offset
write = file.write
reads = iter(partial(self._file.readinto, rbuf), 0)
update_into = self._ctx.update_into
for i in reads:
if i < blocksize:
rbuf = rbuf[:i]
buf = buf[: i + offset]
update_into(rbuf, buf)
write(rbuf)
self.finalize(tag)
[docs]
def finalize(self, tag: bytes | None = None) -> None:
if self._ctx is None:
raise exc.AlreadyFinalized
try:
self._ctx.finalize(tag)
finally:
self._tag, self._ctx = (
self._ctx.calculate_tag(),
None, # type: ignore
)
[docs]
def calculate_tag(self) -> bytes | None:
if self._ctx is not None:
msg = "Cipher has not been finalized yet."
raise exc.NotFinalized(msg)
return self._tag
StreamCipherWrapper = FileCipherWrapper
[docs]
class HMACWrapper(base.BaseAEADCipher):
"""
Wraps a cipher that supports BaseNonAEADCipher cipher interface and
provides authentication capability using HMAC.
"""
def __init__(
self,
cipher: base.BaseNonAEADCipher,
hmac_key: bytes,
hmac_random: bytes,
hashfunc: str | base.BaseHash = "sha256",
offset: int = 0,
tag_length: int | None = 16,
) -> None:
if not isinstance(cipher, base.BaseNonAEADCipher):
msg = "Only NonAEAD ciphers can be wrapped."
raise TypeError(msg)
if isinstance(hashfunc, base.BaseHash):
# always use a fresh hash object.
hashfunc = hashfunc.new()
self._auth = hmac.new(hmac_key, digestmod=hashfunc) # type: ignore
self._auth.update(hmac_random)
self._ctx: typing.Any | None
self._ctx = self._get_mac_ctx(cipher, self._auth, offset)
self._encrypting = cipher.is_encrypting()
self._len_aad, self._len_ct = 0, 0
self._updated = False
self._tag = None
self._tag_length = (
self._auth.digest_size if tag_length is None else tag_length
)
[docs]
def is_encrypting(self) -> bool:
return self._encrypting
[docs]
def authenticate(self, data: bytes) -> None:
if self._ctx is None:
raise exc.AlreadyFinalized
if self._updated:
msg = "Cannot call authenticate after update/update_into has been "
"called"
raise TypeError(msg)
self._auth.update(data)
self._len_aad += len(data)
[docs]
def update(self, data: bytes) -> bytes:
if self._ctx is None:
raise exc.AlreadyFinalized
self._updated = True
self._len_ct += len(data)
return self._ctx.update(data)
[docs]
def update_into(
self,
data: bytes,
out: bytearray | memoryview,
) -> None:
if self._ctx is None:
raise exc.AlreadyFinalized
self._updated = True
self._ctx.update_into(data, out)
self._len_ct += len(data)
[docs]
def finalize(self, tag: bytes | None = None) -> None:
if self._ctx is None:
raise exc.AlreadyFinalized
if not self.is_encrypting():
if tag is None:
msg = "tag is required for decryption"
raise ValueError(msg)
if len(tag) != self._tag_length:
msg = f"Invalid tag length: (required {self._tag_length})"
raise ValueError(msg)
self._auth.update(self._len_aad.to_bytes(8, "little"))
self._auth.update(self._len_ct.to_bytes(8, "little"))
self._ctx = None
if not self._encrypting and not hmac.compare_digest(
self._auth.digest()[: self._tag_length],
tag, # type: ignore
):
raise exc.DecryptionError
[docs]
def calculate_tag(self) -> bytes | None:
if self._ctx is not None:
raise exc.NotFinalized
if self.is_encrypting():
return self._auth.digest()[: self._tag_length]
return None
@staticmethod
def _get_mac_ctx(
cipher: base.BaseNonAEADCipher,
auth: typing.Any,
offset: int,
) -> _EncryptionCtx | _DecryptionCtx:
if cipher.is_encrypting():
return _EncryptionCtx(cipher, auth, offset)
return _DecryptionCtx(cipher, auth)
class _EncryptionCtx:
def __init__(
self,
cipher: base.BaseNonAEADCipher,
auth: typing.Any,
offset: int,
) -> None:
self._ctx = cipher
self._auth = auth
self._offset = -offset or None
def update(self, data: bytes) -> bytes:
ctxt = self._ctx.update(data)
self._auth.update(ctxt)
return ctxt
def update_into(
self,
data: bytes,
out: bytearray | memoryview,
) -> None:
self._ctx.update_into(data, out)
self._auth.update(out[: self._offset])
class _DecryptionCtx:
def __init__(
self, cipher: base.BaseNonAEADCipher, auth: typing.Any
) -> None:
self._ctx = cipher
self._auth = auth
def update(self, data: bytes) -> bytes:
self._auth.update(data)
return self._ctx.update(data)
def update_into(
self,
data: bytes,
out: bytearray | memoryview,
) -> None:
self._auth.update(data)
self._ctx.update_into(data, out)