Clickhouse <-> Spark type mapping#
Type detection & casting#
Spark’s DataFrames always have a schema
which is a list of columns with corresponding Spark types. All operations on a column are performed using column type.
Reading from Clickhouse#
This is how Clickhouse connector performs this:
For each column in query result (
SELECT column1, column2, ... FROM table ...
) get column name and Clickhouse type.Find corresponding
Clickhouse type (read)
->Spark type
combination (see below) for each DataFrame column. If no combination is found, raise exception.Create DataFrame from query with specific column names and Spark types.
Writing to some existing Clickhouse table#
This is how Clickhouse connector performs this:
Get names of columns in DataFrame. [1]
Perform
SELECT * FROM table LIMIT 0
query.Take only columns present in DataFrame (by name, case insensitive). For each found column get Clickhouse type.
Find corresponding
Clickhouse type (read)
->Spark type
combination (see below) for each DataFrame column. If no combination is found, raise exception. [2]Find corresponding
Spark type
->Clickhousetype (write)
combination (see below) for each DataFrame column. If no combination is found, raise exception.If
Clickhousetype (write)
matchClickhouse type (read)
, no additional casts will be performed, DataFrame column will be written to Clickhouse as is.If
Clickhousetype (write)
does not matchClickhouse type (read)
, DataFrame column will be casted to target column type on Clickhouse side. For example, you can write column with text data toInt32
column, if column contains valid integer values within supported value range and precision.
Create new table using Spark#
Warning
ABSOLUTELY NOT RECOMMENDED!
This is how Clickhouse connector performs this:
Find corresponding
Spark type
->Clickhouse type (create)
combination (see below) for each DataFrame column. If no combination is found, raise exception.Generate DDL for creating table in Clickhouse, like
CREATE TABLE (col1 ...)
, and run it.Write DataFrame to created table as is.
But Spark does not have specific dialect for Clickhouse, so Generic JDBC dialect is used. Generic dialect is using SQL ANSI type names while creating tables in target database, not database-specific types.
If some cases this may lead to using wrong column type. For example, Spark creates column of type TIMESTAMP
which corresponds to Clickhouse type DateTime32
(precision up to seconds)
instead of more precise DateTime64
(precision up to nanoseconds).
This may lead to incidental precision loss, or sometimes data cannot be written to created table at all.
So instead of relying on Spark to create tables:
See example
writer = DBWriter(
connection=clickhouse,
table="default.target_tbl",
options=Clickhouse.WriteOptions(
if_exists="append",
# ENGINE is required by Clickhouse
createTableOptions="ENGINE = MergeTree() ORDER BY id",
),
)
writer.run(df)
Always prefer creating tables with specific types BEFORE WRITING DATA:
See example
clickhouse.execute(
"""
CREATE TABLE default.target_tbl AS (
id UInt8,
value DateTime64(6) -- specific type and precision
)
ENGINE = MergeTree()
ORDER BY id
""",
)
writer = DBWriter(
connection=clickhouse,
table="default.target_tbl",
options=Clickhouse.WriteOptions(if_exists="append"),
)
writer.run(df)
References#
Here you can find source code with type conversions:
Supported types#
Generic types#
LowCardinality(T)
is same asT
Nullable(T)
is same asT
, but Spark column is inferred asnullable=True
Numeric types#
Clickhouse type (read) |
Spark type |
Clickhousetype (write) |
Clickhouse type (create) |
---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
unsupported [3] |
||
|
|
|
|
|
|
|
|
|
|
|
|
|
unsupported [3] |
||
|
|
|
|
|
|||
|
|
|
|
|
|
|
|
|
|||
|
|||
|
|
|
|
|
|
|
|
|
unsupported [3] |
||
|
|
|
|
|
|
|
|
|
|
|
|
|
|||
|
|
|
|
|
|||
|
|||
|
unsupported [3] |
Clickhouse support numeric types up to 256 bit - Int256
, UInt256
, Decimal256(S)
, Decimal(P=39..76, S=0..76)
.
But Spark’s DecimalType(P, S)
supports maximum P=38
(128 bit). It is impossible to read, write or operate with values of larger precision,
this leads to an exception.
Temporal types#
- Notes:
Datetime with timezone has the same precision as without timezone
DateTime
is alias forDateTime32
TIMESTAMP
is alias forDateTime32
, butTIMESTAMP(N)
is alias forDateTime64(N)
Clickhouse type (read) |
Spark type |
Clickhousetype (write) |
Clickhouse type (create) |
---|---|---|---|
|
|
|
|
|
unsupported |
||
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
unsupported |
||
|
|||
|
|||
|
|
|
|
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
Warning
Note that types in Clickhouse and Spark have different value ranges:
Clickhouse type |
Min value |
Max value |
Spark type |
Min value |
Max value |
---|---|---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|||
|
|
|
So not all of values in Spark DataFrame can be written to Clickhouse.
Clickhouse support datetime up to nanoseconds precision (23:59:59.999999999
),
but Spark TimestampType()
supports datetime up to microseconds precision (23:59:59.999999
).
Nanoseconds will be lost during read or write operations.
Generic JDBC dialect generates DDL with Clickhouse type TIMESTAMP
which is alias for DateTime32
with precision up to seconds (23:59:59
).
Inserting data with milliseconds precision (23:59:59.999
) will lead to throwing away milliseconds.
Clickhouse will raise an exception that data in format 2001-01-01 23:59:59.999999
has data .999999
which does not match format YYYY-MM-DD hh:mm:ss
.
So you can create Clickhouse table with Spark, but cannot write data to column of this type.
String types#
Clickhouse type (read) |
Spark type |
Clickhousetype (write) |
Clickhouse type (create) |
---|---|---|---|
|
|
|
|
|
|||
|
|||
|
|||
|
|||
|
|||
|
|
Unsupported types#
- Columns of these Clickhouse types cannot be read by Spark:
AggregateFunction(func, T)
Array(T)
JSON
Map(K, V)
MultiPolygon
Nested(field1 T1, ...)
Nothing
Point
Polygon
Ring
SimpleAggregateFunction(func, T)
Tuple(T1, T2, ...)
UUID
- Dataframe with these Spark types be written to Clickhouse:
ArrayType(T)
BinaryType()
CharType(N)
DayTimeIntervalType(P, S)
MapType(K, V)
NullType()
StructType([...])
TimestampNTZType()
VarcharType(N)
This is because Spark does not have dedicated Clickhouse dialect, and uses Generic JDBC dialect instead.
This dialect does not have type conversion between some types, like Clickhouse Array
-> Spark ArrayType()
, and vice versa.
The is a way to avoid this - just cast everything to String
.
Explicit type cast#
DBReader
#
Use CAST
or toJSONString
to get column data as string in JSON format,
and then cast string column in resulting dataframe to proper type using from_json:
from pyspark.sql.functions import from_json
from pyspark.sql.types import ArrayType, IntegerType
reader = DBReader(
connection=clickhouse,
columns=[
"id",
"toJSONString(array_column) array_column",
],
)
df = reader.run()
# Spark requires all columns to have some specific type, describe it
column_type = ArrayType(IntegerType())
df = df.select(
df.id,
from_json(df.array_column, column_type).alias("array_column"),
)
DBWriter
#
Convert dataframe column to JSON using to_json,
and write it as String
column in Clickhouse:
clickhouse.execute(
"""
CREATE TABLE default.target_tbl AS (
id Int32,
array_column_json String,
)
ENGINE = MergeTree()
ORDER BY id
""",
)
from pyspark.sql.functions import to_json
df = df.select(
df.id,
to_json(df.array_column).alias("array_column_json"),
)
writer.run(df)
Then you can parse this column on Clickhouse side - for example, by creating a view:
SELECT
id,
JSONExtract(json_column, 'Array(String)') AS array_column
FROM target_tbl
You can also use ALIAS
or MATERIALIZED columns
to avoid writing such expression in every SELECT
clause all the time:
CREATE TABLE default.target_tbl AS (
id Int32,
array_column_json String,
-- computed column
array_column Array(String) ALIAS JSONExtract(json_column, 'Array(String)')
-- or materialized column
-- array_column Array(String) MATERIALIZED JSONExtract(json_column, 'Array(String)')
)
ENGINE = MergeTree()
ORDER BY id
Downsides:
Using
SELECT JSONExtract(...)
orALIAS
column can be expensive, because value is calculated on every row access. This can be especially harmful if such column is used inWHERE
clause.ALIAS
andMATERIALIZED
columns are not included inSELECT *
clause, they should be added explicitly:SELECT *, calculated_column FROM table
.
Warning
EPHEMERAL columns are not supported by Spark because they cannot be selected to determine target column type.