Coverage for /home/runner/work/zserio/zserio/compiler/extensions/python/runtime/src/zserio/bitreader.py: 100%

330 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-12-05 10:43 +0000

1""" 

2The module implements abstraction for reading data to the bit stream. 

3""" 

4 

5import typing 

6 

7from zserio.bitbuffer import BitBuffer 

8from zserio.limits import INT64_MIN 

9from zserio.exception import PythonRuntimeException 

10from zserio.float import uint16_to_float, uint32_to_float, uint64_to_float 

11from zserio.cppbind import import_cpp_class 

12 

13 

14class BitStreamReader: 

15 """ 

16 Bit stream reader. 

17 """ 

18 

19 def __init__(self, buffer: bytes, bitsize: typing.Optional[int] = None) -> None: 

20 """ 

21 Constructs bit stream reader from bytes buffer. 

22 

23 Because bit buffer size does not have to be byte aligned (divisible by 8), it's possible that not all 

24 bits of the last byte are used. In this case, only most significant bits of the corresponded size are 

25 used. 

26 

27 :param buffer: Bytes-like buffer to read as a bit stream. 

28 :param bitsize: Number of bits stored in buffer to use. 

29 :raises PythonRuntimeException: If bitsize is out of range. 

30 """ 

31 

32 if bitsize is None: 

33 bitsize = len(buffer) * 8 

34 elif len(buffer) * 8 < bitsize: 

35 raise PythonRuntimeException( 

36 f"BitStreamReader: Bit size '{bitsize}' out of range " 

37 f"for the given buffer byte size '{len(buffer)}'!" 

38 ) 

39 

40 self._buffer: bytes = buffer 

41 self._bitsize: int = bitsize 

42 self._bitposition: int = 0 

43 

44 @classmethod 

45 def from_bitbuffer(cls: typing.Type["BitStreamReader"], bitbuffer: BitBuffer) -> "BitStreamReader": 

46 """ 

47 Constructs bit stream reader from bit buffer. 

48 

49 :param bitbuffer: Bit buffer to read as a bit stream. 

50 """ 

51 

52 instance = cls(bitbuffer.buffer, bitbuffer.bitsize) 

53 

54 return instance 

55 

56 @classmethod 

57 def from_file(cls: typing.Type["BitStreamReader"], filename: str) -> "BitStreamReader": 

58 """ 

59 Constructs bit stream reader from file. 

60 

61 :param filename: Filename to read as a bit stream. 

62 """ 

63 

64 with open(filename, "rb") as file: 

65 return cls(file.read()) 

66 

67 def read_bits(self, numbits: int) -> int: 

68 """ 

69 Reads given number of bits from the bit stream as an unsigned integer. 

70 

71 :param numbits: Number of bits to read. 

72 :returns: Read bits as an unsigned integer. 

73 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream. 

74 """ 

75 

76 if numbits < 1 or numbits > 64: 

77 raise PythonRuntimeException("BitStreamReader: numbits '{numbits}' not in range [1,64]!") 

78 

79 return self.read_bits_unchecked(numbits) 

80 

81 def read_signed_bits(self, numbits: int) -> int: 

82 """ 

83 Reads given number of bits from the bit stream as a signed integer. 

84 

85 :param numbits: Number of bits to read 

86 :returns: Read bits as a signed integer. 

87 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream. 

88 """ 

89 

90 if numbits < 1 or numbits > 64: 

91 raise PythonRuntimeException(f"BitStreamReader: numbits '{numbits}' not in range [1,64]!") 

92 

93 return self.read_signed_bits_unchecked(numbits) 

94 

95 def read_bits_unchecked(self, numbits: int) -> int: 

96 """ 

97 Reads given number of bits from the bit stream as an unsigned integer. 

98 

99 This method does not check that numbits >= 0 and assumes that it's ensured by the caller. 

100 

101 :param numbits: Number of bits to read. 

102 :returns: Read bits as an unsigned integer. 

103 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream. 

104 """ 

105 

106 end_bitposition = self._bitposition + numbits 

107 

108 if end_bitposition > self._bitsize: 

109 raise PythonRuntimeException("BitStreamReader: Reading behind the stream!") 

110 

111 start_byte = self._bitposition // 8 

112 end_byte = (end_bitposition - 1) // 8 

113 

114 value = int.from_bytes(self._buffer[start_byte : end_byte + 1], byteorder="big", signed=False) 

115 

116 last_bits = end_bitposition % 8 

117 if last_bits != 0: 

118 value >>= 8 - last_bits 

119 value &= (1 << numbits) - 1 

120 

121 self._bitposition = end_bitposition 

122 

123 return value 

124 

125 def read_signed_bits_unchecked(self, numbits: int) -> int: 

126 """ 

127 Reads given number of bits from the bit stream as a signed integer. 

128 

129 This method does not check that numbits >= 0 and assumes that it's ensured by the caller. 

130 

131 :param numbits: Number of bits to read 

132 :returns: Read bits as a signed integer. 

133 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream. 

134 """ 

135 

136 value = self.read_bits_unchecked(numbits) 

137 

138 if numbits != 0 and (value >> (numbits - 1)) != 0: 

139 # signed 

140 return value - (1 << numbits) 

141 else: 

142 # unsigned 

143 return value 

144 

145 def read_varint16(self) -> int: 

146 """ 

147 Reads variable 16-bit signed integer value from the bit stream. 

148 

149 :returns: Variable 16-bit signed integer value. 

150 """ 

151 

152 byte = self.read_bits_unchecked(8) # byte 1 

153 sign = byte & VARINT_SIGN_1 

154 result = byte & VARINT_BYTE_1 

155 if byte & VARINT_HAS_NEXT_1 == 0: 

156 return -result if sign != 0 else result 

157 

158 result = (result << 8) | self.read_bits_unchecked(8) # byte 2 

159 return -result if sign else result 

160 

161 def read_varint32(self) -> int: 

162 """ 

163 Reads variable 32-bit signed integer value from the bit stream. 

164 

165 :returns: Variable 32-bit signed integer value. 

166 """ 

167 

168 byte = self.read_bits_unchecked(8) # byte 1 

169 sign = byte & VARINT_SIGN_1 

170 result = byte & VARINT_BYTE_1 

171 if byte & VARINT_HAS_NEXT_1 == 0: 

172 return -result if sign else result 

173 

174 byte = self.read_bits_unchecked(8) # byte 2 

175 result = result << 7 | (byte & VARINT_BYTE_N) 

176 if byte & VARINT_HAS_NEXT_N == 0: 

177 return -result if sign else result 

178 

179 byte = self.read_bits_unchecked(8) # byte 3 

180 result = result << 7 | (byte & VARINT_BYTE_N) 

181 if byte & VARINT_HAS_NEXT_N == 0: 

182 return -result if sign else result 

183 

184 result = result << 8 | self.read_bits_unchecked(8) # byte 4 

185 return -result if sign else result 

186 

187 def read_varint64(self) -> int: 

188 """ 

189 Reads variable 64-bit signed integer value from the bit stream. 

190 

191 :returns: Variable 64-bit signed integer value. 

192 """ 

193 

194 byte = self.read_bits_unchecked(8) # byte 1 

195 sign = byte & VARINT_SIGN_1 

196 result = byte & VARINT_BYTE_1 

197 if byte & VARINT_HAS_NEXT_1 == 0: 

198 return -result if sign else result 

199 

200 byte = self.read_bits_unchecked(8) # byte 2 

201 result = result << 7 | (byte & VARINT_BYTE_N) 

202 if byte & VARINT_HAS_NEXT_N == 0: 

203 return -result if sign else result 

204 

205 byte = self.read_bits_unchecked(8) # byte 3 

206 result = result << 7 | (byte & VARINT_BYTE_N) 

207 if byte & VARINT_HAS_NEXT_N == 0: 

208 return -result if sign else result 

209 

210 byte = self.read_bits_unchecked(8) # byte 4 

211 result = result << 7 | (byte & VARINT_BYTE_N) 

212 if byte & VARINT_HAS_NEXT_N == 0: 

213 return -result if sign else result 

214 

215 byte = self.read_bits_unchecked(8) # byte 5 

216 result = result << 7 | (byte & VARINT_BYTE_N) 

217 if byte & VARINT_HAS_NEXT_N == 0: 

218 return -result if sign else result 

219 

220 byte = self.read_bits_unchecked(8) # byte 6 

221 result = result << 7 | (byte & VARINT_BYTE_N) 

222 if byte & VARINT_HAS_NEXT_N == 0: 

223 return -result if sign else result 

224 

225 byte = self.read_bits_unchecked(8) # byte 7 

226 result = result << 7 | (byte & VARINT_BYTE_N) 

227 if byte & VARINT_HAS_NEXT_N == 0: 

228 return -result if sign else result 

229 

230 result = result << 8 | self.read_bits_unchecked(8) # byte 8 

231 return -result if sign else result 

232 

233 def read_varint(self) -> int: 

234 """ 

235 Reads variable signed integer value (up to 9 bytes) from the bit stream. 

236 

237 :returns: Variable signed integer value (up to 9 bytes). 

238 """ 

239 

240 byte = self.read_bits_unchecked(8) # byte 1 

241 sign = byte & VARINT_SIGN_1 

242 result = byte & VARINT_BYTE_1 

243 if byte & VARINT_HAS_NEXT_1 == 0: 

244 return (INT64_MIN if result == 0 else -result) if sign else result 

245 

246 byte = self.read_bits_unchecked(8) # byte 2 

247 result = result << 7 | (byte & VARINT_BYTE_N) 

248 if byte & VARINT_HAS_NEXT_N == 0: 

249 return -result if sign else result 

250 

251 byte = self.read_bits_unchecked(8) # byte 3 

252 result = result << 7 | (byte & VARINT_BYTE_N) 

253 if byte & VARINT_HAS_NEXT_N == 0: 

254 return -result if sign else result 

255 

256 byte = self.read_bits_unchecked(8) # byte 4 

257 result = result << 7 | (byte & VARINT_BYTE_N) 

258 if byte & VARINT_HAS_NEXT_N == 0: 

259 return -result if sign else result 

260 

261 byte = self.read_bits_unchecked(8) # byte 5 

262 result = result << 7 | (byte & VARINT_BYTE_N) 

263 if byte & VARINT_HAS_NEXT_N == 0: 

264 return -result if sign else result 

265 

266 byte = self.read_bits_unchecked(8) # byte 6 

267 result = result << 7 | (byte & VARINT_BYTE_N) 

268 if byte & VARINT_HAS_NEXT_N == 0: 

269 return -result if sign else result 

270 

271 byte = self.read_bits_unchecked(8) # byte 7 

272 result = result << 7 | (byte & VARINT_BYTE_N) 

273 if byte & VARINT_HAS_NEXT_N == 0: 

274 return -result if sign else result 

275 

276 byte = self.read_bits_unchecked(8) # byte 8 

277 result = result << 7 | (byte & VARINT_BYTE_N) 

278 if byte & VARINT_HAS_NEXT_N == 0: 

279 return -result if sign else result 

280 

281 result = result << 8 | self.read_bits_unchecked(8) # byte 9 

282 return -result if sign else result 

283 

284 def read_varuint16(self) -> int: 

285 """ 

286 Reads variable 16-bit unsigned integer value from the bit stream. 

287 

288 :returns: Variable 16-bit unsigned integer value. 

289 """ 

290 

291 byte = self.read_bits_unchecked(8) # byte 1 

292 result = byte & VARUINT_BYTE 

293 if byte & VARUINT_HAS_NEXT == 0: 

294 return result 

295 

296 result = result << 8 | self.read_bits_unchecked(8) # byte 2 

297 return result 

298 

299 def read_varuint32(self) -> int: 

300 """ 

301 Reads variable 32-bit unsigned integer value from the bit stream. 

302 

303 :returns: Variable 32-bit unsigned integer value. 

304 """ 

305 

306 byte = self.read_bits_unchecked(8) # byte 1 

307 result = byte & VARUINT_BYTE 

308 if byte & VARUINT_HAS_NEXT == 0: 

309 return result 

310 

311 byte = self.read_bits_unchecked(8) # byte 2 

312 result = result << 7 | (byte & VARUINT_BYTE) 

313 if byte & VARUINT_HAS_NEXT == 0: 

314 return result 

315 

316 byte = self.read_bits_unchecked(8) # byte 3 

317 result = result << 7 | (byte & VARUINT_BYTE) 

318 if byte & VARUINT_HAS_NEXT == 0: 

319 return result 

320 

321 result = result << 8 | self.read_bits_unchecked(8) # byte 4 

322 return result 

323 

324 def read_varuint64(self) -> int: 

325 """ 

326 Reads variable 64-bit unsigned integer value from the bit stream. 

327 

328 :returns: Variable 64-bit unsigned integer value. 

329 """ 

330 

331 byte = self.read_bits_unchecked(8) # byte 1 

332 result = byte & VARUINT_BYTE 

333 if byte & VARUINT_HAS_NEXT == 0: 

334 return result 

335 

336 byte = self.read_bits_unchecked(8) # byte 2 

337 result = result << 7 | (byte & VARUINT_BYTE) 

338 if byte & VARUINT_HAS_NEXT == 0: 

339 return result 

340 

341 byte = self.read_bits_unchecked(8) # byte 3 

342 result = result << 7 | (byte & VARUINT_BYTE) 

343 if byte & VARUINT_HAS_NEXT == 0: 

344 return result 

345 

346 byte = self.read_bits_unchecked(8) # byte 4 

347 result = result << 7 | (byte & VARUINT_BYTE) 

348 if byte & VARUINT_HAS_NEXT == 0: 

349 return result 

350 

351 byte = self.read_bits_unchecked(8) # byte 5 

352 result = result << 7 | (byte & VARUINT_BYTE) 

353 if byte & VARUINT_HAS_NEXT == 0: 

354 return result 

355 

356 byte = self.read_bits_unchecked(8) # byte 6 

357 result = result << 7 | (byte & VARUINT_BYTE) 

358 if byte & VARUINT_HAS_NEXT == 0: 

359 return result 

360 

361 byte = self.read_bits_unchecked(8) # byte 7 

362 result = result << 7 | (byte & VARUINT_BYTE) 

363 if byte & VARUINT_HAS_NEXT == 0: 

364 return result 

365 

366 result = result << 8 | self.read_bits_unchecked(8) # byte 8 

367 return result 

368 

369 def read_varuint(self) -> int: 

370 """ 

371 Reads variable unsigned integer value (up to 9 bytes) from the bit stream. 

372 

373 :returns: Variable unsigned integer value (up to 9 bytes). 

374 """ 

375 

376 byte = self.read_bits_unchecked(8) # byte 1 

377 result = byte & VARUINT_BYTE 

378 if byte & VARUINT_HAS_NEXT == 0: 

379 return result 

380 

381 byte = self.read_bits_unchecked(8) # byte 2 

382 result = result << 7 | (byte & VARUINT_BYTE) 

383 if byte & VARUINT_HAS_NEXT == 0: 

384 return result 

385 

386 byte = self.read_bits_unchecked(8) # byte 3 

387 result = result << 7 | (byte & VARUINT_BYTE) 

388 if byte & VARUINT_HAS_NEXT == 0: 

389 return result 

390 

391 byte = self.read_bits_unchecked(8) # byte 4 

392 result = result << 7 | (byte & VARUINT_BYTE) 

393 if byte & VARUINT_HAS_NEXT == 0: 

394 return result 

395 

396 byte = self.read_bits_unchecked(8) # byte 5 

397 result = result << 7 | (byte & VARUINT_BYTE) 

398 if byte & VARUINT_HAS_NEXT == 0: 

399 return result 

400 

401 byte = self.read_bits_unchecked(8) # byte 6 

402 result = result << 7 | (byte & VARUINT_BYTE) 

403 if byte & VARUINT_HAS_NEXT == 0: 

404 return result 

405 

406 byte = self.read_bits_unchecked(8) # byte 7 

407 result = result << 7 | (byte & VARUINT_BYTE) 

408 if byte & VARUINT_HAS_NEXT == 0: 

409 return result 

410 

411 byte = self.read_bits_unchecked(8) # byte 8 

412 result = result << 7 | (byte & VARUINT_BYTE) 

413 if byte & VARUINT_HAS_NEXT == 0: 

414 return result 

415 

416 result = result << 8 | self.read_bits_unchecked(8) # byte 9 

417 return result 

418 

419 def read_varsize(self) -> int: 

420 """ 

421 Reads variable size integer value from the bit stream. 

422 

423 :returns: Variable size integer value. 

424 :raises PythonRuntimeException: If read variable size integer is out of range. 

425 """ 

426 

427 byte = self.read_bits_unchecked(8) # byte 1 

428 result = byte & VARUINT_BYTE 

429 if byte & VARUINT_HAS_NEXT == 0: 

430 return result 

431 

432 byte = self.read_bits_unchecked(8) # byte 2 

433 result = result << 7 | (byte & VARUINT_BYTE) 

434 if byte & VARUINT_HAS_NEXT == 0: 

435 return result 

436 

437 byte = self.read_bits_unchecked(8) # byte 3 

438 result = result << 7 | (byte & VARUINT_BYTE) 

439 if byte & VARUINT_HAS_NEXT == 0: 

440 return result 

441 

442 byte = self.read_bits_unchecked(8) # byte 4 

443 result = result << 7 | (byte & VARUINT_BYTE) 

444 if byte & VARUINT_HAS_NEXT == 0: 

445 return result 

446 

447 result = result << 8 | self.read_bits_unchecked(8) # byte 5 

448 if result > VARSIZE_MAX_VALUE: 

449 raise PythonRuntimeException( 

450 f"BitStreamReader: Read value '{result}' is out of range " "for varsize type!" 

451 ) 

452 

453 return result 

454 

455 def read_float16(self) -> float: 

456 """ 

457 Read 16-bits from the stream as a float value encoded according to IEEE 754 binary16. 

458 

459 :returns: Read float value. 

460 :raises PythonRuntimeException: If the reading goes behind the stream. 

461 """ 

462 

463 return uint16_to_float(self.read_bits_unchecked(16)) 

464 

465 def read_float32(self) -> float: 

466 """ 

467 Read 32-bits from the stream as a float value encoded according to IEEE 754 binary32. 

468 

469 :returns: Read float value. 

470 :raises PythonRuntimeException: If the reading goes behind the stream. 

471 """ 

472 

473 return uint32_to_float(self.read_bits_unchecked(32)) 

474 

475 def read_float64(self) -> float: 

476 """ 

477 Read 64-bits from the stream as a float value encoded according to IEEE 754 binary64. 

478 

479 :returns: Read float value. 

480 :raises PythonRuntimeException: If the reading goes behind the stream. 

481 """ 

482 

483 return uint64_to_float(self.read_bits_unchecked(64)) 

484 

485 def read_bytes(self) -> bytearray: 

486 """ 

487 Reads bytes from the stream. 

488 

489 :returns: Read bytes. 

490 :raises PythonRuntimeException: If the reading goes behind the stream. 

491 """ 

492 

493 length = self.read_varsize() 

494 begin_bitposition = self._bitposition 

495 

496 if (begin_bitposition & 0x07) != 0: 

497 # we are not aligned to byte 

498 value = bytearray() 

499 for _ in range(length): 

500 value.append(self.read_bits_unchecked(8)) 

501 else: 

502 # we are aligned to byte 

503 self.bitposition = begin_bitposition + length * 8 

504 value = bytearray(length) 

505 begin_byte_position = begin_bitposition // 8 

506 value[0:length] = self._buffer[begin_byte_position : begin_byte_position + length] 

507 

508 return value 

509 

510 def read_string(self) -> str: 

511 """ 

512 Reads string from the stream. 

513 

514 :returns: Read string. 

515 :raises PythonRuntimeException: If the reading goes behind the stream. 

516 """ 

517 

518 length = self.read_varsize() 

519 begin_bitposition = self._bitposition 

520 if (begin_bitposition & 0x07) != 0: 

521 # we are not aligned to byte 

522 value = bytearray() 

523 for _ in range(length): 

524 value.append(self.read_bits_unchecked(8)) 

525 else: 

526 # we are aligned to byte 

527 self.bitposition = begin_bitposition + length * 8 

528 value = bytearray(length) 

529 begin_byte_position = begin_bitposition // 8 

530 value[0:length] = self._buffer[begin_byte_position : begin_byte_position + length] 

531 

532 return value.decode("utf-8") 

533 

534 def read_bool(self) -> bool: 

535 """ 

536 Reads single bit as a bool value. 

537 

538 :returns: Read bool values. 

539 :raises PythonRuntimeException: If the reading goes behind the stream. 

540 """ 

541 

542 return self.read_bits_unchecked(1) != 0 

543 

544 def read_bitbuffer(self) -> BitBuffer: 

545 """ 

546 Reads a bit buffer from the stream. 

547 

548 :returns: Read bit buffer. 

549 :raises PythonRuntimeException: If the reading goes behind the stream. 

550 """ 

551 

552 bitsize = self.read_varsize() 

553 num_bytes_to_read = bitsize // 8 

554 num_rest_bits = bitsize - num_bytes_to_read * 8 

555 bytesize = (bitsize + 7) // 8 

556 read_buffer = bytearray(bytesize) 

557 begin_bitposition = self._bitposition 

558 if (begin_bitposition & 0x07) != 0: 

559 # we are not aligned to byte 

560 for i in range(num_bytes_to_read): 

561 read_buffer[i] = self.read_bits_unchecked(8) 

562 else: 

563 # we are aligned to byte 

564 self.bitposition = begin_bitposition + num_bytes_to_read * 8 

565 begin_byte_position = begin_bitposition // 8 

566 read_buffer[0:num_bytes_to_read] = self._buffer[ 

567 begin_byte_position : begin_byte_position + num_bytes_to_read 

568 ] 

569 

570 if num_rest_bits != 0: 

571 read_buffer[num_bytes_to_read] = self.read_bits(num_rest_bits) << (8 - num_rest_bits) 

572 

573 return BitBuffer(read_buffer, bitsize) 

574 

575 @property 

576 def bitposition(self) -> int: 

577 """ 

578 Gets current bit position. 

579 

580 :returns: Current bit position. 

581 """ 

582 

583 return self._bitposition 

584 

585 @bitposition.setter 

586 def bitposition(self, bitposition: int) -> None: 

587 """ 

588 Sets bit position. 

589 

590 :param bitposition: New bit position. 

591 :raises PythonRuntimeException: If the position is not within the stream. 

592 """ 

593 

594 if bitposition < 0: 

595 raise PythonRuntimeException("BitStreamReader: Cannot set negative bit position!") 

596 if bitposition > self._bitsize: 

597 raise PythonRuntimeException("BitStreamReader: Setting bit position behind the stream!") 

598 

599 self._bitposition = bitposition 

600 

601 def alignto(self, alignment: int) -> None: 

602 """ 

603 Aligns the bit position according to the aligning value. 

604 

605 :param alignment: An aligning value to use. 

606 :raises PythonRuntimeException: If the aligning moves behind the stream." 

607 """ 

608 

609 offset = self._bitposition % alignment 

610 if offset != 0: 

611 self.bitposition = self._bitposition + alignment - offset 

612 

613 @property 

614 def buffer_bitsize(self) -> int: 

615 """ 

616 Gets size of the underlying buffer in bits. 

617 

618 :returns: Buffer bit size. 

619 """ 

620 

621 return self._bitsize 

622 

623 

624VARINT_SIGN_1 = 0x80 

625VARINT_BYTE_1 = 0x3F 

626VARINT_BYTE_N = 0x7F 

627VARINT_HAS_NEXT_1 = 0x40 

628VARINT_HAS_NEXT_N = 0x80 

629VARUINT_BYTE = 0x7F 

630VARUINT_HAS_NEXT = 0x80 

631VARSIZE_MAX_VALUE = (1 << 31) - 1 

632 

633_BitStreamReaderCpp = import_cpp_class("BitStreamReader") 

634if _BitStreamReaderCpp is not None: 

635 BitStreamReader = _BitStreamReaderCpp # type: ignore 

636 

637 def _bitstreamreader_fromfile(filename: str) -> "BitStreamReader": 

638 with open(filename, "rb") as file: 

639 return BitStreamReader(file.read()) 

640 

641 BitStreamReader.from_file = _bitstreamreader_fromfile