Writing to Hive#
For writing data to Hive, use DBWriter
with options below.
- pydantic model onetl.connection.db_connection.hive.options.HiveWriteOptions#
Hive source writing options.
You can pass here key-value items which then will be converted to calls of
pyspark.sql.readwriter.DataFrameWriter
methods.For example,
Hive.WriteOptions(if_exists="append", partitionBy="reg_id")
will be converted todf.write.mode("append").partitionBy("reg_id")
call, and so on.Note
You can pass any method and its value supported by Spark, even if it is not mentioned in this documentation. Option names should be in
camelCase
!The set of supported options depends on Spark version used. See link above.
Examples
Writing options initialization
options = Hive.WriteOptions( if_exists="append", partition_by="reg_id", customOption="value", )
- field if_exists: HiveTableExistBehavior = HiveTableExistBehavior.APPEND (alias 'mode')#
Behavior of writing data into existing table.
- Possible values:
append
(default)Appends data into existing partition/table, or create partition/table if it does not exist.
Same as Spark’s
df.write.insertInto(table, overwrite=False)
.Behavior in details
- Table does not exist
Table is created using options provided by user (
format
,compression
, etc).
- Table exists, but not partitioned,
partition_by
is set Data is appended to a table. Table is still not partitioned (DDL is unchanged).
- Table exists, but not partitioned,
- Table exists and partitioned, but has different partitioning schema than
partition_by
Partition is created based on table’s
PARTITIONED BY (...)
options. Explicitpartition_by
value is ignored.
- Table exists and partitioned, but has different partitioning schema than
- Table exists and partitioned according
partition_by
, but partition is present only in dataframe Partition is created.
- Table exists and partitioned according
- Table exists and partitioned according
partition_by
, partition is present in both dataframe and table Data is appended to existing partition.
Warning
This mode does not check whether table already contains rows from dataframe, so duplicated rows can be created.
To implement deduplication, write data to staging table first, and then perform some deduplication logic using
sql
.
- Table exists and partitioned according
- Table exists and partitioned according
partition_by
, but partition is present only in table, not dataframe Existing partition is left intact.
- Table exists and partitioned according
replace_overlapping_partitions
Overwrites data in the existing partition, or create partition/table if it does not exist.
Same as Spark’s
df.write.insertInto(table, overwrite=True)
+spark.sql.sources.partitionOverwriteMode=dynamic
.Behavior in details
- Table does not exist
Table is created using options provided by user (
format
,compression
, etc).
- Table exists, but not partitioned,
partition_by
is set Data is overwritten in all the table. Table is still not partitioned (DDL is unchanged).
- Table exists, but not partitioned,
- Table exists and partitioned, but has different partitioning schema than
partition_by
Partition is created based on table’s
PARTITIONED BY (...)
options. Explicitpartition_by
value is ignored.
- Table exists and partitioned, but has different partitioning schema than
- Table exists and partitioned according
partition_by
, but partition is present only in dataframe Partition is created.
- Table exists and partitioned according
- Table exists and partitioned according
partition_by
, partition is present in both dataframe and table Existing partition replaced with data from dataframe.
- Table exists and partitioned according
- Table exists and partitioned according
partition_by
, but partition is present only in table, not dataframe Existing partition is left intact.
- Table exists and partitioned according
replace_entire_table
Recreates table (via
DROP + CREATE
), deleting all existing data. All existing partitions are dropped.Same as Spark’s
df.write.saveAsTable(table, mode="overwrite")
(NOTinsertInto
)!Warning
Table is recreated using options provided by user (
format
,compression
, etc) instead of using original table options. Be careful
ignore
Ignores the write operation if the table/partition already exists.
Behavior in details
- Table does not exist
Table is created using options provided by user (
format
,compression
, etc).
- Table exists
If the table exists, no further action is taken. This is true whether or not new partition values are present and whether the partitioning scheme differs or not
error
Raises an error if the table/partition already exists.
Behavior in details
- Table does not exist
Table is created using options provided by user (
format
,compression
, etc).
- Table exists
If the table exists, raises an error. This is true whether or not new partition values are present and whether the partitioning scheme differs or not
Note
Unlike using pure Spark, config option
spark.sql.sources.partitionOverwriteMode
does not affect behavior.
- field format: str = 'orc'#
Format of files which should be used for storing table data.
Examples:
orc
(default),parquet
,csv
(NOT recommended)Note
It’s better to use column-based formats like
orc
orparquet
, not row-based (csv
,json
)Warning
Used only while creating new table, or in case of
if_exists=replace_entire_table
- field partition_by: List[str] | str | None = None (alias 'partitionBy')#
List of columns should be used for data partitioning.
None
means partitioning is disabled.Each partition is a folder which contains only files with the specific column value, like
myschema.db/mytable/col1=value1
,myschema.db/mytable/col1=value2
, and so on.Multiple partitions columns means nested folder structure, like
myschema.db/mytable/col1=val1/col2=val2
.If
WHERE
clause in the query contains expression likepartition = value
, Spark will scan only files in a specific partition.Examples:
reg_id
or["reg_id", "business_dt"]
Note
Values should be scalars (integers, strings), and either static (
countryId
) or incrementing (dates, years), with low number of distinct values.Columns like
userId
ordatetime
/timestamp
should NOT be used for partitioning.Warning
Used only while creating new table, or in case of
if_exists=replace_entire_table
- field bucket_by: Tuple[int, List[str] | str] | None = None (alias 'bucketBy')#
Number of buckets plus bucketing columns.
None
means bucketing is disabled.Each bucket is created as a set of files with name containing result of calculation
hash(columns) mod num_buckets
.This allows to remove shuffle from queries containing
GROUP BY
orJOIN
or using=
/IN
predicates on specific columns.Examples:
(10, "user_id")
,(10, ["user_id", "user_phone"])
Note
Bucketing should be used on columns containing a lot of unique values, like
userId
.Columns like
date
should NOT be used for bucketing because of too low number of unique values.Warning
It is recommended to use this option ONLY if you have a large table (hundreds of Gb or more), which is used mostly for JOINs with other tables, and you’re inserting data using
if_exists=overwrite_partitions
orif_exists=replace_entire_table
.Otherwise Spark will create a lot of small files (one file for each bucket and each executor), drastically decreasing HDFS performance.
Warning
Used only while creating new table, or in case of
if_exists=replace_entire_table
- field sort_by: List[str] | str | None = None (alias 'sortBy')#
Each file in a bucket will be sorted by these columns value.
None
means sorting is disabled.Examples:
user_id
or["user_id", "user_phone"]
Note
Sorting columns should contain values which are used in
ORDER BY
clauses.Warning
Could be used only with
bucket_by
optionWarning
Used only while creating new table, or in case of
if_exists=replace_entire_table
- field compression: str | None = None#
Compressing algorithm which should be used for compressing created files in HDFS.
None
means compression is disabled.Examples:
snappy
,zlib
Warning
Used only while creating new table, or in case of
if_exists=replace_entire_table