Reading from Kafka#

For reading data from Kafka, use DBReader with specific options (see below).

Note

Unlike other connection classes, Kafka always return dataframe with fixed schema (see documentation):

DataFrame Schema
from pyspark.sql.types import (
    ArrayType,
    BinaryType,
    IntegerType,
    LongType,
    StringType,
    StructField,
    StructType,
    TimestampType,
)

schema = StructType(
    [
        StructField("value", BinaryType(), nullable=True),
        StructField("key", BinaryType(), nullable=True),
        StructField("topic", StringType(), nullable=False),
        StructField("partition", IntegerType(), nullable=False),
        StructField("offset", LongType(), nullable=False),
        StructField("timestamp", TimestampType(), nullable=False),
        StructField("timestampType", IntegerType(), nullable=False),
        # this field is returned only with ``include_headers=True``
        StructField(
            "headers",
            ArrayType(
                StructType(
                    [
                        StructField("key", StringType(), nullable=False),
                        StructField("value", BinaryType(), nullable=True),
                    ],
                ),
            ),
            nullable=True,
        ),
    ],
)

Warning

Columns:

  • value

  • key

  • headers[*].value

are always returned as raw bytes. If they contain values of custom type, these values should be deserialized manually.

pydantic model onetl.connection.db_connection.kafka.options.KafkaReadOptions#

Reading options for Kafka connector.

Note

You can pass any value supported by connector, even if it is not mentioned in this documentation.

The set of supported options depends on connector version. See link above.

Warning

Options:
  • assign

  • endingOffsets

  • endingOffsetsByTimestamp

  • kafka.*

  • startingOffsets

  • startingOffsetsByTimestamp

  • startingTimestamp

  • subscribe

  • subscribePattern

are populated from connection attributes, and cannot be overridden by the user in ReadOptions to avoid issues.

Examples

Read options initialization

options = Kafka.ReadOptions(
    include_headers=False,
    minPartitions=50,
)
field include_headers: bool = False (alias 'includeHeaders')#

If True, add headers column to output DataFrame.

If False, column will not be added.