Writing to Clickhouse using DBWriter
#
For writing data to Clickhouse, use DBWriter
.
Warning
Please take into account Clickhouse <-> Spark type mapping
Warning
It is always recommended to create table explicitly using Clickhouse.execute instead of relying on Spark’s table DDL generation.
This is because Spark’s DDL generator can create columns with different precision and types than it is expected, causing precision loss or other issues.
Examples#
from onetl.connection import Clickhouse
from onetl.db import DBWriter
clickhouse = Clickhouse(...)
df = ... # data is here
writer = DBWriter(
connection=clickhouse,
target="schema.table",
options=Clickhouse.WriteOptions(
if_exists="append",
# ENGINE is required by Clickhouse
createTableOptions="ENGINE = MergeTree() ORDER BY id",
),
)
writer.run(df)
Options#
Method above accepts JDBCWriteOptions
- pydantic model onetl.connection.db_connection.jdbc_connection.options.JDBCWriteOptions#
Spark JDBC writing options.
Note
You can pass any value supported by Spark, even if it is not mentioned in this documentation. Option names should be in
camelCase
!The set of supported options depends on Spark version. See link above.
Examples
Write options initialization
options = JDBC.WriteOptions(if_exists="append", batchsize=20_000, customOption="value")
- field if_exists: JDBCTableExistBehavior = JDBCTableExistBehavior.APPEND (alias 'mode')#
Behavior of writing data into existing table.
- Possible values:
append
(default)Adds new rows into existing table.
Behavior in details
- Table does not exist
Table is created using options provided by user (
createTableOptions
,createTableColumnTypes
, etc).
- Table exists
Data is appended to a table. Table has the same DDL as before writing data
Warning
This mode does not check whether table already contains rows from dataframe, so duplicated rows can be created.
Also Spark does not support passing custom options to insert statement, like
ON CONFLICT
, so don’t try to implement deduplication using unique indexes or constraints.Instead, write to staging table and perform deduplication using
execute
method.
replace_entire_table
Table is dropped and then created, or truncated.
Behavior in details
- Table does not exist
Table is created using options provided by user (
createTableOptions
,createTableColumnTypes
, etc).
- Table exists
Table content is replaced with dataframe content.
After writing completed, target table could either have the same DDL as before writing data (
truncate=True
), or can be recreated (truncate=False
or source does not support truncation).
ignore
Ignores the write operation if the table already exists.
Behavior in details
- Table does not exist
Table is created using options provided by user (
createTableOptions
,createTableColumnTypes
, etc).
- Table exists
The write operation is ignored, and no data is written to the table.
error
Raises an error if the table already exists.
Behavior in details
- Table does not exist
Table is created using options provided by user (
createTableOptions
,createTableColumnTypes
, etc).
- Table exists
An error is raised, and no data is written to the table.
- field batchsize: int = 20000#
How many rows can be inserted per round trip.
Tuning this option can influence performance of writing.
Warning
Default value is different from Spark.
Spark uses quite small value
1000
, which is absolutely not usable in BigData world.Thus we’ve overridden default value with
20_000
, which should increase writing performance.You can increase it even more, up to
50_000
, but it depends on your database load and number of columns in the row. Higher values does not increase performance.
- field isolation_level: str = 'READ_UNCOMMITTED' (alias 'isolationLevel')#
The transaction isolation level, which applies to current connection.
- Possible values:
NONE
(as string, not Python’sNone
)READ_COMMITTED
READ_UNCOMMITTED
REPEATABLE_READ
SERIALIZABLE
Values correspond to transaction isolation levels defined by JDBC standard. Please refer the documentation for java.sql.Connection.