Hive Connection#

class onetl.connection.db_connection.hive.connection.Hive(*, spark: SparkSession, cluster: Cluster)#

Spark connection with Hive MetaStore support. support_hooks

You don’t need a Hive server to use this connector.

Version compatibility
  • Hive metastore version: 0.12 - 3.1.2 (may require to add proper .jar file explicitly)

  • Spark versions: 2.3.x - 3.5.x

  • Java versions: 8 - 20

Warning

To use Hive connector you should have PySpark installed (or injected to sys.path) BEFORE creating the connector instance.

You can install PySpark as follows:

pip install onetl[spark]  # latest PySpark version

# or
pip install onetl pyspark=3.5.0  # pass specific PySpark version

See Spark installation instruction for more details.

Warning

This connector requires some additional configuration files to be present (hive-site.xml and so on), as well as .jar files with Hive MetaStore client.

See Spark Hive Tables documentation and this guide for more details.

Note

Most of Hadoop instances use Kerberos authentication. In this case, you should call kinit BEFORE starting Spark session to generate Kerberos ticket. See Kerberos support.

In case of creating session with "spark.master": "yarn", you should also pass some additional options to Spark session, allowing executors to generate their own Kerberos tickets to access HDFS. See Spark security documentation for more details.

Parameters:
clusterstr

Cluster name. Used for HWM and lineage.

sparkpyspark.sql.SparkSession

Spark session with Hive metastore support enabled

Examples

Hive connection initialization

from onetl.connection import Hive
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("spark-app-name").enableHiveSupport().getOrCreate()

# Create connection
hive = Hive(cluster="rnd-dwh", spark=spark).check()

Hive connection initialization with Kerberos support

from onetl.connection import Hive
from pyspark.sql import SparkSession

# Create Spark session
# Use names "spark.yarn.access.hadoopFileSystems", "spark.yarn.principal"
# and "spark.yarn.keytab" for Spark 2

spark = (
    SparkSession.builder.appName("spark-app-name")
    .option("spark.kerberos.access.hadoopFileSystems", "hdfs://cluster.name.node:8020")
    .option("spark.kerberos.principal", "user")
    .option("spark.kerberos.keytab", "/path/to/keytab")
    .enableHiveSupport()
    .getOrCreate()
)

# Create connection
hive = Hive(cluster="rnd-dwh", spark=spark).check()
classmethod get_current(spark: SparkSession)#

Create connection for current cluster. support_hooks

Note

Can be used only if there are some hooks bound to Slots.get_current_cluster slot.

Parameters:
sparkpyspark.sql.SparkSession

Spark session

Examples

from onetl.connection import Hive
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-app-name").enableHiveSupport().getOrCreate()

# injecting current cluster name via hooks mechanism
hive = Hive.get_current(spark=spark)
check()#

Check source availability. support_hooks

If not, an exception will be raised.

Returns:
Connection itself
Raises:
RuntimeError

If the connection is not available

Examples

connection.check()