Writing to Kafka#

For writing data to Kafka, use DBWriter with specific options (see below).

Note

Unlike other connection classes, Kafka only accepts dataframe with fixed schema (see documentation):

DataFrame Schema
from pyspark.sql.types import (
    ArrayType,
    BinaryType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)

schema = StructType(
    [
        # mandatory fields:
        StructField("value", BinaryType(), nullable=True),
        # optional fields, can be omitted:
        StructField("key", BinaryType(), nullable=True),
        StructField("partition", IntegerType(), nullable=True),
        # this field can be passed only with ``include_headers=True``
        StructField(
            "headers",
            ArrayType(
                StructType(
                    [
                        StructField("key", StringType(), nullable=False),
                        StructField("value", BinaryType(), nullable=True),
                    ],
                ),
            ),
            nullable=True,
        ),
    ],
)

You cannot pass dataframe with other column names or types.

Warning

Columns:

  • value

  • key

  • headers[*].value

can only be string or raw bytes. If they contain values of custom type, these values should be serialized manually.

pydantic model onetl.connection.db_connection.kafka.options.KafkaWriteOptions#

Writing options for Kafka connector.

Note

You can pass any value supported by connector, even if it is not mentioned in this documentation.

The set of supported options depends on connector version. See link above.

Warning

Options:
  • kafka.*

  • topic

are populated from connection attributes, and cannot be overridden by the user in WriteOptions to avoid issues.

Examples

Write options initialization

options = Kafka.WriteOptions(
    if_exists="append",
    include_headers=True,
)
field if_exists: KafkaTopicExistBehaviorKafka = KafkaTopicExistBehaviorKafka.APPEND#

Behavior of writing data into existing topic.

Same as df.write.mode(...).

Possible values:
  • append (default) - Adds new objects into existing topic.

  • error - Raises an error if topic already exists.

field include_headers: bool = False (alias 'includeHeaders')#

If True, headers column from dataframe can be written to Kafka (requires Kafka 2.0+).

If False and dataframe contains headers column, an exception will be raised.