Coverage for /home/runner/work/zserio/zserio/compiler/extensions/python/runtime/src/zserio/bitreader.py: 100%
330 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-12-05 10:43 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-12-05 10:43 +0000
1"""
2The module implements abstraction for reading data to the bit stream.
3"""
5import typing
7from zserio.bitbuffer import BitBuffer
8from zserio.limits import INT64_MIN
9from zserio.exception import PythonRuntimeException
10from zserio.float import uint16_to_float, uint32_to_float, uint64_to_float
11from zserio.cppbind import import_cpp_class
14class BitStreamReader:
15 """
16 Bit stream reader.
17 """
19 def __init__(self, buffer: bytes, bitsize: typing.Optional[int] = None) -> None:
20 """
21 Constructs bit stream reader from bytes buffer.
23 Because bit buffer size does not have to be byte aligned (divisible by 8), it's possible that not all
24 bits of the last byte are used. In this case, only most significant bits of the corresponded size are
25 used.
27 :param buffer: Bytes-like buffer to read as a bit stream.
28 :param bitsize: Number of bits stored in buffer to use.
29 :raises PythonRuntimeException: If bitsize is out of range.
30 """
32 if bitsize is None:
33 bitsize = len(buffer) * 8
34 elif len(buffer) * 8 < bitsize:
35 raise PythonRuntimeException(
36 f"BitStreamReader: Bit size '{bitsize}' out of range "
37 f"for the given buffer byte size '{len(buffer)}'!"
38 )
40 self._buffer: bytes = buffer
41 self._bitsize: int = bitsize
42 self._bitposition: int = 0
44 @classmethod
45 def from_bitbuffer(cls: typing.Type["BitStreamReader"], bitbuffer: BitBuffer) -> "BitStreamReader":
46 """
47 Constructs bit stream reader from bit buffer.
49 :param bitbuffer: Bit buffer to read as a bit stream.
50 """
52 instance = cls(bitbuffer.buffer, bitbuffer.bitsize)
54 return instance
56 @classmethod
57 def from_file(cls: typing.Type["BitStreamReader"], filename: str) -> "BitStreamReader":
58 """
59 Constructs bit stream reader from file.
61 :param filename: Filename to read as a bit stream.
62 """
64 with open(filename, "rb") as file:
65 return cls(file.read())
67 def read_bits(self, numbits: int) -> int:
68 """
69 Reads given number of bits from the bit stream as an unsigned integer.
71 :param numbits: Number of bits to read.
72 :returns: Read bits as an unsigned integer.
73 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
74 """
76 if numbits < 1 or numbits > 64:
77 raise PythonRuntimeException("BitStreamReader: numbits '{numbits}' not in range [1,64]!")
79 return self.read_bits_unchecked(numbits)
81 def read_signed_bits(self, numbits: int) -> int:
82 """
83 Reads given number of bits from the bit stream as a signed integer.
85 :param numbits: Number of bits to read
86 :returns: Read bits as a signed integer.
87 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
88 """
90 if numbits < 1 or numbits > 64:
91 raise PythonRuntimeException(f"BitStreamReader: numbits '{numbits}' not in range [1,64]!")
93 return self.read_signed_bits_unchecked(numbits)
95 def read_bits_unchecked(self, numbits: int) -> int:
96 """
97 Reads given number of bits from the bit stream as an unsigned integer.
99 This method does not check that numbits >= 0 and assumes that it's ensured by the caller.
101 :param numbits: Number of bits to read.
102 :returns: Read bits as an unsigned integer.
103 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
104 """
106 end_bitposition = self._bitposition + numbits
108 if end_bitposition > self._bitsize:
109 raise PythonRuntimeException("BitStreamReader: Reading behind the stream!")
111 start_byte = self._bitposition // 8
112 end_byte = (end_bitposition - 1) // 8
114 value = int.from_bytes(self._buffer[start_byte : end_byte + 1], byteorder="big", signed=False)
116 last_bits = end_bitposition % 8
117 if last_bits != 0:
118 value >>= 8 - last_bits
119 value &= (1 << numbits) - 1
121 self._bitposition = end_bitposition
123 return value
125 def read_signed_bits_unchecked(self, numbits: int) -> int:
126 """
127 Reads given number of bits from the bit stream as a signed integer.
129 This method does not check that numbits >= 0 and assumes that it's ensured by the caller.
131 :param numbits: Number of bits to read
132 :returns: Read bits as a signed integer.
133 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
134 """
136 value = self.read_bits_unchecked(numbits)
138 if numbits != 0 and (value >> (numbits - 1)) != 0:
139 # signed
140 return value - (1 << numbits)
141 else:
142 # unsigned
143 return value
145 def read_varint16(self) -> int:
146 """
147 Reads variable 16-bit signed integer value from the bit stream.
149 :returns: Variable 16-bit signed integer value.
150 """
152 byte = self.read_bits_unchecked(8) # byte 1
153 sign = byte & VARINT_SIGN_1
154 result = byte & VARINT_BYTE_1
155 if byte & VARINT_HAS_NEXT_1 == 0:
156 return -result if sign != 0 else result
158 result = (result << 8) | self.read_bits_unchecked(8) # byte 2
159 return -result if sign else result
161 def read_varint32(self) -> int:
162 """
163 Reads variable 32-bit signed integer value from the bit stream.
165 :returns: Variable 32-bit signed integer value.
166 """
168 byte = self.read_bits_unchecked(8) # byte 1
169 sign = byte & VARINT_SIGN_1
170 result = byte & VARINT_BYTE_1
171 if byte & VARINT_HAS_NEXT_1 == 0:
172 return -result if sign else result
174 byte = self.read_bits_unchecked(8) # byte 2
175 result = result << 7 | (byte & VARINT_BYTE_N)
176 if byte & VARINT_HAS_NEXT_N == 0:
177 return -result if sign else result
179 byte = self.read_bits_unchecked(8) # byte 3
180 result = result << 7 | (byte & VARINT_BYTE_N)
181 if byte & VARINT_HAS_NEXT_N == 0:
182 return -result if sign else result
184 result = result << 8 | self.read_bits_unchecked(8) # byte 4
185 return -result if sign else result
187 def read_varint64(self) -> int:
188 """
189 Reads variable 64-bit signed integer value from the bit stream.
191 :returns: Variable 64-bit signed integer value.
192 """
194 byte = self.read_bits_unchecked(8) # byte 1
195 sign = byte & VARINT_SIGN_1
196 result = byte & VARINT_BYTE_1
197 if byte & VARINT_HAS_NEXT_1 == 0:
198 return -result if sign else result
200 byte = self.read_bits_unchecked(8) # byte 2
201 result = result << 7 | (byte & VARINT_BYTE_N)
202 if byte & VARINT_HAS_NEXT_N == 0:
203 return -result if sign else result
205 byte = self.read_bits_unchecked(8) # byte 3
206 result = result << 7 | (byte & VARINT_BYTE_N)
207 if byte & VARINT_HAS_NEXT_N == 0:
208 return -result if sign else result
210 byte = self.read_bits_unchecked(8) # byte 4
211 result = result << 7 | (byte & VARINT_BYTE_N)
212 if byte & VARINT_HAS_NEXT_N == 0:
213 return -result if sign else result
215 byte = self.read_bits_unchecked(8) # byte 5
216 result = result << 7 | (byte & VARINT_BYTE_N)
217 if byte & VARINT_HAS_NEXT_N == 0:
218 return -result if sign else result
220 byte = self.read_bits_unchecked(8) # byte 6
221 result = result << 7 | (byte & VARINT_BYTE_N)
222 if byte & VARINT_HAS_NEXT_N == 0:
223 return -result if sign else result
225 byte = self.read_bits_unchecked(8) # byte 7
226 result = result << 7 | (byte & VARINT_BYTE_N)
227 if byte & VARINT_HAS_NEXT_N == 0:
228 return -result if sign else result
230 result = result << 8 | self.read_bits_unchecked(8) # byte 8
231 return -result if sign else result
233 def read_varint(self) -> int:
234 """
235 Reads variable signed integer value (up to 9 bytes) from the bit stream.
237 :returns: Variable signed integer value (up to 9 bytes).
238 """
240 byte = self.read_bits_unchecked(8) # byte 1
241 sign = byte & VARINT_SIGN_1
242 result = byte & VARINT_BYTE_1
243 if byte & VARINT_HAS_NEXT_1 == 0:
244 return (INT64_MIN if result == 0 else -result) if sign else result
246 byte = self.read_bits_unchecked(8) # byte 2
247 result = result << 7 | (byte & VARINT_BYTE_N)
248 if byte & VARINT_HAS_NEXT_N == 0:
249 return -result if sign else result
251 byte = self.read_bits_unchecked(8) # byte 3
252 result = result << 7 | (byte & VARINT_BYTE_N)
253 if byte & VARINT_HAS_NEXT_N == 0:
254 return -result if sign else result
256 byte = self.read_bits_unchecked(8) # byte 4
257 result = result << 7 | (byte & VARINT_BYTE_N)
258 if byte & VARINT_HAS_NEXT_N == 0:
259 return -result if sign else result
261 byte = self.read_bits_unchecked(8) # byte 5
262 result = result << 7 | (byte & VARINT_BYTE_N)
263 if byte & VARINT_HAS_NEXT_N == 0:
264 return -result if sign else result
266 byte = self.read_bits_unchecked(8) # byte 6
267 result = result << 7 | (byte & VARINT_BYTE_N)
268 if byte & VARINT_HAS_NEXT_N == 0:
269 return -result if sign else result
271 byte = self.read_bits_unchecked(8) # byte 7
272 result = result << 7 | (byte & VARINT_BYTE_N)
273 if byte & VARINT_HAS_NEXT_N == 0:
274 return -result if sign else result
276 byte = self.read_bits_unchecked(8) # byte 8
277 result = result << 7 | (byte & VARINT_BYTE_N)
278 if byte & VARINT_HAS_NEXT_N == 0:
279 return -result if sign else result
281 result = result << 8 | self.read_bits_unchecked(8) # byte 9
282 return -result if sign else result
284 def read_varuint16(self) -> int:
285 """
286 Reads variable 16-bit unsigned integer value from the bit stream.
288 :returns: Variable 16-bit unsigned integer value.
289 """
291 byte = self.read_bits_unchecked(8) # byte 1
292 result = byte & VARUINT_BYTE
293 if byte & VARUINT_HAS_NEXT == 0:
294 return result
296 result = result << 8 | self.read_bits_unchecked(8) # byte 2
297 return result
299 def read_varuint32(self) -> int:
300 """
301 Reads variable 32-bit unsigned integer value from the bit stream.
303 :returns: Variable 32-bit unsigned integer value.
304 """
306 byte = self.read_bits_unchecked(8) # byte 1
307 result = byte & VARUINT_BYTE
308 if byte & VARUINT_HAS_NEXT == 0:
309 return result
311 byte = self.read_bits_unchecked(8) # byte 2
312 result = result << 7 | (byte & VARUINT_BYTE)
313 if byte & VARUINT_HAS_NEXT == 0:
314 return result
316 byte = self.read_bits_unchecked(8) # byte 3
317 result = result << 7 | (byte & VARUINT_BYTE)
318 if byte & VARUINT_HAS_NEXT == 0:
319 return result
321 result = result << 8 | self.read_bits_unchecked(8) # byte 4
322 return result
324 def read_varuint64(self) -> int:
325 """
326 Reads variable 64-bit unsigned integer value from the bit stream.
328 :returns: Variable 64-bit unsigned integer value.
329 """
331 byte = self.read_bits_unchecked(8) # byte 1
332 result = byte & VARUINT_BYTE
333 if byte & VARUINT_HAS_NEXT == 0:
334 return result
336 byte = self.read_bits_unchecked(8) # byte 2
337 result = result << 7 | (byte & VARUINT_BYTE)
338 if byte & VARUINT_HAS_NEXT == 0:
339 return result
341 byte = self.read_bits_unchecked(8) # byte 3
342 result = result << 7 | (byte & VARUINT_BYTE)
343 if byte & VARUINT_HAS_NEXT == 0:
344 return result
346 byte = self.read_bits_unchecked(8) # byte 4
347 result = result << 7 | (byte & VARUINT_BYTE)
348 if byte & VARUINT_HAS_NEXT == 0:
349 return result
351 byte = self.read_bits_unchecked(8) # byte 5
352 result = result << 7 | (byte & VARUINT_BYTE)
353 if byte & VARUINT_HAS_NEXT == 0:
354 return result
356 byte = self.read_bits_unchecked(8) # byte 6
357 result = result << 7 | (byte & VARUINT_BYTE)
358 if byte & VARUINT_HAS_NEXT == 0:
359 return result
361 byte = self.read_bits_unchecked(8) # byte 7
362 result = result << 7 | (byte & VARUINT_BYTE)
363 if byte & VARUINT_HAS_NEXT == 0:
364 return result
366 result = result << 8 | self.read_bits_unchecked(8) # byte 8
367 return result
369 def read_varuint(self) -> int:
370 """
371 Reads variable unsigned integer value (up to 9 bytes) from the bit stream.
373 :returns: Variable unsigned integer value (up to 9 bytes).
374 """
376 byte = self.read_bits_unchecked(8) # byte 1
377 result = byte & VARUINT_BYTE
378 if byte & VARUINT_HAS_NEXT == 0:
379 return result
381 byte = self.read_bits_unchecked(8) # byte 2
382 result = result << 7 | (byte & VARUINT_BYTE)
383 if byte & VARUINT_HAS_NEXT == 0:
384 return result
386 byte = self.read_bits_unchecked(8) # byte 3
387 result = result << 7 | (byte & VARUINT_BYTE)
388 if byte & VARUINT_HAS_NEXT == 0:
389 return result
391 byte = self.read_bits_unchecked(8) # byte 4
392 result = result << 7 | (byte & VARUINT_BYTE)
393 if byte & VARUINT_HAS_NEXT == 0:
394 return result
396 byte = self.read_bits_unchecked(8) # byte 5
397 result = result << 7 | (byte & VARUINT_BYTE)
398 if byte & VARUINT_HAS_NEXT == 0:
399 return result
401 byte = self.read_bits_unchecked(8) # byte 6
402 result = result << 7 | (byte & VARUINT_BYTE)
403 if byte & VARUINT_HAS_NEXT == 0:
404 return result
406 byte = self.read_bits_unchecked(8) # byte 7
407 result = result << 7 | (byte & VARUINT_BYTE)
408 if byte & VARUINT_HAS_NEXT == 0:
409 return result
411 byte = self.read_bits_unchecked(8) # byte 8
412 result = result << 7 | (byte & VARUINT_BYTE)
413 if byte & VARUINT_HAS_NEXT == 0:
414 return result
416 result = result << 8 | self.read_bits_unchecked(8) # byte 9
417 return result
419 def read_varsize(self) -> int:
420 """
421 Reads variable size integer value from the bit stream.
423 :returns: Variable size integer value.
424 :raises PythonRuntimeException: If read variable size integer is out of range.
425 """
427 byte = self.read_bits_unchecked(8) # byte 1
428 result = byte & VARUINT_BYTE
429 if byte & VARUINT_HAS_NEXT == 0:
430 return result
432 byte = self.read_bits_unchecked(8) # byte 2
433 result = result << 7 | (byte & VARUINT_BYTE)
434 if byte & VARUINT_HAS_NEXT == 0:
435 return result
437 byte = self.read_bits_unchecked(8) # byte 3
438 result = result << 7 | (byte & VARUINT_BYTE)
439 if byte & VARUINT_HAS_NEXT == 0:
440 return result
442 byte = self.read_bits_unchecked(8) # byte 4
443 result = result << 7 | (byte & VARUINT_BYTE)
444 if byte & VARUINT_HAS_NEXT == 0:
445 return result
447 result = result << 8 | self.read_bits_unchecked(8) # byte 5
448 if result > VARSIZE_MAX_VALUE:
449 raise PythonRuntimeException(
450 f"BitStreamReader: Read value '{result}' is out of range " "for varsize type!"
451 )
453 return result
455 def read_float16(self) -> float:
456 """
457 Read 16-bits from the stream as a float value encoded according to IEEE 754 binary16.
459 :returns: Read float value.
460 :raises PythonRuntimeException: If the reading goes behind the stream.
461 """
463 return uint16_to_float(self.read_bits_unchecked(16))
465 def read_float32(self) -> float:
466 """
467 Read 32-bits from the stream as a float value encoded according to IEEE 754 binary32.
469 :returns: Read float value.
470 :raises PythonRuntimeException: If the reading goes behind the stream.
471 """
473 return uint32_to_float(self.read_bits_unchecked(32))
475 def read_float64(self) -> float:
476 """
477 Read 64-bits from the stream as a float value encoded according to IEEE 754 binary64.
479 :returns: Read float value.
480 :raises PythonRuntimeException: If the reading goes behind the stream.
481 """
483 return uint64_to_float(self.read_bits_unchecked(64))
485 def read_bytes(self) -> bytearray:
486 """
487 Reads bytes from the stream.
489 :returns: Read bytes.
490 :raises PythonRuntimeException: If the reading goes behind the stream.
491 """
493 length = self.read_varsize()
494 begin_bitposition = self._bitposition
496 if (begin_bitposition & 0x07) != 0:
497 # we are not aligned to byte
498 value = bytearray()
499 for _ in range(length):
500 value.append(self.read_bits_unchecked(8))
501 else:
502 # we are aligned to byte
503 self.bitposition = begin_bitposition + length * 8
504 value = bytearray(length)
505 begin_byte_position = begin_bitposition // 8
506 value[0:length] = self._buffer[begin_byte_position : begin_byte_position + length]
508 return value
510 def read_string(self) -> str:
511 """
512 Reads string from the stream.
514 :returns: Read string.
515 :raises PythonRuntimeException: If the reading goes behind the stream.
516 """
518 length = self.read_varsize()
519 begin_bitposition = self._bitposition
520 if (begin_bitposition & 0x07) != 0:
521 # we are not aligned to byte
522 value = bytearray()
523 for _ in range(length):
524 value.append(self.read_bits_unchecked(8))
525 else:
526 # we are aligned to byte
527 self.bitposition = begin_bitposition + length * 8
528 value = bytearray(length)
529 begin_byte_position = begin_bitposition // 8
530 value[0:length] = self._buffer[begin_byte_position : begin_byte_position + length]
532 return value.decode("utf-8")
534 def read_bool(self) -> bool:
535 """
536 Reads single bit as a bool value.
538 :returns: Read bool values.
539 :raises PythonRuntimeException: If the reading goes behind the stream.
540 """
542 return self.read_bits_unchecked(1) != 0
544 def read_bitbuffer(self) -> BitBuffer:
545 """
546 Reads a bit buffer from the stream.
548 :returns: Read bit buffer.
549 :raises PythonRuntimeException: If the reading goes behind the stream.
550 """
552 bitsize = self.read_varsize()
553 num_bytes_to_read = bitsize // 8
554 num_rest_bits = bitsize - num_bytes_to_read * 8
555 bytesize = (bitsize + 7) // 8
556 read_buffer = bytearray(bytesize)
557 begin_bitposition = self._bitposition
558 if (begin_bitposition & 0x07) != 0:
559 # we are not aligned to byte
560 for i in range(num_bytes_to_read):
561 read_buffer[i] = self.read_bits_unchecked(8)
562 else:
563 # we are aligned to byte
564 self.bitposition = begin_bitposition + num_bytes_to_read * 8
565 begin_byte_position = begin_bitposition // 8
566 read_buffer[0:num_bytes_to_read] = self._buffer[
567 begin_byte_position : begin_byte_position + num_bytes_to_read
568 ]
570 if num_rest_bits != 0:
571 read_buffer[num_bytes_to_read] = self.read_bits(num_rest_bits) << (8 - num_rest_bits)
573 return BitBuffer(read_buffer, bitsize)
575 @property
576 def bitposition(self) -> int:
577 """
578 Gets current bit position.
580 :returns: Current bit position.
581 """
583 return self._bitposition
585 @bitposition.setter
586 def bitposition(self, bitposition: int) -> None:
587 """
588 Sets bit position.
590 :param bitposition: New bit position.
591 :raises PythonRuntimeException: If the position is not within the stream.
592 """
594 if bitposition < 0:
595 raise PythonRuntimeException("BitStreamReader: Cannot set negative bit position!")
596 if bitposition > self._bitsize:
597 raise PythonRuntimeException("BitStreamReader: Setting bit position behind the stream!")
599 self._bitposition = bitposition
601 def alignto(self, alignment: int) -> None:
602 """
603 Aligns the bit position according to the aligning value.
605 :param alignment: An aligning value to use.
606 :raises PythonRuntimeException: If the aligning moves behind the stream."
607 """
609 offset = self._bitposition % alignment
610 if offset != 0:
611 self.bitposition = self._bitposition + alignment - offset
613 @property
614 def buffer_bitsize(self) -> int:
615 """
616 Gets size of the underlying buffer in bits.
618 :returns: Buffer bit size.
619 """
621 return self._bitsize
624VARINT_SIGN_1 = 0x80
625VARINT_BYTE_1 = 0x3F
626VARINT_BYTE_N = 0x7F
627VARINT_HAS_NEXT_1 = 0x40
628VARINT_HAS_NEXT_N = 0x80
629VARUINT_BYTE = 0x7F
630VARUINT_HAS_NEXT = 0x80
631VARSIZE_MAX_VALUE = (1 << 31) - 1
633_BitStreamReaderCpp = import_cpp_class("BitStreamReader")
634if _BitStreamReaderCpp is not None:
635 BitStreamReader = _BitStreamReaderCpp # type: ignore
637 def _bitstreamreader_fromfile(filename: str) -> "BitStreamReader":
638 with open(filename, "rb") as file:
639 return BitStreamReader(file.read())
641 BitStreamReader.from_file = _bitstreamreader_fromfile