Spark S3 Connection#
- class onetl.connection.file_df_connection.spark_s3.connection.SparkS3(*, spark: SparkSession, host: Host, port: int | None = None, bucket: str, protocol: Literal['http', 'https'] = 'https', access_key: str | None = None, secret_key: SecretStr | None = None, session_token: SecretStr | None = None, region: str | None = None, extra: SparkS3Extra = SparkS3Extra())#
Spark connection to S3 filesystem.
Based on Hadoop-AWS module and Spark integration with Cloud Infrastructures.
Version compatibility
Spark versions: 3.2.x - 3.5.x (only with Hadoop 3.x libraries)
Scala versions: 2.12 - 2.13
Java versions: 8 - 20
Warning
See Spark S3 Troubleshooting guide.
Warning
To use SparkS3 connector you should have PySpark installed (or injected to
sys.path
) BEFORE creating the connector instance.You can install PySpark as follows:
pip install onetl[spark] # latest PySpark version # or pip install onetl pyspark=3.5.0 # pass specific PySpark version
See Spark installation instruction for more details.
Note
Supports only reading files as Spark DataFrame and writing DataFrame to files.
Does NOT support file operations, like create, delete, rename, etc. For these operations, use
S3
connection.- Parameters:
- hoststr
Host of S3 source. For example:
domain.com
- portint, optional
Port of S3 source
- bucketstr
Bucket name in the S3 file source
- protocolstr, default
https
Connection protocol. Allowed values:
https
orhttp
- access_keystr, optional
Access key (aka user ID) of an account in the S3 service
- secret_keystr, optional
Secret key (aka password) of an account in the S3 service
- session_tokenstr, optional
Session token of your account in S3 service
- regionstr, optional
Region name of bucket in S3 service
- extradict, optional
A dictionary of additional properties to be used when connecting to S3.
These are Hadoop AWS specific properties, see links below:
Options are passed without prefixes
spark.hadoop.
,fs.s3a.
andfs.s3a.bucket.$BUCKET.
, for example:extra = { "path.style.access": True, "committer.magic.enabled": True, "committer.name": "magic", "connection.timeout": 300000, }
Warning
Options that populated from connection attributes (like
endpoint
,access.key
) are not allowed to override.But you may override
aws.credentials.provider
and pass custom credential options.- spark
pyspark.sql.SparkSession
Spark session
Examples
Create connection to S3 to work with dataframes.
Connect to bucket as subdomain (
my-bucket.domain.com
):from onetl.connection import SparkS3 from pyspark.sql import SparkSession # Create Spark session with Hadoop AWS libraries loaded maven_packages = SparkS3.get_packages(spark_version="3.5.0") # Some dependencies are not used, but downloading takes a lot of time. Skipping them. excluded_packages = [ "com.google.cloud.bigdataoss:gcs-connector", "org.apache.hadoop:hadoop-aliyun", "org.apache.hadoop:hadoop-azure-datalake", "org.apache.hadoop:hadoop-azure", ] spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) .config("spark.jars.excludes", ",".join(excluded_packages)) .config("spark.hadoop.fs.s3a.committer.magic.enabled", "true") .config("spark.hadoop.fs.s3a.committer.name", "magic") .config( "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory", ) .config( "spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter", ) .config( "spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol", ) .getOrCreate() ) # Create connection s3 = SparkS3( host="domain.com", protocol="http", bucket="my-bucket", access_key="ACCESS_KEY", secret_key="SECRET_KEY", spark=spark, ).check()
Connect to bucket as subpath (
domain.com/my-bucket
):# Create Spark session with Hadoop AWS libraries loaded ... # Create connection s3 = SparkS3( host="domain.com", protocol="http", bucket="my-bucket", access_key="ACCESS_KEY", secret_key="SECRET_KEY", extra={ "path.style.access": True, }, spark=spark, ).check()
- check()#
-
If not, an exception will be raised.
- Returns:
- Connection itself
- Raises:
- RuntimeError
If the connection is not available
Examples
connection.check()
- close()#
Close all connections created to S3.
Also resets all
fs.s3a.bucket.$BUCKET.*
properties of Hadoop configuration.Note
Connection can be used again after it was closed.
- Returns:
- Connection itself
Examples
Close connection automatically:
with connection: ...
Close connection manually:
connection.close()
- classmethod get_packages(spark_version: str | Version, scala_version: str | Version | None = None) list[str] #
Get package names to be downloaded by Spark.
- Parameters:
- spark_versionstr
Spark version in format
major.minor.patch
.- scala_versionstr, optional
Scala version in format
major.minor
.If
None
,spark_version
is used to determine Scala version.
Examples
from onetl.connection import SparkS3 SparkS3.get_packages(spark_version="3.5.0") SparkS3.get_packages(spark_version="3.5.0", scala_version="2.12")