Coverage for /home/runner/work/zserio/zserio/compiler/extensions/python/runtime/src/zserio/bitwriter.py: 100%

142 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-12-05 10:43 +0000

1""" 

2The module implements abstraction for writing data to the bit stream. 

3""" 

4 

5from zserio.bitbuffer import BitBuffer 

6from zserio.bitsizeof import ( 

7 bitsizeof_varint16, 

8 bitsizeof_varint32, 

9 bitsizeof_varint64, 

10 bitsizeof_varint, 

11 bitsizeof_varuint16, 

12 bitsizeof_varuint32, 

13 bitsizeof_varuint64, 

14 bitsizeof_varuint, 

15 bitsizeof_varsize, 

16) 

17from zserio.exception import PythonRuntimeException 

18from zserio.float import float_to_uint16, float_to_uint32, float_to_uint64 

19from zserio.limits import INT64_MIN 

20from zserio.cppbind import import_cpp_class 

21 

22 

23class BitStreamWriter: 

24 """ 

25 Bit stream writer using bytearray. 

26 """ 

27 

28 def __init__(self) -> None: 

29 """ 

30 Constructor. 

31 """ 

32 

33 self._byte_array: bytearray = bytearray() 

34 self._bitposition: int = 0 

35 

36 def write_bits(self, value: int, numbits: int) -> None: 

37 """ 

38 Writes the given value with the given number of bits to the underlying storage. 

39 

40 :param value: Value to write. 

41 :param numbits: Number of bits to write. 

42 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. 

43 """ 

44 

45 if numbits < 1 or numbits > 64: 

46 raise PythonRuntimeException(f"BitStreamWriter: numbits '{numbits}' not in range [1,64]!") 

47 

48 self.write_bits_unchecked(value, numbits) 

49 

50 def write_signed_bits(self, value: int, numbits: int) -> None: 

51 """ 

52 Writes the given signed value with the given number of bits to the underlying storage. 

53 Provided for convenience. 

54 

55 :param value: Signed value to write. 

56 :param numbits: Number of bits to write. 

57 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. 

58 """ 

59 

60 if numbits < 1 or numbits > 64: 

61 raise PythonRuntimeException(f"BitStreamWriter: numbits '{numbits}' not in range [1,64]!") 

62 

63 self.write_signed_bits_unchecked(value, numbits) 

64 

65 def write_bits_unchecked(self, value: int, numbits: int) -> None: 

66 """ 

67 Writes the given value with the given number of bits to the underlying storage. 

68 

69 This method does not check that numbits > 0 and assumes that it's ensured by the caller. 

70 

71 :param value: Value to write. 

72 :param numbits: Number of bits to write. 

73 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. 

74 """ 

75 

76 min_value = 0 

77 max_value = (1 << numbits) - 1 

78 if value < min_value or value > max_value: 

79 raise PythonRuntimeException( 

80 f"BitStreamWriter: Value '{value}' is out of the range " f"[{min_value},{max_value}]!" 

81 ) 

82 

83 self._write_bits(value, numbits, signed=False) 

84 

85 def write_signed_bits_unchecked(self, value: int, numbits: int) -> None: 

86 """ 

87 Writes the given signed value with the given number of bits to the underlying storage. 

88 Provided for convenience. 

89 

90 This method does not check that numbits > 0 and assumes that it's ensured by the caller. 

91 

92 :param value: Signed value to write. 

93 :param numbits: Number of bits to write. 

94 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. 

95 """ 

96 

97 min_value = -(1 << (numbits - 1)) 

98 max_value = (1 << (numbits - 1)) - 1 

99 if value < min_value or value > max_value: 

100 raise PythonRuntimeException( 

101 f"BitStreamWriter: Value '{value}' is out of the range " f"[{min_value},{max_value}]!" 

102 ) 

103 

104 self._write_bits(value, numbits, signed=True) 

105 

106 def write_varint16(self, value: int) -> None: 

107 """ 

108 Writes a variable 16-bit signed integer value to the underlying storage. 

109 

110 :param value: Value to write. 

111 :raises PythonRuntimeException: If the value is out of the range. 

112 """ 

113 

114 self._write_varnum(value, 2, bitsizeof_varint16(value) // 8, is_signed=True) 

115 

116 def write_varint32(self, value: int) -> None: 

117 """ 

118 Writes a variable 32-bit signed integer value to the underlying storage. 

119 

120 :param value: Value to write. 

121 :raises PythonRuntimeException: If the value is out of the range. 

122 """ 

123 

124 self._write_varnum(value, 4, bitsizeof_varint32(value) // 8, is_signed=True) 

125 

126 def write_varint64(self, value: int) -> None: 

127 """ 

128 Writes a variable 16-bit signed integer value to the underlying storage. 

129 

130 :param value: Value to write. 

131 :raises PythonRuntimeException: If the value is out of the range. 

132 """ 

133 

134 self._write_varnum(value, 8, bitsizeof_varint64(value) // 8, is_signed=True) 

135 

136 def write_varint(self, value: int) -> None: 

137 """ 

138 Writes a variable signed integer value (up to 9 bytes) to the underlying storage. 

139 

140 :param value: Value to write. 

141 :raises PythonRuntimeException: If the value is out of the range. 

142 """ 

143 

144 if value == INT64_MIN: 

145 self._write_bits(0x80, 8) # INT64_MIN is stored as -0 

146 else: 

147 self._write_varnum(value, 9, bitsizeof_varint(value) // 8, is_signed=True) 

148 

149 def write_varuint16(self, value: int) -> None: 

150 """ 

151 Writes a variable 16-bit unsigned integer value to the underlying storage. 

152 

153 :param value: Value to write. 

154 :raises PythonRuntimeException: If the value is out of the range. 

155 """ 

156 

157 self._write_varnum(value, 2, bitsizeof_varuint16(value) // 8, is_signed=False) 

158 

159 def write_varuint32(self, value: int) -> None: 

160 """ 

161 Writes a variable 32-bit unsigned integer value to the underlying storage. 

162 

163 :param value: Value to write. 

164 :raises PythonRuntimeException: If the value is out of the range. 

165 """ 

166 

167 self._write_varnum(value, 4, bitsizeof_varuint32(value) // 8, is_signed=False) 

168 

169 def write_varuint64(self, value: int) -> None: 

170 """ 

171 Writes a variable 16-bit unsigned integer value to the underlying storage. 

172 

173 :param value: Value to write. 

174 :raises PythonRuntimeException: If the value is out of the range. 

175 """ 

176 

177 self._write_varnum(value, 8, bitsizeof_varuint64(value) // 8, is_signed=False) 

178 

179 def write_varuint(self, value: int) -> None: 

180 """ 

181 Writes a variable unsigned integer value (up to 9 bytes) to the underlying storage. 

182 

183 :param value: Value to write. 

184 :raises PythonRuntimeException: If the value is out of the range. 

185 """ 

186 

187 self._write_varnum(value, 9, bitsizeof_varuint(value) // 8, is_signed=False) 

188 

189 def write_varsize(self, value: int) -> None: 

190 """ 

191 Writes a variable size integer value to the underlying storage. 

192 

193 :param value: Value to write. 

194 :raises PythonRuntimeException: If the value is out of the range. 

195 """ 

196 

197 self._write_varnum(value, 5, bitsizeof_varsize(value) // 8, is_signed=False) 

198 

199 def write_float16(self, value: float) -> None: 

200 """ 

201 Writes a 16-bit float value to the underlying storage according to IEEE 754 binary16. 

202 

203 :param value: Float value to write. 

204 """ 

205 

206 self.write_bits_unchecked(float_to_uint16(value), 16) 

207 

208 def write_float32(self, value: float) -> None: 

209 """ 

210 Writes a 32-bit float value to the underlying storage according to IEEE 754 binary32. 

211 

212 :param value: Float value to write. 

213 """ 

214 

215 self.write_bits_unchecked(float_to_uint32(value), 32) 

216 

217 def write_float64(self, value: float) -> None: 

218 """ 

219 Writes a 64-bit float value to the underlying storage according to IEEE 754 binary64. 

220 

221 :param value: Float value to write. 

222 """ 

223 

224 self.write_bits_unchecked(float_to_uint64(value), 64) 

225 

226 def write_bytes(self, value: bytearray): 

227 """ 

228 Writes the given bytes to the underlying storage. Length of the bytes is written 

229 as varsize at the beginning. 

230 

231 :param value: Bytes to write. 

232 """ 

233 

234 length = len(value) 

235 self.write_varsize(length) 

236 

237 begin_bitposition = self._bitposition 

238 if (begin_bitposition & 0x07) != 0: 

239 # we are not aligned to byte 

240 for byte in value: 

241 self.write_bits_unchecked(byte, 8) 

242 else: 

243 # we are aligned to byte 

244 self._bitposition += length * 8 

245 self._byte_array += value[0:length] 

246 

247 def write_string(self, string: str) -> None: 

248 """ 

249 Writes the given string to the underlying storage in UTF-8 encoding. Length of the string is written 

250 as varsize at the beginning. 

251 

252 :param string: String to write. 

253 """ 

254 

255 string_bytes = string.encode("utf-8") 

256 length = len(string_bytes) 

257 self.write_varsize(length) 

258 

259 begin_bitposition = self._bitposition 

260 if (begin_bitposition & 0x07) != 0: 

261 # we are not aligned to byte 

262 for string_byte in string_bytes: 

263 self.write_bits_unchecked(string_byte, 8) 

264 else: 

265 # we are aligned to byte 

266 self._bitposition += length * 8 

267 self._byte_array += string_bytes[0:length] 

268 

269 def write_bool(self, value: bool) -> None: 

270 """ 

271 Writes bool in a single bit. 

272 

273 :param value: Bool value to write. 

274 """ 

275 

276 self._write_bits(1 if value else 0, 1) 

277 

278 def write_bitbuffer(self, bitbuffer: BitBuffer) -> None: 

279 """ 

280 Writes a bit buffer to the underlying storage. Length of the bit buffer is written as varsize 

281 at the beginning. 

282 

283 :param bitbuffer: Bit buffer to write. 

284 """ 

285 

286 bitsize = bitbuffer.bitsize 

287 self.write_varsize(bitsize) 

288 

289 write_buffer = bitbuffer.buffer 

290 num_bytes_to_write = bitsize // 8 

291 num_rest_bits = bitsize - num_bytes_to_write * 8 

292 begin_bitposition = self._bitposition 

293 if (begin_bitposition & 0x07) != 0: 

294 # we are not aligned to byte 

295 for i in range(num_bytes_to_write): 

296 self.write_bits_unchecked(write_buffer[i], 8) 

297 else: 

298 # we are aligned to byte 

299 self._bitposition += num_bytes_to_write * 8 

300 self._byte_array += write_buffer[0:num_bytes_to_write] 

301 

302 if num_rest_bits > 0: 

303 self.write_bits_unchecked(write_buffer[num_bytes_to_write] >> (8 - num_rest_bits), num_rest_bits) 

304 

305 @property 

306 def byte_array(self) -> bytes: 

307 """ 

308 Gets internal bytearray. 

309 

310 :returns: Underlying bytearray object. 

311 """ 

312 

313 return self._byte_array 

314 

315 def to_file(self, filename: str) -> None: 

316 """ 

317 Writes underlying bytearray to binary file. 

318 

319 :param filename: File to write. 

320 """ 

321 

322 with open(filename, "wb") as file: 

323 file.write(self._byte_array) 

324 

325 @property 

326 def bitposition(self) -> int: 

327 """ 

328 Gets current bit position. 

329 

330 :returns: Current bit position. 

331 """ 

332 

333 return self._bitposition 

334 

335 def alignto(self, alignment: int) -> None: 

336 """ 

337 Aligns the bit position according to the aligning value. 

338 

339 :param alignment: An aligning value to use. 

340 """ 

341 

342 offset = self._bitposition % alignment 

343 if offset != 0: 

344 self._write_bits(0, alignment - offset) 

345 

346 def _write_bits(self, value: int, numbits: int, *, signed: bool = False) -> None: 

347 buffer_last_byte_bits = self._bitposition % 8 

348 buffer_free_bits = (8 - buffer_last_byte_bits) if buffer_last_byte_bits != 0 else 0 

349 value_first_byte_bits = numbits % 8 or 8 

350 if value_first_byte_bits <= buffer_free_bits: 

351 left_shift = buffer_free_bits - value_first_byte_bits 

352 else: 

353 left_shift = buffer_free_bits + 8 - value_first_byte_bits 

354 value <<= left_shift 

355 num_bytes = (numbits + left_shift + 7) // 8 

356 value_bytes = value.to_bytes(num_bytes, byteorder="big", signed=signed) 

357 if buffer_free_bits == 0: 

358 self._byte_array.extend(value_bytes) 

359 else: 

360 value_first_byte = value_bytes[0] & ((1 << buffer_free_bits) - 1) 

361 self._byte_array[-1] |= value_first_byte 

362 self._byte_array.extend(value_bytes[1:]) 

363 

364 self._bitposition += numbits 

365 

366 def _write_varnum(self, value: int, max_var_bytes: int, num_var_bytes: int, *, is_signed: bool) -> None: 

367 abs_value = abs(value) 

368 has_max_byte_range = num_var_bytes == max_var_bytes 

369 for i in range(num_var_bytes): 

370 byte = 0x00 

371 numbits = 8 

372 has_next_byte = i < num_var_bytes - 1 

373 has_sign_bit = is_signed and i == 0 

374 if has_sign_bit: 

375 if value < 0: 

376 byte |= 0x80 

377 numbits -= 1 

378 if has_next_byte: 

379 numbits -= 1 

380 byte |= 1 << numbits # use bit 6 if signed bit is present, use bit 7 otherwise 

381 else: # this is the last byte 

382 if not has_max_byte_range: # next byte flag isn't used in last byte in case of max byte range 

383 numbits -= 1 

384 

385 shift_bits = (num_var_bytes - (i + 1)) * 7 + (1 if has_max_byte_range and has_next_byte else 0) 

386 byte |= (abs_value >> shift_bits) & VAR_NUM_BIT_MASKS[numbits - 1] 

387 self.write_bits_unchecked(byte, 8) 

388 

389 

390VAR_NUM_BIT_MASKS = [0x01, 0x03, 0x07, 0x0F, 0x1F, 0x3F, 0x7F, 0xFF] 

391 

392BitStreamWriter = import_cpp_class("BitStreamWriter") or BitStreamWriter # type: ignore