Kemux Manager

Manager class for Kemux.

The main class that is used to initialize and start the Kemux receiver.

Manager dataclass

Manager Class

The main class that is used to initialize and start the Kemux receiver and operate on the incoming and outgoing messages accordign to the specifications defined by the user in the Stream subclasses.

Attributes:
  • name (str) –

    The name of the Kemux application.

  • kafka_address (str) –

    The address of the Kafka broker.

  • streams_dir (str) –

    The path to the directory containing the Stream subclasses.

  • persistent_data_directory (str) –

    The path to the directory where persistent Faust data will be stored.

  • logger (Logger) –

    The logger that will be used to log messages.

  • agents (dict[str, AgentT]) –

    The Faust agents that will be used to process incoming messages.

  • streams (dict[str, Stream]) –

    The streams that will be used to process incoming messages.

add_stream(name, stream_input_class, stream_outputs_class)

Add a stream to the Kemux manager.

Parameters:
  • name (str) –

    The name of the stream.

  • stream_input_class (type) –

    The class of the stream input, containing the Schema and Processor.

  • stream_outputs_class (type) –

    The class of the stream outputs, containing the Schema and Processor.

create_processing_function(stream)

Create a processing function for a stream, specific to the name of its input topic.

Parameters:
  • stream (Stream) –

    The stream to create the processing function for.

Returns:
  • Callable[[StreamT[InputSchema]], Awaitable[None]]

    typing.Callable[[faust.StreamT[kemux.data.schema.input.InputSchema]], typing.Awaitable[None]]: The record processing function.

init(name, kafka_address, data_dir, streams_dir=None) classmethod

Initialize the Kemux receiver.

Parameters:
  • name (str) –

    The name of the Kemux application.

  • kafka_address (str) –

    The address of the Kafka broker.

  • data_dir (str) –

    The path to the directory where persistent Faust data will be stored.

  • streams_dir (str, default: None ) –

    The path to the directory containing the Stream subclasses. Defaults to None.

Returns:
  • Manager( Manager ) –

    The initialized Manager instance.

initialize_streams() async

Initialize the streams i.e. initialize the input and output topic handlers and the stream agents.

Raises:
  • ValueError

    If a stream input is not defined or is invalid.

remove_stream(name)

Remove a stream from the Kemux manager.

Parameters:
  • name (str) –

    The name of the stream.

start()

Start the Kemux receiver i.e. the underlying Faust application and the stream agents.

Raises:
  • ValueError

    If no streams have been loaded (either manually or from a directory)