Coverage for /home/runner/work/zserio/zserio/compiler/extensions/python/runtime/tests/test_walker.py: 100%
319 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-10-29 13:10 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-10-29 13:10 +0000
1import unittest
3from test_object.api import WalkerBitmask, WalkerNested, WalkerObject, WalkerUnion
5from zserio import (
6 WalkObserver,
7 WalkFilter,
8 Walker,
9 DefaultWalkObserver,
10 PythonRuntimeException,
11 ArrayLengthWalkFilter,
12 DepthWalkFilter,
13 RegexWalkFilter,
14 AndWalkFilter,
15)
16from zserio.typeinfo import TypeInfo, TypeAttribute, MemberInfo, MemberAttribute
19class TestWalkObserver(WalkObserver):
21 def __init__(self):
22 self._captures = {
23 "begin_root": None,
24 "end_root": None,
25 "begin_array": [],
26 "end_array": [],
27 "begin_compound": [],
28 "end_compound": [],
29 "visit_value": [],
30 }
32 @property
33 def captures(self):
34 return self._captures
36 def begin_root(self, compound):
37 self._captures["begin_root"] = compound
39 def end_root(self, compound):
40 self._captures["end_root"] = compound
42 def begin_array(self, array, member_info):
43 self._captures["begin_array"].append(array)
45 def end_array(self, array, member_info):
46 self._captures["end_array"].append(array)
48 def begin_compound(self, compound, member_info, element_index=None):
49 self._captures["begin_compound"].append(compound)
51 def end_compound(self, compound, member_info, element_index=None):
52 self._captures["end_compound"].append(compound)
54 def visit_value(self, value, member_info, element_index=None):
55 self._captures["visit_value"].append(value)
58class TestWalkFilter(WalkFilter):
60 def __init__(
61 self,
62 *,
63 before_array=True,
64 after_array=True,
65 only_first_element=False,
66 before_compound=True,
67 after_compound=True,
68 before_value=True,
69 after_value=True
70 ):
71 self._config = {
72 "before_array": before_array,
73 "after_array": after_array,
74 "only_first_element": only_first_element,
75 "before_compound": before_compound,
76 "after_compound": after_compound,
77 "before_value": before_value,
78 "after_value": after_value,
79 }
80 self._is_first_element = False
82 def before_array(self, array, member_info):
83 self._is_first_element = True
84 return self._config["before_array"]
86 def after_array(self, array, member_info):
87 self._is_first_element = False
88 return self._config["after_array"]
90 def before_compound(self, compound, member_info, element_index=None):
91 return self._config["before_compound"]
93 def after_compound(self, compound, member_info, element_index=None):
94 go_to_next = not (self._config["only_first_element"] and self._is_first_element)
95 self._is_first_element = False
96 return go_to_next and self._config["after_compound"]
98 def before_value(self, value, member_info, element_index=None):
99 return self._config["before_value"]
101 def after_value(self, value, member_info, element_index=None):
102 return self._config["after_value"]
105class WalkerTest(unittest.TestCase):
107 def test_observer(self):
108 observer = WalkObserver()
110 walker_object = object()
111 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
113 with self.assertRaises(NotImplementedError):
114 observer.begin_root(walker_object)
115 with self.assertRaises(NotImplementedError):
116 observer.end_root(walker_object)
117 with self.assertRaises(NotImplementedError):
118 observer.begin_array([], walker_member_info)
119 with self.assertRaises(NotImplementedError):
120 observer.end_array([], walker_member_info)
121 with self.assertRaises(NotImplementedError):
122 observer.begin_compound(walker_object, walker_member_info)
123 with self.assertRaises(NotImplementedError):
124 observer.end_compound(walker_object, walker_member_info)
125 with self.assertRaises(NotImplementedError):
126 observer.visit_value("", walker_member_info)
128 def test_filter(self):
129 walk_filter = WalkFilter()
131 walker_object = object()
132 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
134 with self.assertRaises(NotImplementedError):
135 walk_filter.before_array([], walker_member_info)
136 with self.assertRaises(NotImplementedError):
137 walk_filter.after_array([], walker_member_info)
138 with self.assertRaises(NotImplementedError):
139 walk_filter.before_compound(walker_object, walker_member_info)
140 with self.assertRaises(NotImplementedError):
141 walk_filter.after_compound(walker_object, walker_member_info)
142 with self.assertRaises(NotImplementedError):
143 walk_filter.before_value("", walker_member_info)
144 with self.assertRaises(NotImplementedError):
145 walk_filter.after_value("", walker_member_info)
147 def test_walk_without_type_info(self):
148 walker = Walker(TestWalkObserver())
149 obj = object()
150 with self.assertRaises(PythonRuntimeException):
151 walker.walk(obj)
153 def test_walk_non_compound(self):
154 walker = Walker(TestWalkObserver())
155 with self.assertRaises(PythonRuntimeException):
156 walker.walk(WalkerBitmask())
158 def test_walk(self):
159 observer = TestWalkObserver()
160 walker = Walker(observer)
161 obj = _create_walker_object()
162 walker.walk(obj)
163 self.assertEqual(obj, observer.captures["begin_root"])
164 self.assertEqual(obj, observer.captures["end_root"])
165 self.assertEqual(
166 [obj.union_array, obj.union_array[2].nested_array],
167 observer.captures["begin_array"],
168 )
169 self.assertEqual(
170 [obj.union_array[2].nested_array, obj.union_array],
171 observer.captures["end_array"],
172 )
173 self.assertEqual(
174 [
175 obj.nested,
176 obj.union_array[0],
177 obj.union_array[1],
178 obj.union_array[2],
179 obj.union_array[2].nested_array[0],
180 ],
181 observer.captures["begin_compound"],
182 )
183 self.assertEqual(
184 [
185 obj.nested,
186 obj.union_array[0],
187 obj.union_array[1],
188 obj.union_array[2].nested_array[0],
189 obj.union_array[2],
190 ],
191 observer.captures["end_compound"],
192 )
193 self.assertEqual(
194 [13, "nested", "test", "1", 2, "nestedArray", None],
195 observer.captures["visit_value"],
196 )
198 def test_walk_skip_compound(self):
199 observer = TestWalkObserver()
200 test_filter = TestWalkFilter(
201 before_array=True,
202 after_array=True,
203 before_compound=False,
204 after_compound=True,
205 before_value=True,
206 after_value=True,
207 )
208 walker = Walker(observer, AndWalkFilter([test_filter]))
209 obj = _create_walker_object()
210 walker.walk(obj)
211 self.assertEqual(obj, observer.captures["begin_root"])
212 self.assertEqual(obj, observer.captures["end_root"])
213 self.assertEqual([obj.union_array], observer.captures["begin_array"])
214 self.assertEqual([obj.union_array], observer.captures["end_array"])
215 self.assertEqual([], observer.captures["begin_compound"])
216 self.assertEqual([], observer.captures["end_compound"])
217 self.assertEqual([13, "test", None], observer.captures["visit_value"])
219 def test_walk_skip_siblings(self):
220 observer = TestWalkObserver()
221 test_filter = TestWalkFilter(
222 before_array=True,
223 after_array=True,
224 before_compound=True,
225 after_compound=True,
226 before_value=True,
227 after_value=False,
228 )
229 walker = Walker(observer, AndWalkFilter([test_filter]))
230 obj = _create_walker_object()
231 walker.walk(obj)
232 self.assertEqual(obj, observer.captures["begin_root"])
233 self.assertEqual(obj, observer.captures["end_root"])
234 self.assertEqual([], observer.captures["begin_array"])
235 self.assertEqual([], observer.captures["end_array"])
236 self.assertEqual([], observer.captures["begin_compound"])
237 self.assertEqual([], observer.captures["end_compound"])
238 self.assertEqual([13], observer.captures["visit_value"])
240 def test_walk_skip_after_nested(self):
241 observer = TestWalkObserver()
242 test_filter = TestWalkFilter(
243 before_array=True,
244 after_array=True,
245 before_compound=True,
246 after_compound=False,
247 before_value=True,
248 after_value=True,
249 )
250 walker = Walker(observer, AndWalkFilter([test_filter]))
251 obj = _create_walker_object()
252 walker.walk(obj)
253 self.assertEqual(obj, observer.captures["begin_root"])
254 self.assertEqual(obj, observer.captures["end_root"])
255 self.assertEqual([], observer.captures["begin_array"])
256 self.assertEqual([], observer.captures["end_array"])
257 self.assertEqual([obj.nested], observer.captures["begin_compound"])
258 self.assertEqual([obj.nested], observer.captures["end_compound"])
259 self.assertEqual([13, "nested"], observer.captures["visit_value"])
261 def test_walk_only_first_element(self):
262 observer = TestWalkObserver()
263 test_filter = TestWalkFilter(
264 before_array=True,
265 after_array=True,
266 only_first_element=True,
267 before_compound=True,
268 after_compound=True,
269 before_value=True,
270 after_value=True,
271 )
272 walker = Walker(observer, AndWalkFilter([test_filter]))
273 obj = _create_walker_object()
274 walker.walk(obj)
275 self.assertEqual(obj, observer.captures["begin_root"])
276 self.assertEqual(obj, observer.captures["end_root"])
277 self.assertEqual([obj.union_array], observer.captures["begin_array"])
278 self.assertEqual([obj.union_array], observer.captures["end_array"])
279 self.assertEqual([obj.nested, obj.union_array[0]], observer.captures["begin_compound"])
280 self.assertEqual([obj.nested, obj.union_array[0]], observer.captures["end_compound"])
281 self.assertEqual([13, "nested", "test", "1", None], observer.captures["visit_value"])
284class DefaultObserverTest(unittest.TestCase):
286 @staticmethod
287 def test_default():
288 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
289 default_observer = DefaultWalkObserver()
291 default_observer.begin_root(object())
292 default_observer.end_root(object())
293 default_observer.begin_array([], walker_member_info)
294 default_observer.end_array([], walker_member_info)
295 default_observer.begin_compound(object(), walker_member_info)
296 default_observer.end_compound(object(), walker_member_info)
297 default_observer.visit_value(None, walker_member_info)
300class DepthWalkFilterTest(unittest.TestCase):
302 def test_depth_0(self):
303 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
304 depth_filter = DepthWalkFilter(0)
306 self.assertFalse(depth_filter.before_array([], walker_member_info))
307 self.assertTrue(depth_filter.after_array([], walker_member_info))
309 self.assertFalse(depth_filter.before_compound(object(), walker_member_info))
310 self.assertTrue(depth_filter.after_compound(object(), walker_member_info))
312 self.assertFalse(depth_filter.before_value(None, walker_member_info))
313 self.assertTrue(depth_filter.after_value(None, walker_member_info))
315 def test_depth_1(self):
316 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
317 depth_filter = DepthWalkFilter(1)
319 self.assertTrue(depth_filter.before_array([], walker_member_info)) # 0
320 self.assertFalse(depth_filter.before_array([], walker_member_info)) # 1
321 self.assertTrue(depth_filter.after_array([], walker_member_info)) # 1
322 self.assertFalse(depth_filter.before_compound(object(), walker_member_info, 0)) # 1
323 self.assertTrue(depth_filter.after_compound(object(), walker_member_info, 0)) # 1
324 self.assertFalse(depth_filter.before_value(None, walker_member_info, 1)) # 1
325 self.assertTrue(depth_filter.after_value(None, walker_member_info, 1)) # 1
326 self.assertTrue(depth_filter.after_array([], walker_member_info)) # 0
328 self.assertTrue(depth_filter.before_compound(object(), walker_member_info)) # 0
329 self.assertFalse(depth_filter.before_array([], walker_member_info)) # 1
330 self.assertTrue(depth_filter.after_array([], walker_member_info)) # 1
331 self.assertFalse(depth_filter.before_compound(object(), walker_member_info)) # 1
332 self.assertTrue(depth_filter.after_compound(object(), walker_member_info)) # 1
333 self.assertFalse(depth_filter.before_value(None, walker_member_info)) # 1
334 self.assertTrue(depth_filter.after_value(None, walker_member_info)) # 1
335 self.assertTrue(depth_filter.after_compound(object(), walker_member_info)) # 0
337 self.assertTrue(depth_filter.before_value(None, walker_member_info)) # 0
338 self.assertTrue(depth_filter.after_value(None, walker_member_info)) # 0
341class RegexWalkFilterTest(unittest.TestCase):
343 def test_regex_all_match(self):
344 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
345 walker_array_member_info = MemberInfo(
346 "walker",
347 TypeInfo("Walker", None),
348 attributes={MemberAttribute.ARRAY_LENGTH: None},
349 )
350 regex_filter = RegexWalkFilter(".*")
352 self.assertTrue(regex_filter.before_array([], walker_array_member_info))
353 self.assertTrue(regex_filter.after_array([], walker_array_member_info))
354 self.assertTrue(regex_filter.before_compound(object(), walker_member_info))
355 self.assertTrue(regex_filter.before_compound(object(), walker_member_info))
356 self.assertTrue(regex_filter.before_value(None, walker_member_info))
357 self.assertTrue(regex_filter.after_value(None, walker_member_info))
359 def test_regex_prefix_match(self):
360 walker_object = _create_walker_object()
361 regex_filter = RegexWalkFilter("nested\\..*")
363 identifier_member_info = walker_object.type_info().attributes[TypeAttribute.FIELDS][0]
364 self.assertFalse(regex_filter.before_value(walker_object.identifier, identifier_member_info))
365 self.assertTrue(regex_filter.after_value(walker_object.identifier, identifier_member_info))
367 nested_member_info = walker_object.type_info().attributes[TypeAttribute.FIELDS][1]
368 self.assertTrue(regex_filter.before_compound(walker_object.nested, nested_member_info))
369 text_member_info = nested_member_info.type_info.attributes[TypeAttribute.FIELDS][0]
370 self.assertTrue(regex_filter.before_value(walker_object.nested.text, text_member_info))
371 self.assertTrue(regex_filter.after_value(walker_object.nested.text, text_member_info))
372 self.assertTrue(regex_filter.after_compound(walker_object.nested, nested_member_info))
374 # ignore text
376 union_array_member_info = walker_object.type_info().attributes[TypeAttribute.FIELDS][3]
377 self.assertFalse(regex_filter.before_array(walker_object.union_array, union_array_member_info))
378 self.assertTrue(regex_filter.after_array(walker_object.union_array, union_array_member_info))
380 def test_regex_array_match(self):
381 walker_object = _create_walker_object()
382 regex_filter = RegexWalkFilter("unionArray\\[\\d+\\]\\.nes.*")
384 union_array_member_info = walker_object.type_info().attributes[TypeAttribute.FIELDS][3]
385 self.assertTrue(regex_filter.before_array(walker_object.union_array, union_array_member_info))
387 self.assertFalse(regex_filter.before_compound(walker_object.union_array[0], union_array_member_info, 0))
388 self.assertTrue(regex_filter.after_compound(walker_object.union_array[0], union_array_member_info, 0))
390 self.assertFalse(regex_filter.before_compound(walker_object.union_array[1], union_array_member_info, 1))
391 self.assertTrue(regex_filter.after_compound(walker_object.union_array[1], union_array_member_info, 1))
393 self.assertTrue(regex_filter.before_compound(walker_object.union_array[2], union_array_member_info, 2))
394 self.assertTrue(regex_filter.after_compound(walker_object.union_array[2], union_array_member_info, 2))
396 self.assertTrue(regex_filter.after_array(walker_object.union_array, union_array_member_info))
398 def test_regex_array_no_match(self):
399 walker_object = WalkerObject(
400 13,
401 WalkerNested("nested"),
402 "test",
403 [WalkerUnion(nested_array_=[WalkerNested("nestedArray")])],
404 )
406 regex_filter = RegexWalkFilter("^unionArray\\[\\d*\\]\\.te.*")
408 union_array_member_info = walker_object.type_info().attributes[TypeAttribute.FIELDS][3]
409 self.assertFalse(regex_filter.before_array(walker_object.union_array, union_array_member_info))
410 self.assertTrue(regex_filter.after_array(walker_object.union_array, union_array_member_info))
412 def test_regex_none_compound_match(self):
413 walker_object = WalkerObject(13, None, "test", [WalkerUnion(text_="1"), WalkerUnion(value_=2)])
415 regex_filter = RegexWalkFilter("nested")
417 nested_member_info = walker_object.type_info().attributes[TypeAttribute.FIELDS][1]
418 # note that the None compounds are processed as values!
419 self.assertTrue(regex_filter.before_value(walker_object.nested, nested_member_info))
420 self.assertTrue(regex_filter.after_value(walker_object.nested, nested_member_info))
422 def test_regex_none_compound_no_match(self):
423 walker_object = WalkerObject(13, None, "test", [WalkerUnion(text_="1"), WalkerUnion(value_=2)])
425 regex_filter = RegexWalkFilter("^nested\\.text$")
427 nested_member_info = walker_object.type_info().attributes[TypeAttribute.FIELDS][1]
428 # note that the None compounds are processed as values!
429 self.assertFalse(regex_filter.before_value(walker_object.nested, nested_member_info))
430 self.assertTrue(regex_filter.after_value(walker_object.nested, nested_member_info))
432 def test_regex_none_array_match(self):
433 regex_filter = RegexWalkFilter("optionalUnionArray")
435 optional_union_array_member_info = WalkerObject.type_info().attributes[TypeAttribute.FIELDS][4]
436 # note that the None arrays are processed as values!
437 self.assertTrue(regex_filter.before_value(None, optional_union_array_member_info))
438 self.assertTrue(regex_filter.after_value(None, optional_union_array_member_info))
440 def test_regex_none_array_no_match(self):
441 regex_filter = RegexWalkFilter("^optionalUnionArray\\[\\d+\\]\\.nestedArray.*")
443 union_array_member_info = WalkerObject.type_info().attributes[TypeAttribute.FIELDS][3]
444 # note that the None arrays are processed as values!
445 self.assertFalse(regex_filter.before_value(None, union_array_member_info))
446 self.assertTrue(regex_filter.after_value(None, union_array_member_info))
449class ArrayLengthWalkFilterTest(unittest.TestCase):
451 def test_array_length_0(self):
452 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
453 walker_array_member_info = MemberInfo(
454 "walker_array",
455 TypeInfo("Walker", None),
456 attributes={MemberAttribute.ARRAY_LENGTH: None},
457 )
458 array_length_filter = ArrayLengthWalkFilter(0)
460 self.assertTrue(array_length_filter.before_array([], walker_array_member_info))
461 self.assertFalse(array_length_filter.before_compound(object(), walker_array_member_info, 0))
462 self.assertFalse(array_length_filter.after_compound(object(), walker_array_member_info, 0))
463 self.assertFalse(array_length_filter.before_value(None, walker_array_member_info, 1))
464 self.assertFalse(array_length_filter.after_value(None, walker_array_member_info, 1))
465 self.assertTrue(array_length_filter.after_array([], walker_array_member_info))
467 self.assertTrue(array_length_filter.before_compound(object(), walker_member_info))
468 self.assertTrue(array_length_filter.before_value(None, walker_member_info))
469 self.assertTrue(array_length_filter.after_value(None, walker_member_info))
470 self.assertTrue(array_length_filter.before_array([], walker_array_member_info))
471 self.assertFalse(array_length_filter.before_value(None, walker_array_member_info, 0))
472 self.assertFalse(array_length_filter.after_value(None, walker_array_member_info, 0))
473 self.assertTrue(array_length_filter.after_array([], walker_array_member_info))
474 self.assertTrue(array_length_filter.after_compound(object(), walker_member_info))
477class AndWalkFilterTest(unittest.TestCase):
479 def test_empty(self):
480 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
481 walk_filter = AndWalkFilter([])
483 self.assertTrue(walk_filter.before_array([], walker_member_info))
484 self.assertTrue(walk_filter.after_array([], walker_member_info))
485 self.assertTrue(walk_filter.before_compound(object(), walker_member_info))
486 self.assertTrue(walk_filter.after_compound(object(), walker_member_info))
487 self.assertTrue(walk_filter.before_value(None, walker_member_info))
488 self.assertTrue(walk_filter.after_value(None, walker_member_info))
490 def test_true_true(self):
491 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
492 # two filters returning True
493 walk_filter = AndWalkFilter([TestWalkFilter(), TestWalkFilter()])
495 self.assertTrue(walk_filter.before_array([], walker_member_info))
496 self.assertTrue(walk_filter.after_array([], walker_member_info))
497 self.assertTrue(walk_filter.before_compound(object(), walker_member_info))
498 self.assertTrue(walk_filter.after_compound(object(), walker_member_info))
499 self.assertTrue(walk_filter.before_value(None, walker_member_info))
500 self.assertTrue(walk_filter.after_value(None, walker_member_info))
502 def test_false_false(self):
503 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
504 # two filters returning False
505 walk_filter = AndWalkFilter(
506 [
507 TestWalkFilter(
508 before_array=False,
509 after_array=False,
510 before_compound=False,
511 after_compound=False,
512 before_value=False,
513 after_value=False,
514 ),
515 TestWalkFilter(
516 before_array=False,
517 after_array=False,
518 before_compound=False,
519 after_compound=False,
520 before_value=False,
521 after_value=False,
522 ),
523 ]
524 )
526 self.assertFalse(walk_filter.before_array([], walker_member_info))
527 self.assertFalse(walk_filter.after_array([], walker_member_info))
528 self.assertFalse(walk_filter.before_compound(object(), walker_member_info))
529 self.assertFalse(walk_filter.after_compound(object(), walker_member_info))
530 self.assertFalse(walk_filter.before_value(None, walker_member_info))
531 self.assertFalse(walk_filter.after_value(None, walker_member_info))
533 def test_true_false(self):
534 walker_member_info = MemberInfo("walker", TypeInfo("Walker", None))
535 walk_filter = AndWalkFilter(
536 [
537 TestWalkFilter(), # returning true
538 TestWalkFilter(
539 before_array=False,
540 after_array=False,
541 before_compound=False,
542 after_compound=False,
543 before_value=False,
544 after_value=False,
545 ), # returning false
546 ]
547 )
549 self.assertFalse(walk_filter.before_array([], walker_member_info))
550 self.assertFalse(walk_filter.after_array([], walker_member_info))
551 self.assertFalse(walk_filter.before_compound(object(), walker_member_info))
552 self.assertFalse(walk_filter.after_compound(object(), walker_member_info))
553 self.assertFalse(walk_filter.before_value(None, walker_member_info))
554 self.assertFalse(walk_filter.after_value(None, walker_member_info))
557def _create_walker_object():
558 return WalkerObject(
559 13,
560 WalkerNested("nested"),
561 "test",
562 [
563 WalkerUnion(text_="1"),
564 WalkerUnion(value_=2),
565 WalkerUnion(nested_array_=[WalkerNested("nestedArray")]),
566 ],
567 )