Writing to Hive#

For writing data to Hive, use DBWriter with options below.

pydantic model onetl.connection.db_connection.hive.options.HiveWriteOptions#

Hive source writing options.

You can pass here key-value items which then will be converted to calls of pyspark.sql.readwriter.DataFrameWriter methods.

For example, Hive.WriteOptions(if_exists="append", partitionBy="reg_id") will be converted to df.write.mode("append").partitionBy("reg_id") call, and so on.

Note

You can pass any method and its value supported by Spark, even if it is not mentioned in this documentation. Option names should be in camelCase!

The set of supported options depends on Spark version used. See link above.

Examples

Writing options initialization

options = Hive.WriteOptions(
    if_exists="append",
    partition_by="reg_id",
    customOption="value",
)
field if_exists: HiveTableExistBehavior = HiveTableExistBehavior.APPEND (alias 'mode')#

Behavior of writing data into existing table.

Possible values:
  • append (default)

    Appends data into existing partition/table, or create partition/table if it does not exist.

    Same as Spark’s df.write.insertInto(table, overwrite=False).

    Behavior in details
    • Table does not exist

      Table is created using options provided by user (format, compression, etc).

    • Table exists, but not partitioned, partition_by is set

      Data is appended to a table. Table is still not partitioned (DDL is unchanged).

    • Table exists and partitioned, but has different partitioning schema than partition_by

      Partition is created based on table’s PARTITIONED BY (...) options. Explicit partition_by value is ignored.

    • Table exists and partitioned according partition_by, but partition is present only in dataframe

      Partition is created.

    • Table exists and partitioned according partition_by, partition is present in both dataframe and table

      Data is appended to existing partition.

      Warning

      This mode does not check whether table already contains rows from dataframe, so duplicated rows can be created.

      To implement deduplication, write data to staging table first, and then perform some deduplication logic using sql.

    • Table exists and partitioned according partition_by, but partition is present only in table, not dataframe

      Existing partition is left intact.

  • replace_overlapping_partitions

    Overwrites data in the existing partition, or create partition/table if it does not exist.

    Same as Spark’s df.write.insertInto(table, overwrite=True) + spark.sql.sources.partitionOverwriteMode=dynamic.

    Behavior in details
    • Table does not exist

      Table is created using options provided by user (format, compression, etc).

    • Table exists, but not partitioned, partition_by is set

      Data is overwritten in all the table. Table is still not partitioned (DDL is unchanged).

    • Table exists and partitioned, but has different partitioning schema than partition_by

      Partition is created based on table’s PARTITIONED BY (...) options. Explicit partition_by value is ignored.

    • Table exists and partitioned according partition_by, but partition is present only in dataframe

      Partition is created.

    • Table exists and partitioned according partition_by, partition is present in both dataframe and table

      Existing partition replaced with data from dataframe.

    • Table exists and partitioned according partition_by, but partition is present only in table, not dataframe

      Existing partition is left intact.

  • replace_entire_table

    Recreates table (via DROP + CREATE), deleting all existing data. All existing partitions are dropped.

    Same as Spark’s df.write.saveAsTable(table, mode="overwrite") (NOT insertInto)!

    Warning

    Table is recreated using options provided by user (format, compression, etc) instead of using original table options. Be careful

  • ignore

    Ignores the write operation if the table/partition already exists.

    Behavior in details
    • Table does not exist

      Table is created using options provided by user (format, compression, etc).

    • Table exists

      If the table exists, no further action is taken. This is true whether or not new partition values are present and whether the partitioning scheme differs or not

  • error

    Raises an error if the table/partition already exists.

    Behavior in details
    • Table does not exist

      Table is created using options provided by user (format, compression, etc).

    • Table exists

      If the table exists, raises an error. This is true whether or not new partition values are present and whether the partitioning scheme differs or not

Note

Unlike using pure Spark, config option spark.sql.sources.partitionOverwriteMode does not affect behavior.

field format: str = 'orc'#

Format of files which should be used for storing table data.

Examples: orc (default), parquet, csv (NOT recommended)

Note

It’s better to use column-based formats like orc or parquet, not row-based (csv, json)

Warning

Used only while creating new table, or in case of if_exists=replace_entire_table

field partition_by: List[str] | str | None = None (alias 'partitionBy')#

List of columns should be used for data partitioning. None means partitioning is disabled.

Each partition is a folder which contains only files with the specific column value, like myschema.db/mytable/col1=value1, myschema.db/mytable/col1=value2, and so on.

Multiple partitions columns means nested folder structure, like myschema.db/mytable/col1=val1/col2=val2.

If WHERE clause in the query contains expression like partition = value, Spark will scan only files in a specific partition.

Examples: reg_id or ["reg_id", "business_dt"]

Note

Values should be scalars (integers, strings), and either static (countryId) or incrementing (dates, years), with low number of distinct values.

Columns like userId or datetime/timestamp should NOT be used for partitioning.

Warning

Used only while creating new table, or in case of if_exists=replace_entire_table

field bucket_by: Tuple[int, List[str] | str] | None = None (alias 'bucketBy')#

Number of buckets plus bucketing columns. None means bucketing is disabled.

Each bucket is created as a set of files with name containing result of calculation hash(columns) mod num_buckets.

This allows to remove shuffle from queries containing GROUP BY or JOIN or using = / IN predicates on specific columns.

Examples: (10, "user_id"), (10, ["user_id", "user_phone"])

Note

Bucketing should be used on columns containing a lot of unique values, like userId.

Columns like date should NOT be used for bucketing because of too low number of unique values.

Warning

It is recommended to use this option ONLY if you have a large table (hundreds of Gb or more), which is used mostly for JOINs with other tables, and you’re inserting data using if_exists=overwrite_partitions or if_exists=replace_entire_table.

Otherwise Spark will create a lot of small files (one file for each bucket and each executor), drastically decreasing HDFS performance.

Warning

Used only while creating new table, or in case of if_exists=replace_entire_table

field sort_by: List[str] | str | None = None (alias 'sortBy')#

Each file in a bucket will be sorted by these columns value. None means sorting is disabled.

Examples: user_id or ["user_id", "user_phone"]

Note

Sorting columns should contain values which are used in ORDER BY clauses.

Warning

Could be used only with bucket_by option

Warning

Used only while creating new table, or in case of if_exists=replace_entire_table

field compression: str | None = None#

Compressing algorithm which should be used for compressing created files in HDFS. None means compression is disabled.

Examples: snappy, zlib

Warning

Used only while creating new table, or in case of if_exists=replace_entire_table