Skip to content

Spark HDFS Connection

Bases: SparkFileDFConnection

Spark connection to HDFS. support hooks

Based on Spark Generic File Data Source.

See also

Before using this connector please take into account Spark HDFS prerequisites

Note

Supports only reading files as Spark DataFrame and writing DataFrame to files.

Does NOT support file operations, like create, delete, rename, etc. For these operations, use HDFS connection.

Added in 0.9.0

Parameters:

  • cluster (str) –

    Cluster name.

    Used for:

    • HWM and lineage (as instance name for file paths)
    • Validation of host value, if latter is passed and if some hooks are bound to Slots.get_cluster_namenodes.
  • host (str) –

    Hadoop namenode host. For example: namenode1.domain.com.

    Should be an active namenode (NOT standby).

    If value is not set, but there are some hooks bound to Slots.get_cluster_namenodes and Slots.is_namenode_active, onETL will iterate over cluster namenodes to detect which one is active.

  • ipc_port (int, default: 8020 ) –

    Port of Hadoop namenode (IPC protocol).

    If omitted, but there are some hooks bound to Slots.get_ipc_port, onETL will try to detect port number for a specific cluster.

  • spark (SparkSession) –

    Spark session

Examples:

Execute kinit consome command before creating Spark Session

$ kinit -kt /path/to/keytab user
from onetl.connection import SparkHDFS
from pyspark.sql import SparkSession

# Create Spark session

spark = (
    SparkSession.builder.appName("spark-app-name")
    .option(
        "spark.kerberos.access.hadoopFileSystems",
        "hdfs://namenode1.domain.com:8020",
    )
    .option("spark.kerberos.principal", "user")
    .option("spark.kerberos.keytab", "/path/to/keytab")
    .enableHiveSupport()
    .getOrCreate()
)

# Create connection
hdfs = SparkHDFS(
    host="namenode1.domain.com",
    cluster="rnd-dwh",
    spark=spark,
).check()

from onetl.connection import SparkHDFS
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()

# Create connection
hdfs = SparkHDFS(
    host="namenode1.domain.com",
    cluster="rnd-dwh",
    spark=spark,
).check()

Can be used only if some third-party plugin provides Spark HDFS Slots implementation

# Create Spark session
...

# Create connection
hdfs = SparkHDFS(cluster="rnd-dwh", spark=spark).check()

check()

Check source availability. support hooks

If not, an exception will be raised.

Returns:

  • Self

    Connection itself

Raises:

  • RuntimeError

    If the connection is not available

Examples:

connection.check()

get_current(spark) classmethod

Create connection for current cluster. support hooks

Automatically sets up current cluster name as cluster.

Note

Can be used only if there are a some hooks bound to Slots.get_current_cluster.

Added in 0.9.0

Parameters:

  • spark (SparkSession) –

    See SparkHDFS constructor documentation.

Examples:

from onetl.connection import SparkHDFS

# injecting current cluster name via hooks mechanism
hdfs = SparkHDFS.get_current(spark=spark)