Skip to content

Iceberg Connection

Bases: DBConnection

Iceberg connection. support hooks

See also

Before using this connector please take into account Iceberg prerequisites

Added in 0.14.0

Parameters:

  • catalog_name (str) –

    Catalog name. Arbitrary string used by Spark to identify catalog and tables (mycatalog.myschema.mytable).

  • catalog (IcebergCatalog) –

    Iceberg catalog configuration

  • warehouse (IcebergWarehouse, default: None ) –

    Iceberg warehouse configuration

  • extra (dict | None, default: None ) –

    A dictionary of additional properties to be used when configuring Iceberg catalog.

    These are Iceberg-specific properties that control behavior of the catalog. See Iceberg Spark configuration documentation

    Pass properties without catalog prefix. For example:

    extra = {
        "cache-enabled": "true",
        "cache.expiration-interval-ms": "40000",
    }
    
    This will be translated to:

    spark.sql.catalog.my_catalog.cache-enabled = 'true'
    spark.sql.catalog.my_catalog.cache.expiration-interval-ms = '40000'
    
  • spark (SparkSession) –

    Spark session

Examples:

from onetl.connection import Iceberg
from pyspark.sql import SparkSession

maven_packages = [
    *Iceberg.get_packages(package_version="1.10.0", spark_version="3.5"),
    *Iceberg.S3Warehouse.get_packages(package_version="1.10.0"),
]
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

iceberg = Iceberg(
    catalog_name="my_catalog",
    spark=spark,
    catalog=Iceberg.RESTCatalog(
        url="http://my.rest.catalog/iceberg",
        auth=Iceberg.RESTCatalog.OAuth2BearerToken(
            token="my_token",
        ),
    ),
    # explicit S3 warehouse params
    warehouse=Iceberg.S3Warehouse(
        path="/warehouse",
        host="s3.domain.com",
        protocol="http",
        bucket="my-bucket",
        path_style_access=True,
        region="us-east-1",
        access_key="access_key",
        secret_key="secret_key"
    ),
)
from onetl.connection import Iceberg
from pyspark.sql import SparkSession

maven_packages = [
    *Iceberg.get_packages(package_version="1.10.0", spark_version="3.5"),
    # required to use S3 warehouse
    *Iceberg.S3Warehouse.get_packages(package_version="1.10.0"),
]
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

iceberg = Iceberg(
    catalog_name="my_catalog",
    spark=spark,
    catalog=Iceberg.RESTCatalog(
        url="http://my.rest.catalog/iceberg",
        auth=Iceberg.RESTCatalog.OAuth2ClientCredentials(
            client_id="my_client",
            client_secret="my_secret",
            oauth2_token_endpoint="http://keycloak.domain.com/realms/my-realm/protocol/openid-connect/token",
        ),
    ),
    # S3 warehouse params and credentials are provided by REST Catalog
    warehouse=Iceberg.DeletatedWarehouse(
        name="my-warehouse",
        access_delegation="vended-credentials",
    ),
)
from onetl.connection import Iceberg, SparkHDFS
from pyspark.sql import SparkSession

maven_packages = Iceberg.get_packages(package_version="1.10.0", spark_version="3.5.8")
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

hdfs_connection = SparkHDFS(
    host="namenode",
    cluster="my-cluster",
    spark=spark
)

iceberg = Iceberg(
    catalog_name="my_catalog",
    spark=spark,
    catalog=Iceberg.FilesystemCatalog(),
    warehouse=Iceberg.FilesystemWarehouse(
        connection=hdfs_connection,
        path="/warehouse/path",
    ),
)

get_packages(package_version, spark_version, scala_version=None) classmethod

Get package names to be downloaded by Spark. support hooks

See Maven package index for all available packages.

Parameters:

  • package_version (str) –

    Iceberg package version in format major.minor.patch.

  • spark_version (str) –

    Spark version in format major.minor.

  • scala_version (str, default: None ) –

    Scala version in format major.minor.

    If None, spark_version is used to determine Scala version.

Returns:

  • list[str]

    List of Maven coordinates.

Examples:

from onetl.connection import Iceberg

# Note: Iceberg 1.10.0 requires Java 11+
Iceberg.get_packages(package_version="1.10.0", spark_version="3.5.8")

check()

Check source availability. support hooks

If not, an exception will be raised.

Returns:

  • Self

    Connection itself

Raises:

  • RuntimeError

    If the connection is not available

Examples:

connection.check()