Spark HDFS Connection#

class onetl.connection.file_df_connection.spark_hdfs.connection.SparkHDFS(*, spark: SparkSession, cluster: Cluster, host: Host | None = None, port: int = 8020)#

Spark connection to HDFS. support_hooks

Based on Spark Generic File Data Source.

Warning

To use Hive connector you should have PySpark installed (or injected to sys.path) BEFORE creating the connector instance.

You can install PySpark as follows:

pip install onetl[spark]  # latest PySpark version

# or
pip install onetl pyspark=3.5.0  # pass specific PySpark version

See Spark installation instruction for more details.

Note

Most of Hadoop instances use Kerberos authentication. In this case, you should call kinit BEFORE starting Spark session to generate Kerberos ticket. See Kerberos support.

In case of creating session with "spark.master": "yarn", you should also pass some additional options to Spark session, allowing executors to generate their own Kerberos tickets to access HDFS. See Spark security documentation for more details.

Note

Supports only reading files as Spark DataFrame and writing DataFrame to files.

Does NOT support file operations, like create, delete, rename, etc. For these operations, use HDFS connection.

Parameters:
clusterstr

Cluster name.

Used for:
  • HWM and lineage (as instance name for file paths)

  • Validation of host value,

    if latter is passed and if some hooks are bound to Slots.get_cluster_namenodes.

hoststr, optional

Hadoop namenode host. For example: namenode1.domain.com.

Should be an active namenode (NOT standby).

If value is not set, but there are some hooks bound to Slots.get_cluster_namenodes and Slots.is_namenode_active, onETL will iterate over cluster namenodes to detect which one is active.

ipc_portint, default: 8020

Port of Hadoop namenode (IPC protocol).

If omitted, but there are some hooks bound to Slots.get_ipc_port, onETL will try to detect port number for a specific cluster.

sparkpyspark.sql.SparkSession

Spark session

Examples

SparkHDFS connection initialization

from onetl.connection import SparkHDFS
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()

# Create connection
hdfs = SparkHDFS(
    host="namenode1.domain.com",
    cluster="rnd-dwh",
    spark=spark,
).check()

SparkHDFS connection initialization with Kerberos support

from onetl.connection import Hive
from pyspark.sql import SparkSession

# Create Spark session.
# Use names "spark.yarn.access.hadoopFileSystems", "spark.yarn.principal"
# and "spark.yarn.keytab" for Spark 2

spark = (
    SparkSession.builder.appName("spark-app-name")
    .option(
        "spark.kerberos.access.hadoopFileSystems",
        "hdfs://namenode1.domain.com:8020",
    )
    .option("spark.kerberos.principal", "user")
    .option("spark.kerberos.keytab", "/path/to/keytab")
    .enableHiveSupport()
    .getOrCreate()
)

# Create connection
hdfs = SparkHDFS(
    host="namenode1.domain.com",
    cluster="rnd-dwh",
    spark=spark,
).check()

Automatically detect hostname for specific cluster (if some third-party plugin provides Spark HDFS Slots implementation):

# Create Spark session
...

# Create connection
hdfs = SparkHDFS(cluster="rnd-dwh", spark=spark).check()
check()#

Check source availability. support_hooks

If not, an exception will be raised.

Returns:
Connection itself
Raises:
RuntimeError

If the connection is not available

Examples

connection.check()
classmethod get_current(spark: SparkSession)#

Create connection for current cluster. support_hooks

Automatically sets up current cluster name as cluster.

Note

Can be used only if there are a some hooks bound to Slots.get_current_cluster.

Parameters:
sparkSparkSession

See SparkHDFS constructor documentation.

Examples

from onetl.connection import SparkHDFS

# injecting current cluster name via hooks mechanism
hdfs = SparkHDFS.get_current(spark=spark)