Kafka Connection#

class onetl.connection.db_connection.kafka.connection.Kafka(*, spark: SparkSession, cluster: Cluster, addresses: List[str], auth: KafkaAuth | None = None, protocol: KafkaProtocol = KafkaPlaintextProtocol(), extra: KafkaExtra = KafkaExtra())#

This connector is designed to read and write from Kafka in batch mode.

Based on official Kafka Source For Spark.

Note

This connector is for batch download from kafka and not streaming.

Version compatibility
  • Apache Kafka versions: 0.10 or higher

  • Spark versions: 2.4.x - 3.5.x

  • Scala versions: 2.11 - 2.13

Parameters:
addresseslist[str]

A list of broker addresses, for example ["192.168.1.10:9092", "192.168.1.11:9092"].

clusterstr

Cluster name. Used for HWM and lineage.

authKafkaAuth, default: None

Kafka authentication mechanism. None means anonymous auth.

protocolKafkaProtocol, default: PlaintextProtocol

Kafka security protocol.

extradict, default: None

A dictionary of additional properties to be used when connecting to Kafka.

These are Kafka-specific properties that control behavior of the producer or consumer. See:

Options are passed without kafka. prefix, for example:

For example:

extra = {
    "group.id": "myGroup",
    "request.timeout.ms": 120000,
}

Warning

Options that populated from connection attributes (like bootstrap.servers, sasl.*, ssl.*) are not allowed to override.

Note

At current version Kafka connection doesn’t support batch strategies.

Examples

Connect to Kafka using PLAINTEXT protocol and without any auth mechanism (anonymous user, default):

from onetl.connection import Kafka
from pyspark.sql import SparkSession

# Create Spark session with Kafka connector loaded
maven_packages = Kafka.get_packages(spark_version="3.2.4")
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

# Create connection
kafka = Kafka(
    addresses=["mybroker:9092", "anotherbroker:9092"],
    cluster="my-cluster",
    spark=spark,
)

Connect to Kafka using PLAINTEXT protocol and basic (PLAIN) auth mechanism:

# Create Spark session with Kafka connector loaded
...

# Create connection
kafka = Kafka(
    addresses=["mybroker:9092", "anotherbroker:9092"],
    cluster="my-cluster",
    auth=Kafka.BasicAuth(
        user="me",
        password="password",
    ),
    spark=spark,
)

Connect to Kafka using PLAINTEXT protocol and Kerberos (GSSAPI) auth mechanism:

# Create Spark session with Kafka connector loaded
...

# Create connection
kafka = Kafka(
    addresses=["mybroker:9092", "anotherbroker:9092"],
    cluster="my-cluster",
    auth=Kafka.KerberosAuth(
        principal="me@example.com",
        keytab="/path/to/keytab",
        deploy_keytab=True,
    ),
    spark=spark,
)

Connect to Kafka using SASL_SSL protocol and SCRAM-SHA-512 auth mechanism:

from pathlib import Path

# Create Spark session with Kafka connector loaded
...

# Create connection
kafka = Kafka(
    addresses=["mybroker:9092", "anotherbroker:9092"],
    cluster="my-cluster",
    protocol=Kafka.SSLProtocol(
        keystore_type="PEM",
        keystore_certificate_chain=Path("path/to/user.crt").read_text(),
        keystore_key=Path("path/to/user.key").read_text(),
        truststore_type="PEM",
        truststore_certificates=Path("/path/to/server.crt").read_text(),
    ),
    auth=Kafka.ScramAuth(
        user="me",
        password="abc",
        digest="SHA-512",
    ),
    spark=spark,
)

Connect to Kafka with extra options:

# Create Spark session with Kafka connector loaded
...

# Create connection
kafka = Kafka(
    addresses=["mybroker:9092", "anotherbroker:9092"],
    cluster="my-cluster",
    protocol=...,
    auth=...,
    extra={"max.request.size": 1000000},
    spark=spark,
)
check()#

Check source availability. support_hooks

If not, an exception will be raised.

Returns:
Connection itself
Raises:
RuntimeError

If the connection is not available

Examples

connection.check()
close()#

Close all connections created to Kafka. support_hooks

Note

Connection can be used again after it was closed.

Returns:
Connection itself

Examples

Close connection automatically:

with connection:
    ...

Close connection manually:

connection.close()
classmethod get_packages(spark_version: str | Version, scala_version: str | Version | None = None) list[str]#

Get package names to be downloaded by Spark. support_hooks

See Maven package index for all available packages.

Parameters:
spark_versionstr

Spark version in format major.minor.patch.

scala_versionstr, optional

Scala version in format major.minor.

If None, spark_version is used to determine Scala version.

Examples

from onetl.connection import Kafka

Kafka.get_packages(spark_version="3.2.4")
Kafka.get_packages(spark_version="3.2.4", scala_version="2.13")