Source code for zserio.bitwriter

"""
The module implements abstraction for writing data to the bit stream.
"""

from zserio.bitbuffer import BitBuffer
from zserio.bitsizeof import (
    bitsizeof_varint16,
    bitsizeof_varint32,
    bitsizeof_varint64,
    bitsizeof_varint,
    bitsizeof_varuint16,
    bitsizeof_varuint32,
    bitsizeof_varuint64,
    bitsizeof_varuint,
    bitsizeof_varsize,
)
from zserio.exception import PythonRuntimeException
from zserio.float import float_to_uint16, float_to_uint32, float_to_uint64
from zserio.limits import INT64_MIN
from zserio.cppbind import import_cpp_class


[docs]class BitStreamWriter: """ Bit stream writer using bytearray. """ def __init__(self) -> None: """ Constructor. """ self._byte_array: bytearray = bytearray() self._bitposition: int = 0
[docs] def write_bits(self, value: int, numbits: int) -> None: """ Writes the given value with the given number of bits to the underlying storage. :param value: Value to write. :param numbits: Number of bits to write. :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. """ if numbits < 1 or numbits > 64: raise PythonRuntimeException(f"BitStreamWriter: numbits '{numbits}' not in range [1,64]!") self.write_bits_unchecked(value, numbits)
[docs] def write_signed_bits(self, value: int, numbits: int) -> None: """ Writes the given signed value with the given number of bits to the underlying storage. Provided for convenience. :param value: Signed value to write. :param numbits: Number of bits to write. :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. """ if numbits < 1 or numbits > 64: raise PythonRuntimeException(f"BitStreamWriter: numbits '{numbits}' not in range [1,64]!") self.write_signed_bits_unchecked(value, numbits)
[docs] def write_bits_unchecked(self, value: int, numbits: int) -> None: """ Writes the given value with the given number of bits to the underlying storage. This method does not check that numbits > 0 and assumes that it's ensured by the caller. :param value: Value to write. :param numbits: Number of bits to write. :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. """ min_value = 0 max_value = (1 << numbits) - 1 if value < min_value or value > max_value: raise PythonRuntimeException( f"BitStreamWriter: Value '{value}' is out of the range " f"[{min_value},{max_value}]!" ) self._write_bits(value, numbits, signed=False)
[docs] def write_signed_bits_unchecked(self, value: int, numbits: int) -> None: """ Writes the given signed value with the given number of bits to the underlying storage. Provided for convenience. This method does not check that numbits > 0 and assumes that it's ensured by the caller. :param value: Signed value to write. :param numbits: Number of bits to write. :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. """ min_value = -(1 << (numbits - 1)) max_value = (1 << (numbits - 1)) - 1 if value < min_value or value > max_value: raise PythonRuntimeException( f"BitStreamWriter: Value '{value}' is out of the range " f"[{min_value},{max_value}]!" ) self._write_bits(value, numbits, signed=True)
[docs] def write_varint16(self, value: int) -> None: """ Writes a variable 16-bit signed integer value to the underlying storage. :param value: Value to write. :raises PythonRuntimeException: If the value is out of the range. """ self._write_varnum(value, 2, bitsizeof_varint16(value) // 8, is_signed=True)
[docs] def write_varint32(self, value: int) -> None: """ Writes a variable 32-bit signed integer value to the underlying storage. :param value: Value to write. :raises PythonRuntimeException: If the value is out of the range. """ self._write_varnum(value, 4, bitsizeof_varint32(value) // 8, is_signed=True)
[docs] def write_varint64(self, value: int) -> None: """ Writes a variable 16-bit signed integer value to the underlying storage. :param value: Value to write. :raises PythonRuntimeException: If the value is out of the range. """ self._write_varnum(value, 8, bitsizeof_varint64(value) // 8, is_signed=True)
[docs] def write_varint(self, value: int) -> None: """ Writes a variable signed integer value (up to 9 bytes) to the underlying storage. :param value: Value to write. :raises PythonRuntimeException: If the value is out of the range. """ if value == INT64_MIN: self._write_bits(0x80, 8) # INT64_MIN is stored as -0 else: self._write_varnum(value, 9, bitsizeof_varint(value) // 8, is_signed=True)
[docs] def write_varuint16(self, value: int) -> None: """ Writes a variable 16-bit unsigned integer value to the underlying storage. :param value: Value to write. :raises PythonRuntimeException: If the value is out of the range. """ self._write_varnum(value, 2, bitsizeof_varuint16(value) // 8, is_signed=False)
[docs] def write_varuint32(self, value: int) -> None: """ Writes a variable 32-bit unsigned integer value to the underlying storage. :param value: Value to write. :raises PythonRuntimeException: If the value is out of the range. """ self._write_varnum(value, 4, bitsizeof_varuint32(value) // 8, is_signed=False)
[docs] def write_varuint64(self, value: int) -> None: """ Writes a variable 16-bit unsigned integer value to the underlying storage. :param value: Value to write. :raises PythonRuntimeException: If the value is out of the range. """ self._write_varnum(value, 8, bitsizeof_varuint64(value) // 8, is_signed=False)
[docs] def write_varuint(self, value: int) -> None: """ Writes a variable unsigned integer value (up to 9 bytes) to the underlying storage. :param value: Value to write. :raises PythonRuntimeException: If the value is out of the range. """ self._write_varnum(value, 9, bitsizeof_varuint(value) // 8, is_signed=False)
[docs] def write_varsize(self, value: int) -> None: """ Writes a variable size integer value to the underlying storage. :param value: Value to write. :raises PythonRuntimeException: If the value is out of the range. """ self._write_varnum(value, 5, bitsizeof_varsize(value) // 8, is_signed=False)
[docs] def write_float16(self, value: float) -> None: """ Writes a 16-bit float value to the underlying storage according to IEEE 754 binary16. :param value: Float value to write. """ self.write_bits_unchecked(float_to_uint16(value), 16)
[docs] def write_float32(self, value: float) -> None: """ Writes a 32-bit float value to the underlying storage according to IEEE 754 binary32. :param value: Float value to write. """ self.write_bits_unchecked(float_to_uint32(value), 32)
[docs] def write_float64(self, value: float) -> None: """ Writes a 64-bit float value to the underlying storage according to IEEE 754 binary64. :param value: Float value to write. """ self.write_bits_unchecked(float_to_uint64(value), 64)
[docs] def write_bytes(self, value: bytearray): """ Writes the given bytes to the underlying storage. Length of the bytes is written as varsize at the beginning. :param value: Bytes to write. """ length = len(value) self.write_varsize(length) begin_bitposition = self._bitposition if (begin_bitposition & 0x07) != 0: # we are not aligned to byte for byte in value: self.write_bits_unchecked(byte, 8) else: # we are aligned to byte self._bitposition += length * 8 self._byte_array += value[0:length]
[docs] def write_string(self, string: str) -> None: """ Writes the given string to the underlying storage in UTF-8 encoding. Length of the string is written as varsize at the beginning. :param string: String to write. """ string_bytes = string.encode("utf-8") length = len(string_bytes) self.write_varsize(length) begin_bitposition = self._bitposition if (begin_bitposition & 0x07) != 0: # we are not aligned to byte for string_byte in string_bytes: self.write_bits_unchecked(string_byte, 8) else: # we are aligned to byte self._bitposition += length * 8 self._byte_array += string_bytes[0:length]
[docs] def write_bool(self, value: bool) -> None: """ Writes bool in a single bit. :param value: Bool value to write. """ self._write_bits(1 if value else 0, 1)
[docs] def write_bitbuffer(self, bitbuffer: BitBuffer) -> None: """ Writes a bit buffer to the underlying storage. Length of the bit buffer is written as varsize at the beginning. :param bitbuffer: Bit buffer to write. """ bitsize = bitbuffer.bitsize self.write_varsize(bitsize) write_buffer = bitbuffer.buffer num_bytes_to_write = bitsize // 8 num_rest_bits = bitsize - num_bytes_to_write * 8 begin_bitposition = self._bitposition if (begin_bitposition & 0x07) != 0: # we are not aligned to byte for i in range(num_bytes_to_write): self.write_bits_unchecked(write_buffer[i], 8) else: # we are aligned to byte self._bitposition += num_bytes_to_write * 8 self._byte_array += write_buffer[0:num_bytes_to_write] if num_rest_bits > 0: self.write_bits_unchecked(write_buffer[num_bytes_to_write] >> (8 - num_rest_bits), num_rest_bits)
@property def byte_array(self) -> bytes: """ Gets internal bytearray. :returns: Underlying bytearray object. """ return self._byte_array
[docs] def to_file(self, filename: str) -> None: """ Writes underlying bytearray to binary file. :param filename: File to write. """ with open(filename, "wb") as file: file.write(self._byte_array)
@property def bitposition(self) -> int: """ Gets current bit position. :returns: Current bit position. """ return self._bitposition
[docs] def alignto(self, alignment: int) -> None: """ Aligns the bit position according to the aligning value. :param alignment: An aligning value to use. """ offset = self._bitposition % alignment if offset != 0: self._write_bits(0, alignment - offset)
def _write_bits(self, value: int, numbits: int, *, signed: bool = False) -> None: buffer_last_byte_bits = self._bitposition % 8 buffer_free_bits = (8 - buffer_last_byte_bits) if buffer_last_byte_bits != 0 else 0 value_first_byte_bits = numbits % 8 or 8 if value_first_byte_bits <= buffer_free_bits: left_shift = buffer_free_bits - value_first_byte_bits else: left_shift = buffer_free_bits + 8 - value_first_byte_bits value <<= left_shift num_bytes = (numbits + left_shift + 7) // 8 value_bytes = value.to_bytes(num_bytes, byteorder="big", signed=signed) if buffer_free_bits == 0: self._byte_array.extend(value_bytes) else: value_first_byte = value_bytes[0] & ((1 << buffer_free_bits) - 1) self._byte_array[-1] |= value_first_byte self._byte_array.extend(value_bytes[1:]) self._bitposition += numbits def _write_varnum(self, value: int, max_var_bytes: int, num_var_bytes: int, *, is_signed: bool) -> None: abs_value = abs(value) has_max_byte_range = num_var_bytes == max_var_bytes for i in range(num_var_bytes): byte = 0x00 numbits = 8 has_next_byte = i < num_var_bytes - 1 has_sign_bit = is_signed and i == 0 if has_sign_bit: if value < 0: byte |= 0x80 numbits -= 1 if has_next_byte: numbits -= 1 byte |= 1 << numbits # use bit 6 if signed bit is present, use bit 7 otherwise else: # this is the last byte if not has_max_byte_range: # next byte flag isn't used in last byte in case of max byte range numbits -= 1 shift_bits = (num_var_bytes - (i + 1)) * 7 + (1 if has_max_byte_range and has_next_byte else 0) byte |= (abs_value >> shift_bits) & VAR_NUM_BIT_MASKS[numbits - 1] self.write_bits_unchecked(byte, 8)
VAR_NUM_BIT_MASKS = [0x01, 0x03, 0x07, 0x0F, 0x1F, 0x3F, 0x7F, 0xFF] BitStreamWriter = import_cpp_class("BitStreamWriter") or BitStreamWriter # type: ignore