Source code for zserio.pubsub

"""
The module provides classes for Zserio Pub/Sub.
"""

import typing

from zserio.exception import PythonRuntimeException


[docs]class PubsubInterface: """Interface for Pub/Sub client backends."""
[docs] def publish(self, topic: str, data: bytes, context: typing.Any = None) -> None: """ Publishes given data as a specified topic. :param topic: Topic definition. :param data: Data to publish. :param context: Context specific for a particular Pub/Sub implementation. :raises PubsubException: If the publishing fails. """ raise NotImplementedError()
[docs] def subscribe( self, topic: str, callback: typing.Callable[[str, bytes], None], context: typing.Any = None, ) -> int: """ Subscribes a topic. :param topic: Topic definition to subscribe. Note that the definition format depends on the particular Pub/Sub backend implementation and therefore e.g. wildcards can be used only if they are supported by Pub/Sub backend. :param callback: Callback to be called when a message with the specified topic arrives. :param context: Context specific for a particular Pub/Sub implementation. :returns: Subscription ID. :raises PubsubException: If subscribing fails. """ raise NotImplementedError()
[docs] def unsubscribe(self, subscription_id: int) -> None: """ Unsubscribes the subscription with the given ID. :param id: ID of the subscription to be unsubscribed. :raises PubsubException: If unsubscribing fails. """ raise NotImplementedError()
[docs]class PubsubException(PythonRuntimeException): """ Exception thrown in case of an error in Zserio Pub/Sub. """