Coverage for /home/runner/work/zserio/zserio/compiler/extensions/python/runtime/tests/test_bitstream.py: 100%

156 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-12-05 10:43 +0000

1import unittest 

2 

3from zserio.bitbuffer import BitBuffer 

4from zserio.bitreader import BitStreamReader 

5from zserio.bitsizeof import INT64_MIN 

6from zserio.bitwriter import BitStreamWriter 

7 

8 

9class BitStreamTest(unittest.TestCase): 

10 

11 def test_bits(self): 

12 for numbits in range(1, 65): 

13 max_value = (1 << numbits) - 1 

14 values = [ 

15 max_value, 

16 max_value >> 1, 

17 max_value >> 2, 

18 1, 

19 0, 

20 1, 

21 max_value >> 2, 

22 max_value >> 1, 

23 max_value, 

24 ] 

25 self._test_bits_impl(BitStreamWriter.write_bits, BitStreamReader.read_bits, values, numbits) 

26 

27 def test_signed_bits(self): 

28 for numbits in range(1, 65): 

29 min_value = -1 << (numbits - 1) 

30 max_value = (1 << (numbits - 1)) - 1 

31 values = [ 

32 min_value, 

33 max_value, 

34 min_value >> 1, 

35 max_value >> 1, 

36 min_value >> 2, 

37 max_value >> 2, 

38 -1, 

39 (1 if numbits != 1 else -1), 

40 0, 

41 (1 if numbits != 1 else -1), 

42 -1, 

43 max_value >> 2, 

44 min_value >> 2, 

45 max_value >> 1, 

46 min_value >> 1, 

47 max_value, 

48 min_value, 

49 ] 

50 self._test_bits_impl( 

51 BitStreamWriter.write_signed_bits, 

52 BitStreamReader.read_signed_bits, 

53 values, 

54 numbits, 

55 ) 

56 

57 def test_varint16(self): 

58 values = [ 

59 # 1 byte 

60 0, 

61 -1, 

62 +1, 

63 -((1 << (6)) - 1), 

64 +((1 << (6)) - 1), 

65 # 2 bytes 

66 -((1 << (6))), 

67 +((1 << (6))), 

68 -((1 << (6 + 8)) - 1), 

69 +((1 << (6 + 8)) - 1), 

70 ] 

71 

72 self._test_impl(BitStreamWriter.write_varint16, BitStreamReader.read_varint16, values, 15) 

73 

74 def test_varint32(self): 

75 values = [ 

76 # 1 byte 

77 0, 

78 -((1)), 

79 +((1)), 

80 -((1 << (6)) - 1), 

81 +((1 << (6)) - 1), 

82 # 2 bytes 

83 -((1 << (6))), 

84 +((1 << (6))), 

85 -((1 << (6 + 7)) - 1), 

86 +((1 << (6 + 7)) - 1), 

87 # 3 bytes 

88 -((1 << (6 + 7))), 

89 +((1 << (6 + 7))), 

90 -((1 << (6 + 7 + 7)) - 1), 

91 +((1 << (6 + 7 + 7)) - 1), 

92 # 4 bytes 

93 -((1 << (6 + 7 + 7))), 

94 +((1 << (6 + 7 + 7))), 

95 -((1 << (6 + 7 + 7 + 8)) - 1), 

96 +((1 << (6 + 7 + 7 + 8)) - 1), 

97 ] 

98 

99 self._test_impl(BitStreamWriter.write_varint32, BitStreamReader.read_varint32, values, 31) 

100 

101 def test_varint64(self): 

102 values = [ 

103 # 1 byte 

104 0, 

105 -((1)), 

106 +((1)), 

107 -((1 << (6)) - 1), 

108 +((1 << (6)) - 1), 

109 # 2 bytes 

110 -((1 << (6))), 

111 +((1 << (6))), 

112 -((1 << (6 + 7)) - 1), 

113 +((1 << (6 + 7)) - 1), 

114 # 3 bytes 

115 -((1 << (6 + 7))), 

116 +((1 << (6 + 7))), 

117 -((1 << (6 + 7 + 7)) - 1), 

118 +((1 << (6 + 7 + 7)) - 1), 

119 # 4 bytes 

120 -((1 << (6 + 7 + 7))), 

121 +((1 << (6 + 7 + 7))), 

122 -((1 << (6 + 7 + 7 + 8)) - 1), 

123 +((1 << (6 + 7 + 7 + 8)) - 1) 

124 # 5 bytes 

125 - ((1 << (6 + 7 + 7 + 7))), 

126 +((1 << (6 + 7 + 7 + 7))), 

127 -((1 << (6 + 7 + 7 + 7 + 7)) - 1), 

128 +((1 << (6 + 7 + 7 + 7 + 7)) - 1), 

129 # 6 bytes 

130 -((1 << (6 + 7 + 7 + 7 + 7))), 

131 +((1 << (6 + 7 + 7 + 7 + 7))), 

132 -((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1), 

133 +((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1), 

134 # 7 bytes 

135 -((1 << (6 + 7 + 7 + 7 + 7 + 7))), 

136 +((1 << (6 + 7 + 7 + 7 + 7 + 7))), 

137 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

138 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

139 # 8 bytes 

140 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))), 

141 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))), 

142 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

143 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

144 ] 

145 

146 self._test_impl(BitStreamWriter.write_varint64, BitStreamReader.read_varint64, values, 63) 

147 

148 def test_varint(self): 

149 values = [ 

150 # 1 byte 

151 0, 

152 -((1)), 

153 +((1)), 

154 -((1 << (6)) - 1), 

155 +((1 << (6)) - 1), 

156 # 2 bytes 

157 -((1 << (6))), 

158 +((1 << (6))), 

159 -((1 << (6 + 7)) - 1), 

160 +((1 << (6 + 7)) - 1), 

161 # 3 bytes 

162 -((1 << (6 + 7))), 

163 +((1 << (6 + 7))), 

164 -((1 << (6 + 7 + 7)) - 1), 

165 +((1 << (6 + 7 + 7)) - 1), 

166 # 4 bytes 

167 -((1 << (6 + 7 + 7))), 

168 +((1 << (6 + 7 + 7))), 

169 -((1 << (6 + 7 + 7 + 8)) - 1), 

170 +((1 << (6 + 7 + 7 + 8)) - 1) 

171 # 5 bytes 

172 - ((1 << (6 + 7 + 7 + 7))), 

173 +((1 << (6 + 7 + 7 + 7))), 

174 -((1 << (6 + 7 + 7 + 7 + 7)) - 1), 

175 +((1 << (6 + 7 + 7 + 7 + 7)) - 1), 

176 # 6 bytes 

177 -((1 << (6 + 7 + 7 + 7 + 7))), 

178 +((1 << (6 + 7 + 7 + 7 + 7))), 

179 -((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1), 

180 +((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1), 

181 # 7 bytes 

182 -((1 << (6 + 7 + 7 + 7 + 7 + 7))), 

183 +((1 << (6 + 7 + 7 + 7 + 7 + 7))), 

184 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

185 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

186 # 8 bytes 

187 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))), 

188 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))), 

189 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

190 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

191 # 9 bytes 

192 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7))), 

193 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7))), 

194 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

195 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

196 # 1 byte 

197 INT64_MIN, # special case, stored as -0 

198 ] 

199 

200 self._test_impl(BitStreamWriter.write_varint, BitStreamReader.read_varint, values, 71) 

201 

202 def test_varuint16(self): 

203 values = [ 

204 # 1 byte 

205 0, 

206 1, 

207 ((1 << (7)) - 1), 

208 # 2 bytes 

209 ((1 << (7))), 

210 ((1 << (7 + 8)) - 1), 

211 ] 

212 

213 self._test_impl(BitStreamWriter.write_varuint16, BitStreamReader.read_varuint16, values, 15) 

214 

215 def test_varuint32(self): 

216 values = [ 

217 # 1 byte 

218 ((0)), 

219 ((1)), 

220 ((1 << (7)) - 1), 

221 # 2 bytes 

222 ((1 << (7))), 

223 ((1 << (7 + 7)) - 1), 

224 # 3 bytes 

225 ((1 << (7 + 7))), 

226 ((1 << (7 + 7 + 7)) - 1), 

227 # 4 bytes 

228 ((1 << (7 + 7 + 7))), 

229 ((1 << (7 + 7 + 7 + 8)) - 1), 

230 ] 

231 

232 self._test_impl(BitStreamWriter.write_varuint32, BitStreamReader.read_varuint32, values, 31) 

233 

234 def test_varuint64(self): 

235 values = [ 

236 # 1 byte 

237 ((0)), 

238 ((1)), 

239 ((1 << (7)) - 1), 

240 # 2 bytes 

241 ((1 << (7))), 

242 ((1 << (7 + 7)) - 1), 

243 # 3 bytes 

244 ((1 << (7 + 7))), 

245 ((1 << (7 + 7 + 7)) - 1), 

246 # 4 bytes 

247 ((1 << (7 + 7 + 7))), 

248 ((1 << (7 + 7 + 7 + 8)) - 1), 

249 # 5 bytes 

250 ((1 << (7 + 7 + 7 + 7))), 

251 ((1 << (7 + 7 + 7 + 7 + 7)) - 1), 

252 # 6 bytes 

253 ((1 << (7 + 7 + 7 + 7 + 7))), 

254 ((1 << (7 + 7 + 7 + 7 + 7 + 7)) - 1), 

255 # 7 bytes 

256 ((1 << (7 + 7 + 7 + 7 + 7 + 7))), 

257 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

258 # 8 bytes 

259 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7))), 

260 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

261 ] 

262 

263 self._test_impl(BitStreamWriter.write_varuint64, BitStreamReader.read_varuint64, values, 63) 

264 

265 def test_varuint(self): 

266 values = [ 

267 # 1 byte 

268 ((0)), 

269 ((1)), 

270 ((1 << (7)) - 1), 

271 # 2 bytes 

272 ((1 << (7))), 

273 ((1 << (7 + 7)) - 1), 

274 # 3 bytes 

275 ((1 << (7 + 7))), 

276 ((1 << (7 + 7 + 7)) - 1), 

277 # 4 bytes 

278 ((1 << (7 + 7 + 7))), 

279 ((1 << (7 + 7 + 7 + 8)) - 1), 

280 # 5 bytes 

281 ((1 << (7 + 7 + 7 + 7))), 

282 ((1 << (7 + 7 + 7 + 7 + 7)) - 1), 

283 # 6 bytes 

284 ((1 << (7 + 7 + 7 + 7 + 7))), 

285 ((1 << (7 + 7 + 7 + 7 + 7 + 7)) - 1), 

286 # 7 bytes 

287 ((1 << (7 + 7 + 7 + 7 + 7 + 7))), 

288 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

289 # 8 bytes 

290 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7))), 

291 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

292 # 9 bytes 

293 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 7))), 

294 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

295 ] 

296 

297 self._test_impl(BitStreamWriter.write_varuint, BitStreamReader.read_varuint, values, 71) 

298 

299 def test_varsize(self): 

300 values = [ 

301 # 1 byte 

302 ((0)), 

303 ((1)), 

304 ((1 << (7)) - 1), 

305 # 2 bytes 

306 ((1 << (7))), 

307 ((1 << (7 + 7)) - 1), 

308 # 3 bytes 

309 ((1 << (7 + 7))), 

310 ((1 << (7 + 7 + 7)) - 1), 

311 # 4 bytes 

312 ((1 << (7 + 7 + 7))), 

313 ((1 << (7 + 7 + 7 + 7)) - 1), 

314 # 5 bytes 

315 ((1 << (7 + 7 + 7 + 7))), 

316 ((1 << (2 + 7 + 7 + 7 + 8)) - 1), 

317 ] 

318 

319 self._test_impl(BitStreamWriter.write_varsize, BitStreamReader.read_varsize, values, 39) 

320 

321 def test_float16(self): 

322 values = [-42.5, -2.0, 0.0, 0.6171875, 0.875, 2.0, 9.875, 42.5] 

323 

324 self._test_impl(BitStreamWriter.write_float16, BitStreamReader.read_float16, values, 15) 

325 

326 def test_float32(self): 

327 values = [-42.5, -2.0, 0.0, 0.6171875, 0.875, 2.0, 9.875, 42.5] 

328 

329 self._test_impl(BitStreamWriter.write_float32, BitStreamReader.read_float32, values, 31) 

330 

331 def test_float64(self): 

332 values = [-42.5, -2.0, 0.0, 0.6171875, 0.875, 2.0, 9.875, 42.5] 

333 

334 self._test_impl(BitStreamWriter.write_float64, BitStreamReader.read_float64, values, 63) 

335 

336 def test_string(self): 

337 values = [ 

338 "Hello World", 

339 "\n\t%^@(*aAzZ01234569$%^!?<>[]](){}-=+~:;/|\\\"'Hello World2\0nonWrittenPart", 

340 b"Price: \xE2\x82\xAC 3 what's this? -> \xC2\xA2".decode("utf-8"), 

341 ] 

342 

343 self._test_impl(BitStreamWriter.write_string, BitStreamReader.read_string, values, 7) 

344 

345 def test_bool(self): 

346 values = [ 

347 False, 

348 True, 

349 True, 

350 False, 

351 False, 

352 True, 

353 False, 

354 True, 

355 False, 

356 False, 

357 True, 

358 True, 

359 False, 

360 ] 

361 

362 self._test_impl(BitStreamWriter.write_bool, BitStreamReader.read_bool, values, 1) 

363 

364 def test_bitbuffer(self): 

365 values = [ 

366 BitBuffer(bytes([0xAB, 0x07]), 11), 

367 BitBuffer(bytes([0xAB, 0xCD, 0x7F]), 23), 

368 ] 

369 

370 self._test_impl(BitStreamWriter.write_bitbuffer, BitStreamReader.read_bitbuffer, values, 7) 

371 

372 def test_bytes(self): 

373 values = [bytearray([0, 255]), bytearray([1, 127, 128, 254])] 

374 

375 self._test_impl(BitStreamWriter.write_bytes, BitStreamReader.read_bytes, values, 7) 

376 

377 def test_bitposition(self): 

378 writer = BitStreamWriter() 

379 writer.write_bits(0xAAAA, 16) 

380 self.assertEqual(16, writer.bitposition) 

381 writer.write_bits(0xFF, 8) 

382 self.assertEqual(24, writer.bitposition) 

383 

384 reader = BitStreamReader(buffer=writer.byte_array) 

385 self.assertEqual(0xAAAA, reader.read_bits(16)) 

386 self.assertEqual(16, reader.bitposition) 

387 reader.bitposition = 8 

388 self.assertEqual(8, reader.bitposition) 

389 self.assertEqual(0xAAFF, reader.read_bits(16)) 

390 reader.bitposition = 13 

391 self.assertEqual(13, reader.bitposition) 

392 self.assertEqual(0x02, reader.read_bits(3)) 

393 self.assertEqual(16, reader.bitposition) 

394 self.assertEqual(0xFF, reader.read_bits(8)) 

395 self.assertEqual(24, reader.bitposition) 

396 reader.bitposition = 0 

397 self.assertEqual(0, reader.bitposition) 

398 self.assertEqual(0xAAAAFF, reader.read_bits(24)) 

399 

400 def test_alignto(self): 

401 writer = BitStreamWriter() 

402 writer.write_bits(5, 3) 

403 writer.alignto(8) 

404 self.assertEqual(8, writer.bitposition) 

405 writer.write_bits(0, 1) 

406 writer.alignto(16) 

407 self.assertEqual(16, writer.bitposition) 

408 writer.write_bits(0xAA, 9) 

409 writer.alignto(32) 

410 self.assertEqual(32, writer.bitposition) 

411 writer.write_bits(0xACA, 13) 

412 writer.alignto(64) 

413 self.assertEqual(64, writer.bitposition) 

414 writer.write_bits(0xCAFE, 16) 

415 

416 reader = BitStreamReader(buffer=writer.byte_array) 

417 self.assertEqual(5, reader.read_bits(3)) 

418 reader.alignto(8) 

419 self.assertEqual(8, reader.bitposition) 

420 self.assertEqual(0, reader.read_bits(1)) 

421 reader.alignto(16) 

422 self.assertEqual(16, reader.bitposition) 

423 self.assertEqual(0xAA, reader.read_bits(9)) 

424 reader.alignto(32) 

425 self.assertEqual(32, reader.bitposition) 

426 self.assertEqual(0xACA, reader.read_bits(13)) 

427 reader.alignto(64) 

428 self.assertEqual(64, reader.bitposition) 

429 self.assertEqual(0xCAFE, reader.read_bits(16)) 

430 

431 def test_file(self): 

432 test_filename = "BitStreamTest.bin" 

433 writer = BitStreamWriter() 

434 writer.write_bits(13, 7) 

435 writer.write_string(test_filename) 

436 writer.write_varint(-123456) 

437 writer.to_file(test_filename) 

438 

439 reader = BitStreamReader.from_file(test_filename) 

440 self.assertEqual(13, reader.read_bits(7)) 

441 self.assertEqual(test_filename, reader.read_string()) 

442 self.assertEqual(-123456, reader.read_varint()) 

443 

444 def _test_bits_impl(self, write_method, read_method, values, numbits): 

445 for bit_pos in range(numbits): 

446 writer = BitStreamWriter() 

447 if bit_pos > 0: 

448 writer.write_bits(0, bit_pos) 

449 for value in values: 

450 write_method(writer, value, numbits) 

451 

452 reader = BitStreamReader(buffer=writer.byte_array) 

453 if bit_pos > 0: 

454 reader.read_bits(bit_pos) 

455 for value in values: 

456 self.assertEqual( 

457 value, 

458 read_method(reader, numbits), 

459 f"[numbits={numbits}, bit_pos={bit_pos}]", 

460 ) 

461 

462 def _test_impl(self, write_method, read_method, values, max_start_bit_pos): 

463 for bit_pos in range(max_start_bit_pos): 

464 writer = BitStreamWriter() 

465 if bit_pos > 64: 

466 writer.write_bits(0, 64) 

467 writer.write_bits(0, bit_pos - 64) 

468 elif bit_pos > 0: 

469 writer.write_bits(0, bit_pos) 

470 for value in values: 

471 write_method(writer, value) 

472 

473 reader = BitStreamReader(buffer=writer.byte_array) 

474 if bit_pos > 64: 

475 reader.read_bits(64) 

476 reader.read_bits(bit_pos - 64) 

477 elif bit_pos > 0: 

478 reader.read_bits(bit_pos) 

479 for value in values: 

480 self.assertEqual(value, read_method(reader), f"[bit_pos={bit_pos}]")