Coverage for /home/runner/work/zserio/zserio/compiler/extensions/python/runtime/src/zserio/walker.py: 100%
253 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-12-05 10:43 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-12-05 10:43 +0000
1"""
2The module implements generic walker through given zserio object tree.
3"""
5import functools
6import re
7import typing
9from zserio.exception import PythonRuntimeException
10from zserio.typeinfo import TypeAttribute, MemberInfo, MemberAttribute
13class WalkObserver:
14 """
15 Interface for observers which are called by the walker.
16 """
18 def begin_root(self, compound: typing.Any) -> None:
19 """
20 Called for the root compound zserio object which is to be walked-through.
22 :param compound: Root compound zserio object.
23 """
24 raise NotImplementedError()
26 def end_root(self, compound: typing.Any) -> None:
27 """
28 Called at the end of just walked root compound zserio object.
30 :param compound: Root compound zserio object.
31 """
32 raise NotImplementedError()
34 def begin_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None:
35 """
36 Called at the beginning of an array.
38 Note that for None arrays (i.e. non-present optionals) the visit_value with None is called instead!
40 :param array: Zserio array.
41 :param member_info: Array member info.
42 """
44 raise NotImplementedError()
46 def end_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None:
47 """
48 Called at the end of an array.
50 :param array: Zserio array.
51 :param member_info: Array member info.
52 """
54 raise NotImplementedError()
56 def begin_compound(
57 self,
58 compound: typing.Any,
59 member_info: MemberInfo,
60 element_index: typing.Optional[int] = None,
61 ) -> None:
62 """
63 Called at the beginning of an compound field object.
65 Note that for None compounds (i.e. uninitialized or optionals) the visit_value method is called instead!
67 :param compound: Compound zserio object.
68 :param member_info: Compound member info.
69 :param element_index: Element index in array or None if the compound is not in array.
70 """
71 raise NotImplementedError()
73 def end_compound(
74 self,
75 compound: typing.Any,
76 member_info: MemberInfo,
77 element_index: typing.Optional[int] = None,
78 ) -> None:
79 """
80 Called at the end of just walked compound object.
82 :param compound: Compound zserio object.
83 :param member_info: Compound member info.
84 :param element_index: Element index in array or None if the compound is not in array.
85 """
86 raise NotImplementedError()
88 def visit_value(
89 self,
90 value: typing.Any,
91 member_info: MemberInfo,
92 element_index: typing.Optional[int] = None,
93 ) -> None:
94 """
95 Called when a simple (or an unset compound or array - i.e. None) value is reached.
97 :param value: Simple value.
98 :param member_info: Member info.
99 :param element_index: Element index in array or None if the value is not in array.
100 """
101 raise NotImplementedError()
104class WalkFilter:
105 """
106 Interface for filters which can influence the walking.
107 """
109 def before_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
110 """
111 Called before an array.
113 Note that for None arrays (i.e. non-present optionals) the before_value with None is called instead!
115 :param array: Zserio array.
116 :param member_info: Array member info.
118 :returns: True when the walking should continue to the array.
119 """
120 raise NotImplementedError()
122 def after_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
123 """
124 Called after an array.
126 :param array: Zserio array.
127 :param member_info: Array member info.
129 :returns: True when the walking should continue to a next sibling, False to return to the parent.
130 """
132 raise NotImplementedError()
134 def before_compound(
135 self,
136 compound: typing.Any,
137 member_info: MemberInfo,
138 element_index: typing.Optional[int] = None,
139 ) -> bool:
140 """
141 Called before a compound object.
143 Note that for uninitialized compounds (i.e. None) the before_value method is called instead!
145 :param compound: Compound zserio object.
146 :param member_info: Compound member info.
147 :param element_index: Element index in array or None if the compound is not in array.
149 :returns: True when the walking should continue into the compound object, False otherwise.
150 """
151 raise NotImplementedError()
153 def after_compound(
154 self,
155 compound: typing.Any,
156 member_info: MemberInfo,
157 element_index: typing.Optional[int] = None,
158 ) -> bool:
159 """
160 Called after a compound object.
162 :param compound: Compound zserio object.
163 :param member_info: Compound member info.
164 :param element_index: Element index in array or None if the compound is not in array.
166 :returns: True when the walking should continue to a next sibling, False to return to the parent.
167 """
168 raise NotImplementedError()
170 def before_value(
171 self,
172 value: typing.Any,
173 member_info: MemberInfo,
174 element_index: typing.Optional[int] = None,
175 ) -> bool:
176 """
177 Called before a simple (or an unset compound or array - i.e. None) value.
179 :param value: Simple value.
180 :param member_info: Member info.
181 :param element_index: Element index in array or None if the value is not in array.
183 :returns: True when the walking should continue to the simple value, False otherwise.
184 """
185 raise NotImplementedError()
187 def after_value(
188 self,
189 value: typing.Any,
190 member_info: MemberInfo,
191 element_index: typing.Optional[int] = None,
192 ) -> bool:
193 """
194 Called after a simple (or an unset compound or array - i.e. None) value.
196 :param value: Simple value.
197 :param member_info: Member info.
198 :param element_index: Element index in array or None if the value is not in array.
200 :returns: True when the walking should continue to a next sibling, False to return to the parent.
201 """
202 raise NotImplementedError()
205class Walker:
206 """
207 Walker through zserio objects, based on generated type info (see -withTypeInfoCode).
208 """
210 def __init__(
211 self,
212 walk_observer: WalkObserver,
213 walk_filter: typing.Optional[WalkFilter] = None,
214 ) -> None:
215 """
216 Constructor.
218 :param walk_observer: Observer to use during walking.
219 :param walk_filter: Walk filter to use.
220 """
222 self._walk_observer = walk_observer
223 self._walk_filter = walk_filter if walk_filter is not None else DefaultWalkFilter()
225 def walk(self, zserio_object: typing.Any) -> None:
226 """
227 Walks given zserio compound object which must be generated with type_info
228 (see -withTypeInfoCode options).
230 :param zserio_object: Zserio object to walk.
231 """
233 if not hasattr(zserio_object, "type_info"):
234 raise PythonRuntimeException(
235 "Walker: Type info must be enabled (see zserio option -withTypeInfoCode)!"
236 )
238 type_info = zserio_object.type_info()
239 if TypeAttribute.FIELDS not in type_info.attributes:
240 raise PythonRuntimeException(
241 "Walker: Root object '" + type_info.schema_name + "' is not a compound type!"
242 )
244 self._walk_observer.begin_root(zserio_object)
245 self._walk_fields(zserio_object, type_info)
246 self._walk_observer.end_root(zserio_object)
248 def _walk_fields(self, zserio_object, type_info) -> None:
249 fields = type_info.attributes[TypeAttribute.FIELDS]
250 if TypeAttribute.SELECTOR in type_info.attributes:
251 # union or choice
252 choice_tag = zserio_object.choice_tag
253 if choice_tag != zserio_object.UNDEFINED_CHOICE:
254 field = fields[choice_tag]
255 self._walk_field(
256 getattr(zserio_object, field.attributes[MemberAttribute.PROPERTY_NAME]),
257 field,
258 )
259 # else: uninitialized or empty branch
260 else:
261 # structure
262 for field in fields:
263 if not self._walk_field(
264 getattr(zserio_object, field.attributes[MemberAttribute.PROPERTY_NAME]),
265 field,
266 ):
267 break
269 def _walk_field(self, zserio_object: typing.Any, member_info: MemberInfo) -> bool:
270 if zserio_object is not None and MemberAttribute.ARRAY_LENGTH in member_info.attributes:
271 if self._walk_filter.before_array(zserio_object, member_info):
272 self._walk_observer.begin_array(zserio_object, member_info)
273 for index, element in enumerate(zserio_object):
274 if not self._walk_field_value(element, member_info, index):
275 break
276 self._walk_observer.end_array(zserio_object, member_info)
277 return self._walk_filter.after_array(zserio_object, member_info)
278 else:
279 return self._walk_field_value(zserio_object, member_info)
281 def _walk_field_value(
282 self,
283 zserio_object: typing.Any,
284 member_info: MemberInfo,
285 element_index: typing.Optional[int] = None,
286 ) -> bool:
287 type_info = member_info.type_info
288 if zserio_object is not None and TypeAttribute.FIELDS in type_info.attributes:
289 if self._walk_filter.before_compound(zserio_object, member_info, element_index):
290 self._walk_observer.begin_compound(zserio_object, member_info, element_index)
291 self._walk_fields(zserio_object, type_info)
292 self._walk_observer.end_compound(zserio_object, member_info, element_index)
293 return self._walk_filter.after_compound(zserio_object, member_info, element_index)
294 else:
295 if self._walk_filter.before_value(zserio_object, member_info, element_index):
296 self._walk_observer.visit_value(zserio_object, member_info, element_index)
297 return self._walk_filter.after_value(zserio_object, member_info, element_index)
300class DefaultWalkObserver(WalkObserver):
301 """
302 Default walk observer which just does nothing.
303 """
305 def begin_root(self, _root: typing.Any) -> None:
306 pass
308 def end_root(self, _root: typing.Any) -> None:
309 pass
311 def begin_array(self, _array: typing.List[typing.Any], member_info: MemberInfo) -> None:
312 pass
314 def end_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> None:
315 pass
317 def begin_compound(
318 self,
319 _compound: typing.Any,
320 member_info: MemberInfo,
321 element_index: typing.Optional[int] = None,
322 ) -> None:
323 pass
325 def end_compound(
326 self,
327 _compound: typing.Any,
328 _member_info: MemberInfo,
329 element_index: typing.Optional[int] = None,
330 ) -> None:
331 pass
333 def visit_value(
334 self,
335 _value: typing.Any,
336 member_info: MemberInfo,
337 element_index: typing.Optional[int] = None,
338 ) -> None:
339 pass
342class DefaultWalkFilter(WalkFilter):
343 """
344 Default walk filter which filters nothing.
345 """
347 def before_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
348 return True
350 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
351 return True
353 def before_compound(
354 self,
355 _compound: typing.Any,
356 _member_info: MemberInfo,
357 _element_index: typing.Optional[int] = None,
358 ) -> bool:
359 return True
361 def after_compound(
362 self,
363 _compound: typing.Any,
364 _member_info: MemberInfo,
365 _element_index: typing.Optional[int] = None,
366 ) -> bool:
367 return True
369 def before_value(
370 self,
371 _value: typing.Any,
372 _member_info: MemberInfo,
373 _element_index: typing.Optional[int] = None,
374 ) -> bool:
375 return True
377 def after_value(
378 self,
379 _value: typing.Any,
380 _member_info: MemberInfo,
381 _element_index: typing.Optional[int] = None,
382 ) -> bool:
383 return True
386class DepthWalkFilter(WalkFilter):
387 """
388 Walk filter which allows to walk only to the given maximum depth.
389 """
391 def __init__(self, max_depth: int):
392 """
393 Constructor.
395 :param max_depth: Maximum depth to walk to.
396 """
398 self._max_depth = max_depth
399 self._depth = 1
401 def before_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
402 return self._enter_depth_level()
404 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
405 return self._leave_depth_level()
407 def before_compound(
408 self,
409 _compound: typing.Any,
410 _member_info: MemberInfo,
411 _element_index: typing.Optional[int] = None,
412 ) -> bool:
413 return self._enter_depth_level()
415 def after_compound(
416 self,
417 _compound: typing.Any,
418 _member_info: MemberInfo,
419 _element_index: typing.Optional[int] = None,
420 ) -> bool:
421 return self._leave_depth_level()
423 def before_value(
424 self,
425 _value: typing.Any,
426 _member_info: MemberInfo,
427 _element_index: typing.Optional[int] = None,
428 ) -> bool:
429 return self._depth <= self._max_depth
431 def after_value(
432 self,
433 _value: typing.Any,
434 _member_info: MemberInfo,
435 _element_index: typing.Optional[int] = None,
436 ) -> bool:
437 return True
439 def _enter_depth_level(self) -> bool:
440 enter = self._depth <= self._max_depth
441 self._depth += 1
442 return enter
444 def _leave_depth_level(self) -> bool:
445 self._depth -= 1
446 return True
449class RegexWalkFilter(WalkFilter):
450 """
451 Walk filter which allows to walk only paths matching the given regex.
453 The path is constructed from field names within the root object, thus the root object
454 itself is not part of the path.
456 Array elements have the index appended to the path so that e.g. "compound.arrayField[0]" will match
457 only the first element in the array "arrayField".
458 """
460 def __init__(self, path_regex: str) -> None:
461 """
462 Constructor.
464 :param path_regex: Path regex to use for filtering.
465 """
467 self._current_path: typing.List[str] = []
468 self._path_regex = re.compile(path_regex)
470 def before_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
471 self._current_path.append(member_info.schema_name)
472 if self._path_regex.match(self._get_current_path()):
473 return True # the array itself matches
475 # try to find match in each element and continue into the array only if some match is found
476 # (note that array is never None)
477 for i, element in enumerate(array):
478 self._current_path[-1] = member_info.schema_name + f"[{i}]"
479 if self._match_subtree(element, member_info):
480 return True
481 self._current_path[-1] = member_info.schema_name
482 return False
484 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
485 self._current_path.pop()
486 return True
488 def before_compound(
489 self,
490 compound: typing.Any,
491 member_info: MemberInfo,
492 element_index: typing.Optional[int] = None,
493 ) -> bool:
494 self._append_path(member_info, element_index)
495 if self._path_regex.match(self._get_current_path()):
496 return True # the compound itself matches
498 return self._match_subtree(compound, member_info)
500 def after_compound(
501 self,
502 _compound: typing.Any,
503 member_info: MemberInfo,
504 element_index: typing.Optional[int] = None,
505 ) -> bool:
506 self._pop_path(member_info, element_index)
507 return True
509 def before_value(
510 self,
511 value: typing.Any,
512 member_info: MemberInfo,
513 element_index: typing.Optional[int] = None,
514 ) -> bool:
515 self._append_path(member_info, element_index)
516 return self._match_subtree(value, member_info)
518 def after_value(
519 self,
520 _value: typing.Any,
521 member_info: MemberInfo,
522 element_index: typing.Optional[int] = None,
523 ) -> bool:
524 self._pop_path(member_info, element_index)
525 return True
527 def _match_subtree(self, member: typing.Any, member_info: MemberInfo) -> bool:
528 if member is not None and TypeAttribute.FIELDS in member_info.type_info.attributes:
529 # is a not None compound, try to find match within its subtree
530 subtree_regex_filter = self._SubtreeRegexFilter(self._current_path.copy(), self._path_regex)
531 walker = Walker(DefaultWalkObserver(), subtree_regex_filter)
532 walker.walk(member)
533 return subtree_regex_filter.matches()
534 else:
535 # try to match a simple value or None compound
536 return self._path_regex.match(self._get_current_path()) is not None
538 def _get_current_path(self):
539 return self._get_current_path_impl(self._current_path)
541 def _append_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None:
542 self._append_path_impl(self._current_path, member_info, element_index)
544 def _pop_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None:
545 self._pop_path_impl(self._current_path, member_info, element_index)
547 @staticmethod
548 def _get_current_path_impl(current_path: typing.List[str]) -> str:
549 return ".".join(current_path)
551 @staticmethod
552 def _append_path_impl(
553 current_path: typing.List[str],
554 member_info: MemberInfo,
555 element_index: typing.Optional[int],
556 ) -> None:
557 if element_index is None:
558 current_path.append(member_info.schema_name)
559 else:
560 current_path[-1] = member_info.schema_name + f"[{element_index}]" # add index
562 @staticmethod
563 def _pop_path_impl(
564 current_path: typing.List[str],
565 member_info: MemberInfo,
566 element_index: typing.Optional[int],
567 ) -> None:
568 if element_index is None:
569 current_path.pop()
570 else:
571 current_path[-1] = member_info.schema_name # just remove the index
573 class _SubtreeRegexFilter(WalkFilter):
574 """
575 Walks whole subtree and in case of match stops walking. Used to check whether any path
576 within the subtree matches given regex.
577 """
579 def __init__(self, current_path: typing.List[str], path_regex: typing.Pattern) -> None:
580 self._current_path = current_path
581 self._path_regex = path_regex
582 self._matches = False
584 def matches(self) -> bool:
585 """
586 Returns whether the subtree contains any matching value.
588 :returns: True when the subtree contains a matching value, False otherwise.
589 """
591 return self._matches
593 def before_array(self, _array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
594 self._current_path.append(member_info.schema_name)
595 self._matches = self._path_regex.match(self._get_current_path()) is not None
597 # terminate when the match is already found (note that array is never None here)
598 return not self._matches
600 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
601 self._current_path.pop()
602 return not self._matches # terminate when the match is already found
604 def before_compound(
605 self,
606 _compound: typing.Any,
607 member_info: MemberInfo,
608 element_index: typing.Optional[int] = None,
609 ) -> bool:
610 self._append_path(member_info, element_index)
611 self._matches = self._path_regex.match(self._get_current_path()) is not None
613 # terminate when the match is already found (note that compound is never None here)
614 return not self._matches
616 def after_compound(
617 self,
618 _compound: typing.Any,
619 member_info: MemberInfo,
620 element_index: typing.Optional[int] = None,
621 ) -> bool:
622 self._pop_path(member_info, element_index)
623 return not self._matches # terminate when the match is already found
625 def before_value(
626 self,
627 _value: typing.Any,
628 member_info: MemberInfo,
629 element_index: typing.Optional[int] = None,
630 ) -> bool:
631 self._append_path(member_info, element_index)
632 self._matches = self._path_regex.match(self._get_current_path()) is not None
634 return not self._matches # terminate when the match is already found
636 def after_value(
637 self,
638 _value: typing.Any,
639 member_info: MemberInfo,
640 element_index: typing.Optional[int] = None,
641 ) -> bool:
642 self._pop_path(member_info, element_index)
643 return not self._matches # terminate when the match is already found
645 def _get_current_path(self) -> str:
646 return RegexWalkFilter._get_current_path_impl(self._current_path)
648 def _append_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None:
649 RegexWalkFilter._append_path_impl(self._current_path, member_info, element_index)
651 def _pop_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None:
652 RegexWalkFilter._pop_path_impl(self._current_path, member_info, element_index)
655class ArrayLengthWalkFilter(WalkFilter):
656 """
657 Walk filter which allows to walk only to the given maximum array length.
658 """
660 def __init__(self, max_array_length: int):
661 """
662 Constructor.
664 :param max_array_length: Maximum array length to walk to.
665 """
667 self._max_array_length = max_array_length
669 def before_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
670 return True
672 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
673 return True
675 def before_compound(
676 self,
677 _compound: typing.Any,
678 _member_info: MemberInfo,
679 element_index: typing.Optional[int] = None,
680 ) -> bool:
681 return self._filter_array_element(element_index)
683 def after_compound(
684 self,
685 _compound: typing.Any,
686 _member_info: MemberInfo,
687 element_index: typing.Optional[int] = None,
688 ) -> bool:
689 return self._filter_array_element(element_index)
691 def before_value(
692 self,
693 _value: typing.Any,
694 _member_info: MemberInfo,
695 element_index: typing.Optional[int] = None,
696 ) -> bool:
697 return self._filter_array_element(element_index)
699 def after_value(
700 self,
701 _value: typing.Any,
702 _member_info: MemberInfo,
703 element_index: typing.Optional[int] = None,
704 ) -> bool:
705 return self._filter_array_element(element_index)
707 def _filter_array_element(self, element_index: typing.Optional[int]) -> bool:
708 return True if element_index is None else element_index < self._max_array_length
711class AndWalkFilter(WalkFilter):
712 """
713 Walk filter which implements composition of particular filters.
715 The filters are called sequentially and logical and is applied on theirs results.
716 Note that all filters are always called.
717 """
719 def __init__(self, walk_filters: typing.List[WalkFilter]) -> None:
720 """
721 Constructor.
723 :param walk_filters: List of filters to use in composition.
724 """
726 self._walk_filters = walk_filters
728 def before_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
729 return self._apply_filters(lambda x: x.before_array(array, member_info))
731 def after_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
732 return self._apply_filters(lambda x: x.after_array(array, member_info))
734 def before_compound(
735 self,
736 compound: typing.Any,
737 member_info: MemberInfo,
738 element_index: typing.Optional[int] = None,
739 ) -> bool:
740 return self._apply_filters(lambda x: x.before_compound(compound, member_info, element_index))
742 def after_compound(
743 self,
744 compound: typing.Any,
745 member_info: MemberInfo,
746 element_index: typing.Optional[int] = None,
747 ) -> bool:
748 return self._apply_filters(lambda x: x.after_compound(compound, member_info, element_index))
750 def before_value(
751 self,
752 value: typing.Any,
753 member_info: MemberInfo,
754 element_index: typing.Optional[int] = None,
755 ) -> bool:
756 return self._apply_filters(lambda x: x.before_value(value, member_info, element_index))
758 def after_value(
759 self,
760 value: typing.Any,
761 member_info: MemberInfo,
762 element_index: typing.Optional[int] = None,
763 ) -> bool:
764 return self._apply_filters(lambda x: x.after_value(value, member_info, element_index))
766 def _apply_filters(self, method):
767 return functools.reduce(lambda x, y: x and y, map(method, self._walk_filters), True)