Kemux Stream object

Stream class definition.

A stream is a collection of input and output processors that are connected and ensure that messages are ingested, transformed, and sent to the correct output topics, while keeping the data schema consistent to the user's specifications.

Stream dataclass

Stream Class

A class that connects the input processor with any number of output processors.

Attributes:
  • input (InputProcessor) –

    The input processor that will be used to ingest messages.

  • outputs (dict[str, OutputProcessor]) –

    The output processors that will be used to send messages.

  • logger (Logger) –

    The logger that will be used to log messages.

add_output(stream_output, output_schema=None)

Add an output processor to the stream.

Parameters:
  • stream_output (OutputProcessor) –

    The output processor to be added.

  • output_schema (OutputSchema, default: None ) –

    The output schema to be used. Defaults to None.

process(event) async

Process incoming messages as separate Events coming from input Kafka topic.

Parameters:
  • event (EventT) –

    The event to be processed.

remove_output(output_topic_name)

Remove an output processor from the stream.

Parameters:
  • output_topic_name (str) –

    The name of the output processor to be removed.

set_input(stream_input, input_schema=None)

Set the input processor of the stream.

Parameters:
  • stream_input (InputProcessor) –

    The input processor to be used.

  • input_schema (InputSchema, default: None ) –

    The input schema to be used. Defaults to None.

topics()

Get the input and output topics of the stream.

Returns:
  • tuple[str, list[str]]

    tuple[str, list[str]]: The input and output topics of the stream.

Raises:
  • ValueError

    If the stream is not initialized and the input stream is not defined.

find_streams_order(info)

Find the order of the streams based on their dependency on each other.

Parameters:
  • info (list[tuple]) –

    The input and output topics of the streams.

Returns:
  • list[tuple]

    list[tuple]: The ordered input and output topics of the streams.

order_streams(streams)

Order the streams based on their dependency on each other.

Parameters:
  • streams (dict[str, Stream]) –

    The streams to be ordered.

Returns:
  • dict[str, Stream]

    dict[str, Stream]: The ordered streams.

Raises:
  • ValueError

    If a stream input is not defined.