Coverage for /home/runner/work/zserio/zserio/compiler/extensions/python/runtime/src/zserio/bitwriter.py: 100%
142 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-12-05 10:43 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-12-05 10:43 +0000
1"""
2The module implements abstraction for writing data to the bit stream.
3"""
5from zserio.bitbuffer import BitBuffer
6from zserio.bitsizeof import (
7 bitsizeof_varint16,
8 bitsizeof_varint32,
9 bitsizeof_varint64,
10 bitsizeof_varint,
11 bitsizeof_varuint16,
12 bitsizeof_varuint32,
13 bitsizeof_varuint64,
14 bitsizeof_varuint,
15 bitsizeof_varsize,
16)
17from zserio.exception import PythonRuntimeException
18from zserio.float import float_to_uint16, float_to_uint32, float_to_uint64
19from zserio.limits import INT64_MIN
20from zserio.cppbind import import_cpp_class
23class BitStreamWriter:
24 """
25 Bit stream writer using bytearray.
26 """
28 def __init__(self) -> None:
29 """
30 Constructor.
31 """
33 self._byte_array: bytearray = bytearray()
34 self._bitposition: int = 0
36 def write_bits(self, value: int, numbits: int) -> None:
37 """
38 Writes the given value with the given number of bits to the underlying storage.
40 :param value: Value to write.
41 :param numbits: Number of bits to write.
42 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid.
43 """
45 if numbits < 1 or numbits > 64:
46 raise PythonRuntimeException(f"BitStreamWriter: numbits '{numbits}' not in range [1,64]!")
48 self.write_bits_unchecked(value, numbits)
50 def write_signed_bits(self, value: int, numbits: int) -> None:
51 """
52 Writes the given signed value with the given number of bits to the underlying storage.
53 Provided for convenience.
55 :param value: Signed value to write.
56 :param numbits: Number of bits to write.
57 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid.
58 """
60 if numbits < 1 or numbits > 64:
61 raise PythonRuntimeException(f"BitStreamWriter: numbits '{numbits}' not in range [1,64]!")
63 self.write_signed_bits_unchecked(value, numbits)
65 def write_bits_unchecked(self, value: int, numbits: int) -> None:
66 """
67 Writes the given value with the given number of bits to the underlying storage.
69 This method does not check that numbits > 0 and assumes that it's ensured by the caller.
71 :param value: Value to write.
72 :param numbits: Number of bits to write.
73 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid.
74 """
76 min_value = 0
77 max_value = (1 << numbits) - 1
78 if value < min_value or value > max_value:
79 raise PythonRuntimeException(
80 f"BitStreamWriter: Value '{value}' is out of the range " f"[{min_value},{max_value}]!"
81 )
83 self._write_bits(value, numbits, signed=False)
85 def write_signed_bits_unchecked(self, value: int, numbits: int) -> None:
86 """
87 Writes the given signed value with the given number of bits to the underlying storage.
88 Provided for convenience.
90 This method does not check that numbits > 0 and assumes that it's ensured by the caller.
92 :param value: Signed value to write.
93 :param numbits: Number of bits to write.
94 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid.
95 """
97 min_value = -(1 << (numbits - 1))
98 max_value = (1 << (numbits - 1)) - 1
99 if value < min_value or value > max_value:
100 raise PythonRuntimeException(
101 f"BitStreamWriter: Value '{value}' is out of the range " f"[{min_value},{max_value}]!"
102 )
104 self._write_bits(value, numbits, signed=True)
106 def write_varint16(self, value: int) -> None:
107 """
108 Writes a variable 16-bit signed integer value to the underlying storage.
110 :param value: Value to write.
111 :raises PythonRuntimeException: If the value is out of the range.
112 """
114 self._write_varnum(value, 2, bitsizeof_varint16(value) // 8, is_signed=True)
116 def write_varint32(self, value: int) -> None:
117 """
118 Writes a variable 32-bit signed integer value to the underlying storage.
120 :param value: Value to write.
121 :raises PythonRuntimeException: If the value is out of the range.
122 """
124 self._write_varnum(value, 4, bitsizeof_varint32(value) // 8, is_signed=True)
126 def write_varint64(self, value: int) -> None:
127 """
128 Writes a variable 16-bit signed integer value to the underlying storage.
130 :param value: Value to write.
131 :raises PythonRuntimeException: If the value is out of the range.
132 """
134 self._write_varnum(value, 8, bitsizeof_varint64(value) // 8, is_signed=True)
136 def write_varint(self, value: int) -> None:
137 """
138 Writes a variable signed integer value (up to 9 bytes) to the underlying storage.
140 :param value: Value to write.
141 :raises PythonRuntimeException: If the value is out of the range.
142 """
144 if value == INT64_MIN:
145 self._write_bits(0x80, 8) # INT64_MIN is stored as -0
146 else:
147 self._write_varnum(value, 9, bitsizeof_varint(value) // 8, is_signed=True)
149 def write_varuint16(self, value: int) -> None:
150 """
151 Writes a variable 16-bit unsigned integer value to the underlying storage.
153 :param value: Value to write.
154 :raises PythonRuntimeException: If the value is out of the range.
155 """
157 self._write_varnum(value, 2, bitsizeof_varuint16(value) // 8, is_signed=False)
159 def write_varuint32(self, value: int) -> None:
160 """
161 Writes a variable 32-bit unsigned integer value to the underlying storage.
163 :param value: Value to write.
164 :raises PythonRuntimeException: If the value is out of the range.
165 """
167 self._write_varnum(value, 4, bitsizeof_varuint32(value) // 8, is_signed=False)
169 def write_varuint64(self, value: int) -> None:
170 """
171 Writes a variable 16-bit unsigned integer value to the underlying storage.
173 :param value: Value to write.
174 :raises PythonRuntimeException: If the value is out of the range.
175 """
177 self._write_varnum(value, 8, bitsizeof_varuint64(value) // 8, is_signed=False)
179 def write_varuint(self, value: int) -> None:
180 """
181 Writes a variable unsigned integer value (up to 9 bytes) to the underlying storage.
183 :param value: Value to write.
184 :raises PythonRuntimeException: If the value is out of the range.
185 """
187 self._write_varnum(value, 9, bitsizeof_varuint(value) // 8, is_signed=False)
189 def write_varsize(self, value: int) -> None:
190 """
191 Writes a variable size integer value to the underlying storage.
193 :param value: Value to write.
194 :raises PythonRuntimeException: If the value is out of the range.
195 """
197 self._write_varnum(value, 5, bitsizeof_varsize(value) // 8, is_signed=False)
199 def write_float16(self, value: float) -> None:
200 """
201 Writes a 16-bit float value to the underlying storage according to IEEE 754 binary16.
203 :param value: Float value to write.
204 """
206 self.write_bits_unchecked(float_to_uint16(value), 16)
208 def write_float32(self, value: float) -> None:
209 """
210 Writes a 32-bit float value to the underlying storage according to IEEE 754 binary32.
212 :param value: Float value to write.
213 """
215 self.write_bits_unchecked(float_to_uint32(value), 32)
217 def write_float64(self, value: float) -> None:
218 """
219 Writes a 64-bit float value to the underlying storage according to IEEE 754 binary64.
221 :param value: Float value to write.
222 """
224 self.write_bits_unchecked(float_to_uint64(value), 64)
226 def write_bytes(self, value: bytearray):
227 """
228 Writes the given bytes to the underlying storage. Length of the bytes is written
229 as varsize at the beginning.
231 :param value: Bytes to write.
232 """
234 length = len(value)
235 self.write_varsize(length)
237 begin_bitposition = self._bitposition
238 if (begin_bitposition & 0x07) != 0:
239 # we are not aligned to byte
240 for byte in value:
241 self.write_bits_unchecked(byte, 8)
242 else:
243 # we are aligned to byte
244 self._bitposition += length * 8
245 self._byte_array += value[0:length]
247 def write_string(self, string: str) -> None:
248 """
249 Writes the given string to the underlying storage in UTF-8 encoding. Length of the string is written
250 as varsize at the beginning.
252 :param string: String to write.
253 """
255 string_bytes = string.encode("utf-8")
256 length = len(string_bytes)
257 self.write_varsize(length)
259 begin_bitposition = self._bitposition
260 if (begin_bitposition & 0x07) != 0:
261 # we are not aligned to byte
262 for string_byte in string_bytes:
263 self.write_bits_unchecked(string_byte, 8)
264 else:
265 # we are aligned to byte
266 self._bitposition += length * 8
267 self._byte_array += string_bytes[0:length]
269 def write_bool(self, value: bool) -> None:
270 """
271 Writes bool in a single bit.
273 :param value: Bool value to write.
274 """
276 self._write_bits(1 if value else 0, 1)
278 def write_bitbuffer(self, bitbuffer: BitBuffer) -> None:
279 """
280 Writes a bit buffer to the underlying storage. Length of the bit buffer is written as varsize
281 at the beginning.
283 :param bitbuffer: Bit buffer to write.
284 """
286 bitsize = bitbuffer.bitsize
287 self.write_varsize(bitsize)
289 write_buffer = bitbuffer.buffer
290 num_bytes_to_write = bitsize // 8
291 num_rest_bits = bitsize - num_bytes_to_write * 8
292 begin_bitposition = self._bitposition
293 if (begin_bitposition & 0x07) != 0:
294 # we are not aligned to byte
295 for i in range(num_bytes_to_write):
296 self.write_bits_unchecked(write_buffer[i], 8)
297 else:
298 # we are aligned to byte
299 self._bitposition += num_bytes_to_write * 8
300 self._byte_array += write_buffer[0:num_bytes_to_write]
302 if num_rest_bits > 0:
303 self.write_bits_unchecked(write_buffer[num_bytes_to_write] >> (8 - num_rest_bits), num_rest_bits)
305 @property
306 def byte_array(self) -> bytes:
307 """
308 Gets internal bytearray.
310 :returns: Underlying bytearray object.
311 """
313 return self._byte_array
315 def to_file(self, filename: str) -> None:
316 """
317 Writes underlying bytearray to binary file.
319 :param filename: File to write.
320 """
322 with open(filename, "wb") as file:
323 file.write(self._byte_array)
325 @property
326 def bitposition(self) -> int:
327 """
328 Gets current bit position.
330 :returns: Current bit position.
331 """
333 return self._bitposition
335 def alignto(self, alignment: int) -> None:
336 """
337 Aligns the bit position according to the aligning value.
339 :param alignment: An aligning value to use.
340 """
342 offset = self._bitposition % alignment
343 if offset != 0:
344 self._write_bits(0, alignment - offset)
346 def _write_bits(self, value: int, numbits: int, *, signed: bool = False) -> None:
347 buffer_last_byte_bits = self._bitposition % 8
348 buffer_free_bits = (8 - buffer_last_byte_bits) if buffer_last_byte_bits != 0 else 0
349 value_first_byte_bits = numbits % 8 or 8
350 if value_first_byte_bits <= buffer_free_bits:
351 left_shift = buffer_free_bits - value_first_byte_bits
352 else:
353 left_shift = buffer_free_bits + 8 - value_first_byte_bits
354 value <<= left_shift
355 num_bytes = (numbits + left_shift + 7) // 8
356 value_bytes = value.to_bytes(num_bytes, byteorder="big", signed=signed)
357 if buffer_free_bits == 0:
358 self._byte_array.extend(value_bytes)
359 else:
360 value_first_byte = value_bytes[0] & ((1 << buffer_free_bits) - 1)
361 self._byte_array[-1] |= value_first_byte
362 self._byte_array.extend(value_bytes[1:])
364 self._bitposition += numbits
366 def _write_varnum(self, value: int, max_var_bytes: int, num_var_bytes: int, *, is_signed: bool) -> None:
367 abs_value = abs(value)
368 has_max_byte_range = num_var_bytes == max_var_bytes
369 for i in range(num_var_bytes):
370 byte = 0x00
371 numbits = 8
372 has_next_byte = i < num_var_bytes - 1
373 has_sign_bit = is_signed and i == 0
374 if has_sign_bit:
375 if value < 0:
376 byte |= 0x80
377 numbits -= 1
378 if has_next_byte:
379 numbits -= 1
380 byte |= 1 << numbits # use bit 6 if signed bit is present, use bit 7 otherwise
381 else: # this is the last byte
382 if not has_max_byte_range: # next byte flag isn't used in last byte in case of max byte range
383 numbits -= 1
385 shift_bits = (num_var_bytes - (i + 1)) * 7 + (1 if has_max_byte_range and has_next_byte else 0)
386 byte |= (abs_value >> shift_bits) & VAR_NUM_BIT_MASKS[numbits - 1]
387 self.write_bits_unchecked(byte, 8)
390VAR_NUM_BIT_MASKS = [0x01, 0x03, 0x07, 0x0F, 0x1F, 0x3F, 0x7F, 0xFF]
392BitStreamWriter = import_cpp_class("BitStreamWriter") or BitStreamWriter # type: ignore