MSSQL <-> Spark type mapping#
Type detection & casting#
Spark’s DataFrames always have a schema
which is a list of columns with corresponding Spark types. All operations on a column are performed using column type.
Reading from MSSQL#
This is how MSSQL connector performs this:
For each column in query result (
SELECT column1, column2, ... FROM table ...
) get column name and MSSQL type.Find corresponding
MSSQL type (read)
->Spark type
combination (see below) for each DataFrame column. If no combination is found, raise exception.Create DataFrame from query with specific column names and Spark types.
Writing to some existing MSSQL table#
This is how MSSQL connector performs this:
Get names of columns in DataFrame. [1]
Perform
SELECT * FROM table LIMIT 0
query.Take only columns present in DataFrame (by name, case insensitive). For each found column get MSSQL type.
Find corresponding
Spark type
->MSSQL type (write)
combination (see below) for each DataFrame column. If no combination is found, raise exception.If
MSSQL type (write)
matchMSSQL type (read)
, no additional casts will be performed, DataFrame column will be written to MSSQL as is.If
MSSQL type (write)
does not matchMSSQL type (read)
, DataFrame column will be casted to target column type on MSSQL side. For example, you can write column with text data toint
column, if column contains valid integer values within supported value range and precision [2].
Create new table using Spark#
Warning
ABSOLUTELY NOT RECOMMENDED!
This is how MSSQL connector performs this:
Find corresponding
Spark type
->MSSQL type (create)
combination (see below) for each DataFrame column. If no combination is found, raise exception.Generate DDL for creating table in MSSQL, like
CREATE TABLE (col1 ...)
, and run it.Write DataFrame to created table as is.
But some cases this may lead to using wrong column type. For example, Spark creates column of type timestamp
which corresponds to MSSQL’s type timestamp(0)
(precision up to seconds)
instead of more precise timestamp(6)
(precision up to nanoseconds).
This may lead to incidental precision loss, or sometimes data cannot be written to created table at all.
So instead of relying on Spark to create tables:
See example
writer = DBWriter(
connection=mssql,
table="myschema.target_tbl",
options=MSSQL.WriteOptions(
if_exists="append",
),
)
writer.run(df)
Always prefer creating tables with specific types BEFORE WRITING DATA:
See example
mssql.execute(
"""
CREATE TABLE schema.table AS (
id bigint,
key text,
value datetime2(6) -- specific type and precision
)
""",
)
writer = DBWriter(
connection=mssql,
table="myschema.target_tbl",
options=MSSQL.WriteOptions(if_exists="append"),
)
writer.run(df)
References#
Here you can find source code with type conversions:
Supported types#
Numeric types#
MSSQL type (read) |
Spark type |
MSSQL type (write) |
MSSQL type (create) |
---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|||
|
|
|
|
Temporal types#
Note
MSSQL timestamp
type is alias for rowversion
(see Special types). It is not a temporal type!
MSSQL type (read) |
Spark type |
MSSQL type (write) |
MSSQL type (create) |
---|---|---|---|
|
|
|
|
|
|
|
|
|
|||
|
|||
|
|||
|
|
|
|
|
|
|
|
|
|
|
|
|
|||
|
|
|
|
|
|
|
|
|
|||
|
|
|
|
Warning
Note that types in MSSQL and Spark have different value ranges:
MySQL type |
Min value |
Max value |
Spark type |
Min value |
Max value |
---|---|---|---|---|---|
|
|
|
|
|
|
|
|
|
|||
|
|
|
|||
|
|
|
So not all of values in Spark DataFrame can be written to MSSQL.
MSSQL dialect for Spark generates DDL with type datetime
which has precision up to milliseconds (23:59:59.999
, 10-3 seconds).
Inserting data with microsecond and higher precision (23:59:59.999999
.. 23.59:59.9999999
, 10-6 .. 10-7 seconds)
will lead to throwing away microseconds.
MSSQL support timestamp up to 100s of nanoseconds precision (23:59:59.9999999999
, 10-7 seconds),
but Spark TimestampType()
supports datetime up to microseconds precision (23:59:59.999999
, 10-6 seconds).
Last digit will be lost during read or write operations.
time
type is the same as timestamp
with date 1970-01-01
. So instead of reading data from MSSQL like 23:59:59.999999
it is actually read 1970-01-01 23:59:59.999999
, and vice versa.
String types#
MSSQL type (read) |
Spark type |
MSSQL type (write) |
MSSQL type (create) |
---|---|---|---|
|
|
|
|
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
Binary types#
MSSQL type (read) |
Spark type |
MSSQL type (write) |
MSSQL type (create) |
---|---|---|---|
|
|
|
|
|
|
|
|
|
|||
|
|||
|
|||
|
Special types#
MSSQL type (read) |
Spark type |
MSSQL type (write) |
MSSQL type (create) |
---|---|---|---|
|
|
|
|
|
|||
|
|||
|
|||
|
unsupported |
||
|
|
|
|
|
Explicit type cast#
DBReader
#
It is possible to explicitly cast column type using DBReader(columns=...)
syntax.
For example, you can use CAST(column AS text)
to convert data to string representation on MSSQL side, and so it will be read as Spark’s StringType()
:
from onetl.connection import MSSQL
from onetl.db import DBReader
mssql = MSSQL(...)
DBReader(
connection=mssql,
columns=[
"id",
"supported_column",
"CAST(unsupported_column AS text) unsupported_column_str",
],
)
df = reader.run()
# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
)
DBWriter
#
Convert dataframe column to JSON using to_json,
and write it as text
column in MSSQL:
mssql.execute(
"""
CREATE TABLE schema.target_tbl AS (
id bigint,
struct_column_json text -- any string type, actually
)
""",
)
from pyspark.sql.functions import to_json
df = df.select(
df.id,
to_json(df.struct_column).alias("struct_column_json"),
)
writer.run(df)
Then you can parse this column on MSSQL side - for example, by creating a view:
SELECT
id,
JSON_VALUE(struct_column_json, "$.nested.field") AS nested_field
FROM target_tbl
Or by using computed column:
CREATE TABLE schema.target_table (
id bigint,
supported_column datetime2(6),
struct_column_json text, -- any string type, actually
-- computed column
nested_field AS (JSON_VALUE(struct_column_json, "$.nested.field"))
-- or persisted column
-- nested_field AS (JSON_VALUE(struct_column_json, "$.nested.field")) PERSISTED
)
By default, column value is calculated on every table read.
Column marked as PERSISTED
is calculated during insert, but this require additional space.