Writing to Kafka#
For writing data to Kafka, use DBWriter
with specific options (see below).
Note
Unlike other connection classes, Kafka only accepts dataframe with fixed schema (see documentation):
DataFrame Schema
from pyspark.sql.types import (
ArrayType,
BinaryType,
IntegerType,
StringType,
StructField,
StructType,
)
schema = StructType(
[
# mandatory fields:
StructField("value", BinaryType(), nullable=True),
# optional fields, can be omitted:
StructField("key", BinaryType(), nullable=True),
StructField("partition", IntegerType(), nullable=True),
# this field can be passed only with ``include_headers=True``
StructField(
"headers",
ArrayType(
StructType(
[
StructField("key", StringType(), nullable=False),
StructField("value", BinaryType(), nullable=True),
],
),
),
nullable=True,
),
],
)
You cannot pass dataframe with other column names or types.
Warning
Columns:
value
key
headers[*].value
can only be string or raw bytes. If they contain values of custom type, these values should be serialized manually.
- pydantic model onetl.connection.db_connection.kafka.options.KafkaWriteOptions#
Writing options for Kafka connector.
Note
You can pass any value supported by connector, even if it is not mentioned in this documentation.
The set of supported options depends on connector version. See link above.
Warning
- Options:
kafka.*
topic
are populated from connection attributes, and cannot be overridden by the user in
WriteOptions
to avoid issues.Examples
Write options initialization
options = Kafka.WriteOptions( if_exists="append", include_headers=True, )
- field if_exists: KafkaTopicExistBehaviorKafka = KafkaTopicExistBehaviorKafka.APPEND#
Behavior of writing data into existing topic.
Same as
df.write.mode(...)
.- Possible values:
append
(default) - Adds new objects into existing topic.error
- Raises an error if topic already exists.
- field include_headers: bool = False (alias 'includeHeaders')#
If
True
,headers
column from dataframe can be written to Kafka (requires Kafka 2.0+).If
False
and dataframe containsheaders
column, an exception will be raised.