Spark HDFS Connection#
- class onetl.connection.file_df_connection.spark_hdfs.connection.SparkHDFS(*, spark: SparkSession, cluster: Cluster, host: Host | None = None, port: int = 8020)#
-
Based on Spark Generic File Data Source.
Warning
To use Hive connector you should have PySpark installed (or injected to
sys.path
) BEFORE creating the connector instance.You can install PySpark as follows:
pip install onetl[spark] # latest PySpark version # or pip install onetl pyspark=3.5.0 # pass specific PySpark version
See Spark installation instruction for more details.
Note
Most of Hadoop instances use Kerberos authentication. In this case, you should call
kinit
BEFORE starting Spark session to generate Kerberos ticket. See Kerberos support.In case of creating session with
"spark.master": "yarn"
, you should also pass some additional options to Spark session, allowing executors to generate their own Kerberos tickets to access HDFS. See Spark security documentation for more details.Note
Supports only reading files as Spark DataFrame and writing DataFrame to files.
Does NOT support file operations, like create, delete, rename, etc. For these operations, use
HDFS
connection.- Parameters:
- clusterstr
Cluster name.
- Used for:
HWM and lineage (as instance name for file paths)
- Validation of
host
value, if latter is passed and if some hooks are bound to
Slots.get_cluster_namenodes
.
- Validation of
- hoststr, optional
Hadoop namenode host. For example:
namenode1.domain.com
.Should be an active namenode (NOT standby).
If value is not set, but there are some hooks bound to
Slots.get_cluster_namenodes
andSlots.is_namenode_active
, onETL will iterate over cluster namenodes to detect which one is active.- ipc_portint, default:
8020
Port of Hadoop namenode (IPC protocol).
If omitted, but there are some hooks bound to
Slots.get_ipc_port
, onETL will try to detect port number for a specificcluster
.- spark
pyspark.sql.SparkSession
Spark session
Examples
SparkHDFS connection initialization
from onetl.connection import SparkHDFS from pyspark.sql import SparkSession # Create Spark session spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate() # Create connection hdfs = SparkHDFS( host="namenode1.domain.com", cluster="rnd-dwh", spark=spark, ).check()
SparkHDFS connection initialization with Kerberos support
from onetl.connection import Hive from pyspark.sql import SparkSession # Create Spark session. # Use names "spark.yarn.access.hadoopFileSystems", "spark.yarn.principal" # and "spark.yarn.keytab" for Spark 2 spark = ( SparkSession.builder.appName("spark-app-name") .option( "spark.kerberos.access.hadoopFileSystems", "hdfs://namenode1.domain.com:8020", ) .option("spark.kerberos.principal", "user") .option("spark.kerberos.keytab", "/path/to/keytab") .enableHiveSupport() .getOrCreate() ) # Create connection hdfs = SparkHDFS( host="namenode1.domain.com", cluster="rnd-dwh", spark=spark, ).check()
Automatically detect hostname for specific cluster (if some third-party plugin provides Spark HDFS Slots implementation):
# Create Spark session ... # Create connection hdfs = SparkHDFS(cluster="rnd-dwh", spark=spark).check()
- check()#
-
If not, an exception will be raised.
- Returns:
- Connection itself
- Raises:
- RuntimeError
If the connection is not available
Examples
connection.check()
- classmethod get_current(spark: SparkSession)#
Create connection for current cluster.
Automatically sets up current cluster name as
cluster
.Note
Can be used only if there are a some hooks bound to
Slots.get_current_cluster
.- Parameters:
- sparkSparkSession
See
SparkHDFS
constructor documentation.
Examples
from onetl.connection import SparkHDFS # injecting current cluster name via hooks mechanism hdfs = SparkHDFS.get_current(spark=spark)