Source code for zserio.bitreader
"""
The module implements abstraction for reading data to the bit stream.
"""
import typing
from zserio.bitbuffer import BitBuffer
from zserio.limits import INT64_MIN
from zserio.exception import PythonRuntimeException
from zserio.float import uint16_to_float, uint32_to_float, uint64_to_float
from zserio.cppbind import import_cpp_class
[docs]class BitStreamReader:
"""
Bit stream reader.
"""
def __init__(self, buffer: bytes, bitsize: typing.Optional[int] = None) -> None:
"""
Constructs bit stream reader from bytes buffer.
Because bit buffer size does not have to be byte aligned (divisible by 8), it's possible that not all
bits of the last byte are used. In this case, only most significant bits of the corresponded size are
used.
:param buffer: Bytes-like buffer to read as a bit stream.
:param bitsize: Number of bits stored in buffer to use.
:raises PythonRuntimeException: If bitsize is out of range.
"""
if bitsize is None:
bitsize = len(buffer) * 8
elif len(buffer) * 8 < bitsize:
raise PythonRuntimeException(
f"BitStreamReader: Bit size '{bitsize}' out of range "
f"for the given buffer byte size '{len(buffer)}'!"
)
self._buffer: bytes = buffer
self._bitsize: int = bitsize
self._bitposition: int = 0
[docs] @classmethod
def from_bitbuffer(cls: typing.Type["BitStreamReader"], bitbuffer: BitBuffer) -> "BitStreamReader":
"""
Constructs bit stream reader from bit buffer.
:param bitbuffer: Bit buffer to read as a bit stream.
"""
instance = cls(bitbuffer.buffer, bitbuffer.bitsize)
return instance
[docs] @classmethod
def from_file(cls: typing.Type["BitStreamReader"], filename: str) -> "BitStreamReader":
"""
Constructs bit stream reader from file.
:param filename: Filename to read as a bit stream.
"""
with open(filename, "rb") as file:
return cls(file.read())
[docs] def read_bits(self, numbits: int) -> int:
"""
Reads given number of bits from the bit stream as an unsigned integer.
:param numbits: Number of bits to read.
:returns: Read bits as an unsigned integer.
:raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
"""
if numbits < 1 or numbits > 64:
raise PythonRuntimeException("BitStreamReader: numbits '{numbits}' not in range [1,64]!")
return self.read_bits_unchecked(numbits)
[docs] def read_signed_bits(self, numbits: int) -> int:
"""
Reads given number of bits from the bit stream as a signed integer.
:param numbits: Number of bits to read
:returns: Read bits as a signed integer.
:raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
"""
if numbits < 1 or numbits > 64:
raise PythonRuntimeException(f"BitStreamReader: numbits '{numbits}' not in range [1,64]!")
return self.read_signed_bits_unchecked(numbits)
[docs] def read_bits_unchecked(self, numbits: int) -> int:
"""
Reads given number of bits from the bit stream as an unsigned integer.
This method does not check that numbits >= 0 and assumes that it's ensured by the caller.
:param numbits: Number of bits to read.
:returns: Read bits as an unsigned integer.
:raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
"""
end_bitposition = self._bitposition + numbits
if end_bitposition > self._bitsize:
raise PythonRuntimeException("BitStreamReader: Reading behind the stream!")
start_byte = self._bitposition // 8
end_byte = (end_bitposition - 1) // 8
value = int.from_bytes(self._buffer[start_byte : end_byte + 1], byteorder="big", signed=False)
last_bits = end_bitposition % 8
if last_bits != 0:
value >>= 8 - last_bits
value &= (1 << numbits) - 1
self._bitposition = end_bitposition
return value
[docs] def read_signed_bits_unchecked(self, numbits: int) -> int:
"""
Reads given number of bits from the bit stream as a signed integer.
This method does not check that numbits >= 0 and assumes that it's ensured by the caller.
:param numbits: Number of bits to read
:returns: Read bits as a signed integer.
:raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
"""
value = self.read_bits_unchecked(numbits)
if numbits != 0 and (value >> (numbits - 1)) != 0:
# signed
return value - (1 << numbits)
else:
# unsigned
return value
[docs] def read_varint16(self) -> int:
"""
Reads variable 16-bit signed integer value from the bit stream.
:returns: Variable 16-bit signed integer value.
"""
byte = self.read_bits_unchecked(8) # byte 1
sign = byte & VARINT_SIGN_1
result = byte & VARINT_BYTE_1
if byte & VARINT_HAS_NEXT_1 == 0:
return -result if sign != 0 else result
result = (result << 8) | self.read_bits_unchecked(8) # byte 2
return -result if sign else result
[docs] def read_varint32(self) -> int:
"""
Reads variable 32-bit signed integer value from the bit stream.
:returns: Variable 32-bit signed integer value.
"""
byte = self.read_bits_unchecked(8) # byte 1
sign = byte & VARINT_SIGN_1
result = byte & VARINT_BYTE_1
if byte & VARINT_HAS_NEXT_1 == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 2
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 3
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
result = result << 8 | self.read_bits_unchecked(8) # byte 4
return -result if sign else result
[docs] def read_varint64(self) -> int:
"""
Reads variable 64-bit signed integer value from the bit stream.
:returns: Variable 64-bit signed integer value.
"""
byte = self.read_bits_unchecked(8) # byte 1
sign = byte & VARINT_SIGN_1
result = byte & VARINT_BYTE_1
if byte & VARINT_HAS_NEXT_1 == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 2
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 3
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 4
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 5
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 6
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 7
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
result = result << 8 | self.read_bits_unchecked(8) # byte 8
return -result if sign else result
[docs] def read_varint(self) -> int:
"""
Reads variable signed integer value (up to 9 bytes) from the bit stream.
:returns: Variable signed integer value (up to 9 bytes).
"""
byte = self.read_bits_unchecked(8) # byte 1
sign = byte & VARINT_SIGN_1
result = byte & VARINT_BYTE_1
if byte & VARINT_HAS_NEXT_1 == 0:
return (INT64_MIN if result == 0 else -result) if sign else result
byte = self.read_bits_unchecked(8) # byte 2
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 3
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 4
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 5
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 6
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 7
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
byte = self.read_bits_unchecked(8) # byte 8
result = result << 7 | (byte & VARINT_BYTE_N)
if byte & VARINT_HAS_NEXT_N == 0:
return -result if sign else result
result = result << 8 | self.read_bits_unchecked(8) # byte 9
return -result if sign else result
[docs] def read_varuint16(self) -> int:
"""
Reads variable 16-bit unsigned integer value from the bit stream.
:returns: Variable 16-bit unsigned integer value.
"""
byte = self.read_bits_unchecked(8) # byte 1
result = byte & VARUINT_BYTE
if byte & VARUINT_HAS_NEXT == 0:
return result
result = result << 8 | self.read_bits_unchecked(8) # byte 2
return result
[docs] def read_varuint32(self) -> int:
"""
Reads variable 32-bit unsigned integer value from the bit stream.
:returns: Variable 32-bit unsigned integer value.
"""
byte = self.read_bits_unchecked(8) # byte 1
result = byte & VARUINT_BYTE
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 2
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 3
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
result = result << 8 | self.read_bits_unchecked(8) # byte 4
return result
[docs] def read_varuint64(self) -> int:
"""
Reads variable 64-bit unsigned integer value from the bit stream.
:returns: Variable 64-bit unsigned integer value.
"""
byte = self.read_bits_unchecked(8) # byte 1
result = byte & VARUINT_BYTE
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 2
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 3
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 4
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 5
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 6
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 7
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
result = result << 8 | self.read_bits_unchecked(8) # byte 8
return result
[docs] def read_varuint(self) -> int:
"""
Reads variable unsigned integer value (up to 9 bytes) from the bit stream.
:returns: Variable unsigned integer value (up to 9 bytes).
"""
byte = self.read_bits_unchecked(8) # byte 1
result = byte & VARUINT_BYTE
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 2
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 3
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 4
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 5
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 6
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 7
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 8
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
result = result << 8 | self.read_bits_unchecked(8) # byte 9
return result
[docs] def read_varsize(self) -> int:
"""
Reads variable size integer value from the bit stream.
:returns: Variable size integer value.
:raises PythonRuntimeException: If read variable size integer is out of range.
"""
byte = self.read_bits_unchecked(8) # byte 1
result = byte & VARUINT_BYTE
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 2
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 3
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
byte = self.read_bits_unchecked(8) # byte 4
result = result << 7 | (byte & VARUINT_BYTE)
if byte & VARUINT_HAS_NEXT == 0:
return result
result = result << 8 | self.read_bits_unchecked(8) # byte 5
if result > VARSIZE_MAX_VALUE:
raise PythonRuntimeException(
f"BitStreamReader: Read value '{result}' is out of range " "for varsize type!"
)
return result
[docs] def read_float16(self) -> float:
"""
Read 16-bits from the stream as a float value encoded according to IEEE 754 binary16.
:returns: Read float value.
:raises PythonRuntimeException: If the reading goes behind the stream.
"""
return uint16_to_float(self.read_bits_unchecked(16))
[docs] def read_float32(self) -> float:
"""
Read 32-bits from the stream as a float value encoded according to IEEE 754 binary32.
:returns: Read float value.
:raises PythonRuntimeException: If the reading goes behind the stream.
"""
return uint32_to_float(self.read_bits_unchecked(32))
[docs] def read_float64(self) -> float:
"""
Read 64-bits from the stream as a float value encoded according to IEEE 754 binary64.
:returns: Read float value.
:raises PythonRuntimeException: If the reading goes behind the stream.
"""
return uint64_to_float(self.read_bits_unchecked(64))
[docs] def read_bytes(self) -> bytearray:
"""
Reads bytes from the stream.
:returns: Read bytes.
:raises PythonRuntimeException: If the reading goes behind the stream.
"""
length = self.read_varsize()
begin_bitposition = self._bitposition
if (begin_bitposition & 0x07) != 0:
# we are not aligned to byte
value = bytearray()
for _ in range(length):
value.append(self.read_bits_unchecked(8))
else:
# we are aligned to byte
self.bitposition = begin_bitposition + length * 8
value = bytearray(length)
begin_byte_position = begin_bitposition // 8
value[0:length] = self._buffer[begin_byte_position : begin_byte_position + length]
return value
[docs] def read_string(self) -> str:
"""
Reads string from the stream.
:returns: Read string.
:raises PythonRuntimeException: If the reading goes behind the stream.
"""
length = self.read_varsize()
begin_bitposition = self._bitposition
if (begin_bitposition & 0x07) != 0:
# we are not aligned to byte
value = bytearray()
for _ in range(length):
value.append(self.read_bits_unchecked(8))
else:
# we are aligned to byte
self.bitposition = begin_bitposition + length * 8
value = bytearray(length)
begin_byte_position = begin_bitposition // 8
value[0:length] = self._buffer[begin_byte_position : begin_byte_position + length]
return value.decode("utf-8")
[docs] def read_bool(self) -> bool:
"""
Reads single bit as a bool value.
:returns: Read bool values.
:raises PythonRuntimeException: If the reading goes behind the stream.
"""
return self.read_bits_unchecked(1) != 0
[docs] def read_bitbuffer(self) -> BitBuffer:
"""
Reads a bit buffer from the stream.
:returns: Read bit buffer.
:raises PythonRuntimeException: If the reading goes behind the stream.
"""
bitsize = self.read_varsize()
num_bytes_to_read = bitsize // 8
num_rest_bits = bitsize - num_bytes_to_read * 8
bytesize = (bitsize + 7) // 8
read_buffer = bytearray(bytesize)
begin_bitposition = self._bitposition
if (begin_bitposition & 0x07) != 0:
# we are not aligned to byte
for i in range(num_bytes_to_read):
read_buffer[i] = self.read_bits_unchecked(8)
else:
# we are aligned to byte
self.bitposition = begin_bitposition + num_bytes_to_read * 8
begin_byte_position = begin_bitposition // 8
read_buffer[0:num_bytes_to_read] = self._buffer[
begin_byte_position : begin_byte_position + num_bytes_to_read
]
if num_rest_bits != 0:
read_buffer[num_bytes_to_read] = self.read_bits(num_rest_bits) << (8 - num_rest_bits)
return BitBuffer(read_buffer, bitsize)
@property
def bitposition(self) -> int:
"""
Gets current bit position.
:returns: Current bit position.
"""
return self._bitposition
@bitposition.setter
def bitposition(self, bitposition: int) -> None:
"""
Sets bit position.
:param bitposition: New bit position.
:raises PythonRuntimeException: If the position is not within the stream.
"""
if bitposition < 0:
raise PythonRuntimeException("BitStreamReader: Cannot set negative bit position!")
if bitposition > self._bitsize:
raise PythonRuntimeException("BitStreamReader: Setting bit position behind the stream!")
self._bitposition = bitposition
[docs] def alignto(self, alignment: int) -> None:
"""
Aligns the bit position according to the aligning value.
:param alignment: An aligning value to use.
:raises PythonRuntimeException: If the aligning moves behind the stream."
"""
offset = self._bitposition % alignment
if offset != 0:
self.bitposition = self._bitposition + alignment - offset
@property
def buffer_bitsize(self) -> int:
"""
Gets size of the underlying buffer in bits.
:returns: Buffer bit size.
"""
return self._bitsize
VARINT_SIGN_1 = 0x80
VARINT_BYTE_1 = 0x3F
VARINT_BYTE_N = 0x7F
VARINT_HAS_NEXT_1 = 0x40
VARINT_HAS_NEXT_N = 0x80
VARUINT_BYTE = 0x7F
VARUINT_HAS_NEXT = 0x80
VARSIZE_MAX_VALUE = (1 << 31) - 1
_BitStreamReaderCpp = import_cpp_class("BitStreamReader")
if _BitStreamReaderCpp is not None:
BitStreamReader = _BitStreamReaderCpp # type: ignore
def _bitstreamreader_fromfile(filename: str) -> "BitStreamReader":
with open(filename, "rb") as file:
return BitStreamReader(file.read())
BitStreamReader.from_file = _bitstreamreader_fromfile