Spark S3 Connection#

class onetl.connection.file_df_connection.spark_s3.connection.SparkS3(*, spark: SparkSession, host: Host, port: int | None = None, bucket: str, protocol: Literal['http', 'https'] = 'https', access_key: str | None = None, secret_key: SecretStr | None = None, session_token: SecretStr | None = None, region: str | None = None, extra: SparkS3Extra = SparkS3Extra())#

Spark connection to S3 filesystem. support_hooks

Based on Hadoop-AWS module and Spark integration with Cloud Infrastructures.

Version compatibility
  • Spark versions: 3.2.x - 3.5.x (only with Hadoop 3.x libraries)

  • Scala versions: 2.12 - 2.13

  • Java versions: 8 - 20

Warning

See Spark S3 Troubleshooting guide.

Warning

To use SparkS3 connector you should have PySpark installed (or injected to sys.path) BEFORE creating the connector instance.

You can install PySpark as follows:

pip install onetl[spark]  # latest PySpark version

# or
pip install onetl pyspark=3.5.0  # pass specific PySpark version

See Spark installation instruction for more details.

Note

Supports only reading files as Spark DataFrame and writing DataFrame to files.

Does NOT support file operations, like create, delete, rename, etc. For these operations, use S3 connection.

Parameters:
hoststr

Host of S3 source. For example: domain.com

portint, optional

Port of S3 source

bucketstr

Bucket name in the S3 file source

protocolstr, defaulthttps

Connection protocol. Allowed values: https or http

access_keystr, optional

Access key (aka user ID) of an account in the S3 service

secret_keystr, optional

Secret key (aka password) of an account in the S3 service

session_tokenstr, optional

Session token of your account in S3 service

regionstr, optional

Region name of bucket in S3 service

extradict, optional

A dictionary of additional properties to be used when connecting to S3.

These are Hadoop AWS specific properties, see links below:

Options are passed without prefixes spark.hadoop., fs.s3a. and fs.s3a.bucket.$BUCKET., for example:

extra = {
    "path.style.access": True,
    "committer.magic.enabled": True,
    "committer.name": "magic",
    "connection.timeout": 300000,
}

Warning

Options that populated from connection attributes (like endpoint, access.key) are not allowed to override.

But you may override aws.credentials.provider and pass custom credential options.

sparkpyspark.sql.SparkSession

Spark session

Examples

Create connection to S3 to work with dataframes.

Connect to bucket as subdomain (my-bucket.domain.com):

from onetl.connection import SparkS3
from pyspark.sql import SparkSession

# Create Spark session with Hadoop AWS libraries loaded
maven_packages = SparkS3.get_packages(spark_version="3.5.0")
# Some dependencies are not used, but downloading takes a lot of time. Skipping them.
excluded_packages = [
    "com.google.cloud.bigdataoss:gcs-connector",
    "org.apache.hadoop:hadoop-aliyun",
    "org.apache.hadoop:hadoop-azure-datalake",
    "org.apache.hadoop:hadoop-azure",
]
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .config("spark.jars.excludes", ",".join(excluded_packages))
    .config("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
    .config("spark.hadoop.fs.s3a.committer.name", "magic")
    .config(
        "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
        "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
    )
    .config(
        "spark.sql.parquet.output.committer.class",
        "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
    )
    .config(
        "spark.sql.sources.commitProtocolClass",
        "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
    )
    .getOrCreate()
)

# Create connection
s3 = SparkS3(
    host="domain.com",
    protocol="http",
    bucket="my-bucket",
    access_key="ACCESS_KEY",
    secret_key="SECRET_KEY",
    spark=spark,
).check()

Connect to bucket as subpath (domain.com/my-bucket):

# Create Spark session with Hadoop AWS libraries loaded
...

# Create connection
s3 = SparkS3(
    host="domain.com",
    protocol="http",
    bucket="my-bucket",
    access_key="ACCESS_KEY",
    secret_key="SECRET_KEY",
    extra={
        "path.style.access": True,
    },
    spark=spark,
).check()
check()#

Check source availability. support_hooks

If not, an exception will be raised.

Returns:
Connection itself
Raises:
RuntimeError

If the connection is not available

Examples

connection.check()
close()#

Close all connections created to S3. support_hooks

Also resets all fs.s3a.bucket.$BUCKET.* properties of Hadoop configuration.

Note

Connection can be used again after it was closed.

Returns:
Connection itself

Examples

Close connection automatically:

with connection:
    ...

Close connection manually:

connection.close()
classmethod get_packages(spark_version: str | Version, scala_version: str | Version | None = None) list[str]#

Get package names to be downloaded by Spark. support_hooks

Parameters:
spark_versionstr

Spark version in format major.minor.patch.

scala_versionstr, optional

Scala version in format major.minor.

If None, spark_version is used to determine Scala version.

Examples

from onetl.connection import SparkS3

SparkS3.get_packages(spark_version="3.5.0")
SparkS3.get_packages(spark_version="3.5.0", scala_version="2.12")