MySQL <-> Spark type mapping#
Type detection & casting#
Spark’s DataFrames always have a schema
which is a list of columns with corresponding Spark types. All operations on a column are performed using column type.
Reading from MySQL#
This is how MySQL connector performs this:
For each column in query result (
SELECT column1, column2, ... FROM table ...
) get column name and MySQL type.Find corresponding
MySQL type (read)
->Spark type
combination (see below) for each DataFrame column. If no combination is found, raise exception.Create DataFrame from query with specific column names and Spark types.
Writing to some existing MySQL table#
This is how MySQL connector performs this:
Get names of columns in DataFrame. [1]
Perform
SELECT * FROM table LIMIT 0
query.Take only columns present in DataFrame (by name, case insensitive). For each found column get MySQL type.
Find corresponding
Spark type
->MySQL type (write)
combination (see below) for each DataFrame column. If no combination is found, raise exception.If
MySQL type (write)
matchMySQL type (read)
, no additional casts will be performed, DataFrame column will be written to MySQL as is.If
MySQL type (write)
does not matchMySQL type (read)
, DataFrame column will be casted to target column type on MySQL side. For example, you can write column with text data toint
column, if column contains valid integer values within supported value range and precision.
Create new table using Spark#
Warning
ABSOLUTELY NOT RECOMMENDED!
This is how MySQL connector performs this:
Find corresponding
Spark type
->MySQL type (create)
combination (see below) for each DataFrame column. If no combination is found, raise exception.Generate DDL for creating table in MySQL, like
CREATE TABLE (col1 ...)
, and run it.Write DataFrame to created table as is.
But some cases this may lead to using wrong column type. For example, Spark creates column of type timestamp
which corresponds to MySQL type timestamp(0)
(precision up to seconds)
instead of more precise timestamp(6)
(precision up to nanoseconds).
This may lead to incidental precision loss, or sometimes data cannot be written to created table at all.
So instead of relying on Spark to create tables:
See example
writer = DBWriter(
connection=mysql,
table="myschema.target_tbl",
options=MySQL.WriteOptions(
if_exists="append",
createTableOptions="ENGINE = InnoDB",
),
)
writer.run(df)
Always prefer creating tables with specific types BEFORE WRITING DATA:
See example
mysql.execute(
"""
CREATE TABLE schema.table AS (
id bigint,
key text,
value timestamp(6) -- specific type and precision
)
ENGINE = InnoDB
""",
)
writer = DBWriter(
connection=mysql,
table="myschema.target_tbl",
options=MySQL.WriteOptions(if_exists="append"),
)
writer.run(df)
References#
Here you can find source code with type conversions:
Supported types#
Numeric types#
MySQL type (read) |
Spark type |
MySQL type (write) |
MySQL type (create) |
---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
unsupported [2] |
||
|
|
|
|
|
|||
|
|
|
|
|
|||
|
|||
|
|||
|
|
|
|
MySQL support decimal types with precision P
up to 65.
But Spark’s DecimalType(P, S)
supports maximum P=38
. It is impossible to read, write or operate with values of larger precision,
this leads to an exception.
Temporal types#
MySQL type (read) |
Spark type |
MySQL type (write) |
MySQL type (create) |
---|---|---|---|
|
|
|
|
|
|||
|
|
|
|
|
|||
|
|||
|
|||
|
|
|
|
|
|||
|
|||
|
|||
|
|
|
|
|
|||
|
|
|
|
|
Warning
Note that types in MySQL and Spark have different value ranges:
MySQL type |
Min value |
Max value |
Spark type |
Min value |
Max value |
---|---|---|---|---|---|
|
|
|
|
|
|
|
|
|
|||
|
|
|
|
|
|
|
|
|
|||
|
|
|
So Spark can read all the values from MySQL, but not all of values in Spark DataFrame can be written to MySQL.
MySQL dialect generates DDL with MySQL type timestamp
which is alias for timestamp(0)
with precision up to seconds (23:59:59
).
Inserting data with microseconds precision (23:59:59.999999
) will lead to throwing away microseconds.
time
type is the same as timestamp
with date 1970-01-01
. So instead of reading data from MySQL like 23:59:59
it is actually read 1970-01-01 23:59:59
, and vice versa.
String types#
MySQL type (read) |
Spark type |
MySQL type (write) |
MySQL type (create) |
---|---|---|---|
|
|
|
|
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
Binary types#
MySQL type (read) |
Spark type |
MySQL type (write) |
MySQL type (create) |
---|---|---|---|
|
|
|
|
|
|||
|
|||
|
|||
|
|||
|
Geometry types#
MySQL type (read) |
Spark type |
MySQL type (write) |
MySQL type (create) |
---|---|---|---|
|
|
|
|
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
Explicit type cast#
DBReader
#
It is possible to explicitly cast column type using DBReader(columns=...)
syntax.
For example, you can use CAST(column AS text)
to convert data to string representation on MySQL side, and so it will be read as Spark’s StringType()
.
It is also possible to use JSON_OBJECT MySQL function to convert column of any type to string representation, and then parse this column on Spark side using from_json:
from pyspark.sql.types import IntegerType, StructField, StructType
from onetl.connection import MySQL
from onetl.db import DBReader
mysql = MySQL(...)
DBReader(
connection=mysql,
columns=[
"id",
"supported_column",
"CAST(unsupported_column AS text) unsupported_column_str",
# or
"JSON_OBJECT('key', value_column) json_column",
],
)
df = reader.run()
# Spark requires all columns to have some type, describe it
column_type = StructType([StructField("key", IntegerType())])
# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
# or explicit json parsing
from_json(df.json_column, schema).alias("struct_column"),
)
DBWriter
#
Convert dataframe column to JSON using to_json,
and write it as text
column in MySQL:
mysql.execute(
"""
CREATE TABLE schema.target_tbl AS (
id bigint,
array_column_json json -- any string type, actually
)
ENGINE = InnoDB
""",
)
from pyspark.sql.functions import to_json
df = df.select(
df.id,
to_json(df.array_column).alias("array_column_json"),
)
writer.run(df)
Then you can parse this column on MySQL side - for example, by creating a view:
SELECT
id,
array_column_json->"$[0]" AS array_item
FROM target_tbl
Or by using GENERATED column:
CREATE TABLE schema.target_table (
id bigint,
supported_column timestamp,
array_column_json json, -- any string type, actually
-- virtual column
array_item_0 GENERATED ALWAYS AS (array_column_json->"$[0]")) VIRTUAL
-- or stired column
-- array_item_0 GENERATED ALWAYS AS (array_column_json->"$[0]")) STORED
)
VIRTUAL
column value is calculated on every table read.
STORED
column value is calculated during insert, but this require additional space.