Source code for pyflocker.ciphers.backends.cryptography_.DH

from __future__ import annotations

import typing

from cryptography.hazmat.primitives import serialization as serial
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives.serialization import (
    Encoding,
    ParameterFormat,
    PrivateFormat,
    PublicFormat,
)

from pyflocker.ciphers import base


[docs] class DHParameters(base.BaseDHParameters): _ENCODINGS = { "PEM": Encoding.PEM, "DER": Encoding.DER, } _FORMATS = { "PKCS3": ParameterFormat.PKCS3, } _LOADERS = { b"-----BEGIN DH PARAMETERS": serial.load_pem_parameters, b"0": serial.load_der_parameters, } def __init__( self, key_size: int | None, generator: int = 2, _params: dh.DHParameters | None = None, ) -> None: if _params is not None: self._params = _params else: if not isinstance(key_size, int): # pragma: no cover msg = "key_size must be an integer" raise TypeError(msg) self._params = dh.generate_parameters(generator, key_size) numbers = self._params.parameter_numbers() self._g = numbers.g self._p = numbers.p self._q = numbers.q @property def g(self) -> int: return self._g @property def p(self) -> int: return self._p @property def q(self) -> int | None: return self._q
[docs] def private_key(self) -> DHPrivateKey: return DHPrivateKey(self._params.generate_private_key())
[docs] def serialize(self, encoding: str = "PEM", format: str = "PKCS3") -> bytes: """Serialize the DH parameters. Args: encoding: The encoding can be ``PEM`` or ``DER``. Defaults to ``PEM``. format: The format. Defaults to ``PKCS3``. Returns: The parameters encoded as bytes object. Raises: ValueError: if the encoding of format is invalid. """ try: encd = self._ENCODINGS[encoding] fmt = self._FORMATS[format] except KeyError as e: msg = f"The encoding or format is invalid: {e.args[0]!r}" raise ValueError(msg) from e try: return self._params.parameter_bytes(encd, fmt) except ValueError as e: msg = f"Failed to serialize key: {e!s}" raise ValueError(msg) from e
[docs] @classmethod def load(cls, data: bytes) -> DHParameters: data = memoryview(data).tobytes() loader = cls._get_loader(data) try: params = loader(data) if not isinstance(params, dh.DHParameters): msg = "Data is not a DH parameter." raise ValueError(msg) except ValueError as e: msg = f"Failed to load key: {e!s}" raise ValueError(msg) from e return cls(None, _params=params)
@classmethod def _get_loader(cls, data: bytes) -> typing.Callable: """ Returns a loader function depending on the initial bytes of the parameter. """ try: return cls._LOADERS[next(filter(data.startswith, cls._LOADERS))] except StopIteration: msg = "Invalid format." raise ValueError(msg) from None
[docs] @classmethod def load_from_parameters( cls, p: int, g: int = 2, q: int | None = None, ) -> DHParameters: param_nos = dh.DHParameterNumbers(p, g, q) return cls(None, _params=param_nos.parameters())
[docs] class DHPrivateKey(base.BaseDHPrivateKey): _ENCODINGS = { "PEM": Encoding.PEM, "DER": Encoding.DER, } _FORMATS = { "PKCS8": PrivateFormat.PKCS8, } _LOADERS = { b"-----": serial.load_pem_private_key, b"0": serial.load_der_private_key, } def __init__(self, key: dh.DHPrivateKey) -> None: if not isinstance(key, dh.DHPrivateKey): # pragma: no cover msg = "The key is not a DH private key." raise ValueError(msg) self._key = key numbers = key.private_numbers() self._x = numbers.x
[docs] def parameters(self) -> DHParameters: return DHParameters(None, _params=self._key.parameters())
@property def key_size(self) -> int: return self._key.key_size
[docs] def public_key(self) -> DHPublicKey: return DHPublicKey(self._key.public_key())
[docs] def exchange( self, peer_public_key: bytes | DHPublicKey | base.BaseDHPublicKey, ) -> bytes: if isinstance(peer_public_key, bytes): return self._key.exchange(DHPublicKey.load(peer_public_key)._key) # optimizing case: key is made from this Backend if isinstance(peer_public_key, DHPublicKey): return self._key.exchange(peer_public_key._key) return self._key.exchange( # pragma: no cover DHPublicKey.load( peer_public_key.serialize("PEM", "SubjectPublicKeyInfo"), )._key )
[docs] def serialize( self, encoding: str = "PEM", format: str = "PKCS8", passphrase: bytes | None = None, ) -> bytes: try: encd = self._ENCODINGS[encoding] fmt = self._FORMATS[format] except KeyError as e: msg = f"The encoding or format is invalid: {e.args[0]!r}" raise ValueError(msg) from e protection: serial.KeySerializationEncryption if passphrase is None: protection = serial.NoEncryption() else: protection = serial.BestAvailableEncryption( memoryview(passphrase).tobytes() ) try: return self._key.private_bytes(encd, fmt, protection) except ValueError as e: msg = f"Failed to serialize key: {e!s}" raise ValueError(msg) from e
@property def x(self) -> int: return self._x
[docs] @classmethod def load( cls, data: bytes, passphrase: bytes | None = None, ) -> DHPrivateKey: data = memoryview(data).tobytes() loader = cls._get_loader(data) if passphrase is not None: passphrase = memoryview(passphrase).tobytes() try: key = loader(data, passphrase) if not isinstance(key, dh.DHPrivateKey): msg = "Key is not a DH private key." raise ValueError(msg) except (ValueError, TypeError) as e: msg = f"Failed to load key: {e!s}" raise ValueError(msg) from e return cls(key)
@classmethod def _get_loader(cls, data: bytes) -> typing.Callable: """ Returns a loader function depending on the initial bytes of the key. """ try: return cls._LOADERS[next(filter(data.startswith, cls._LOADERS))] except StopIteration: msg = "Invalid format" raise ValueError(msg) from None
[docs] class DHPublicKey(base.BaseDHPublicKey): _ENCODINGS = { "PEM": Encoding.PEM, "DER": Encoding.DER, } _FORMATS = { "SubjectPublicKeyInfo": PublicFormat.SubjectPublicKeyInfo, } _LOADERS = { b"-----": serial.load_pem_public_key, b"0": serial.load_der_public_key, } def __init__(self, key: dh.DHPublicKey) -> None: if not isinstance(key, dh.DHPublicKey): # pragma: no cover msg = "The key is not a DH public key." raise ValueError(msg) self._key = key self._y = key.public_numbers().y
[docs] def parameters(self) -> DHParameters: return DHParameters(None, _params=self._key.parameters())
@property def key_size(self) -> int: return self._key.key_size
[docs] def serialize( self, encoding: str = "PEM", format: str = "SubjectPublicKeyInfo", ) -> bytes: """Serialize the public key. Args: encoding: The encoding to use. It can be ``PEM`` or ``DER``. format: The format can be ``SubjectPublicKeyInfo`` only. Returns: The public key as bytes object. Raises: ValueError: if the encoding or format is invalid. """ try: encd = self._ENCODINGS[encoding] fmt = self._FORMATS[format] except KeyError as e: msg = f"Invalid encoding or format: {e.args[0]!r}" raise ValueError(msg) from e try: return self._key.public_bytes(encd, fmt) except ValueError as e: msg = f"Failed to serialize key: {e!s}" raise ValueError(msg) from e
@property def y(self) -> int: return self._y
[docs] @classmethod def load(cls, data: bytes) -> DHPublicKey: data = memoryview(data).tobytes() loader = cls._get_loader(data) try: key = loader(data) if not isinstance(key, dh.DHPublicKey): msg = "Key is not a DH public key." raise ValueError(msg) except ValueError as e: msg = f"Failed to load key: {e!s}" raise ValueError(msg) from e return cls(key)
@classmethod def _get_loader(cls, data: bytes) -> typing.Callable: """ Returns a loader function depending on the initial bytes of the key. """ try: return cls._LOADERS[next(filter(data.startswith, cls._LOADERS))] except StopIteration: msg = "Invalid format." raise ValueError(msg) from None
[docs] def generate(key_size: int, g: int = 2) -> DHParameters: """ Generate DHE parameter with prime number's bit size ``bits`` and generator ``g`` (default 2). Recommended size of ``bits`` > 1024. Args: key_size: The bit length of the prime modulus. g: The value to use as a generator value. Default is 2. Returns: A DH key exchange paramenter object. """ return DHParameters(key_size, g)
[docs] def load_from_parameters( p: int, g: int = 2, q: int | None = None, ) -> DHParameters: """Create a DH Parameter object from the given parameters. Args: p: The prime modulus `p` as ``int``. g: The generator. q: `p` subgroup order value. Returns: A DH key exchange paramenter object. """ return DHParameters.load_from_parameters(p, g, q)
[docs] def load_parameters(data: bytes) -> DHParameters: """Deserialize the DH parameters and load a parameter object. Args: data: Serialized DH Parameter. Returns: A parameter object. """ return DHParameters.load(data)
[docs] def load_public_key(data: bytes) -> DHPublicKey: """Loads the public key and returns a Key interface. Args: data: The public key (a bytes-like object) to deserialize. Returns: A public key object. """ return DHPublicKey.load(data)
[docs] def load_private_key( data: bytes, passphrase: bytes | None = None, ) -> DHPrivateKey: """Loads the private key and returns a private key object. If the private key was not encrypted duting the serialization, ``passphrase`` must be ``None``, otherwise it must be a ``bytes-like`` object. Args: data: The private key (a bytes-like object) to deserialize. passphrase: The passphrase (in bytes) that was used to encrypt the private key. ``None`` if the key was not encrypted. Returns: A private key object. """ return DHPrivateKey.load(data, passphrase)