MSSQL <-> Spark type mapping#

Type detection & casting#

Spark’s DataFrames always have a schema which is a list of columns with corresponding Spark types. All operations on a column are performed using column type.

Reading from MSSQL#

This is how MSSQL connector performs this:

  • For each column in query result (SELECT column1, column2, ... FROM table ...) get column name and MSSQL type.

  • Find corresponding MSSQL type (read) -> Spark type combination (see below) for each DataFrame column. If no combination is found, raise exception.

  • Create DataFrame from query with specific column names and Spark types.

Writing to some existing MSSQL table#

This is how MSSQL connector performs this:

  • Get names of columns in DataFrame. [1]

  • Perform SELECT * FROM table LIMIT 0 query.

  • Take only columns present in DataFrame (by name, case insensitive). For each found column get MSSQL type.

  • Find corresponding Spark type -> MSSQL type (write) combination (see below) for each DataFrame column. If no combination is found, raise exception.

  • If MSSQL type (write) match MSSQL type (read), no additional casts will be performed, DataFrame column will be written to MSSQL as is.

  • If MSSQL type (write) does not match MSSQL type (read), DataFrame column will be casted to target column type on MSSQL side. For example, you can write column with text data to int column, if column contains valid integer values within supported value range and precision [2].

Create new table using Spark#

Warning

ABSOLUTELY NOT RECOMMENDED!

This is how MSSQL connector performs this:

  • Find corresponding Spark type -> MSSQL type (create) combination (see below) for each DataFrame column. If no combination is found, raise exception.

  • Generate DDL for creating table in MSSQL, like CREATE TABLE (col1 ...), and run it.

  • Write DataFrame to created table as is.

But some cases this may lead to using wrong column type. For example, Spark creates column of type timestamp which corresponds to MSSQL’s type timestamp(0) (precision up to seconds) instead of more precise timestamp(6) (precision up to nanoseconds). This may lead to incidental precision loss, or sometimes data cannot be written to created table at all.

So instead of relying on Spark to create tables:

See example
writer = DBWriter(
    connection=mssql,
    table="myschema.target_tbl",
    options=MSSQL.WriteOptions(
        if_exists="append",
    ),
)
writer.run(df)

Always prefer creating tables with specific types BEFORE WRITING DATA:

See example
mssql.execute(
    """
    CREATE TABLE schema.table AS (
        id bigint,
        key text,
        value datetime2(6) -- specific type and precision
    )
    """,
)

writer = DBWriter(
    connection=mssql,
    table="myschema.target_tbl",
    options=MSSQL.WriteOptions(if_exists="append"),
)
writer.run(df)

References#

Here you can find source code with type conversions:

Supported types#

See official documentation

Numeric types#

MSSQL type (read)

Spark type

MSSQL type (write)

MSSQL type (create)

decimal

DecimalType(P=18, S=0)

decimal(P=18, S=0)

decimal(P=18, S=0)

decimal(P=0..38)

DecimalType(P=0..38, S=0)

decimal(P=0..38, S=0)

decimal(P=0..38, S=0)

decimal(P=0..38, S=0..38)

DecimalType(P=0..38, S=0..38)

decimal(P=0..38, S=0..38)

decimal(P=0..38, S=0..38)

real

FloatType()

real

real

float

DoubleType()

float

float

smallint

ShortType()

smallint

smallint

tinyint

IntegerType()

int

int

int

bigint

LongType()

bigint

bigint

Temporal types#

Note

MSSQL timestamp type is alias for rowversion (see Special types). It is not a temporal type!

MSSQL type (read)

Spark type

MSSQL type (write)

MSSQL type (create)

date

DateType()

date

date

smalldatetime, minutes

TimestampType(), microseconds

datetime2(6), microseconds

datetime, milliseconds

datetime, milliseconds

datetime2(0), seconds

datetime2(3), milliseconds

datetime2(6), microseconds

TimestampType(), microseconds

datetime2(6), microseconds

datetime, milliseconds, precision loss [3]

datetime2(7), 100s of nanoseconds

TimestampType(), microseconds, precision loss [4]

datetime2(6), microseconds, precision loss [4]

time(0), seconds

TimestampType(), microseconds, with time format quirks [5]

datetime2(6), microseconds

datetime, milliseconds

time(3), milliseconds

time(6), microseconds

TimestampType(), microseconds, with time format quirks [5]

datetime2(6), microseconds

datetime, milliseconds, precision loss [3]

time, 100s of nanoseconds

TimestampType(), microseconds, precision loss [4], with time format quirks [5]

datetime2(6), microseconds precision loss [3]

time(7), 100s of nanoseconds

datetimeoffset

StringType()

nvarchar

nvarchar

Warning

Note that types in MSSQL and Spark have different value ranges:

MySQL type

Min value

Max value

Spark type

Min value

Max value

smalldatetime

1900-01-01 00:00:00

2079-06-06 23:59:00

TimestampType()

0001-01-01 00:00:00.000000

9999-12-31 23:59:59.999999

datetime

1753-01-01 00:00:00.000

9999-12-31 23:59:59.997

datetime2

0001-01-01 00:00:00.000000

9999-12-31 23:59:59.999999

time

00:00:00.0000000

23:59:59.9999999

So not all of values in Spark DataFrame can be written to MSSQL.

References:

String types#

MSSQL type (read)

Spark type

MSSQL type (write)

MSSQL type (create)

char

StringType()

nvarchar

nvarchar

char(N)

nchar

nchar(N)

varchar

varchar(N)

nvarchar

nvarchar(N)

mediumtext

text

ntext

xml

Binary types#

MSSQL type (read)

Spark type

MSSQL type (write)

MSSQL type (create)

bit

BooleanType()

bit

bit

binary

BinaryType()

varbinary

varbinary

binary(N)

varbinary

varbinary(N)

image

Special types#

MSSQL type (read)

Spark type

MSSQL type (write)

MSSQL type (create)

geography

BinaryType()

varbinary

varbinary

geometry

hierarchyid

rowversion

sql_variant

unsupported

sysname

StringType()

nvarchar

nvarchar

uniqueidentifier

Explicit type cast#

DBReader#

It is possible to explicitly cast column type using DBReader(columns=...) syntax.

For example, you can use CAST(column AS text) to convert data to string representation on MSSQL side, and so it will be read as Spark’s StringType():

from onetl.connection import MSSQL
from onetl.db import DBReader

mssql = MSSQL(...)

DBReader(
    connection=mssql,
    columns=[
        "id",
        "supported_column",
        "CAST(unsupported_column AS text) unsupported_column_str",
    ],
)
df = reader.run()

# cast column content to proper Spark type
df = df.select(
    df.id,
    df.supported_column,
    # explicit cast
    df.unsupported_column_str.cast("integer").alias("parsed_integer"),
)

DBWriter#

Convert dataframe column to JSON using to_json, and write it as text column in MSSQL:

mssql.execute(
    """
    CREATE TABLE schema.target_tbl AS (
        id bigint,
        struct_column_json text -- any string type, actually
    )
    """,
)

from pyspark.sql.functions import to_json

df = df.select(
    df.id,
    to_json(df.struct_column).alias("struct_column_json"),
)

writer.run(df)

Then you can parse this column on MSSQL side - for example, by creating a view:

SELECT
    id,
    JSON_VALUE(struct_column_json, "$.nested.field") AS nested_field
FROM target_tbl

Or by using computed column:

CREATE TABLE schema.target_table (
    id bigint,
    supported_column datetime2(6),
    struct_column_json text, -- any string type, actually
    -- computed column
    nested_field AS (JSON_VALUE(struct_column_json, "$.nested.field"))
    -- or persisted column
    -- nested_field AS (JSON_VALUE(struct_column_json, "$.nested.field")) PERSISTED
)

By default, column value is calculated on every table read. Column marked as PERSISTED is calculated during insert, but this require additional space.