Coverage for /home/runner/work/zserio/zserio/compiler/extensions/python/runtime/tests/test_bitstream.py: 100%
156 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-12-05 10:43 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-12-05 10:43 +0000
1import unittest
3from zserio.bitbuffer import BitBuffer
4from zserio.bitreader import BitStreamReader
5from zserio.bitsizeof import INT64_MIN
6from zserio.bitwriter import BitStreamWriter
9class BitStreamTest(unittest.TestCase):
11 def test_bits(self):
12 for numbits in range(1, 65):
13 max_value = (1 << numbits) - 1
14 values = [
15 max_value,
16 max_value >> 1,
17 max_value >> 2,
18 1,
19 0,
20 1,
21 max_value >> 2,
22 max_value >> 1,
23 max_value,
24 ]
25 self._test_bits_impl(BitStreamWriter.write_bits, BitStreamReader.read_bits, values, numbits)
27 def test_signed_bits(self):
28 for numbits in range(1, 65):
29 min_value = -1 << (numbits - 1)
30 max_value = (1 << (numbits - 1)) - 1
31 values = [
32 min_value,
33 max_value,
34 min_value >> 1,
35 max_value >> 1,
36 min_value >> 2,
37 max_value >> 2,
38 -1,
39 (1 if numbits != 1 else -1),
40 0,
41 (1 if numbits != 1 else -1),
42 -1,
43 max_value >> 2,
44 min_value >> 2,
45 max_value >> 1,
46 min_value >> 1,
47 max_value,
48 min_value,
49 ]
50 self._test_bits_impl(
51 BitStreamWriter.write_signed_bits,
52 BitStreamReader.read_signed_bits,
53 values,
54 numbits,
55 )
57 def test_varint16(self):
58 values = [
59 # 1 byte
60 0,
61 -1,
62 +1,
63 -((1 << (6)) - 1),
64 +((1 << (6)) - 1),
65 # 2 bytes
66 -((1 << (6))),
67 +((1 << (6))),
68 -((1 << (6 + 8)) - 1),
69 +((1 << (6 + 8)) - 1),
70 ]
72 self._test_impl(BitStreamWriter.write_varint16, BitStreamReader.read_varint16, values, 15)
74 def test_varint32(self):
75 values = [
76 # 1 byte
77 0,
78 -((1)),
79 +((1)),
80 -((1 << (6)) - 1),
81 +((1 << (6)) - 1),
82 # 2 bytes
83 -((1 << (6))),
84 +((1 << (6))),
85 -((1 << (6 + 7)) - 1),
86 +((1 << (6 + 7)) - 1),
87 # 3 bytes
88 -((1 << (6 + 7))),
89 +((1 << (6 + 7))),
90 -((1 << (6 + 7 + 7)) - 1),
91 +((1 << (6 + 7 + 7)) - 1),
92 # 4 bytes
93 -((1 << (6 + 7 + 7))),
94 +((1 << (6 + 7 + 7))),
95 -((1 << (6 + 7 + 7 + 8)) - 1),
96 +((1 << (6 + 7 + 7 + 8)) - 1),
97 ]
99 self._test_impl(BitStreamWriter.write_varint32, BitStreamReader.read_varint32, values, 31)
101 def test_varint64(self):
102 values = [
103 # 1 byte
104 0,
105 -((1)),
106 +((1)),
107 -((1 << (6)) - 1),
108 +((1 << (6)) - 1),
109 # 2 bytes
110 -((1 << (6))),
111 +((1 << (6))),
112 -((1 << (6 + 7)) - 1),
113 +((1 << (6 + 7)) - 1),
114 # 3 bytes
115 -((1 << (6 + 7))),
116 +((1 << (6 + 7))),
117 -((1 << (6 + 7 + 7)) - 1),
118 +((1 << (6 + 7 + 7)) - 1),
119 # 4 bytes
120 -((1 << (6 + 7 + 7))),
121 +((1 << (6 + 7 + 7))),
122 -((1 << (6 + 7 + 7 + 8)) - 1),
123 +((1 << (6 + 7 + 7 + 8)) - 1)
124 # 5 bytes
125 - ((1 << (6 + 7 + 7 + 7))),
126 +((1 << (6 + 7 + 7 + 7))),
127 -((1 << (6 + 7 + 7 + 7 + 7)) - 1),
128 +((1 << (6 + 7 + 7 + 7 + 7)) - 1),
129 # 6 bytes
130 -((1 << (6 + 7 + 7 + 7 + 7))),
131 +((1 << (6 + 7 + 7 + 7 + 7))),
132 -((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1),
133 +((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1),
134 # 7 bytes
135 -((1 << (6 + 7 + 7 + 7 + 7 + 7))),
136 +((1 << (6 + 7 + 7 + 7 + 7 + 7))),
137 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1),
138 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1),
139 # 8 bytes
140 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))),
141 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))),
142 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1),
143 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1),
144 ]
146 self._test_impl(BitStreamWriter.write_varint64, BitStreamReader.read_varint64, values, 63)
148 def test_varint(self):
149 values = [
150 # 1 byte
151 0,
152 -((1)),
153 +((1)),
154 -((1 << (6)) - 1),
155 +((1 << (6)) - 1),
156 # 2 bytes
157 -((1 << (6))),
158 +((1 << (6))),
159 -((1 << (6 + 7)) - 1),
160 +((1 << (6 + 7)) - 1),
161 # 3 bytes
162 -((1 << (6 + 7))),
163 +((1 << (6 + 7))),
164 -((1 << (6 + 7 + 7)) - 1),
165 +((1 << (6 + 7 + 7)) - 1),
166 # 4 bytes
167 -((1 << (6 + 7 + 7))),
168 +((1 << (6 + 7 + 7))),
169 -((1 << (6 + 7 + 7 + 8)) - 1),
170 +((1 << (6 + 7 + 7 + 8)) - 1)
171 # 5 bytes
172 - ((1 << (6 + 7 + 7 + 7))),
173 +((1 << (6 + 7 + 7 + 7))),
174 -((1 << (6 + 7 + 7 + 7 + 7)) - 1),
175 +((1 << (6 + 7 + 7 + 7 + 7)) - 1),
176 # 6 bytes
177 -((1 << (6 + 7 + 7 + 7 + 7))),
178 +((1 << (6 + 7 + 7 + 7 + 7))),
179 -((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1),
180 +((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1),
181 # 7 bytes
182 -((1 << (6 + 7 + 7 + 7 + 7 + 7))),
183 +((1 << (6 + 7 + 7 + 7 + 7 + 7))),
184 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1),
185 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1),
186 # 8 bytes
187 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))),
188 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))),
189 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1),
190 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1),
191 # 9 bytes
192 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7))),
193 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7))),
194 -((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1),
195 +((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1),
196 # 1 byte
197 INT64_MIN, # special case, stored as -0
198 ]
200 self._test_impl(BitStreamWriter.write_varint, BitStreamReader.read_varint, values, 71)
202 def test_varuint16(self):
203 values = [
204 # 1 byte
205 0,
206 1,
207 ((1 << (7)) - 1),
208 # 2 bytes
209 ((1 << (7))),
210 ((1 << (7 + 8)) - 1),
211 ]
213 self._test_impl(BitStreamWriter.write_varuint16, BitStreamReader.read_varuint16, values, 15)
215 def test_varuint32(self):
216 values = [
217 # 1 byte
218 ((0)),
219 ((1)),
220 ((1 << (7)) - 1),
221 # 2 bytes
222 ((1 << (7))),
223 ((1 << (7 + 7)) - 1),
224 # 3 bytes
225 ((1 << (7 + 7))),
226 ((1 << (7 + 7 + 7)) - 1),
227 # 4 bytes
228 ((1 << (7 + 7 + 7))),
229 ((1 << (7 + 7 + 7 + 8)) - 1),
230 ]
232 self._test_impl(BitStreamWriter.write_varuint32, BitStreamReader.read_varuint32, values, 31)
234 def test_varuint64(self):
235 values = [
236 # 1 byte
237 ((0)),
238 ((1)),
239 ((1 << (7)) - 1),
240 # 2 bytes
241 ((1 << (7))),
242 ((1 << (7 + 7)) - 1),
243 # 3 bytes
244 ((1 << (7 + 7))),
245 ((1 << (7 + 7 + 7)) - 1),
246 # 4 bytes
247 ((1 << (7 + 7 + 7))),
248 ((1 << (7 + 7 + 7 + 8)) - 1),
249 # 5 bytes
250 ((1 << (7 + 7 + 7 + 7))),
251 ((1 << (7 + 7 + 7 + 7 + 7)) - 1),
252 # 6 bytes
253 ((1 << (7 + 7 + 7 + 7 + 7))),
254 ((1 << (7 + 7 + 7 + 7 + 7 + 7)) - 1),
255 # 7 bytes
256 ((1 << (7 + 7 + 7 + 7 + 7 + 7))),
257 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1),
258 # 8 bytes
259 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7))),
260 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1),
261 ]
263 self._test_impl(BitStreamWriter.write_varuint64, BitStreamReader.read_varuint64, values, 63)
265 def test_varuint(self):
266 values = [
267 # 1 byte
268 ((0)),
269 ((1)),
270 ((1 << (7)) - 1),
271 # 2 bytes
272 ((1 << (7))),
273 ((1 << (7 + 7)) - 1),
274 # 3 bytes
275 ((1 << (7 + 7))),
276 ((1 << (7 + 7 + 7)) - 1),
277 # 4 bytes
278 ((1 << (7 + 7 + 7))),
279 ((1 << (7 + 7 + 7 + 8)) - 1),
280 # 5 bytes
281 ((1 << (7 + 7 + 7 + 7))),
282 ((1 << (7 + 7 + 7 + 7 + 7)) - 1),
283 # 6 bytes
284 ((1 << (7 + 7 + 7 + 7 + 7))),
285 ((1 << (7 + 7 + 7 + 7 + 7 + 7)) - 1),
286 # 7 bytes
287 ((1 << (7 + 7 + 7 + 7 + 7 + 7))),
288 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1),
289 # 8 bytes
290 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7))),
291 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1),
292 # 9 bytes
293 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 7))),
294 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1),
295 ]
297 self._test_impl(BitStreamWriter.write_varuint, BitStreamReader.read_varuint, values, 71)
299 def test_varsize(self):
300 values = [
301 # 1 byte
302 ((0)),
303 ((1)),
304 ((1 << (7)) - 1),
305 # 2 bytes
306 ((1 << (7))),
307 ((1 << (7 + 7)) - 1),
308 # 3 bytes
309 ((1 << (7 + 7))),
310 ((1 << (7 + 7 + 7)) - 1),
311 # 4 bytes
312 ((1 << (7 + 7 + 7))),
313 ((1 << (7 + 7 + 7 + 7)) - 1),
314 # 5 bytes
315 ((1 << (7 + 7 + 7 + 7))),
316 ((1 << (2 + 7 + 7 + 7 + 8)) - 1),
317 ]
319 self._test_impl(BitStreamWriter.write_varsize, BitStreamReader.read_varsize, values, 39)
321 def test_float16(self):
322 values = [-42.5, -2.0, 0.0, 0.6171875, 0.875, 2.0, 9.875, 42.5]
324 self._test_impl(BitStreamWriter.write_float16, BitStreamReader.read_float16, values, 15)
326 def test_float32(self):
327 values = [-42.5, -2.0, 0.0, 0.6171875, 0.875, 2.0, 9.875, 42.5]
329 self._test_impl(BitStreamWriter.write_float32, BitStreamReader.read_float32, values, 31)
331 def test_float64(self):
332 values = [-42.5, -2.0, 0.0, 0.6171875, 0.875, 2.0, 9.875, 42.5]
334 self._test_impl(BitStreamWriter.write_float64, BitStreamReader.read_float64, values, 63)
336 def test_string(self):
337 values = [
338 "Hello World",
339 "\n\t%^@(*aAzZ01234569$%^!?<>[]](){}-=+~:;/|\\\"'Hello World2\0nonWrittenPart",
340 b"Price: \xE2\x82\xAC 3 what's this? -> \xC2\xA2".decode("utf-8"),
341 ]
343 self._test_impl(BitStreamWriter.write_string, BitStreamReader.read_string, values, 7)
345 def test_bool(self):
346 values = [
347 False,
348 True,
349 True,
350 False,
351 False,
352 True,
353 False,
354 True,
355 False,
356 False,
357 True,
358 True,
359 False,
360 ]
362 self._test_impl(BitStreamWriter.write_bool, BitStreamReader.read_bool, values, 1)
364 def test_bitbuffer(self):
365 values = [
366 BitBuffer(bytes([0xAB, 0x07]), 11),
367 BitBuffer(bytes([0xAB, 0xCD, 0x7F]), 23),
368 ]
370 self._test_impl(BitStreamWriter.write_bitbuffer, BitStreamReader.read_bitbuffer, values, 7)
372 def test_bytes(self):
373 values = [bytearray([0, 255]), bytearray([1, 127, 128, 254])]
375 self._test_impl(BitStreamWriter.write_bytes, BitStreamReader.read_bytes, values, 7)
377 def test_bitposition(self):
378 writer = BitStreamWriter()
379 writer.write_bits(0xAAAA, 16)
380 self.assertEqual(16, writer.bitposition)
381 writer.write_bits(0xFF, 8)
382 self.assertEqual(24, writer.bitposition)
384 reader = BitStreamReader(buffer=writer.byte_array)
385 self.assertEqual(0xAAAA, reader.read_bits(16))
386 self.assertEqual(16, reader.bitposition)
387 reader.bitposition = 8
388 self.assertEqual(8, reader.bitposition)
389 self.assertEqual(0xAAFF, reader.read_bits(16))
390 reader.bitposition = 13
391 self.assertEqual(13, reader.bitposition)
392 self.assertEqual(0x02, reader.read_bits(3))
393 self.assertEqual(16, reader.bitposition)
394 self.assertEqual(0xFF, reader.read_bits(8))
395 self.assertEqual(24, reader.bitposition)
396 reader.bitposition = 0
397 self.assertEqual(0, reader.bitposition)
398 self.assertEqual(0xAAAAFF, reader.read_bits(24))
400 def test_alignto(self):
401 writer = BitStreamWriter()
402 writer.write_bits(5, 3)
403 writer.alignto(8)
404 self.assertEqual(8, writer.bitposition)
405 writer.write_bits(0, 1)
406 writer.alignto(16)
407 self.assertEqual(16, writer.bitposition)
408 writer.write_bits(0xAA, 9)
409 writer.alignto(32)
410 self.assertEqual(32, writer.bitposition)
411 writer.write_bits(0xACA, 13)
412 writer.alignto(64)
413 self.assertEqual(64, writer.bitposition)
414 writer.write_bits(0xCAFE, 16)
416 reader = BitStreamReader(buffer=writer.byte_array)
417 self.assertEqual(5, reader.read_bits(3))
418 reader.alignto(8)
419 self.assertEqual(8, reader.bitposition)
420 self.assertEqual(0, reader.read_bits(1))
421 reader.alignto(16)
422 self.assertEqual(16, reader.bitposition)
423 self.assertEqual(0xAA, reader.read_bits(9))
424 reader.alignto(32)
425 self.assertEqual(32, reader.bitposition)
426 self.assertEqual(0xACA, reader.read_bits(13))
427 reader.alignto(64)
428 self.assertEqual(64, reader.bitposition)
429 self.assertEqual(0xCAFE, reader.read_bits(16))
431 def test_file(self):
432 test_filename = "BitStreamTest.bin"
433 writer = BitStreamWriter()
434 writer.write_bits(13, 7)
435 writer.write_string(test_filename)
436 writer.write_varint(-123456)
437 writer.to_file(test_filename)
439 reader = BitStreamReader.from_file(test_filename)
440 self.assertEqual(13, reader.read_bits(7))
441 self.assertEqual(test_filename, reader.read_string())
442 self.assertEqual(-123456, reader.read_varint())
444 def _test_bits_impl(self, write_method, read_method, values, numbits):
445 for bit_pos in range(numbits):
446 writer = BitStreamWriter()
447 if bit_pos > 0:
448 writer.write_bits(0, bit_pos)
449 for value in values:
450 write_method(writer, value, numbits)
452 reader = BitStreamReader(buffer=writer.byte_array)
453 if bit_pos > 0:
454 reader.read_bits(bit_pos)
455 for value in values:
456 self.assertEqual(
457 value,
458 read_method(reader, numbits),
459 f"[numbits={numbits}, bit_pos={bit_pos}]",
460 )
462 def _test_impl(self, write_method, read_method, values, max_start_bit_pos):
463 for bit_pos in range(max_start_bit_pos):
464 writer = BitStreamWriter()
465 if bit_pos > 64:
466 writer.write_bits(0, 64)
467 writer.write_bits(0, bit_pos - 64)
468 elif bit_pos > 0:
469 writer.write_bits(0, bit_pos)
470 for value in values:
471 write_method(writer, value)
473 reader = BitStreamReader(buffer=writer.byte_array)
474 if bit_pos > 64:
475 reader.read_bits(64)
476 reader.read_bits(bit_pos - 64)
477 elif bit_pos > 0:
478 reader.read_bits(bit_pos)
479 for value in values:
480 self.assertEqual(value, read_method(reader), f"[bit_pos={bit_pos}]")