Reading from Kafka#
For reading data from Kafka, use DBReader
with specific options (see below).
Note
Unlike other connection classes, Kafka always return dataframe with fixed schema (see documentation):
DataFrame Schema
from pyspark.sql.types import (
ArrayType,
BinaryType,
IntegerType,
LongType,
StringType,
StructField,
StructType,
TimestampType,
)
schema = StructType(
[
StructField("value", BinaryType(), nullable=True),
StructField("key", BinaryType(), nullable=True),
StructField("topic", StringType(), nullable=False),
StructField("partition", IntegerType(), nullable=False),
StructField("offset", LongType(), nullable=False),
StructField("timestamp", TimestampType(), nullable=False),
StructField("timestampType", IntegerType(), nullable=False),
# this field is returned only with ``include_headers=True``
StructField(
"headers",
ArrayType(
StructType(
[
StructField("key", StringType(), nullable=False),
StructField("value", BinaryType(), nullable=True),
],
),
),
nullable=True,
),
],
)
Warning
Columns:
value
key
headers[*].value
are always returned as raw bytes. If they contain values of custom type, these values should be deserialized manually.
- pydantic model onetl.connection.db_connection.kafka.options.KafkaReadOptions#
Reading options for Kafka connector.
Note
You can pass any value supported by connector, even if it is not mentioned in this documentation.
The set of supported options depends on connector version. See link above.
Warning
- Options:
assign
endingOffsets
endingOffsetsByTimestamp
kafka.*
startingOffsets
startingOffsetsByTimestamp
startingTimestamp
subscribe
subscribePattern
are populated from connection attributes, and cannot be overridden by the user in
ReadOptions
to avoid issues.Examples
Read options initialization
options = Kafka.ReadOptions( include_headers=False, minPartitions=50, )
- field include_headers: bool = False (alias 'includeHeaders')#
If
True
, addheaders
column to output DataFrame.If
False
, column will not be added.