Coverage for /home/runner/work/zserio/zserio/compiler/extensions/python/runtime/src/zserio/json.py: 100%
727 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-12-05 10:43 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-12-05 10:43 +0000
1"""
2The module implements WalkObserver for writing of zserio objects to JSON format.
3"""
5import enum
6import io
7import json
8import typing
10from zserio.bitbuffer import BitBuffer
11from zserio.creator import ZserioTreeCreator
12from zserio.exception import PythonRuntimeException
13from zserio.typeinfo import TypeInfo, RecursiveTypeInfo, TypeAttribute, MemberInfo
14from zserio.walker import WalkObserver
17class JsonEnumerableFormat(enum.Enum):
18 """
19 Configuration for writing of enumerable types.
20 """
22 #: Print as JSON integral value.
23 NUMBER = enum.auto()
24 #: Print as JSON string according to the following rules:
25 #:
26 #: #. Enums
27 #:
28 #: * when an exact match with an enumerable item is found, the item name is used - e.g. "FIRST",
29 #: * when no exact match is found, it's an invalid value, the integral value is converted to string
30 #: and an appropriate comment is included - e.g. "10 /\* no match \*/".
31 #:
32 #: #. Bitmasks
33 #:
34 #: * when an exact mach with or-ed bitmask values is found, it's used - e.g. "READ | WRITE",
35 #: * when no exact match is found, but some or-ed values match, the integral value is converted
36 #: to string and the or-ed values are included in a comment - e.g. "127 /\* READ | CREATE \*/",
37 #: * when no match is found at all, the integral value is converted to string and an appropriate
38 #: comment is included - e.g. "13 /\* no match \*/".
39 STRING = enum.auto()
42class JsonWriter(WalkObserver):
43 """
44 Walker observer which dumps zserio objects to JSON format.
45 """
47 def __init__(
48 self,
49 *,
50 text_io: typing.Optional[typing.TextIO] = None,
51 enumerable_format: JsonEnumerableFormat = JsonEnumerableFormat.STRING,
52 item_separator: typing.Optional[str] = None,
53 key_separator: typing.Optional[str] = None,
54 indent: typing.Union[str, int] = None,
55 ) -> None:
56 """
57 Constructor.
59 :param text_io: Optional text stream for JSON output, io.StringIO is used by default.
60 :param item_separator: Optional item separator, default is ', ' if indent is None, ',' otherwise.
61 :param key_separator: Optional key separator, default is ': '.
62 :param enumerable_format: Optional enumerable format to use, default is JsonEnumerableFormat.STRING.
63 :param indent: String or (non-negative) integer defining the indent. If not None, newlines are inserted.
64 """
66 self._io: typing.TextIO = text_io if text_io else io.StringIO()
67 self._item_separator: str = item_separator if item_separator else ("," if indent is not None else ", ")
68 self._key_separator: str = key_separator if key_separator else ": "
69 self._enumerable_format = enumerable_format
71 self._indent: typing.Optional[str] = (
72 (indent if isinstance(indent, str) else " " * indent) if indent is not None else None
73 )
75 self._is_first = True
76 self._level = 0
77 self._json_encoder = JsonEncoder()
79 def get_io(self) -> typing.TextIO:
80 """
81 Gets the underlying text stream.
83 :returns: Underlying text steam.
84 """
86 return self._io
88 def begin_root(self, _compound: typing.Any) -> None:
89 self._begin_object()
91 def end_root(self, _compound: typing.Any) -> None:
92 self._end_object()
94 def begin_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None:
95 self._begin_item()
97 self._write_key(member_info.schema_name)
99 self._begin_array()
101 def end_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None:
102 self._end_array()
104 self._end_item()
106 def begin_compound(
107 self,
108 compound: typing.Any,
109 member_info: MemberInfo,
110 element_index: typing.Optional[int] = None,
111 ) -> None:
112 self._begin_item()
114 if element_index is None:
115 self._write_key(member_info.schema_name)
117 self._begin_object()
119 def end_compound(
120 self,
121 compound: typing.Any,
122 member_info: MemberInfo,
123 _element_index: typing.Optional[int] = None,
124 ) -> None:
125 self._end_object()
127 self._end_item()
129 def visit_value(
130 self,
131 value: typing.Any,
132 member_info: MemberInfo,
133 element_index: typing.Optional[int] = None,
134 ) -> None:
135 self._begin_item()
137 if element_index is None:
138 self._write_key(member_info.schema_name)
140 self._write_value(value, member_info)
142 self._end_item()
144 def _begin_item(self):
145 if not self._is_first:
146 self._io.write(self._item_separator)
148 if self._indent is not None:
149 self._io.write("\n")
150 if self._indent:
151 self._io.write(self._indent * self._level)
153 def _end_item(self):
154 self._is_first = False
156 def _begin_object(self):
157 self._io.write("{")
159 self._is_first = True
160 self._level += 1
162 def _end_object(self):
163 if self._indent is not None:
164 self._io.write("\n")
165 self._level -= 1
166 if self._indent:
167 self._io.write(self._indent * self._level)
169 self._io.write("}")
171 def _begin_array(self):
172 self._io.write("[")
174 self._is_first = True
175 self._level += 1
177 def _end_array(self):
178 if self._indent is not None:
179 self._io.write("\n")
180 self._level -= 1
181 if self._indent:
182 self._io.write(self._indent * self._level)
184 self._io.write("]")
186 def _write_key(self, key: str) -> None:
187 self._io.write(f"{self._json_encoder.encode_value(key)}{self._key_separator}")
189 def _write_value(self, value: typing.Any, member_info: MemberInfo) -> None:
190 if value is None:
191 self._io.write(self._json_encoder.encode_value(None))
192 return
194 type_info = member_info.type_info
195 if type_info.schema_name == "extern":
196 self._write_bitbuffer(value)
197 elif type_info.schema_name == "bytes":
198 self._write_bytes(value)
199 else:
200 if TypeAttribute.ENUM_ITEMS in type_info.attributes:
201 if self._enumerable_format == JsonEnumerableFormat.STRING:
202 self._write_stringified_enum(value, type_info)
203 else:
204 self._io.write(self._json_encoder.encode_value(value.value))
205 elif TypeAttribute.BITMASK_VALUES in type_info.attributes:
206 if self._enumerable_format == JsonEnumerableFormat.STRING:
207 self._write_stringified_bitmask(value, type_info)
208 else:
209 self._io.write(self._json_encoder.encode_value(value.value))
210 else:
211 self._io.write(self._json_encoder.encode_value(value))
213 def _write_bitbuffer(self, value: BitBuffer) -> None:
214 self._begin_object()
215 self._begin_item()
216 self._write_key("buffer")
217 self._begin_array()
218 for byte in value.buffer:
219 self._begin_item()
220 self._io.write(self._json_encoder.encode_value(byte))
221 self._end_item()
222 self._end_array()
223 self._end_item()
224 self._begin_item()
225 self._write_key("bitSize")
226 self._io.write(self._json_encoder.encode_value(value.bitsize))
227 self._end_item()
228 self._end_object()
230 def _write_bytes(self, value: bytes) -> None:
231 self._begin_object()
232 self._begin_item()
233 self._write_key("buffer")
234 self._begin_array()
235 for byte in value:
236 self._begin_item()
237 self._io.write(self._json_encoder.encode_value(byte))
238 self._end_item()
239 self._end_array()
240 self._end_item()
241 self._end_object()
243 def _write_stringified_enum(
244 self, value: typing.Any, type_info: typing.Union[TypeInfo, RecursiveTypeInfo]
245 ) -> typing.Any:
246 for item in type_info.attributes[TypeAttribute.ENUM_ITEMS]:
247 if item.py_item == value:
248 # exact match
249 self._io.write(self._json_encoder.encode_value(item.schema_name))
250 return
252 # no match
253 self._io.write(self._json_encoder.encode_value(str(value.value) + " /* no match */"))
255 def _write_stringified_bitmask(
256 self, value: typing.Any, type_info: typing.Union[TypeInfo, RecursiveTypeInfo]
257 ) -> typing.Any:
258 string_value = ""
259 bitmask_value = value.value
260 value_check = 0
262 for item_info in type_info.attributes[TypeAttribute.BITMASK_VALUES]:
263 is_zero = item_info.py_item.value == 0
264 if (not is_zero and (bitmask_value & item_info.py_item.value == item_info.py_item.value)) or (
265 is_zero and bitmask_value == 0
266 ):
267 value_check |= item_info.py_item.value
268 if string_value:
269 string_value += " | "
270 string_value += item_info.schema_name
272 if not string_value:
273 # no match
274 string_value += str(bitmask_value) + " /* no match */"
275 elif bitmask_value != value_check:
276 # partial match
277 string_value = str(bitmask_value) + " /* partial match: " + string_value + " */"
278 # else exact match
280 self._io.write(self._json_encoder.encode_value(string_value))
283class JsonEncoder:
284 """
285 Converts zserio values to Json string representation.
286 """
288 def __init__(self) -> None:
289 """
290 Constructor.
291 """
293 self._encoder = json.JSONEncoder(ensure_ascii=False)
295 def encode_value(self, value: typing.Any) -> str:
296 """
297 Encodes value to JSON string representation.
299 :param value: Value to encode.
301 :returns: Value encoded to string as a valid JSON value.
302 """
304 return self._encoder.encode(value)
307class JsonToken(enum.Enum):
308 """
309 Tokens used by Json Tokenizer.
310 """
312 BEGIN_OF_FILE = enum.auto()
313 END_OF_FILE = enum.auto()
314 BEGIN_OBJECT = enum.auto()
315 END_OBJECT = enum.auto()
316 BEGIN_ARRAY = enum.auto()
317 END_ARRAY = enum.auto()
318 KEY_SEPARATOR = enum.auto()
319 ITEM_SEPARATOR = enum.auto()
320 VALUE = enum.auto()
323class JsonParserException(PythonRuntimeException):
324 """
325 Exception used to distinguish exceptions from the JsonParser.
326 """
329class JsonParser:
330 """
331 Json Parser.
333 Parses the JSON on the fly and calls an observer.
334 """
336 class Observer:
337 """
338 Json parser observer.
339 """
341 def begin_object(self) -> None:
342 """
343 Called when a JSON object begins - i.e. on '{'.
344 """
346 raise NotImplementedError()
348 def end_object(self) -> None:
349 """
350 Called when a JSON object ends - i.e. on '}'.
351 """
353 raise NotImplementedError()
355 def begin_array(self) -> None:
356 """
357 Called when a JSON array begins - i.e. on '['.
358 """
360 raise NotImplementedError()
362 def end_array(self) -> None:
363 """
364 Called when a JSON array ends - i.e. on ']'.
365 """
367 raise NotImplementedError()
369 def visit_key(self, key: str) -> None:
370 """
371 Called on a JSON key.
373 :param key: Key value.
374 """
376 raise NotImplementedError()
378 def visit_value(self, value: typing.Any) -> None:
379 """
380 Called on a JSON value.
382 :param value: JSON value.
383 """
385 raise NotImplementedError()
387 def __init__(self, text_io: typing.TextIO, observer: Observer) -> None:
388 """
389 Constructor.
391 :param text_io: Text stream to parse.
392 :param observer: Observer to use.
393 """
395 self._tokenizer = JsonTokenizer(text_io)
396 self._observer = observer
398 def parse(self) -> bool:
399 """
400 Parses single JSON element from the text stream.
402 :returns: True when end-of-file is reached, False otherwise (i.e. another JSON element is present).
403 :raises JsonParserException: When parsing fails.
404 """
406 if self._tokenizer.get_token() == JsonToken.BEGIN_OF_FILE:
407 self._tokenizer.next()
409 if self._tokenizer.get_token() == JsonToken.END_OF_FILE:
410 return True
412 self._parse_element()
414 return self._tokenizer.get_token() == JsonToken.END_OF_FILE
416 def get_line(self) -> int:
417 """
418 Gets current line number.
420 :returns: Line number.
421 """
423 return self._tokenizer.get_line()
425 def get_column(self) -> int:
426 """
427 Gets current column number.
429 :returns: Column number.
430 """
432 return self._tokenizer.get_column()
434 def _parse_element(self) -> None:
435 token = self._tokenizer.get_token()
436 if token == JsonToken.BEGIN_ARRAY:
437 self._parse_array()
438 elif token == JsonToken.BEGIN_OBJECT:
439 self._parse_object()
440 elif token == JsonToken.VALUE:
441 self._parse_value()
442 else:
443 self._raise_unexpected_token(JsonParser.ELEMENT_TOKENS)
445 def _parse_array(self) -> None:
446 self._observer.begin_array()
447 token = self._tokenizer.next()
449 if token in JsonParser.ELEMENT_TOKENS:
450 self._parse_elements()
452 self._consume_token(JsonToken.END_ARRAY)
453 self._observer.end_array()
455 def _parse_elements(self) -> None:
456 self._parse_element()
457 while self._tokenizer.get_token() == JsonToken.ITEM_SEPARATOR:
458 self._tokenizer.next()
459 self._parse_element()
461 def _parse_object(self) -> None:
462 self._observer.begin_object()
463 token = self._tokenizer.next()
464 if token == JsonToken.VALUE:
465 self._parse_members()
467 self._consume_token(JsonToken.END_OBJECT)
468 self._observer.end_object()
470 def _parse_members(self) -> None:
471 self._parse_member()
472 while self._tokenizer.get_token() == JsonToken.ITEM_SEPARATOR:
473 self._tokenizer.next()
474 self._parse_member()
476 def _parse_member(self) -> None:
477 self._check_token(JsonToken.VALUE)
478 key = self._tokenizer.get_value()
479 if not isinstance(key, str):
480 raise JsonParserException(
481 f"JsonParser:{self.get_line()}:{self.get_column()}: " f"Key must be a string value!"
482 )
484 self._observer.visit_key(key)
485 self._tokenizer.next()
487 self._consume_token(JsonToken.KEY_SEPARATOR)
489 self._parse_element()
491 def _parse_value(self) -> None:
492 self._observer.visit_value(self._tokenizer.get_value())
493 self._tokenizer.next()
495 def _consume_token(self, token: JsonToken) -> None:
496 self._check_token(token)
497 self._tokenizer.next()
499 def _check_token(self, token: JsonToken) -> None:
500 if self._tokenizer.get_token() != token:
501 self._raise_unexpected_token([token])
503 def _raise_unexpected_token(self, expecting: typing.List[JsonToken]) -> None:
504 msg = (
505 f"JsonParser:{self.get_line()}:{self.get_column()}: "
506 f"Unexpected token: {self._tokenizer.get_token()}"
507 )
508 if self._tokenizer.get_value() is not None:
509 msg += f" ('{self._tokenizer.get_value()}')"
510 if len(expecting) == 1:
511 msg += f", expecting {expecting[0]}!"
512 else:
513 msg += ", expecting one of [" + ", ".join([str(token) for token in expecting]) + "]!"
515 raise JsonParserException(msg)
517 ELEMENT_TOKENS = [JsonToken.BEGIN_OBJECT, JsonToken.BEGIN_ARRAY, JsonToken.VALUE]
520class JsonDecoder:
521 """
522 JSON value decoder.
523 """
525 @staticmethod
526 def decode_value(content: str, pos: int) -> "JsonDecoder.Result":
527 """
528 Decodes the JSON value from the string.
530 :param content: String which contains encoded JSON value.
531 :param pos: Position from zero in content where the encoded JSON value begins.
533 :returns: Decoder result object.
534 """
536 if pos >= len(content):
537 return JsonDecoder.Result.from_failure()
539 first_char = content[pos]
540 if first_char == "n":
541 return JsonDecoder._decode_literal(content, pos, "null", None)
543 if first_char == "t":
544 return JsonDecoder._decode_literal(content, pos, "true", True)
546 if first_char == "f":
547 return JsonDecoder._decode_literal(content, pos, "false", False)
549 if first_char == "N":
550 return JsonDecoder._decode_literal(content, pos, "NaN", float("nan"))
552 if first_char == "I":
553 return JsonDecoder._decode_literal(content, pos, "Infinity", float("inf"))
555 if first_char == '"':
556 return JsonDecoder._decode_string(content, pos)
558 if first_char == "-":
559 if pos + 1 >= len(content):
560 return JsonDecoder.Result.from_failure(1)
562 second_char = content[pos + 1]
563 if second_char == "I":
564 return JsonDecoder._decode_literal(content, pos, "-Infinity", float("-inf"))
566 return JsonDecoder._decode_number(content, pos)
568 return JsonDecoder._decode_number(content, pos)
570 class Result:
571 """
572 Decoder result value.
573 """
575 def __init__(self, success: bool, value: typing.Any, num_read_chars: int):
576 """
577 Constructor.
578 """
580 self._success = success
581 self._value = value
582 self._num_read_chars = num_read_chars
584 @classmethod
585 def from_failure(cls: typing.Type["JsonDecoder.Result"], num_read_chars: int = 0):
586 """
587 Creates decoder result value in case of failure.
589 :param num_read_chars: Number of processed characters.
590 """
591 instance = cls(False, None, num_read_chars)
593 return instance
595 @classmethod
596 def from_success(
597 cls: typing.Type["JsonDecoder.Result"],
598 value: typing.Any,
599 num_read_chars: int = 0,
600 ):
601 """
602 Creates decoder result value in case of success.
604 :param value: Decoded value.
605 :param num_read_chars: Number of read characters.
606 """
607 instance = cls(True, value, num_read_chars)
609 return instance
611 @property
612 def success(self) -> bool:
613 """
614 Gets the decoder result.
616 :returns: True in case of success, otherwise false.
617 """
619 return self._success
621 @property
622 def value(self) -> typing.Any:
623 """
624 Gets the decoded JSON value.
626 :returns: Decoded JSON value or None in case of failure.
627 """
629 return self._value
631 @property
632 def num_read_chars(self) -> int:
633 """
634 Gets the number of read characters from the string which contains encoded JSON value.
636 In case of failure, it returns the number of processed (read) characters.
638 :returns: Number of read characters.
639 """
641 return self._num_read_chars
643 @staticmethod
644 def _decode_literal(content: str, pos: int, text: str, decoded_object) -> "JsonDecoder.Result":
645 text_length = len(text)
646 if pos + text_length > len(content):
647 return JsonDecoder.Result.from_failure(len(content) - pos)
649 sub_content = content[pos : pos + text_length]
650 if sub_content == text:
651 return JsonDecoder.Result.from_success(decoded_object, text_length)
653 return JsonDecoder.Result.from_failure(text_length)
655 @staticmethod
656 def _decode_string(content: str, pos: int) -> "JsonDecoder.Result":
657 decoded_string = ""
658 end_of_string_pos = pos + 1 # we know that at the beginning is '"'
659 while True:
660 if end_of_string_pos >= len(content):
661 return JsonDecoder.Result.from_failure(end_of_string_pos - pos)
663 next_char = content[end_of_string_pos]
664 end_of_string_pos += 1
665 if next_char == "\\":
666 if end_of_string_pos >= len(content):
667 return JsonDecoder.Result.from_failure(end_of_string_pos - pos)
669 next_next_char = content[end_of_string_pos]
670 end_of_string_pos += 1
671 if next_next_char in ("\\", '"'):
672 decoded_string += next_next_char
673 elif next_next_char == "b":
674 decoded_string += "\b"
675 elif next_next_char == "f":
676 decoded_string += "\f"
677 elif next_next_char == "n":
678 decoded_string += "\n"
679 elif next_next_char == "r":
680 decoded_string += "\r"
681 elif next_next_char == "t":
682 decoded_string += "\t"
683 elif next_next_char == "u": # unicode escape
684 unicode_escape_len = 4
685 end_of_string_pos += unicode_escape_len
686 if end_of_string_pos >= len(content):
687 return JsonDecoder.Result.from_failure(len(content) - pos)
688 sub_content = content[end_of_string_pos - unicode_escape_len - 2 : end_of_string_pos]
689 decoded_unicode = JsonDecoder._decode_unicode_escape(sub_content)
690 if decoded_unicode is not None:
691 decoded_string += decoded_unicode
692 else:
693 return JsonDecoder.Result.from_failure(end_of_string_pos - pos)
694 else:
695 # unknown escape character, not decoded...
696 return JsonDecoder.Result.from_failure(end_of_string_pos - pos)
697 elif next_char == '"':
698 break
699 else:
700 decoded_string += next_char
702 return JsonDecoder.Result.from_success(decoded_string, end_of_string_pos - pos)
704 @staticmethod
705 def _decode_unicode_escape(content: str) -> typing.Optional[str]:
706 try:
707 return bytes(content, "ascii").decode("unicode-escape")
708 except ValueError:
709 return None
711 @staticmethod
712 def _decode_number(content: str, pos: int) -> "JsonDecoder.Result":
713 number_content, is_float = JsonDecoder._extract_number(content, pos)
714 number_length = len(number_content)
715 if number_length == 0:
716 return JsonDecoder.Result.from_failure(1)
718 try:
719 if is_float:
720 float_number = float(number_content)
721 return JsonDecoder.Result.from_success(float_number, number_length)
722 else:
723 int_number = int(number_content)
724 return JsonDecoder.Result.from_success(int_number, number_length)
725 except ValueError:
726 return JsonDecoder.Result.from_failure(number_length)
728 @staticmethod
729 def _extract_number(content: str, pos: int) -> typing.Tuple[str, bool]:
730 end_of_number_pos = pos
731 if content[end_of_number_pos] == "-": # we already know that there is something after '-'
732 end_of_number_pos += 1
733 accept_exp_sign = False
734 is_scientific_float = False
735 is_float = False
736 while end_of_number_pos < len(content):
737 next_char = content[end_of_number_pos]
739 if accept_exp_sign:
740 accept_exp_sign = False
741 if next_char in ("+", "-"):
742 end_of_number_pos += 1
743 continue
745 if next_char.isdigit():
746 end_of_number_pos += 1
747 continue
749 if (next_char in ("e", "E")) and not is_scientific_float:
750 end_of_number_pos += 1
751 is_float = True
752 is_scientific_float = True
753 accept_exp_sign = True
754 continue
756 if next_char == "." and not is_float:
757 end_of_number_pos += 1
758 is_float = True
759 continue
761 break # pragma: no cover (to satisfy test coverage)
763 return content[pos:end_of_number_pos], is_float
766class JsonTokenizer:
767 """
768 Tokenizer used by JsonParser.
769 """
771 def __init__(self, text_io: typing.TextIO) -> None:
772 """
773 Constructor.
775 :param text_io: Text stream to tokenize.
776 """
777 self._io = text_io
779 self._content = self._io.read(JsonTokenizer.MAX_LINE_LEN)
780 self._line_number = 1
781 self._column_number = 1
782 self._token_column_number = 1
783 self._pos = 0
784 self._set_token(JsonToken.BEGIN_OF_FILE if self._content else JsonToken.END_OF_FILE, None)
785 self._decoder_result = JsonDecoder.Result.from_failure()
787 def next(self) -> JsonToken:
788 """
789 Moves to next token.
791 :returns: Token.
792 :raises JsonParserException: When unknown token is reached.
793 """
795 while not self._decode_next():
796 new_content = self._io.read(JsonTokenizer.MAX_LINE_LEN)
797 if not new_content:
798 if self._token == JsonToken.END_OF_FILE:
799 self._token_column_number = self._column_number
800 else:
801 # stream is finished but last token is not EOF => value must be at the end
802 self._set_token_value()
804 return self._token
806 self._content = self._content[self._pos :]
807 self._content += new_content
808 self._pos = 0
810 return self._token
812 def get_token(self) -> JsonToken:
813 """
814 Gets current token.
816 :returns: Current token.
817 """
818 return self._token
820 def get_value(self) -> typing.Any:
821 """
822 Gets current value.
824 :returns: Current value.
825 """
826 return self._value
828 def get_line(self) -> int:
829 """
830 Gets line number of the current token.
832 :returns: Line number.
833 """
834 return self._line_number
836 def get_column(self) -> int:
837 """
838 Gets column number of the current token.
840 :returns: Column number.
841 """
842 return self._token_column_number
844 def _decode_next(self) -> bool:
845 if not self._skip_whitespaces():
846 return False
848 self._token_column_number = self._column_number
850 next_char = self._content[self._pos]
851 if next_char == "{":
852 self._set_token(JsonToken.BEGIN_OBJECT, next_char)
853 self._set_position(self._pos + 1, self._column_number + 1)
854 elif next_char == "}":
855 self._set_token(JsonToken.END_OBJECT, next_char)
856 self._set_position(self._pos + 1, self._column_number + 1)
857 elif next_char == "[":
858 self._set_token(JsonToken.BEGIN_ARRAY, next_char)
859 self._set_position(self._pos + 1, self._column_number + 1)
860 elif next_char == "]":
861 self._set_token(JsonToken.END_ARRAY, next_char)
862 self._set_position(self._pos + 1, self._column_number + 1)
863 elif next_char == ":":
864 self._set_token(JsonToken.KEY_SEPARATOR, next_char)
865 self._set_position(self._pos + 1, self._column_number + 1)
866 elif next_char == ",":
867 self._set_token(JsonToken.ITEM_SEPARATOR, next_char)
868 self._set_position(self._pos + 1, self._column_number + 1)
869 else:
870 self._decoder_result = JsonDecoder.decode_value(self._content, self._pos)
871 if self._pos + self._decoder_result.num_read_chars >= len(self._content):
872 return False # we are at the end of chunk => try to read more
874 self._set_token_value()
876 return True
878 def _skip_whitespaces(self) -> bool:
879 while True:
880 if self._pos >= len(self._content):
881 self._set_token(JsonToken.END_OF_FILE, None)
882 return False
884 next_char = self._content[self._pos]
885 if next_char in (" ", "\t"):
886 self._set_position(self._pos + 1, self._column_number + 1)
887 elif next_char == "\n":
888 self._line_number += 1
889 self._set_position(self._pos + 1, 1)
890 elif next_char == "\r":
891 if self._pos + 1 >= len(self._content):
892 self._set_token(JsonToken.END_OF_FILE, None)
893 return False
895 next_next_char = self._content[self._pos + 1]
896 self._line_number += 1
897 self._set_position(self._pos + (2 if (next_next_char == "\n") else 1), 1)
898 else:
899 return True
901 def _set_token(self, new_token: JsonToken, new_value: typing.Any) -> None:
902 self._token = new_token
903 self._value = new_value
905 def _set_position(self, new_pos: int, new_column_number: int) -> None:
906 self._pos = new_pos
907 self._column_number = new_column_number
909 def _set_token_value(self) -> None:
910 if not self._decoder_result.success:
911 raise JsonParserException(
912 f"JsonTokenizer:{self._line_number}:{self._token_column_number}: " f"Unknown token!"
913 )
915 self._set_token(JsonToken.VALUE, self._decoder_result.value)
916 num_read_chars = self._decoder_result.num_read_chars
917 self._set_position(self._pos + num_read_chars, self._column_number + num_read_chars)
919 MAX_LINE_LEN = 64 * 1024
922class JsonReader:
923 """
924 Reads zserio object tree defined by a type info from a text stream.
925 """
927 def __init__(self, text_io: typing.TextIO) -> None:
928 """
929 Constructor.
931 :param text_io: Text stream to read.
932 """
934 self._creator_adapter = JsonReader._CreatorAdapter()
935 self._parser = JsonParser(text_io, self._creator_adapter)
937 def read(self, type_info: TypeInfo, *arguments: typing.List[typing.Any]) -> typing.Any:
938 """
939 Reads a zserio object tree defined by the given type info from the text steam.
941 :param type_info: Type info defining the expected zserio object tree.
942 :param arguments: Arguments of type defining the expected zserio object tree.
943 :returns: Zserio object tree initialized using the JSON data.
944 :raises PythonRuntimeException: When the JSON doesn't contain expected zserio object tree.
945 """
947 self._creator_adapter.set_type(type_info, *arguments)
949 try:
950 self._parser.parse()
951 except JsonParserException:
952 raise
953 except PythonRuntimeException as err:
954 raise PythonRuntimeException(
955 f"{err} (JsonParser:" f"{self._parser.get_line()}:{self._parser.get_column()})"
956 ) from err
958 return self._creator_adapter.get()
960 class _ObjectValueAdapter(JsonParser.Observer):
961 """
962 Adapter for values which are encoded as Json objects.
963 """
965 def get(self) -> typing.Any:
966 """
967 Gets the parsed value.
968 """
970 raise NotImplementedError()
972 class _BitBufferAdapter(_ObjectValueAdapter):
973 """
974 The adapter which allows to parse Bit Buffer object from JSON.
975 """
977 def __init__(self) -> None:
978 """
979 Constructor.
980 """
982 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY
983 self._buffer: typing.Optional[typing.List[int]] = None
984 self._bit_size: typing.Optional[int] = None
986 def get(self) -> BitBuffer:
987 """
988 Gets the created Bit Buffer object.
990 :returns: Parsed Bit Buffer object.
991 :raises PythonRuntimeException: In case of invalid use.
992 """
994 if self._buffer is None or self._bit_size is None:
995 raise PythonRuntimeException("JsonReader: Unexpected end in Bit Buffer!")
996 return BitBuffer(bytes(self._buffer), self._bit_size)
998 def begin_object(self) -> None:
999 raise PythonRuntimeException("JsonReader: Unexpected begin object in Bit Buffer!")
1001 def end_object(self) -> None:
1002 raise PythonRuntimeException("JsonReader: Unexpected end object in Bit Buffer!")
1004 def begin_array(self) -> None:
1005 if self._state == JsonReader._BitBufferAdapter._State.BEGIN_ARRAY_BUFFER:
1006 self._state = JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER
1007 self._buffer = []
1008 else:
1009 raise PythonRuntimeException("JsonReader: Unexpected begin array in Bit Buffer!")
1011 def end_array(self) -> None:
1012 if self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER:
1013 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY
1014 else:
1015 raise PythonRuntimeException("JsonReader: Unexpected end array in Bit Buffer!")
1017 def visit_key(self, key: str) -> None:
1018 if self._state == JsonReader._BitBufferAdapter._State.VISIT_KEY:
1019 if key == "buffer":
1020 self._state = JsonReader._BitBufferAdapter._State.BEGIN_ARRAY_BUFFER
1021 elif key == "bitSize":
1022 self._state = JsonReader._BitBufferAdapter._State.VISIT_VALUE_BITSIZE
1023 else:
1024 raise PythonRuntimeException(f"JsonReader: Unknown key '{key}' in Bit Buffer!")
1025 else:
1026 raise PythonRuntimeException(f"JsonReader: Unexpected key '{key}' in Bit Buffer!")
1028 def visit_value(self, value: typing.Any) -> None:
1029 if (
1030 self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER
1031 and self._buffer is not None
1032 and isinstance(value, int)
1033 ):
1034 self._buffer.append(value)
1035 elif self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BITSIZE and isinstance(
1036 value, int
1037 ):
1038 self._bit_size = value
1039 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY
1040 else:
1041 raise PythonRuntimeException(f"JsonReader: Unexpected value '{value}' in Bit Buffer!")
1043 class _State(enum.Enum):
1044 VISIT_KEY = enum.auto()
1045 BEGIN_ARRAY_BUFFER = enum.auto()
1046 VISIT_VALUE_BUFFER = enum.auto()
1047 VISIT_VALUE_BITSIZE = enum.auto()
1049 class _BytesAdapter(_ObjectValueAdapter):
1050 """
1051 The adapter which allows to parse bytes object from JSON.
1052 """
1054 def __init__(self) -> None:
1055 """
1056 Constructor.
1057 """
1059 self._state = JsonReader._BytesAdapter._State.VISIT_KEY
1060 self._buffer: typing.Optional[bytearray] = None
1062 def get(self) -> bytearray:
1063 """
1064 Gets the created bytes object.
1066 :returns: Parsed bytes object.
1067 :raises PythonRuntimeException: In case of invalid use.
1068 """
1070 if self._buffer is None:
1071 raise PythonRuntimeException("JsonReader: Unexpected end in bytes!")
1072 return self._buffer
1074 def begin_object(self) -> None:
1075 raise PythonRuntimeException("JsonReader: Unexpected begin object in bytes!")
1077 def end_object(self) -> None:
1078 raise PythonRuntimeException("JsonReader: Unexpected end object in bytes!")
1080 def begin_array(self) -> None:
1081 if self._state == JsonReader._BytesAdapter._State.BEGIN_ARRAY_BUFFER:
1082 self._state = JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER
1083 self._buffer = bytearray()
1084 else:
1085 raise PythonRuntimeException("JsonReader: Unexpected begin array in bytes!")
1087 def end_array(self) -> None:
1088 if self._state == JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER:
1089 self._state = JsonReader._BytesAdapter._State.VISIT_KEY
1090 else:
1091 raise PythonRuntimeException("JsonReader: Unexpected end array in bytes!")
1093 def visit_key(self, key: str) -> None:
1094 if self._state == JsonReader._BytesAdapter._State.VISIT_KEY:
1095 if key == "buffer":
1096 self._state = JsonReader._BytesAdapter._State.BEGIN_ARRAY_BUFFER
1097 else:
1098 raise PythonRuntimeException(f"JsonReader: Unknown key '{key}' in bytes!")
1099 else:
1100 raise PythonRuntimeException(f"JsonReader: Unexpected key '{key}' in bytes!")
1102 def visit_value(self, value: typing.Any) -> None:
1103 if (
1104 self._state == JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER
1105 and self._buffer is not None
1106 and isinstance(value, int)
1107 ):
1108 self._buffer.append(value)
1109 else:
1110 raise PythonRuntimeException(f"JsonReader: Unexpected value '{value}' in bytes!")
1112 class _State(enum.Enum):
1113 VISIT_KEY = enum.auto()
1114 BEGIN_ARRAY_BUFFER = enum.auto()
1115 VISIT_VALUE_BUFFER = enum.auto()
1117 class _CreatorAdapter(JsonParser.Observer):
1118 """
1119 The adapter which allows to use ZserioTreeCreator as an JsonReader observer.
1120 """
1122 def __init__(self) -> None:
1123 """
1124 Constructor.
1125 """
1127 self._creator: typing.Optional[ZserioTreeCreator] = None
1128 self._key_stack: typing.List[str] = []
1129 self._object: typing.Any = None
1130 self._object_value_adapter: typing.Optional[JsonReader._ObjectValueAdapter] = None
1132 def set_type(self, type_info: TypeInfo, *arguments: typing.List[typing.Any]) -> None:
1133 """
1134 Sets type which shall be created next. Resets the current object.
1136 :param type_info: Type info of the type which is to be created.
1137 :param arguments: Arguments of type defining the expected zserio object tree.
1138 """
1140 self._creator = ZserioTreeCreator(type_info, *arguments)
1141 self._object = None
1143 def get(self) -> typing.Any:
1144 """
1145 Gets the created zserio object tree.
1147 :returns: Zserio object tree.
1148 :raises PythonRuntimeException: In case of invalid use.
1149 """
1151 if not self._object:
1152 raise PythonRuntimeException("JsonReader: Zserio tree not created!")
1154 return self._object
1156 def begin_object(self) -> None:
1157 if self._object_value_adapter:
1158 self._object_value_adapter.begin_object()
1159 else:
1160 if not self._creator:
1161 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1163 if not self._key_stack:
1164 self._creator.begin_root()
1165 else:
1166 if self._key_stack[-1]:
1167 schema_type = self._creator.get_field_type(self._key_stack[-1]).schema_name
1168 if schema_type == "extern":
1169 self._object_value_adapter = JsonReader._BitBufferAdapter()
1170 elif schema_type == "bytes":
1171 self._object_value_adapter = JsonReader._BytesAdapter()
1172 else:
1173 self._creator.begin_compound(self._key_stack[-1])
1174 else:
1175 schema_type = self._creator.get_element_type().schema_name
1176 if schema_type == "extern":
1177 self._object_value_adapter = JsonReader._BitBufferAdapter()
1178 elif schema_type == "bytes":
1179 self._object_value_adapter = JsonReader._BytesAdapter()
1180 else:
1181 self._creator.begin_compound_element()
1183 def end_object(self) -> None:
1184 if self._object_value_adapter:
1185 value = self._object_value_adapter.get()
1186 self._object_value_adapter = None
1187 self.visit_value(value)
1188 else:
1189 if not self._creator:
1190 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1192 if not self._key_stack:
1193 self._object = self._creator.end_root()
1194 self._creator = None
1195 else:
1196 if self._key_stack[-1]:
1197 self._creator.end_compound()
1198 self._key_stack.pop() # finish member
1199 else:
1200 self._creator.end_compound_element()
1202 def begin_array(self) -> None:
1203 if self._object_value_adapter:
1204 self._object_value_adapter.begin_array()
1205 else:
1206 if not self._creator:
1207 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1209 if not self._key_stack:
1210 raise PythonRuntimeException("JsonReader: ZserioTreeCreator expects json object!")
1212 self._creator.begin_array(self._key_stack[-1])
1214 self._key_stack.append("")
1216 def end_array(self) -> None:
1217 if self._object_value_adapter:
1218 self._object_value_adapter.end_array()
1219 else:
1220 if not self._creator:
1221 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1223 self._creator.end_array()
1225 self._key_stack.pop() # finish array
1226 self._key_stack.pop() # finish member
1228 def visit_key(self, key: str) -> None:
1229 if self._object_value_adapter:
1230 self._object_value_adapter.visit_key(key)
1231 else:
1232 if not self._creator:
1233 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1235 self._key_stack.append(key)
1237 def visit_value(self, value: typing.Any) -> None:
1238 if self._object_value_adapter:
1239 self._object_value_adapter.visit_value(value)
1240 else:
1241 if not self._creator:
1242 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1244 if not self._key_stack:
1245 raise PythonRuntimeException("JsonReader: ZserioTreeCreator expects json object!")
1247 if self._key_stack[-1]:
1248 expected_type_info = self._creator.get_field_type(self._key_stack[-1])
1249 self._creator.set_value(
1250 self._key_stack[-1],
1251 self._convert_value(value, expected_type_info),
1252 )
1253 self._key_stack.pop() # finish member
1254 else:
1255 expected_type_info = self._creator.get_element_type()
1256 self._creator.add_value_element(self._convert_value(value, expected_type_info))
1258 @staticmethod
1259 def _convert_value(
1260 value: typing.Any, type_info: typing.Union[TypeInfo, RecursiveTypeInfo]
1261 ) -> typing.Any:
1262 if value is None:
1263 return None
1265 if TypeAttribute.ENUM_ITEMS in type_info.attributes:
1266 if isinstance(value, str):
1267 return JsonReader._CreatorAdapter._enum_from_string(value, type_info)
1268 else:
1269 return type_info.py_type(value)
1270 elif TypeAttribute.BITMASK_VALUES in type_info.attributes:
1271 if isinstance(value, str):
1272 return JsonReader._CreatorAdapter._bitmask_from_string(value, type_info)
1273 else:
1274 return type_info.py_type.from_value(value)
1275 else:
1276 return value
1278 @staticmethod
1279 def _enum_from_string(
1280 string_value: str, type_info: typing.Union[TypeInfo, RecursiveTypeInfo]
1281 ) -> typing.Any:
1282 if string_value:
1283 first_char = string_value[0]
1284 if ("A" <= first_char <= "Z") or ("a" <= first_char <= "z") or first_char == "_":
1285 py_item = JsonReader._CreatorAdapter._parse_enum_string_value(string_value, type_info)
1286 if py_item is not None:
1287 return py_item
1288 # else it's a no match
1290 raise PythonRuntimeException(
1291 f"JsonReader: Cannot create enum '{type_info.schema_name}' "
1292 f"from string value '{string_value}'!"
1293 )
1295 @staticmethod
1296 def _bitmask_from_string(
1297 string_value: str, type_info: typing.Union[TypeInfo, RecursiveTypeInfo]
1298 ) -> typing.Any:
1299 if string_value:
1300 first_char = string_value[0]
1301 if ("A" <= first_char <= "Z") or ("a" <= first_char <= "z") or first_char == "_":
1302 value = JsonReader._CreatorAdapter._parse_bitmask_string_value(string_value, type_info)
1303 if value is not None:
1304 return type_info.py_type.from_value(value)
1305 elif "0" <= first_char <= "9": # bitmask can be only unsigned
1306 value = JsonReader._CreatorAdapter._parse_bitmask_numeric_string_value(string_value)
1307 if value is not None:
1308 return type_info.py_type.from_value(value)
1310 raise PythonRuntimeException(
1311 f"JsonReader: Cannot create bitmask '{type_info.schema_name}' "
1312 f"from string value '{string_value}'!"
1313 )
1315 @staticmethod
1316 def _parse_enum_string_value(
1317 string_value: str, type_info: typing.Union[TypeInfo, RecursiveTypeInfo]
1318 ) -> typing.Any:
1319 for item_info in type_info.attributes[TypeAttribute.ENUM_ITEMS]:
1320 if string_value == item_info.schema_name:
1321 return item_info.py_item
1323 return None
1325 @staticmethod
1326 def _parse_bitmask_string_value(
1327 string_value: str, type_info: typing.Union[TypeInfo, RecursiveTypeInfo]
1328 ) -> typing.Any:
1329 value = 0
1330 identifiers = string_value.split("|")
1331 for identifier_with_spaces in identifiers:
1332 identifier = identifier_with_spaces.strip()
1333 match = False
1334 for item_info in type_info.attributes[TypeAttribute.BITMASK_VALUES]:
1335 if identifier == item_info.schema_name:
1336 match = True
1337 value |= item_info.py_item.value
1338 break
1340 if not match:
1341 return None
1343 return value
1345 @staticmethod
1346 def _parse_bitmask_numeric_string_value(string_value: str):
1347 number_len = 1
1348 while string_value[number_len] >= "0" and string_value[number_len] <= "9":
1349 number_len += 1
1351 return int(string_value[0:number_len])