Spark HDFS Connection¶
Bases: SparkFileDFConnection
Based on Spark Generic File Data Source.
See also
Before using this connector please take into account Spark HDFS prerequisites
Note
Supports only reading files as Spark DataFrame and writing DataFrame to files.
Does NOT support file operations, like create, delete, rename, etc. For these operations, use HDFS connection.
Added in 0.9.0
Parameters:
-
cluster(str) –Cluster name.
Used for:
- HWM and lineage (as instance name for file paths)
- Validation of
hostvalue, if latter is passed and if some hooks are bound to Slots.get_cluster_namenodes.
-
host(str) –Hadoop namenode host. For example:
namenode1.domain.com.Should be an active namenode (NOT standby).
If value is not set, but there are some hooks bound to Slots.get_cluster_namenodes and Slots.is_namenode_active, onETL will iterate over cluster namenodes to detect which one is active.
-
ipc_port(int, default:8020) –Port of Hadoop namenode (IPC protocol).
If omitted, but there are some hooks bound to Slots.get_ipc_port, onETL will try to detect port number for a specific
cluster. -
spark(SparkSession) –Spark session
Examples:
Execute kinit consome command before creating Spark Session
$ kinit -kt /path/to/keytab user
from onetl.connection import SparkHDFS
from pyspark.sql import SparkSession
# Create Spark session
spark = (
SparkSession.builder.appName("spark-app-name")
.option(
"spark.kerberos.access.hadoopFileSystems",
"hdfs://namenode1.domain.com:8020",
)
.option("spark.kerberos.principal", "user")
.option("spark.kerberos.keytab", "/path/to/keytab")
.enableHiveSupport()
.getOrCreate()
)
# Create connection
hdfs = SparkHDFS(
host="namenode1.domain.com",
cluster="rnd-dwh",
spark=spark,
).check()
from onetl.connection import SparkHDFS
from pyspark.sql import SparkSession
# Create Spark session
spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()
# Create connection
hdfs = SparkHDFS(
host="namenode1.domain.com",
cluster="rnd-dwh",
spark=spark,
).check()
Can be used only if some third-party plugin provides Spark HDFS Slots implementation
# Create Spark session
...
# Create connection
hdfs = SparkHDFS(cluster="rnd-dwh", spark=spark).check()
check()
¶
get_current(spark)
classmethod
¶
Create connection for current cluster.
Automatically sets up current cluster name as cluster.
Note
Can be used only if there are a some hooks bound to Slots.get_current_cluster.
Added in 0.9.0
Parameters:
-
spark(SparkSession) –See SparkHDFS constructor documentation.
Examples:
from onetl.connection import SparkHDFS
# injecting current cluster name via hooks mechanism
hdfs = SparkHDFS.get_current(spark=spark)