Reference

Baseclasses

class heisskleber.Sender

Abstract interface for asynchronous data sinks.

This class defines a protocol for sending data to various output streams asynchronously. It supports context manager usage and ensures proper resource management.

packer

Component responsible for serializing type T data before sending.

abstractmethod async send(data, **kwargs)

Send data through the implemented output stream.

Parameters:
  • data (TypeVar(T)) – The data to be sent, of type T.

  • **kwargs (Any) – Additional implementation-specific arguments.

Return type:

None

abstractmethod async start()

Initialize and start the sink’s background processes and tasks.

Return type:

None

abstractmethod async stop()

Stop and cleanup the sink’s background processes and tasks.

This method should be called when the sink is no longer needed. It should handle cleanup of any resources initialized in start().

Return type:

None

class heisskleber.Receiver

Abstract interface for asynchronous data sources.

This class defines a protocol for receiving data from various input streams asynchronously. It supports both async iteration and context manager patterns, and ensures proper resource management.

The source is covariant in its type parameter, allowing for type-safe subtyping relationships.

unpacker

Component responsible for deserializing incoming data into type T_co.

Example

>>> async with CustomSource(unpacker) as source:
...     async for data, metadata in source:
...         print(f"Received: {data}, metadata: {metadata}")
abstractmethod async receive(**kwargs)

Receive data from the implemented input stream.

Returns:

A tuple containing:
  • The received and unpacked data of type T_co

  • A dictionary of metadata associated with the received data

Return type:

tuple[T_co, dict[str, Any]]

Raises:

Any implementation-specific exceptions that might occur during receiving.

Parameters:

kwargs (Any)

abstractmethod async start()

Initialize and start any background processes and tasks of the source.

Return type:

None

abstractmethod async stop()

Stop any background processes and tasks.

Return type:

None

Serialization

See Serialization for a tutorial on how to implement custom packer and unpacker for (de-)serialization.

class heisskleber.core.Packer(*args, **kwargs)

Packer Interface.

This class defines a protocol for packing data. It takes data and converts it into a bytes payload.

None
class heisskleber.core.Unpacker(*args, **kwargs)

Unpacker Interface.

This abstract base class defines an interface for unpacking payloads. It takes a payload of bytes, creates a data dictionary and an optional topic, and returns a tuple containing the topic and data.

Errors

class heisskleber.core.UnpackerError(payload)

Raised when unpacking operations fail.

This exception wraps underlying errors that may occur during unpacking, providing a consistent interface for error handling.

Parameters:

payload (str | bytes | bytearray) – The bytes payload that failed to unpack.

class heisskleber.core.PackerError(data)

Raised when unpacking operations fail.

This exception wraps underlying errors that may occur during unpacking, providing a consistent interface for error handling.

Parameters:

data (Any) – The data object that caused the PackerError

Implementations (Adapters)

MQTT

Async wrappers for mqtt functionality.

MQTT implementation is achieved via the aiomqtt package, which is an async wrapper around the paho-mqtt package.

class heisskleber.mqtt.MqttSender(config, packer=<heisskleber.core.packer.JSONPacker object>)

MQTT publisher with queued message handling.

This sink implementation provides asynchronous MQTT publishing capabilities with automatic connection management and message queueing. Network operations are handled in a separate task.

config

MQTT configuration in a dataclass.

packer

Callable to pack data from type T to bytes for transport.

Parameters:
async send(data, topic='mqtt', qos=0, retain=False, **kwargs)

Queue data for asynchronous publication to the mqtt broker.

Parameters:
  • data (TypeVar(T)) – The data to be published.

  • topic (str) – The mqtt topic to publish to.

  • qos (int) – MQTT QOS level (0, 1, or 2). Defaults to 0.o

  • retain (bool) – Whether to set the MQTT retain flag. Defaults to False.

  • **kwargs (Any) – Not implemented.

Raises:

PackerError – The data could not be serialized with the provided Packer.

Return type:

None

class heisskleber.mqtt.MqttReceiver(config, topic, unpacker=<heisskleber.core.unpacker.JSONUnpacker object>)

Asynchronous MQTT subscriber based on aiomqtt.

This class implements an asynchronous MQTT subscriber that handles connection, subscription, and message reception from an MQTT broker. It uses aiomqtt as the underlying MQTT client implementation.

The subscriber maintains a queue of received messages which can be accessed through the receive method.

config

Stored configuration for MQTT connection.

Type:

MqttConf

topics

Topics to subscribe to.

Type:

Union[str, List[str]]

Parameters:
async receive(**kwargs)

Receive and process the next message from the queue.

Return type:

tuple[TypeVar(T), dict[str, Any]]

Returns:

tuple[T, dict[str, Any]]
  • The unpacked message data

  • A dictionary with metadata including the message topic

Raises:
  • TypeError – If the message payload is not of type bytes.

  • UnpackerError – If the message could not be unpacked with the unpacker protocol.

Parameters:

kwargs (Any)

async subscribe(topic, qos=None)

Subscribe to an additional MQTT topic.

Parameters:
  • topic (str) – The topic to subscribe to

  • qos (int | None) – Quality of Service level, uses config.qos if None

Return type:

None

class heisskleber.mqtt.MqttConf(host='localhost', port=1883, ssl=False, user='', password='', qos=0, retain=False, max_saved_messages=1000, timeout=60, keep_alive=60, will=None)

MQTT configuration class.

Parameters:
classmethod from_dict(config_dict)

Create a MqttConf object from a dictionary.

Parameters:

config_dict (dict[str, Any])

Return type:

MqttConf

ZMQ

class heisskleber.zmq.ZmqConf(protocol='tcp', host='127.0.0.1', publisher_port=5555, subscriber_port=5556, packstyle='json')

ZMQ Configuration file.

Parameters:
  • protocol (str)

  • host (str)

  • publisher_port (int)

  • subscriber_port (int)

  • packstyle (str)

class heisskleber.zmq.ZmqSender(config, packer=<heisskleber.core.packer.JSONPacker object>)

Async publisher that sends messages to a ZMQ PUB socket.

config

The ZmqConf configuration object for the connection.

packer

The packer strategy to use for serializing the data. Defaults to json packer with utf-8 encoding.

Parameters:
async send(data, topic='zmq', **kwargs)

Take the data as a dict, serialize it with the given packer and send it to the zmq socket.

Parameters:
Return type:

None

class heisskleber.zmq.ZmqReceiver(config, topic, unpacker=<heisskleber.core.unpacker.JSONUnpacker object>)

Async source that subscribes to one or many topics from a zmq broker and receives messages via the receive() function.

config

The ZmqConf configuration object for the connection.

unpacker

The unpacker function to use for deserializing the data.

Parameters:
async receive(**kwargs)

Read a message from the zmq bus and return it.

Returns:

str, message: dict): the message received

Return type:

tuple(topic

Raises:

UnpackerError – If payload could not be unpacked with provided unpacker.

Parameters:

kwargs (Any)

Serial

class heisskleber.serial.SerialConf(port='/dev/serial0', baudrate=9600, bytesize=8, encoding='ascii', parity='N', stopbits=1, termination_char=b'\\n')

Serial Config class.

port

The port to connect to. Defaults to /dev/serial0.

baudrate

The baudrate of the serial connection. Defaults to 9600.

bytesize

The bytesize of the messages. Defaults to 8.

encoding

The string encoding of the messages. Defaults to ascii.

parity

The parity checking value. One of “N” for none, “E” for even, “O” for odd. Defaults to None.

stopbits

Stopbits. One of 1, 2 or 1.5. Defaults to 1.

Note

stopbits 1.5 is not yet implemented.

Parameters:
class heisskleber.serial.SerialSender(config, packer)

An asynchronous sink for writing data to a serial port.

This class implements the AsyncSink interface for writing data to a serial port. It uses a thread pool executor to perform blocking I/O operations asynchronously.

config

Configuration for the serial port.

packer

Function to pack data for sending.

Parameters:
async send(data, **kwargs)

Send data to the serial port.

This method packs the data, writes it to the serial port, and then flushes the port.

Parameters:
  • data (TypeVar(T)) – The data to be sent.

  • **kwargs (dict[str, Any]) – Not implemented.

Raises:

PackerError – If data could not be packed to bytes with the provided packer.

Return type:

None

Note

If the serial port is not connected, it will implicitly attempt to connect first.

class heisskleber.serial.SerialReceiver(config, unpacker)

An asynchronous source for reading data from a serial port.

This class implements the AsyncSource interface for reading data from a serial port. It uses a thread pool executor to perform blocking I/O operations asynchronously.

config

Configuration for the serial port.

unpacker

Function to unpack received data.

Parameters:
async receive(*, termination_char=None, read_bytes=-1, **kwargs)

Receive data from the serial port.

This method reads a line from the serial port, unpacks it, and returns the data. If the serial port is not connected, it will attempt to connect first.

Parameters:
  • termination_char (bytes | None) – Line termination character that signals the message end.

  • read_bytes (int) – Number of bytes to read. Defaults to -1, i.e. infinite.

  • kwargs (Any)

Returns:

A tuple containing the unpacked data and any extra information.

Return type:

tuple[T, dict[str, Any]]

Raises:

UnpackerError – If the data could not be unpacked with the provided unpacker.

TCP

class heisskleber.tcp.TcpConf(host='localhost', port=6000, timeout=60, retry_delay=0.5, restart_behavior=RestartBehavior.ALWAYS)

Configuration dataclass for TCP connections.

Parameters:
  • host (str)

  • port (int)

  • timeout (int)

  • retry_delay (float)

  • restart_behavior (RestartBehavior)

class heisskleber.tcp.TcpSender(config, packer)

Async TCP Sink.

config

The TcpConf configuration object.

packer

The packer protocol to serialize data before sending.

Parameters:
async send(data, **kwargs)

Send data via tcp connection.

Parameters:
  • data (TypeVar(T)) – The data to be sent.

  • kwargs (dict[str, Any]) – Not implemented.

Return type:

None

class heisskleber.tcp.TcpReceiver(config, unpacker=<heisskleber.core.unpacker.JSONUnpacker object>)

Async TCP connection, connects to host:port and reads byte encoded strings.

Parameters:
async receive(**kwargs)

Receive data from a connection.

Attempt to read data from the connection and handle the process of re-establishing the connection if necessary.

Return type:

tuple[TypeVar(T), dict[str, Any]]

Returns:

tuple[T, dict[str, Any]]
  • The unpacked message data

  • A dictionary with metadata including the message topic

Raises:
  • TypeError – If the message payload is not of type bytes.

  • UnpackerError – If the message could not be unpacked with the unpacker protocol.

Parameters:

kwargs (Any)

UDP

class heisskleber.udp.UdpConf(port=1234, host='127.0.0.1', max_queue_size=1000, encoding='utf-8', delimiter='\\r\\n')

UDP configuration.

Parameters:
  • port (int)

  • host (str)

  • max_queue_size (int)

  • encoding (str)

  • delimiter (str)

class heisskleber.udp.UdpSender(config, packer=<heisskleber.core.packer.JSONPacker object>)

UDP sink for sending data via UDP protocol.

Parameters:
  • config (UdpConf) – UDP configuration parameters

  • packer (Packer[TypeVar(T)]) – Function to serialize data, defaults to JSON packing

async send(data, **kwargs)

Send data over UDP connection.

Parameters:
  • data (TypeVar(T)) – Data to send

  • **kwargs (dict[str, Any]) – Additional arguments passed to send

Return type:

None

class heisskleber.udp.UdpReceiver(config, unpacker=<heisskleber.core.unpacker.JSONUnpacker object>)

An asynchronous UDP subscriber based on asyncio.protocols.DatagramProtocol.

Parameters:
async receive(**kwargs)

Get the next message from the udp connection.

Return type:

tuple[TypeVar(T), dict[str, Any]]

Returns:

tuple[T, dict[str, Any]]
  • The data as returned by the unpacker.

  • A dictionary containing extra information.

Raises:

UnpackerError – If the received message could not be unpacked.

Parameters:

kwargs (Any)

File

class heisskleber.file.FileConf(rollover=3600, name_fmt='%Y%m%d_%h%M%s.txt', batch_interval=5, directory='./', watchfile='', format='json', tz=datetime.timezone.utc)

Config class for file operations.

Parameters:
class heisskleber.file.FileWriter(config, packer=None, header_func=None)

Asynchronous file writer implementation of the Sender interface.

Writes data to files with automatic rollover based on time intervals. Files are named according to the configured datetime format.

Parameters:
async send(data, **kwargs)

Write data to the current file.

Parameters:
  • data (TypeVar(T)) – Data to write

  • **kwargs (Any) – Additional arguments (unused)

Raises:

RuntimeError – If writer hasn’t been started

Return type:

None

class heisskleber.file.FileReader(config, unpacker)

Asynchronous File Reader.

Currently only reads bytes.

Parameters:
async receive(**kwargs)

Get the next data and extra tuple from the watched file.

Parameters:

kwargs (Any)

Return type:

tuple[TypeVar(T), dict[str, Any]]