Kafka Slots#

class onetl.connection.db_connection.kafka.slots.KafkaSlots#

Kafka slots that could be implemented by third-party plugins

static normalize_cluster_name(cluster: str) str | None#

Normalize the given Kafka cluster name.

This can be used to ensure that the Kafka cluster name conforms to specific naming conventions.

Parameters:
clusterstr

The original Kafka cluster name.

Returns:
str | None

The normalized Kafka cluster name. If the hook cannot be applied, return None.

Examples

from onetl.connection import Kafka
from onetl.hooks import hook

@Kafka.Slots.normalize_cluster_name.bind
@hook
def normalize_cluster_name(cluster: str) -> str | None:
    return cluster.lower()
static get_known_clusters() set[str] | None#

Retrieve the collection of known Kafka clusters.

This can be used to validate if the provided Kafka cluster name is recognized in the system.

Returns:
set[str] | None

A collection of known Kafka cluster names. If the hook cannot be applied, return None.

Examples

from onetl.connection import Kafka
from onetl.hooks import hook

@Kafka.Slots.get_known_clusters.bind
@hook
def get_known_clusters() -> set[str] | None:
    return {"kafka-cluster", "local"}
static normalize_address(address: str, cluster: str) str | None#

Normalize the given broker address for a specific Kafka cluster.

This can be used to format the broker address according to specific rules, such as adding default ports.

Parameters:
addressstr

The original broker address.

clusterstr

The Kafka cluster name for which the address should be normalized.

Returns:
str | None

The normalized broker address. If the hook cannot be applied to the specific address, return None.

Examples

from onetl.connection import Kafka
from onetl.hooks import hook

@Kafka.Slots.normalize_address.bind
@hook
def normalize_address(address: str, cluster: str) -> str | None:
    if cluster == "kafka-cluster" and ":" not in address:
        return f"{address}:9092"
    return None
static get_cluster_addresses(cluster: str) list[str] | None#

Retrieve a collection of known broker addresses for the specified Kafka cluster.

This can be used to obtain the broker addresses dynamically.

Parameters:
clusterstr

The Kafka cluster name.

Returns:
list[str] | None

A collection of broker addresses for the specified Kafka cluster. If the hook cannot be applied, return None.

Examples

from onetl.connection import Kafka
from onetl.hooks import hook

@Kafka.Slots.get_cluster_addresses.bind
@hook
def get_cluster_addresses(cluster: str) -> list[str] | None:
    if cluster == "kafka_cluster":
        return ["192.168.1.1:9092", "192.168.1.2:9092", "192.168.1.3:9092"]
    return None