Iceberg Connection¶
Bases: DBConnection
See also
Before using this connector please take into account Iceberg prerequisites
Added in 0.14.0
Parameters:
-
catalog_name(str) –Catalog name. Arbitrary string used by Spark to identify catalog and tables (
mycatalog.myschema.mytable). -
catalog(IcebergCatalog) –Iceberg catalog configuration
-
warehouse(IcebergWarehouse, default:None) –Iceberg warehouse configuration
-
extra(dict | None, default:None) –A dictionary of additional properties to be used when configuring Iceberg catalog.
These are Iceberg-specific properties that control behavior of the catalog. See Iceberg Spark configuration documentation
Pass properties without catalog prefix. For example:
This will be translated to:extra = { "cache-enabled": "true", "cache.expiration-interval-ms": "40000", }spark.sql.catalog.my_catalog.cache-enabled = 'true' spark.sql.catalog.my_catalog.cache.expiration-interval-ms = '40000' -
spark(SparkSession) –Spark session
Examples:
from onetl.connection import Iceberg
from pyspark.sql import SparkSession
maven_packages = [
*Iceberg.get_packages(package_version="1.10.0", spark_version="3.5"),
*Iceberg.S3Warehouse.get_packages(package_version="1.10.0"),
]
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
iceberg = Iceberg(
catalog_name="my_catalog",
spark=spark,
catalog=Iceberg.RESTCatalog(
url="http://my.rest.catalog/iceberg",
auth=Iceberg.RESTCatalog.OAuth2BearerToken(
token="my_token",
),
),
# explicit S3 warehouse params
warehouse=Iceberg.S3Warehouse(
path="/warehouse",
host="s3.domain.com",
protocol="http",
bucket="my-bucket",
path_style_access=True,
region="us-east-1",
access_key="access_key",
secret_key="secret_key"
),
)
from onetl.connection import Iceberg
from pyspark.sql import SparkSession
maven_packages = [
*Iceberg.get_packages(package_version="1.10.0", spark_version="3.5"),
# required to use S3 warehouse
*Iceberg.S3Warehouse.get_packages(package_version="1.10.0"),
]
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
iceberg = Iceberg(
catalog_name="my_catalog",
spark=spark,
catalog=Iceberg.RESTCatalog(
url="http://my.rest.catalog/iceberg",
auth=Iceberg.RESTCatalog.OAuth2ClientCredentials(
client_id="my_client",
client_secret="my_secret",
oauth2_token_endpoint="http://keycloak.domain.com/realms/my-realm/protocol/openid-connect/token",
),
),
# S3 warehouse params and credentials are provided by REST Catalog
warehouse=Iceberg.DeletatedWarehouse(
name="my-warehouse",
access_delegation="vended-credentials",
),
)
from onetl.connection import Iceberg, SparkHDFS
from pyspark.sql import SparkSession
maven_packages = Iceberg.get_packages(package_version="1.10.0", spark_version="3.5.8")
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
hdfs_connection = SparkHDFS(
host="namenode",
cluster="my-cluster",
spark=spark
)
iceberg = Iceberg(
catalog_name="my_catalog",
spark=spark,
catalog=Iceberg.FilesystemCatalog(),
warehouse=Iceberg.FilesystemWarehouse(
connection=hdfs_connection,
path="/warehouse/path",
),
)
get_packages(package_version, spark_version, scala_version=None)
classmethod
¶
Get package names to be downloaded by Spark.
See Maven package index for all available packages.
Parameters:
-
package_version(str) –Iceberg package version in format
major.minor.patch. -
spark_version(str) –Spark version in format
major.minor. -
scala_version(str, default:None) –Scala version in format
major.minor.If
None,spark_versionis used to determine Scala version.
Returns:
-
list[str]–List of Maven coordinates.
Examples:
from onetl.connection import Iceberg
# Note: Iceberg 1.10.0 requires Java 11+
Iceberg.get_packages(package_version="1.10.0", spark_version="3.5.8")