Reference¶
Baseclasses¶
- class heisskleber.Sender¶
Abstract interface for asynchronous data sinks.
This class defines a protocol for sending data to various output streams asynchronously. It supports context manager usage and ensures proper resource management.
- packer¶
Component responsible for serializing type T data before sending.
- abstractmethod async send(data, **kwargs)¶
Send data through the implemented output stream.
- abstractmethod async start()¶
Initialize and start the sink’s background processes and tasks.
- Return type:
- class heisskleber.Receiver¶
Abstract interface for asynchronous data sources.
This class defines a protocol for receiving data from various input streams asynchronously. It supports both async iteration and context manager patterns, and ensures proper resource management.
The source is covariant in its type parameter, allowing for type-safe subtyping relationships.
- unpacker¶
Component responsible for deserializing incoming data into type T_co.
Example
>>> async with CustomSource(unpacker) as source: ... async for data, metadata in source: ... print(f"Received: {data}, metadata: {metadata}")
- abstractmethod async receive(**kwargs)¶
Receive data from the implemented input stream.
- abstractmethod async start()¶
Initialize and start any background processes and tasks of the source.
- Return type:
Serialization¶
See Serialization for a tutorial on how to implement custom packer and unpacker for (de-)serialization.
- class heisskleber.core.Packer(*args, **kwargs)¶
Packer Interface.
This class defines a protocol for packing data. It takes data and converts it into a bytes payload.
- None¶
- class heisskleber.core.Unpacker(*args, **kwargs)¶
Unpacker Interface.
This abstract base class defines an interface for unpacking payloads. It takes a payload of bytes, creates a data dictionary and an optional topic, and returns a tuple containing the topic and data.
Errors¶
- class heisskleber.core.UnpackerError(payload)¶
Raised when unpacking operations fail.
This exception wraps underlying errors that may occur during unpacking, providing a consistent interface for error handling.
Implementations (Adapters)¶
MQTT¶
Async wrappers for mqtt functionality.
MQTT implementation is achieved via the aiomqtt package, which is an async wrapper around the paho-mqtt package.
- class heisskleber.mqtt.MqttSender(config, packer=<heisskleber.core.packer.JSONPacker object>)¶
MQTT publisher with queued message handling.
This sink implementation provides asynchronous MQTT publishing capabilities with automatic connection management and message queueing. Network operations are handled in a separate task.
- config¶
MQTT configuration in a dataclass.
- packer¶
Callable to pack data from type T to bytes for transport.
- async send(data, topic='mqtt', qos=0, retain=False, **kwargs)¶
Queue data for asynchronous publication to the mqtt broker.
- Parameters:
- Raises:
PackerError – The data could not be serialized with the provided Packer.
- Return type:
- class heisskleber.mqtt.MqttReceiver(config, topic, unpacker=<heisskleber.core.unpacker.JSONUnpacker object>)¶
Asynchronous MQTT subscriber based on aiomqtt.
This class implements an asynchronous MQTT subscriber that handles connection, subscription, and message reception from an MQTT broker. It uses aiomqtt as the underlying MQTT client implementation.
The subscriber maintains a queue of received messages which can be accessed through the receive method.
- async receive(**kwargs)¶
Receive and process the next message from the queue.
- Return type:
- Returns:
- tuple[T, dict[str, Any]]
The unpacked message data
A dictionary with metadata including the message topic
- Raises:
TypeError – If the message payload is not of type bytes.
UnpackerError – If the message could not be unpacked with the unpacker protocol.
- Parameters:
kwargs (
Any)
- class heisskleber.mqtt.MqttConf(host='localhost', port=1883, ssl=False, user='', password='', qos=0, retain=False, max_saved_messages=1000, timeout=60, keep_alive=60, will=None)¶
MQTT configuration class.
- Parameters:
ZMQ¶
- class heisskleber.zmq.ZmqConf(protocol='tcp', host='127.0.0.1', publisher_port=5555, subscriber_port=5556, packstyle='json')¶
ZMQ Configuration file.
- class heisskleber.zmq.ZmqSender(config, packer=<heisskleber.core.packer.JSONPacker object>)¶
Async publisher that sends messages to a ZMQ PUB socket.
- config¶
The ZmqConf configuration object for the connection.
- packer¶
The packer strategy to use for serializing the data. Defaults to json packer with utf-8 encoding.
- class heisskleber.zmq.ZmqReceiver(config, topic, unpacker=<heisskleber.core.unpacker.JSONUnpacker object>)¶
Async source that subscribes to one or many topics from a zmq broker and receives messages via the receive() function.
- config¶
The ZmqConf configuration object for the connection.
- unpacker¶
The unpacker function to use for deserializing the data.
- async receive(**kwargs)¶
Read a message from the zmq bus and return it.
- Returns:
str, message: dict): the message received
- Return type:
tuple(topic
- Raises:
UnpackerError – If payload could not be unpacked with provided unpacker.
- Parameters:
kwargs (
Any)
Serial¶
- class heisskleber.serial.SerialConf(port='/dev/serial0', baudrate=9600, bytesize=8, encoding='ascii', parity='N', stopbits=1, termination_char=b'\\n')¶
Serial Config class.
- port¶
The port to connect to. Defaults to /dev/serial0.
- baudrate¶
The baudrate of the serial connection. Defaults to 9600.
- bytesize¶
The bytesize of the messages. Defaults to 8.
- encoding¶
The string encoding of the messages. Defaults to ascii.
- parity¶
The parity checking value. One of “N” for none, “E” for even, “O” for odd. Defaults to None.
- stopbits¶
Stopbits. One of 1, 2 or 1.5. Defaults to 1.
Note
stopbits 1.5 is not yet implemented.
- class heisskleber.serial.SerialSender(config, packer)¶
An asynchronous sink for writing data to a serial port.
This class implements the AsyncSink interface for writing data to a serial port. It uses a thread pool executor to perform blocking I/O operations asynchronously.
- config¶
Configuration for the serial port.
- packer¶
Function to pack data for sending.
- Parameters:
config (
SerialConf)
- async send(data, **kwargs)¶
Send data to the serial port.
This method packs the data, writes it to the serial port, and then flushes the port.
- Parameters:
- Raises:
PackerError – If data could not be packed to bytes with the provided packer.
- Return type:
Note
If the serial port is not connected, it will implicitly attempt to connect first.
- class heisskleber.serial.SerialReceiver(config, unpacker)¶
An asynchronous source for reading data from a serial port.
This class implements the AsyncSource interface for reading data from a serial port. It uses a thread pool executor to perform blocking I/O operations asynchronously.
- config¶
Configuration for the serial port.
- unpacker¶
Function to unpack received data.
- Parameters:
config (
SerialConf)
- async receive(*, termination_char=None, read_bytes=-1, **kwargs)¶
Receive data from the serial port.
This method reads a line from the serial port, unpacks it, and returns the data. If the serial port is not connected, it will attempt to connect first.
- Parameters:
- Returns:
A tuple containing the unpacked data and any extra information.
- Return type:
- Raises:
UnpackerError – If the data could not be unpacked with the provided unpacker.
TCP¶
- class heisskleber.tcp.TcpConf(host='localhost', port=6000, timeout=60, retry_delay=0.5, restart_behavior=RestartBehavior.ALWAYS)¶
Configuration dataclass for TCP connections.
- class heisskleber.tcp.TcpSender(config, packer)¶
Async TCP Sink.
- config¶
The TcpConf configuration object.
- packer¶
The packer protocol to serialize data before sending.
- class heisskleber.tcp.TcpReceiver(config, unpacker=<heisskleber.core.unpacker.JSONUnpacker object>)¶
Async TCP connection, connects to host:port and reads byte encoded strings.
- async receive(**kwargs)¶
Receive data from a connection.
Attempt to read data from the connection and handle the process of re-establishing the connection if necessary.
- Return type:
- Returns:
- tuple[T, dict[str, Any]]
The unpacked message data
A dictionary with metadata including the message topic
- Raises:
TypeError – If the message payload is not of type bytes.
UnpackerError – If the message could not be unpacked with the unpacker protocol.
- Parameters:
kwargs (
Any)
UDP¶
- class heisskleber.udp.UdpConf(port=1234, host='127.0.0.1', max_queue_size=1000, encoding='utf-8', delimiter='\\r\\n')¶
UDP configuration.
- class heisskleber.udp.UdpSender(config, packer=<heisskleber.core.packer.JSONPacker object>)¶
UDP sink for sending data via UDP protocol.
- Parameters:
File¶
- class heisskleber.file.FileConf(rollover=3600, name_fmt='%Y%m%d_%h%M%s.txt', batch_interval=5, directory='./', watchfile='', format='json', tz=datetime.timezone.utc)¶
Config class for file operations.
- class heisskleber.file.FileWriter(config, packer=None, header_func=None)¶
Asynchronous file writer implementation of the Sender interface.
Writes data to files with automatic rollover based on time intervals. Files are named according to the configured datetime format.
- Parameters:
- async send(data, **kwargs)¶
Write data to the current file.
- Parameters:
- Raises:
RuntimeError – If writer hasn’t been started
- Return type:
- class heisskleber.file.FileReader(config, unpacker)¶
Asynchronous File Reader.
Currently only reads bytes.