Kafka Connection#
- class onetl.connection.db_connection.kafka.connection.Kafka(*, spark: SparkSession, cluster: Cluster, addresses: List[str], auth: KafkaAuth | None = None, protocol: KafkaProtocol = KafkaPlaintextProtocol(), extra: KafkaExtra = KafkaExtra())#
This connector is designed to read and write from Kafka in batch mode.
Based on official Kafka Source For Spark.
Note
This connector is for batch download from kafka and not streaming.
Version compatibility
Apache Kafka versions: 0.10 or higher
Spark versions: 2.4.x - 3.5.x
Scala versions: 2.11 - 2.13
- Parameters:
- addresseslist[str]
A list of broker addresses, for example
["192.168.1.10:9092", "192.168.1.11:9092"]
.- clusterstr
Cluster name. Used for HWM and lineage.
- authKafkaAuth, default:
None
Kafka authentication mechanism.
None
means anonymous auth.- protocolKafkaProtocol, default:
PlaintextProtocol
Kafka security protocol.
- extradict, default:
None
A dictionary of additional properties to be used when connecting to Kafka.
These are Kafka-specific properties that control behavior of the producer or consumer. See:
Options are passed without
kafka.
prefix, for example:For example:
extra = { "group.id": "myGroup", "request.timeout.ms": 120000, }
Warning
Options that populated from connection attributes (like
bootstrap.servers
,sasl.*
,ssl.*
) are not allowed to override.Note
At current version Kafka connection doesn’t support batch strategies.
Examples
Connect to Kafka using
PLAINTEXT
protocol and without any auth mechanism (anonymous user, default):from onetl.connection import Kafka from pyspark.sql import SparkSession # Create Spark session with Kafka connector loaded maven_packages = Kafka.get_packages(spark_version="3.2.4") spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) .getOrCreate() ) # Create connection kafka = Kafka( addresses=["mybroker:9092", "anotherbroker:9092"], cluster="my-cluster", spark=spark, )
Connect to Kafka using
PLAINTEXT
protocol and basic (PLAIN
) auth mechanism:# Create Spark session with Kafka connector loaded ... # Create connection kafka = Kafka( addresses=["mybroker:9092", "anotherbroker:9092"], cluster="my-cluster", auth=Kafka.BasicAuth( user="me", password="password", ), spark=spark, )
Connect to Kafka using
PLAINTEXT
protocol and Kerberos (GSSAPI
) auth mechanism:# Create Spark session with Kafka connector loaded ... # Create connection kafka = Kafka( addresses=["mybroker:9092", "anotherbroker:9092"], cluster="my-cluster", auth=Kafka.KerberosAuth( principal="me@example.com", keytab="/path/to/keytab", deploy_keytab=True, ), spark=spark, )
Connect to Kafka using
SASL_SSL
protocol andSCRAM-SHA-512
auth mechanism:from pathlib import Path # Create Spark session with Kafka connector loaded ... # Create connection kafka = Kafka( addresses=["mybroker:9092", "anotherbroker:9092"], cluster="my-cluster", protocol=Kafka.SSLProtocol( keystore_type="PEM", keystore_certificate_chain=Path("path/to/user.crt").read_text(), keystore_key=Path("path/to/user.key").read_text(), truststore_type="PEM", truststore_certificates=Path("/path/to/server.crt").read_text(), ), auth=Kafka.ScramAuth( user="me", password="abc", digest="SHA-512", ), spark=spark, )
Connect to Kafka with extra options:
# Create Spark session with Kafka connector loaded ... # Create connection kafka = Kafka( addresses=["mybroker:9092", "anotherbroker:9092"], cluster="my-cluster", protocol=..., auth=..., extra={"max.request.size": 1000000}, spark=spark, )
- check()#
-
If not, an exception will be raised.
- Returns:
- Connection itself
- Raises:
- RuntimeError
If the connection is not available
Examples
connection.check()
- close()#
Close all connections created to Kafka.
Note
Connection can be used again after it was closed.
- Returns:
- Connection itself
Examples
Close connection automatically:
with connection: ...
Close connection manually:
connection.close()
- classmethod get_packages(spark_version: str | Version, scala_version: str | Version | None = None) list[str] #
Get package names to be downloaded by Spark.
See Maven package index for all available packages.
- Parameters:
- spark_versionstr
Spark version in format
major.minor.patch
.- scala_versionstr, optional
Scala version in format
major.minor
.If
None
,spark_version
is used to determine Scala version.
Examples
from onetl.connection import Kafka Kafka.get_packages(spark_version="3.2.4") Kafka.get_packages(spark_version="3.2.4", scala_version="2.13")