Source code for bobocep.dist.crypto.aes

# Copyright (c) 2019-2024 r3w0p
# The following code can be redistributed and/or
# modified under the terms of the MIT License.

"""
AES encryption for distributed traffic.
"""
from threading import RLock

from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes

from bobocep.dist.crypto.crypto import BoboDistributedCrypto, \
    BoboDistributedCryptoError

_UTF_8: str = "UTF-8"
_PAD_MODULO: int = 16
_END_BYTES: bytes = "BOBO".encode(_UTF_8)
_LEN_END_BYTES: int = len(_END_BYTES)
_PAD_CHAR: str = '\0'

_BYTES_AES_128: int = 16
_BYTES_AES_192: int = 24
_BYTES_AES_256: int = 32


[docs] class BoboDistributedCryptoAES(BoboDistributedCrypto): """ AES encryption in GCM mode. Data are encrypted using either AES-128, AES-192, or AES-256 encryption. """
[docs] def __init__(self, aes_key: str, nonce_length: int = 16, mac_length: int = 16): """ :param aes_key: The AES key to use for encryption. :param nonce_length: The nonce length. :param mac_length: The MAC length. """ super().__init__() self._lock: RLock = RLock() # Check AES key num_bytes_aes_key = len(aes_key) if (num_bytes_aes_key != _BYTES_AES_128 and num_bytes_aes_key != _BYTES_AES_192 and num_bytes_aes_key != _BYTES_AES_256): raise BoboDistributedCryptoError( "AES key must be 16, 24, or 32 bytes long " "(respectively for AES-128, AES-192, or AES-256): " "'{}' has {} bytes.".format(aes_key, len(aes_key))) self._aes_key: bytes = aes_key.encode(_UTF_8) self._nonce_length: int = nonce_length self._mac_length: int = mac_length self._msg_min_length: int = ( _PAD_MODULO + nonce_length + mac_length + _LEN_END_BYTES )
[docs] def end_bytes(self) -> bytes: """ :return: Bytes used to signify the end of every encrypted payload. """ return _END_BYTES
[docs] def min_length(self) -> int: """ :return: The minimum possible length of an encrypted payload, in number of bytes. """ return self._msg_min_length
[docs] def encrypt(self, msg_str: str) -> bytes: """ :param msg_str: Wraps JSON string in other data for transit. :return: The bytes to transmit. """ nonce = get_random_bytes(self._nonce_length) cipher = AES.new(self._aes_key, AES.MODE_GCM, nonce=nonce, mac_len=self._mac_length) # Pad message, if necessary len_msg = len(msg_str) if len_msg % _PAD_MODULO != 0: msg_str = msg_str + \ (_PAD_CHAR * (_PAD_MODULO - len_msg % _PAD_MODULO)) ciphertext, mac = cipher.encrypt_and_digest( # type: ignore msg_str.encode(_UTF_8)) # Append nonce, mac, and end bytes to ciphertext ciphertext_bytes = bytearray(ciphertext) ciphertext_bytes.extend(nonce) ciphertext_bytes.extend(mac) ciphertext_bytes.extend(_END_BYTES) return ciphertext_bytes
[docs] def decrypt(self, msg_bytes: bytes) -> str: """ :param msg_bytes: Incoming bytes. :return: Incoming JSON string with other data from transit. """ # [CIPHERTEXT][NONCE][MAC][END_BYTES] ciphertext = msg_bytes[:-( self._nonce_length + self._mac_length + _LEN_END_BYTES)] nonce = msg_bytes[ -(self._nonce_length + self._mac_length + _LEN_END_BYTES): -(self._mac_length + _LEN_END_BYTES)] mac = msg_bytes[ -(self._mac_length + _LEN_END_BYTES): -_LEN_END_BYTES] cipher = AES.new(self._aes_key, AES.MODE_GCM, nonce=nonce) plaintext = cipher.decrypt_and_verify( # type: ignore ciphertext, mac).decode(_UTF_8).rstrip(_PAD_CHAR) return plaintext