Coverage for /home/runner/work/zserio/zserio/compiler/extensions/python/runtime/src/zserio/json.py: 100%

727 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-12-05 10:43 +0000

1""" 

2The module implements WalkObserver for writing of zserio objects to JSON format. 

3""" 

4 

5import enum 

6import io 

7import json 

8import typing 

9 

10from zserio.bitbuffer import BitBuffer 

11from zserio.creator import ZserioTreeCreator 

12from zserio.exception import PythonRuntimeException 

13from zserio.typeinfo import TypeInfo, RecursiveTypeInfo, TypeAttribute, MemberInfo 

14from zserio.walker import WalkObserver 

15 

16 

17class JsonEnumerableFormat(enum.Enum): 

18 """ 

19 Configuration for writing of enumerable types. 

20 """ 

21 

22 #: Print as JSON integral value. 

23 NUMBER = enum.auto() 

24 #: Print as JSON string according to the following rules: 

25 #: 

26 #: #. Enums 

27 #: 

28 #: * when an exact match with an enumerable item is found, the item name is used - e.g. "FIRST", 

29 #: * when no exact match is found, it's an invalid value, the integral value is converted to string 

30 #: and an appropriate comment is included - e.g. "10 /\* no match \*/". 

31 #: 

32 #: #. Bitmasks 

33 #: 

34 #: * when an exact mach with or-ed bitmask values is found, it's used - e.g. "READ | WRITE", 

35 #: * when no exact match is found, but some or-ed values match, the integral value is converted 

36 #: to string and the or-ed values are included in a comment - e.g. "127 /\* READ | CREATE \*/", 

37 #: * when no match is found at all, the integral value is converted to string and an appropriate 

38 #: comment is included - e.g. "13 /\* no match \*/". 

39 STRING = enum.auto() 

40 

41 

42class JsonWriter(WalkObserver): 

43 """ 

44 Walker observer which dumps zserio objects to JSON format. 

45 """ 

46 

47 def __init__( 

48 self, 

49 *, 

50 text_io: typing.Optional[typing.TextIO] = None, 

51 enumerable_format: JsonEnumerableFormat = JsonEnumerableFormat.STRING, 

52 item_separator: typing.Optional[str] = None, 

53 key_separator: typing.Optional[str] = None, 

54 indent: typing.Union[str, int] = None, 

55 ) -> None: 

56 """ 

57 Constructor. 

58 

59 :param text_io: Optional text stream for JSON output, io.StringIO is used by default. 

60 :param item_separator: Optional item separator, default is ', ' if indent is None, ',' otherwise. 

61 :param key_separator: Optional key separator, default is ': '. 

62 :param enumerable_format: Optional enumerable format to use, default is JsonEnumerableFormat.STRING. 

63 :param indent: String or (non-negative) integer defining the indent. If not None, newlines are inserted. 

64 """ 

65 

66 self._io: typing.TextIO = text_io if text_io else io.StringIO() 

67 self._item_separator: str = item_separator if item_separator else ("," if indent is not None else ", ") 

68 self._key_separator: str = key_separator if key_separator else ": " 

69 self._enumerable_format = enumerable_format 

70 

71 self._indent: typing.Optional[str] = ( 

72 (indent if isinstance(indent, str) else " " * indent) if indent is not None else None 

73 ) 

74 

75 self._is_first = True 

76 self._level = 0 

77 self._json_encoder = JsonEncoder() 

78 

79 def get_io(self) -> typing.TextIO: 

80 """ 

81 Gets the underlying text stream. 

82 

83 :returns: Underlying text steam. 

84 """ 

85 

86 return self._io 

87 

88 def begin_root(self, _compound: typing.Any) -> None: 

89 self._begin_object() 

90 

91 def end_root(self, _compound: typing.Any) -> None: 

92 self._end_object() 

93 

94 def begin_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None: 

95 self._begin_item() 

96 

97 self._write_key(member_info.schema_name) 

98 

99 self._begin_array() 

100 

101 def end_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None: 

102 self._end_array() 

103 

104 self._end_item() 

105 

106 def begin_compound( 

107 self, 

108 compound: typing.Any, 

109 member_info: MemberInfo, 

110 element_index: typing.Optional[int] = None, 

111 ) -> None: 

112 self._begin_item() 

113 

114 if element_index is None: 

115 self._write_key(member_info.schema_name) 

116 

117 self._begin_object() 

118 

119 def end_compound( 

120 self, 

121 compound: typing.Any, 

122 member_info: MemberInfo, 

123 _element_index: typing.Optional[int] = None, 

124 ) -> None: 

125 self._end_object() 

126 

127 self._end_item() 

128 

129 def visit_value( 

130 self, 

131 value: typing.Any, 

132 member_info: MemberInfo, 

133 element_index: typing.Optional[int] = None, 

134 ) -> None: 

135 self._begin_item() 

136 

137 if element_index is None: 

138 self._write_key(member_info.schema_name) 

139 

140 self._write_value(value, member_info) 

141 

142 self._end_item() 

143 

144 def _begin_item(self): 

145 if not self._is_first: 

146 self._io.write(self._item_separator) 

147 

148 if self._indent is not None: 

149 self._io.write("\n") 

150 if self._indent: 

151 self._io.write(self._indent * self._level) 

152 

153 def _end_item(self): 

154 self._is_first = False 

155 

156 def _begin_object(self): 

157 self._io.write("{") 

158 

159 self._is_first = True 

160 self._level += 1 

161 

162 def _end_object(self): 

163 if self._indent is not None: 

164 self._io.write("\n") 

165 self._level -= 1 

166 if self._indent: 

167 self._io.write(self._indent * self._level) 

168 

169 self._io.write("}") 

170 

171 def _begin_array(self): 

172 self._io.write("[") 

173 

174 self._is_first = True 

175 self._level += 1 

176 

177 def _end_array(self): 

178 if self._indent is not None: 

179 self._io.write("\n") 

180 self._level -= 1 

181 if self._indent: 

182 self._io.write(self._indent * self._level) 

183 

184 self._io.write("]") 

185 

186 def _write_key(self, key: str) -> None: 

187 self._io.write(f"{self._json_encoder.encode_value(key)}{self._key_separator}") 

188 

189 def _write_value(self, value: typing.Any, member_info: MemberInfo) -> None: 

190 if value is None: 

191 self._io.write(self._json_encoder.encode_value(None)) 

192 return 

193 

194 type_info = member_info.type_info 

195 if type_info.schema_name == "extern": 

196 self._write_bitbuffer(value) 

197 elif type_info.schema_name == "bytes": 

198 self._write_bytes(value) 

199 else: 

200 if TypeAttribute.ENUM_ITEMS in type_info.attributes: 

201 if self._enumerable_format == JsonEnumerableFormat.STRING: 

202 self._write_stringified_enum(value, type_info) 

203 else: 

204 self._io.write(self._json_encoder.encode_value(value.value)) 

205 elif TypeAttribute.BITMASK_VALUES in type_info.attributes: 

206 if self._enumerable_format == JsonEnumerableFormat.STRING: 

207 self._write_stringified_bitmask(value, type_info) 

208 else: 

209 self._io.write(self._json_encoder.encode_value(value.value)) 

210 else: 

211 self._io.write(self._json_encoder.encode_value(value)) 

212 

213 def _write_bitbuffer(self, value: BitBuffer) -> None: 

214 self._begin_object() 

215 self._begin_item() 

216 self._write_key("buffer") 

217 self._begin_array() 

218 for byte in value.buffer: 

219 self._begin_item() 

220 self._io.write(self._json_encoder.encode_value(byte)) 

221 self._end_item() 

222 self._end_array() 

223 self._end_item() 

224 self._begin_item() 

225 self._write_key("bitSize") 

226 self._io.write(self._json_encoder.encode_value(value.bitsize)) 

227 self._end_item() 

228 self._end_object() 

229 

230 def _write_bytes(self, value: bytes) -> None: 

231 self._begin_object() 

232 self._begin_item() 

233 self._write_key("buffer") 

234 self._begin_array() 

235 for byte in value: 

236 self._begin_item() 

237 self._io.write(self._json_encoder.encode_value(byte)) 

238 self._end_item() 

239 self._end_array() 

240 self._end_item() 

241 self._end_object() 

242 

243 def _write_stringified_enum( 

244 self, value: typing.Any, type_info: typing.Union[TypeInfo, RecursiveTypeInfo] 

245 ) -> typing.Any: 

246 for item in type_info.attributes[TypeAttribute.ENUM_ITEMS]: 

247 if item.py_item == value: 

248 # exact match 

249 self._io.write(self._json_encoder.encode_value(item.schema_name)) 

250 return 

251 

252 # no match 

253 self._io.write(self._json_encoder.encode_value(str(value.value) + " /* no match */")) 

254 

255 def _write_stringified_bitmask( 

256 self, value: typing.Any, type_info: typing.Union[TypeInfo, RecursiveTypeInfo] 

257 ) -> typing.Any: 

258 string_value = "" 

259 bitmask_value = value.value 

260 value_check = 0 

261 

262 for item_info in type_info.attributes[TypeAttribute.BITMASK_VALUES]: 

263 is_zero = item_info.py_item.value == 0 

264 if (not is_zero and (bitmask_value & item_info.py_item.value == item_info.py_item.value)) or ( 

265 is_zero and bitmask_value == 0 

266 ): 

267 value_check |= item_info.py_item.value 

268 if string_value: 

269 string_value += " | " 

270 string_value += item_info.schema_name 

271 

272 if not string_value: 

273 # no match 

274 string_value += str(bitmask_value) + " /* no match */" 

275 elif bitmask_value != value_check: 

276 # partial match 

277 string_value = str(bitmask_value) + " /* partial match: " + string_value + " */" 

278 # else exact match 

279 

280 self._io.write(self._json_encoder.encode_value(string_value)) 

281 

282 

283class JsonEncoder: 

284 """ 

285 Converts zserio values to Json string representation. 

286 """ 

287 

288 def __init__(self) -> None: 

289 """ 

290 Constructor. 

291 """ 

292 

293 self._encoder = json.JSONEncoder(ensure_ascii=False) 

294 

295 def encode_value(self, value: typing.Any) -> str: 

296 """ 

297 Encodes value to JSON string representation. 

298 

299 :param value: Value to encode. 

300 

301 :returns: Value encoded to string as a valid JSON value. 

302 """ 

303 

304 return self._encoder.encode(value) 

305 

306 

307class JsonToken(enum.Enum): 

308 """ 

309 Tokens used by Json Tokenizer. 

310 """ 

311 

312 BEGIN_OF_FILE = enum.auto() 

313 END_OF_FILE = enum.auto() 

314 BEGIN_OBJECT = enum.auto() 

315 END_OBJECT = enum.auto() 

316 BEGIN_ARRAY = enum.auto() 

317 END_ARRAY = enum.auto() 

318 KEY_SEPARATOR = enum.auto() 

319 ITEM_SEPARATOR = enum.auto() 

320 VALUE = enum.auto() 

321 

322 

323class JsonParserException(PythonRuntimeException): 

324 """ 

325 Exception used to distinguish exceptions from the JsonParser. 

326 """ 

327 

328 

329class JsonParser: 

330 """ 

331 Json Parser. 

332 

333 Parses the JSON on the fly and calls an observer. 

334 """ 

335 

336 class Observer: 

337 """ 

338 Json parser observer. 

339 """ 

340 

341 def begin_object(self) -> None: 

342 """ 

343 Called when a JSON object begins - i.e. on '{'. 

344 """ 

345 

346 raise NotImplementedError() 

347 

348 def end_object(self) -> None: 

349 """ 

350 Called when a JSON object ends - i.e. on '}'. 

351 """ 

352 

353 raise NotImplementedError() 

354 

355 def begin_array(self) -> None: 

356 """ 

357 Called when a JSON array begins - i.e. on '['. 

358 """ 

359 

360 raise NotImplementedError() 

361 

362 def end_array(self) -> None: 

363 """ 

364 Called when a JSON array ends - i.e. on ']'. 

365 """ 

366 

367 raise NotImplementedError() 

368 

369 def visit_key(self, key: str) -> None: 

370 """ 

371 Called on a JSON key. 

372 

373 :param key: Key value. 

374 """ 

375 

376 raise NotImplementedError() 

377 

378 def visit_value(self, value: typing.Any) -> None: 

379 """ 

380 Called on a JSON value. 

381 

382 :param value: JSON value. 

383 """ 

384 

385 raise NotImplementedError() 

386 

387 def __init__(self, text_io: typing.TextIO, observer: Observer) -> None: 

388 """ 

389 Constructor. 

390 

391 :param text_io: Text stream to parse. 

392 :param observer: Observer to use. 

393 """ 

394 

395 self._tokenizer = JsonTokenizer(text_io) 

396 self._observer = observer 

397 

398 def parse(self) -> bool: 

399 """ 

400 Parses single JSON element from the text stream. 

401 

402 :returns: True when end-of-file is reached, False otherwise (i.e. another JSON element is present). 

403 :raises JsonParserException: When parsing fails. 

404 """ 

405 

406 if self._tokenizer.get_token() == JsonToken.BEGIN_OF_FILE: 

407 self._tokenizer.next() 

408 

409 if self._tokenizer.get_token() == JsonToken.END_OF_FILE: 

410 return True 

411 

412 self._parse_element() 

413 

414 return self._tokenizer.get_token() == JsonToken.END_OF_FILE 

415 

416 def get_line(self) -> int: 

417 """ 

418 Gets current line number. 

419 

420 :returns: Line number. 

421 """ 

422 

423 return self._tokenizer.get_line() 

424 

425 def get_column(self) -> int: 

426 """ 

427 Gets current column number. 

428 

429 :returns: Column number. 

430 """ 

431 

432 return self._tokenizer.get_column() 

433 

434 def _parse_element(self) -> None: 

435 token = self._tokenizer.get_token() 

436 if token == JsonToken.BEGIN_ARRAY: 

437 self._parse_array() 

438 elif token == JsonToken.BEGIN_OBJECT: 

439 self._parse_object() 

440 elif token == JsonToken.VALUE: 

441 self._parse_value() 

442 else: 

443 self._raise_unexpected_token(JsonParser.ELEMENT_TOKENS) 

444 

445 def _parse_array(self) -> None: 

446 self._observer.begin_array() 

447 token = self._tokenizer.next() 

448 

449 if token in JsonParser.ELEMENT_TOKENS: 

450 self._parse_elements() 

451 

452 self._consume_token(JsonToken.END_ARRAY) 

453 self._observer.end_array() 

454 

455 def _parse_elements(self) -> None: 

456 self._parse_element() 

457 while self._tokenizer.get_token() == JsonToken.ITEM_SEPARATOR: 

458 self._tokenizer.next() 

459 self._parse_element() 

460 

461 def _parse_object(self) -> None: 

462 self._observer.begin_object() 

463 token = self._tokenizer.next() 

464 if token == JsonToken.VALUE: 

465 self._parse_members() 

466 

467 self._consume_token(JsonToken.END_OBJECT) 

468 self._observer.end_object() 

469 

470 def _parse_members(self) -> None: 

471 self._parse_member() 

472 while self._tokenizer.get_token() == JsonToken.ITEM_SEPARATOR: 

473 self._tokenizer.next() 

474 self._parse_member() 

475 

476 def _parse_member(self) -> None: 

477 self._check_token(JsonToken.VALUE) 

478 key = self._tokenizer.get_value() 

479 if not isinstance(key, str): 

480 raise JsonParserException( 

481 f"JsonParser:{self.get_line()}:{self.get_column()}: " f"Key must be a string value!" 

482 ) 

483 

484 self._observer.visit_key(key) 

485 self._tokenizer.next() 

486 

487 self._consume_token(JsonToken.KEY_SEPARATOR) 

488 

489 self._parse_element() 

490 

491 def _parse_value(self) -> None: 

492 self._observer.visit_value(self._tokenizer.get_value()) 

493 self._tokenizer.next() 

494 

495 def _consume_token(self, token: JsonToken) -> None: 

496 self._check_token(token) 

497 self._tokenizer.next() 

498 

499 def _check_token(self, token: JsonToken) -> None: 

500 if self._tokenizer.get_token() != token: 

501 self._raise_unexpected_token([token]) 

502 

503 def _raise_unexpected_token(self, expecting: typing.List[JsonToken]) -> None: 

504 msg = ( 

505 f"JsonParser:{self.get_line()}:{self.get_column()}: " 

506 f"Unexpected token: {self._tokenizer.get_token()}" 

507 ) 

508 if self._tokenizer.get_value() is not None: 

509 msg += f" ('{self._tokenizer.get_value()}')" 

510 if len(expecting) == 1: 

511 msg += f", expecting {expecting[0]}!" 

512 else: 

513 msg += ", expecting one of [" + ", ".join([str(token) for token in expecting]) + "]!" 

514 

515 raise JsonParserException(msg) 

516 

517 ELEMENT_TOKENS = [JsonToken.BEGIN_OBJECT, JsonToken.BEGIN_ARRAY, JsonToken.VALUE] 

518 

519 

520class JsonDecoder: 

521 """ 

522 JSON value decoder. 

523 """ 

524 

525 @staticmethod 

526 def decode_value(content: str, pos: int) -> "JsonDecoder.Result": 

527 """ 

528 Decodes the JSON value from the string. 

529 

530 :param content: String which contains encoded JSON value. 

531 :param pos: Position from zero in content where the encoded JSON value begins. 

532 

533 :returns: Decoder result object. 

534 """ 

535 

536 if pos >= len(content): 

537 return JsonDecoder.Result.from_failure() 

538 

539 first_char = content[pos] 

540 if first_char == "n": 

541 return JsonDecoder._decode_literal(content, pos, "null", None) 

542 

543 if first_char == "t": 

544 return JsonDecoder._decode_literal(content, pos, "true", True) 

545 

546 if first_char == "f": 

547 return JsonDecoder._decode_literal(content, pos, "false", False) 

548 

549 if first_char == "N": 

550 return JsonDecoder._decode_literal(content, pos, "NaN", float("nan")) 

551 

552 if first_char == "I": 

553 return JsonDecoder._decode_literal(content, pos, "Infinity", float("inf")) 

554 

555 if first_char == '"': 

556 return JsonDecoder._decode_string(content, pos) 

557 

558 if first_char == "-": 

559 if pos + 1 >= len(content): 

560 return JsonDecoder.Result.from_failure(1) 

561 

562 second_char = content[pos + 1] 

563 if second_char == "I": 

564 return JsonDecoder._decode_literal(content, pos, "-Infinity", float("-inf")) 

565 

566 return JsonDecoder._decode_number(content, pos) 

567 

568 return JsonDecoder._decode_number(content, pos) 

569 

570 class Result: 

571 """ 

572 Decoder result value. 

573 """ 

574 

575 def __init__(self, success: bool, value: typing.Any, num_read_chars: int): 

576 """ 

577 Constructor. 

578 """ 

579 

580 self._success = success 

581 self._value = value 

582 self._num_read_chars = num_read_chars 

583 

584 @classmethod 

585 def from_failure(cls: typing.Type["JsonDecoder.Result"], num_read_chars: int = 0): 

586 """ 

587 Creates decoder result value in case of failure. 

588 

589 :param num_read_chars: Number of processed characters. 

590 """ 

591 instance = cls(False, None, num_read_chars) 

592 

593 return instance 

594 

595 @classmethod 

596 def from_success( 

597 cls: typing.Type["JsonDecoder.Result"], 

598 value: typing.Any, 

599 num_read_chars: int = 0, 

600 ): 

601 """ 

602 Creates decoder result value in case of success. 

603 

604 :param value: Decoded value. 

605 :param num_read_chars: Number of read characters. 

606 """ 

607 instance = cls(True, value, num_read_chars) 

608 

609 return instance 

610 

611 @property 

612 def success(self) -> bool: 

613 """ 

614 Gets the decoder result. 

615 

616 :returns: True in case of success, otherwise false. 

617 """ 

618 

619 return self._success 

620 

621 @property 

622 def value(self) -> typing.Any: 

623 """ 

624 Gets the decoded JSON value. 

625 

626 :returns: Decoded JSON value or None in case of failure. 

627 """ 

628 

629 return self._value 

630 

631 @property 

632 def num_read_chars(self) -> int: 

633 """ 

634 Gets the number of read characters from the string which contains encoded JSON value. 

635 

636 In case of failure, it returns the number of processed (read) characters. 

637 

638 :returns: Number of read characters. 

639 """ 

640 

641 return self._num_read_chars 

642 

643 @staticmethod 

644 def _decode_literal(content: str, pos: int, text: str, decoded_object) -> "JsonDecoder.Result": 

645 text_length = len(text) 

646 if pos + text_length > len(content): 

647 return JsonDecoder.Result.from_failure(len(content) - pos) 

648 

649 sub_content = content[pos : pos + text_length] 

650 if sub_content == text: 

651 return JsonDecoder.Result.from_success(decoded_object, text_length) 

652 

653 return JsonDecoder.Result.from_failure(text_length) 

654 

655 @staticmethod 

656 def _decode_string(content: str, pos: int) -> "JsonDecoder.Result": 

657 decoded_string = "" 

658 end_of_string_pos = pos + 1 # we know that at the beginning is '"' 

659 while True: 

660 if end_of_string_pos >= len(content): 

661 return JsonDecoder.Result.from_failure(end_of_string_pos - pos) 

662 

663 next_char = content[end_of_string_pos] 

664 end_of_string_pos += 1 

665 if next_char == "\\": 

666 if end_of_string_pos >= len(content): 

667 return JsonDecoder.Result.from_failure(end_of_string_pos - pos) 

668 

669 next_next_char = content[end_of_string_pos] 

670 end_of_string_pos += 1 

671 if next_next_char in ("\\", '"'): 

672 decoded_string += next_next_char 

673 elif next_next_char == "b": 

674 decoded_string += "\b" 

675 elif next_next_char == "f": 

676 decoded_string += "\f" 

677 elif next_next_char == "n": 

678 decoded_string += "\n" 

679 elif next_next_char == "r": 

680 decoded_string += "\r" 

681 elif next_next_char == "t": 

682 decoded_string += "\t" 

683 elif next_next_char == "u": # unicode escape 

684 unicode_escape_len = 4 

685 end_of_string_pos += unicode_escape_len 

686 if end_of_string_pos >= len(content): 

687 return JsonDecoder.Result.from_failure(len(content) - pos) 

688 sub_content = content[end_of_string_pos - unicode_escape_len - 2 : end_of_string_pos] 

689 decoded_unicode = JsonDecoder._decode_unicode_escape(sub_content) 

690 if decoded_unicode is not None: 

691 decoded_string += decoded_unicode 

692 else: 

693 return JsonDecoder.Result.from_failure(end_of_string_pos - pos) 

694 else: 

695 # unknown escape character, not decoded... 

696 return JsonDecoder.Result.from_failure(end_of_string_pos - pos) 

697 elif next_char == '"': 

698 break 

699 else: 

700 decoded_string += next_char 

701 

702 return JsonDecoder.Result.from_success(decoded_string, end_of_string_pos - pos) 

703 

704 @staticmethod 

705 def _decode_unicode_escape(content: str) -> typing.Optional[str]: 

706 try: 

707 return bytes(content, "ascii").decode("unicode-escape") 

708 except ValueError: 

709 return None 

710 

711 @staticmethod 

712 def _decode_number(content: str, pos: int) -> "JsonDecoder.Result": 

713 number_content, is_float = JsonDecoder._extract_number(content, pos) 

714 number_length = len(number_content) 

715 if number_length == 0: 

716 return JsonDecoder.Result.from_failure(1) 

717 

718 try: 

719 if is_float: 

720 float_number = float(number_content) 

721 return JsonDecoder.Result.from_success(float_number, number_length) 

722 else: 

723 int_number = int(number_content) 

724 return JsonDecoder.Result.from_success(int_number, number_length) 

725 except ValueError: 

726 return JsonDecoder.Result.from_failure(number_length) 

727 

728 @staticmethod 

729 def _extract_number(content: str, pos: int) -> typing.Tuple[str, bool]: 

730 end_of_number_pos = pos 

731 if content[end_of_number_pos] == "-": # we already know that there is something after '-' 

732 end_of_number_pos += 1 

733 accept_exp_sign = False 

734 is_scientific_float = False 

735 is_float = False 

736 while end_of_number_pos < len(content): 

737 next_char = content[end_of_number_pos] 

738 

739 if accept_exp_sign: 

740 accept_exp_sign = False 

741 if next_char in ("+", "-"): 

742 end_of_number_pos += 1 

743 continue 

744 

745 if next_char.isdigit(): 

746 end_of_number_pos += 1 

747 continue 

748 

749 if (next_char in ("e", "E")) and not is_scientific_float: 

750 end_of_number_pos += 1 

751 is_float = True 

752 is_scientific_float = True 

753 accept_exp_sign = True 

754 continue 

755 

756 if next_char == "." and not is_float: 

757 end_of_number_pos += 1 

758 is_float = True 

759 continue 

760 

761 break # pragma: no cover (to satisfy test coverage) 

762 

763 return content[pos:end_of_number_pos], is_float 

764 

765 

766class JsonTokenizer: 

767 """ 

768 Tokenizer used by JsonParser. 

769 """ 

770 

771 def __init__(self, text_io: typing.TextIO) -> None: 

772 """ 

773 Constructor. 

774 

775 :param text_io: Text stream to tokenize. 

776 """ 

777 self._io = text_io 

778 

779 self._content = self._io.read(JsonTokenizer.MAX_LINE_LEN) 

780 self._line_number = 1 

781 self._column_number = 1 

782 self._token_column_number = 1 

783 self._pos = 0 

784 self._set_token(JsonToken.BEGIN_OF_FILE if self._content else JsonToken.END_OF_FILE, None) 

785 self._decoder_result = JsonDecoder.Result.from_failure() 

786 

787 def next(self) -> JsonToken: 

788 """ 

789 Moves to next token. 

790 

791 :returns: Token. 

792 :raises JsonParserException: When unknown token is reached. 

793 """ 

794 

795 while not self._decode_next(): 

796 new_content = self._io.read(JsonTokenizer.MAX_LINE_LEN) 

797 if not new_content: 

798 if self._token == JsonToken.END_OF_FILE: 

799 self._token_column_number = self._column_number 

800 else: 

801 # stream is finished but last token is not EOF => value must be at the end 

802 self._set_token_value() 

803 

804 return self._token 

805 

806 self._content = self._content[self._pos :] 

807 self._content += new_content 

808 self._pos = 0 

809 

810 return self._token 

811 

812 def get_token(self) -> JsonToken: 

813 """ 

814 Gets current token. 

815 

816 :returns: Current token. 

817 """ 

818 return self._token 

819 

820 def get_value(self) -> typing.Any: 

821 """ 

822 Gets current value. 

823 

824 :returns: Current value. 

825 """ 

826 return self._value 

827 

828 def get_line(self) -> int: 

829 """ 

830 Gets line number of the current token. 

831 

832 :returns: Line number. 

833 """ 

834 return self._line_number 

835 

836 def get_column(self) -> int: 

837 """ 

838 Gets column number of the current token. 

839 

840 :returns: Column number. 

841 """ 

842 return self._token_column_number 

843 

844 def _decode_next(self) -> bool: 

845 if not self._skip_whitespaces(): 

846 return False 

847 

848 self._token_column_number = self._column_number 

849 

850 next_char = self._content[self._pos] 

851 if next_char == "{": 

852 self._set_token(JsonToken.BEGIN_OBJECT, next_char) 

853 self._set_position(self._pos + 1, self._column_number + 1) 

854 elif next_char == "}": 

855 self._set_token(JsonToken.END_OBJECT, next_char) 

856 self._set_position(self._pos + 1, self._column_number + 1) 

857 elif next_char == "[": 

858 self._set_token(JsonToken.BEGIN_ARRAY, next_char) 

859 self._set_position(self._pos + 1, self._column_number + 1) 

860 elif next_char == "]": 

861 self._set_token(JsonToken.END_ARRAY, next_char) 

862 self._set_position(self._pos + 1, self._column_number + 1) 

863 elif next_char == ":": 

864 self._set_token(JsonToken.KEY_SEPARATOR, next_char) 

865 self._set_position(self._pos + 1, self._column_number + 1) 

866 elif next_char == ",": 

867 self._set_token(JsonToken.ITEM_SEPARATOR, next_char) 

868 self._set_position(self._pos + 1, self._column_number + 1) 

869 else: 

870 self._decoder_result = JsonDecoder.decode_value(self._content, self._pos) 

871 if self._pos + self._decoder_result.num_read_chars >= len(self._content): 

872 return False # we are at the end of chunk => try to read more 

873 

874 self._set_token_value() 

875 

876 return True 

877 

878 def _skip_whitespaces(self) -> bool: 

879 while True: 

880 if self._pos >= len(self._content): 

881 self._set_token(JsonToken.END_OF_FILE, None) 

882 return False 

883 

884 next_char = self._content[self._pos] 

885 if next_char in (" ", "\t"): 

886 self._set_position(self._pos + 1, self._column_number + 1) 

887 elif next_char == "\n": 

888 self._line_number += 1 

889 self._set_position(self._pos + 1, 1) 

890 elif next_char == "\r": 

891 if self._pos + 1 >= len(self._content): 

892 self._set_token(JsonToken.END_OF_FILE, None) 

893 return False 

894 

895 next_next_char = self._content[self._pos + 1] 

896 self._line_number += 1 

897 self._set_position(self._pos + (2 if (next_next_char == "\n") else 1), 1) 

898 else: 

899 return True 

900 

901 def _set_token(self, new_token: JsonToken, new_value: typing.Any) -> None: 

902 self._token = new_token 

903 self._value = new_value 

904 

905 def _set_position(self, new_pos: int, new_column_number: int) -> None: 

906 self._pos = new_pos 

907 self._column_number = new_column_number 

908 

909 def _set_token_value(self) -> None: 

910 if not self._decoder_result.success: 

911 raise JsonParserException( 

912 f"JsonTokenizer:{self._line_number}:{self._token_column_number}: " f"Unknown token!" 

913 ) 

914 

915 self._set_token(JsonToken.VALUE, self._decoder_result.value) 

916 num_read_chars = self._decoder_result.num_read_chars 

917 self._set_position(self._pos + num_read_chars, self._column_number + num_read_chars) 

918 

919 MAX_LINE_LEN = 64 * 1024 

920 

921 

922class JsonReader: 

923 """ 

924 Reads zserio object tree defined by a type info from a text stream. 

925 """ 

926 

927 def __init__(self, text_io: typing.TextIO) -> None: 

928 """ 

929 Constructor. 

930 

931 :param text_io: Text stream to read. 

932 """ 

933 

934 self._creator_adapter = JsonReader._CreatorAdapter() 

935 self._parser = JsonParser(text_io, self._creator_adapter) 

936 

937 def read(self, type_info: TypeInfo, *arguments: typing.List[typing.Any]) -> typing.Any: 

938 """ 

939 Reads a zserio object tree defined by the given type info from the text steam. 

940 

941 :param type_info: Type info defining the expected zserio object tree. 

942 :param arguments: Arguments of type defining the expected zserio object tree. 

943 :returns: Zserio object tree initialized using the JSON data. 

944 :raises PythonRuntimeException: When the JSON doesn't contain expected zserio object tree. 

945 """ 

946 

947 self._creator_adapter.set_type(type_info, *arguments) 

948 

949 try: 

950 self._parser.parse() 

951 except JsonParserException: 

952 raise 

953 except PythonRuntimeException as err: 

954 raise PythonRuntimeException( 

955 f"{err} (JsonParser:" f"{self._parser.get_line()}:{self._parser.get_column()})" 

956 ) from err 

957 

958 return self._creator_adapter.get() 

959 

960 class _ObjectValueAdapter(JsonParser.Observer): 

961 """ 

962 Adapter for values which are encoded as Json objects. 

963 """ 

964 

965 def get(self) -> typing.Any: 

966 """ 

967 Gets the parsed value. 

968 """ 

969 

970 raise NotImplementedError() 

971 

972 class _BitBufferAdapter(_ObjectValueAdapter): 

973 """ 

974 The adapter which allows to parse Bit Buffer object from JSON. 

975 """ 

976 

977 def __init__(self) -> None: 

978 """ 

979 Constructor. 

980 """ 

981 

982 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY 

983 self._buffer: typing.Optional[typing.List[int]] = None 

984 self._bit_size: typing.Optional[int] = None 

985 

986 def get(self) -> BitBuffer: 

987 """ 

988 Gets the created Bit Buffer object. 

989 

990 :returns: Parsed Bit Buffer object. 

991 :raises PythonRuntimeException: In case of invalid use. 

992 """ 

993 

994 if self._buffer is None or self._bit_size is None: 

995 raise PythonRuntimeException("JsonReader: Unexpected end in Bit Buffer!") 

996 return BitBuffer(bytes(self._buffer), self._bit_size) 

997 

998 def begin_object(self) -> None: 

999 raise PythonRuntimeException("JsonReader: Unexpected begin object in Bit Buffer!") 

1000 

1001 def end_object(self) -> None: 

1002 raise PythonRuntimeException("JsonReader: Unexpected end object in Bit Buffer!") 

1003 

1004 def begin_array(self) -> None: 

1005 if self._state == JsonReader._BitBufferAdapter._State.BEGIN_ARRAY_BUFFER: 

1006 self._state = JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER 

1007 self._buffer = [] 

1008 else: 

1009 raise PythonRuntimeException("JsonReader: Unexpected begin array in Bit Buffer!") 

1010 

1011 def end_array(self) -> None: 

1012 if self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER: 

1013 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY 

1014 else: 

1015 raise PythonRuntimeException("JsonReader: Unexpected end array in Bit Buffer!") 

1016 

1017 def visit_key(self, key: str) -> None: 

1018 if self._state == JsonReader._BitBufferAdapter._State.VISIT_KEY: 

1019 if key == "buffer": 

1020 self._state = JsonReader._BitBufferAdapter._State.BEGIN_ARRAY_BUFFER 

1021 elif key == "bitSize": 

1022 self._state = JsonReader._BitBufferAdapter._State.VISIT_VALUE_BITSIZE 

1023 else: 

1024 raise PythonRuntimeException(f"JsonReader: Unknown key '{key}' in Bit Buffer!") 

1025 else: 

1026 raise PythonRuntimeException(f"JsonReader: Unexpected key '{key}' in Bit Buffer!") 

1027 

1028 def visit_value(self, value: typing.Any) -> None: 

1029 if ( 

1030 self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER 

1031 and self._buffer is not None 

1032 and isinstance(value, int) 

1033 ): 

1034 self._buffer.append(value) 

1035 elif self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BITSIZE and isinstance( 

1036 value, int 

1037 ): 

1038 self._bit_size = value 

1039 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY 

1040 else: 

1041 raise PythonRuntimeException(f"JsonReader: Unexpected value '{value}' in Bit Buffer!") 

1042 

1043 class _State(enum.Enum): 

1044 VISIT_KEY = enum.auto() 

1045 BEGIN_ARRAY_BUFFER = enum.auto() 

1046 VISIT_VALUE_BUFFER = enum.auto() 

1047 VISIT_VALUE_BITSIZE = enum.auto() 

1048 

1049 class _BytesAdapter(_ObjectValueAdapter): 

1050 """ 

1051 The adapter which allows to parse bytes object from JSON. 

1052 """ 

1053 

1054 def __init__(self) -> None: 

1055 """ 

1056 Constructor. 

1057 """ 

1058 

1059 self._state = JsonReader._BytesAdapter._State.VISIT_KEY 

1060 self._buffer: typing.Optional[bytearray] = None 

1061 

1062 def get(self) -> bytearray: 

1063 """ 

1064 Gets the created bytes object. 

1065 

1066 :returns: Parsed bytes object. 

1067 :raises PythonRuntimeException: In case of invalid use. 

1068 """ 

1069 

1070 if self._buffer is None: 

1071 raise PythonRuntimeException("JsonReader: Unexpected end in bytes!") 

1072 return self._buffer 

1073 

1074 def begin_object(self) -> None: 

1075 raise PythonRuntimeException("JsonReader: Unexpected begin object in bytes!") 

1076 

1077 def end_object(self) -> None: 

1078 raise PythonRuntimeException("JsonReader: Unexpected end object in bytes!") 

1079 

1080 def begin_array(self) -> None: 

1081 if self._state == JsonReader._BytesAdapter._State.BEGIN_ARRAY_BUFFER: 

1082 self._state = JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER 

1083 self._buffer = bytearray() 

1084 else: 

1085 raise PythonRuntimeException("JsonReader: Unexpected begin array in bytes!") 

1086 

1087 def end_array(self) -> None: 

1088 if self._state == JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER: 

1089 self._state = JsonReader._BytesAdapter._State.VISIT_KEY 

1090 else: 

1091 raise PythonRuntimeException("JsonReader: Unexpected end array in bytes!") 

1092 

1093 def visit_key(self, key: str) -> None: 

1094 if self._state == JsonReader._BytesAdapter._State.VISIT_KEY: 

1095 if key == "buffer": 

1096 self._state = JsonReader._BytesAdapter._State.BEGIN_ARRAY_BUFFER 

1097 else: 

1098 raise PythonRuntimeException(f"JsonReader: Unknown key '{key}' in bytes!") 

1099 else: 

1100 raise PythonRuntimeException(f"JsonReader: Unexpected key '{key}' in bytes!") 

1101 

1102 def visit_value(self, value: typing.Any) -> None: 

1103 if ( 

1104 self._state == JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER 

1105 and self._buffer is not None 

1106 and isinstance(value, int) 

1107 ): 

1108 self._buffer.append(value) 

1109 else: 

1110 raise PythonRuntimeException(f"JsonReader: Unexpected value '{value}' in bytes!") 

1111 

1112 class _State(enum.Enum): 

1113 VISIT_KEY = enum.auto() 

1114 BEGIN_ARRAY_BUFFER = enum.auto() 

1115 VISIT_VALUE_BUFFER = enum.auto() 

1116 

1117 class _CreatorAdapter(JsonParser.Observer): 

1118 """ 

1119 The adapter which allows to use ZserioTreeCreator as an JsonReader observer. 

1120 """ 

1121 

1122 def __init__(self) -> None: 

1123 """ 

1124 Constructor. 

1125 """ 

1126 

1127 self._creator: typing.Optional[ZserioTreeCreator] = None 

1128 self._key_stack: typing.List[str] = [] 

1129 self._object: typing.Any = None 

1130 self._object_value_adapter: typing.Optional[JsonReader._ObjectValueAdapter] = None 

1131 

1132 def set_type(self, type_info: TypeInfo, *arguments: typing.List[typing.Any]) -> None: 

1133 """ 

1134 Sets type which shall be created next. Resets the current object. 

1135 

1136 :param type_info: Type info of the type which is to be created. 

1137 :param arguments: Arguments of type defining the expected zserio object tree. 

1138 """ 

1139 

1140 self._creator = ZserioTreeCreator(type_info, *arguments) 

1141 self._object = None 

1142 

1143 def get(self) -> typing.Any: 

1144 """ 

1145 Gets the created zserio object tree. 

1146 

1147 :returns: Zserio object tree. 

1148 :raises PythonRuntimeException: In case of invalid use. 

1149 """ 

1150 

1151 if not self._object: 

1152 raise PythonRuntimeException("JsonReader: Zserio tree not created!") 

1153 

1154 return self._object 

1155 

1156 def begin_object(self) -> None: 

1157 if self._object_value_adapter: 

1158 self._object_value_adapter.begin_object() 

1159 else: 

1160 if not self._creator: 

1161 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1162 

1163 if not self._key_stack: 

1164 self._creator.begin_root() 

1165 else: 

1166 if self._key_stack[-1]: 

1167 schema_type = self._creator.get_field_type(self._key_stack[-1]).schema_name 

1168 if schema_type == "extern": 

1169 self._object_value_adapter = JsonReader._BitBufferAdapter() 

1170 elif schema_type == "bytes": 

1171 self._object_value_adapter = JsonReader._BytesAdapter() 

1172 else: 

1173 self._creator.begin_compound(self._key_stack[-1]) 

1174 else: 

1175 schema_type = self._creator.get_element_type().schema_name 

1176 if schema_type == "extern": 

1177 self._object_value_adapter = JsonReader._BitBufferAdapter() 

1178 elif schema_type == "bytes": 

1179 self._object_value_adapter = JsonReader._BytesAdapter() 

1180 else: 

1181 self._creator.begin_compound_element() 

1182 

1183 def end_object(self) -> None: 

1184 if self._object_value_adapter: 

1185 value = self._object_value_adapter.get() 

1186 self._object_value_adapter = None 

1187 self.visit_value(value) 

1188 else: 

1189 if not self._creator: 

1190 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1191 

1192 if not self._key_stack: 

1193 self._object = self._creator.end_root() 

1194 self._creator = None 

1195 else: 

1196 if self._key_stack[-1]: 

1197 self._creator.end_compound() 

1198 self._key_stack.pop() # finish member 

1199 else: 

1200 self._creator.end_compound_element() 

1201 

1202 def begin_array(self) -> None: 

1203 if self._object_value_adapter: 

1204 self._object_value_adapter.begin_array() 

1205 else: 

1206 if not self._creator: 

1207 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1208 

1209 if not self._key_stack: 

1210 raise PythonRuntimeException("JsonReader: ZserioTreeCreator expects json object!") 

1211 

1212 self._creator.begin_array(self._key_stack[-1]) 

1213 

1214 self._key_stack.append("") 

1215 

1216 def end_array(self) -> None: 

1217 if self._object_value_adapter: 

1218 self._object_value_adapter.end_array() 

1219 else: 

1220 if not self._creator: 

1221 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1222 

1223 self._creator.end_array() 

1224 

1225 self._key_stack.pop() # finish array 

1226 self._key_stack.pop() # finish member 

1227 

1228 def visit_key(self, key: str) -> None: 

1229 if self._object_value_adapter: 

1230 self._object_value_adapter.visit_key(key) 

1231 else: 

1232 if not self._creator: 

1233 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1234 

1235 self._key_stack.append(key) 

1236 

1237 def visit_value(self, value: typing.Any) -> None: 

1238 if self._object_value_adapter: 

1239 self._object_value_adapter.visit_value(value) 

1240 else: 

1241 if not self._creator: 

1242 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1243 

1244 if not self._key_stack: 

1245 raise PythonRuntimeException("JsonReader: ZserioTreeCreator expects json object!") 

1246 

1247 if self._key_stack[-1]: 

1248 expected_type_info = self._creator.get_field_type(self._key_stack[-1]) 

1249 self._creator.set_value( 

1250 self._key_stack[-1], 

1251 self._convert_value(value, expected_type_info), 

1252 ) 

1253 self._key_stack.pop() # finish member 

1254 else: 

1255 expected_type_info = self._creator.get_element_type() 

1256 self._creator.add_value_element(self._convert_value(value, expected_type_info)) 

1257 

1258 @staticmethod 

1259 def _convert_value( 

1260 value: typing.Any, type_info: typing.Union[TypeInfo, RecursiveTypeInfo] 

1261 ) -> typing.Any: 

1262 if value is None: 

1263 return None 

1264 

1265 if TypeAttribute.ENUM_ITEMS in type_info.attributes: 

1266 if isinstance(value, str): 

1267 return JsonReader._CreatorAdapter._enum_from_string(value, type_info) 

1268 else: 

1269 return type_info.py_type(value) 

1270 elif TypeAttribute.BITMASK_VALUES in type_info.attributes: 

1271 if isinstance(value, str): 

1272 return JsonReader._CreatorAdapter._bitmask_from_string(value, type_info) 

1273 else: 

1274 return type_info.py_type.from_value(value) 

1275 else: 

1276 return value 

1277 

1278 @staticmethod 

1279 def _enum_from_string( 

1280 string_value: str, type_info: typing.Union[TypeInfo, RecursiveTypeInfo] 

1281 ) -> typing.Any: 

1282 if string_value: 

1283 first_char = string_value[0] 

1284 if ("A" <= first_char <= "Z") or ("a" <= first_char <= "z") or first_char == "_": 

1285 py_item = JsonReader._CreatorAdapter._parse_enum_string_value(string_value, type_info) 

1286 if py_item is not None: 

1287 return py_item 

1288 # else it's a no match 

1289 

1290 raise PythonRuntimeException( 

1291 f"JsonReader: Cannot create enum '{type_info.schema_name}' " 

1292 f"from string value '{string_value}'!" 

1293 ) 

1294 

1295 @staticmethod 

1296 def _bitmask_from_string( 

1297 string_value: str, type_info: typing.Union[TypeInfo, RecursiveTypeInfo] 

1298 ) -> typing.Any: 

1299 if string_value: 

1300 first_char = string_value[0] 

1301 if ("A" <= first_char <= "Z") or ("a" <= first_char <= "z") or first_char == "_": 

1302 value = JsonReader._CreatorAdapter._parse_bitmask_string_value(string_value, type_info) 

1303 if value is not None: 

1304 return type_info.py_type.from_value(value) 

1305 elif "0" <= first_char <= "9": # bitmask can be only unsigned 

1306 value = JsonReader._CreatorAdapter._parse_bitmask_numeric_string_value(string_value) 

1307 if value is not None: 

1308 return type_info.py_type.from_value(value) 

1309 

1310 raise PythonRuntimeException( 

1311 f"JsonReader: Cannot create bitmask '{type_info.schema_name}' " 

1312 f"from string value '{string_value}'!" 

1313 ) 

1314 

1315 @staticmethod 

1316 def _parse_enum_string_value( 

1317 string_value: str, type_info: typing.Union[TypeInfo, RecursiveTypeInfo] 

1318 ) -> typing.Any: 

1319 for item_info in type_info.attributes[TypeAttribute.ENUM_ITEMS]: 

1320 if string_value == item_info.schema_name: 

1321 return item_info.py_item 

1322 

1323 return None 

1324 

1325 @staticmethod 

1326 def _parse_bitmask_string_value( 

1327 string_value: str, type_info: typing.Union[TypeInfo, RecursiveTypeInfo] 

1328 ) -> typing.Any: 

1329 value = 0 

1330 identifiers = string_value.split("|") 

1331 for identifier_with_spaces in identifiers: 

1332 identifier = identifier_with_spaces.strip() 

1333 match = False 

1334 for item_info in type_info.attributes[TypeAttribute.BITMASK_VALUES]: 

1335 if identifier == item_info.schema_name: 

1336 match = True 

1337 value |= item_info.py_item.value 

1338 break 

1339 

1340 if not match: 

1341 return None 

1342 

1343 return value 

1344 

1345 @staticmethod 

1346 def _parse_bitmask_numeric_string_value(string_value: str): 

1347 number_len = 1 

1348 while string_value[number_len] >= "0" and string_value[number_len] <= "9": 

1349 number_len += 1 

1350 

1351 return int(string_value[0:number_len])