Postgres <-> Spark type mapping#

Type detection & casting#

Spark’s DataFrames always have a schema which is a list of columns with corresponding Spark types. All operations on a column are performed using column type.

Reading from Postgres#

This is how Postgres connector performs this:

  • For each column in query result (SELECT column1, column2, ... FROM table ...) get column name and Postgres type.

  • Find corresponding Postgres type (read) -> Spark type combination (see below) for each DataFrame column [1]. If no combination is found, raise exception.

  • Create DataFrame from query with specific column names and Spark types.

Writing to some existing Postgres table#

This is how Postgres connector performs this:

  • Get names of columns in DataFrame. [1]

  • Perform SELECT * FROM table LIMIT 0 query.

  • Take only columns present in DataFrame (by name, case insensitive). For each found column get Clickhouse type.

  • Find corresponding Spark type -> Postgres type (write) combination (see below) for each DataFrame column. If no combination is found, raise exception.

  • If Postgres type (write) match Postgres type (read), no additional casts will be performed, DataFrame column will be written to Postgres as is.

  • If Postgres type (write) does not match Postgres type (read), DataFrame column will be casted to target column type on Postgres side. For example, you can write column with text data to int column, if column contains valid integer values within supported value range and precision [3].

Create new table using Spark#

Warning

ABSOLUTELY NOT RECOMMENDED!

This is how Postgres connector performs this:

  • Find corresponding Spark type -> Postgres type (create) combination (see below) for each DataFrame column. If no combination is found, raise exception.

  • Generate DDL for creating table in Postgres, like CREATE TABLE (col1 ...), and run it.

  • Write DataFrame to created table as is.

But Postgres connector support only limited number of types and almost no custom clauses (like PARTITION BY, INDEX, etc). So instead of relying on Spark to create tables:

See example
writer = DBWriter(
    connection=postgres,
    table="public.table",
    options=Postgres.WriteOptions(
        if_exists="append",
        createTableOptions="PARTITION BY RANGE (id)",
    ),
)
writer.run(df)

Always prefer creating table with desired DDL BEFORE WRITING DATA:

See example
postgres.execute(
    """
    CREATE TABLE public.table AS (
        id bigint,
        business_dt timestamp(6),
        value json
    )
    PARTITION BY RANGE (Id)
    """,
)

writer = DBWriter(
    connection=postgres,
    table="public.table",
    options=Postgres.WriteOptions(if_exists="append"),
)
writer.run(df)

See Postgres CREATE TABLE documentation.

Supported types#

References#

See List of Postgres types.

Here you can find source code with type conversions:

Numeric types#

Postgres type (read)

Spark type

Postgres type (write)

Postgres type (create)

decimal

DecimalType(P=38, S=18)

decimal(P=38, S=18)

decimal (unbounded)

decimal(P=0..38)

DecimalType(P=0..38, S=0)

decimal(P=0..38, S=0)

decimal(P=0..38, S=0..38)

DecimalType(P=0..38, S=0..38)

decimal(P=0..38, S=0..38)

decimal(P=39.., S=0..)

unsupported [4]

decimal(P=.., S=..-1)

unsupported [5]

real

FloatType()

real

real

double precision

DoubleType()

double precision

double precision

smallint

ShortType()

smallint

smallint

-

ByteType()

integer

IntegerType()

integer

integer

bigint

LongType()

bigint

bigint

money

StringType() [1]

text

text

int4range

int8range

numrange

int2vector

Temporal types#

Postgres type (read)

Spark type

Postgres type (write)

Postgres type (create)

date

DateType()

date

date

time

TimestampType(), with time format quirks [6]

timestamp(6)

timestamp(6)

time(0..6)

time with time zone

time(0..6) with time zone

timestamp

TimestampType()

timestamp(6)

timestamp(6)

timestamp(0..6)

timestamp with time zone

timestamp(0..6) with time zone

-

TimestampNTZType()

timestamp(6)

timestamp(6)

interval of any precision

StringType() [1]

text

text

-

DayTimeIntervalType()

unsupported

unsupported

-

YearMonthIntervalType()

unsupported

unsupported

daterange

StringType() [1]

text

text

tsrange

tstzrange

Warning

Note that types in Postgres and Spark have different value ranges:

Postgres type

Min value

Max value

Spark type

Min value

Max value

date

-4713-01-01

5874897-01-01

DateType()

0001-01-01

9999-12-31

timestamp

-4713-01-01 00:00:00.000000

294276-12-31 23:59:59.999999

TimestampType()

0001-01-01 00:00:00.000000

9999-12-31 23:59:59.999999

time

00:00:00.000000

24:00:00.000000

So not all of values can be read from Postgres to Spark.

References:

String types#

Postgres type (read)

Spark type

Postgres type (write)

Postgres type (create)

character

StringType()

StringType() [1]

text

text

character(N)

character varying

character varying(N)

text

json

jsonb

xml

CREATE TYPE ... AS ENUM

tsvector

tsquery

-

CharType()

unsupported

unsupported

-

VarcharType()

unsupported

unsupported

Binary types#

Postgres type (read)

Spark type

Postgres type (write)

Postgres type (create)

boolean

BooleanType()

boolean

boolean

bit

BooleanType()

bool, cannot insert data [3]

bool

bit(N=1)

bit(N=2..)

ByteType()

bytea, cannot insert data [3]

bytea

bit varying

StringType() [1]

text

text

bit varying(N)

bytea

BinaryType()

bytea

bytea

Struct types#

Postgres type (read)

Spark type

Postgres type (write)

Postgres type (create)

T[]

ArrayType(T)

T[]

T[]

T[][]

unsupported

CREATE TYPE sometype (...)

StringType() [1]

text

text

-

StructType()

unsupported

-

MapType()

Network types#

Postgres type (read)

Spark type

Postgres type (write)

Postgres type (create)

cidr

StringType() [1]

text

text

inet

macaddr

macaddr8

Geo types#

Postgres type (read)

Spark type

Postgres type (write)

Postgres type (create)

circle

StringType() [1]

text

text

box

line

lseg

path

point

polygon

polygon

Explicit type cast#

DBReader#

It is possible to explicitly cast column of unsupported type using DBReader(columns=...) syntax.

For example, you can use CAST(column AS text) to convert data to string representation on Postgres side, and so it will be read as Spark’s StringType().

It is also possible to use to_json Postgres function to convert column of any type to string representation, and then parse this column on Spark side using from_json:

from pyspark.sql.functions import from_json
from pyspark.sql.types import IntegerType

from onetl.connection import Postgres
from onetl.db import DBReader

postgres = Postgres(...)

DBReader(
    connection=postgres,
    columns=[
        "id",
        "supported_column",
        "CAST(unsupported_column AS text) unsupported_column_str",
        # or
        "to_json(unsupported_column) array_column_json",
    ],
)
df = reader.run()

# Spark requires all columns to have some type, describe it
column_type = IntegerType()

# cast column content to proper Spark type
df = df.select(
    df.id,
    df.supported_column,
    # explicit cast
    df.unsupported_column_str.cast("integer").alias("parsed_integer"),
    # or explicit json parsing
    from_json(df.array_column_json, schema).alias("array_column"),
)

DBWriter#

It is always possible to convert data on Spark side to string, and then write it to text column in Postgres table.

Using to_json#

For example, you can convert data using to_json function.

from pyspark.sql.functions import to_json

from onetl.connection import Postgres
from onetl.db import DBReader

postgres = Postgres(...)

postgres.execute(
    """
    CREATE TABLE schema.target_table (
        id int,
        supported_column timestamp,
        array_column_json jsonb -- any column type, actually
    )
    """,
)

write_df = df.select(
    df.id,
    df.supported_column,
    to_json(df.unsupported_column).alias("array_column_json"),
)

writer = DBWriter(
    connection=postgres,
    target="schema.target_table",
)
writer.run(write_df)

Then you can parse this column on Postgres side (for example, by creating a view):

SELECT
    id,
    supported_column,
    array_column_json->'0' AS array_item_0
FROM
    schema.target_table

To avoid casting the value on every table read you can use GENERATED ALWAYS STORED column, but this requires 2x space (for original and parsed value).

Manual conversion to string#

Postgres connector also supports conversion text value directly to target column type, if this value has a proper format.

For example, you can write data like [123, 345) to int8range type because Postgres allows cast '[123, 345)'::int8range':

from pyspark.sql.ftypes import StringType
from pyspark.sql.functions import udf

from onetl.connection import Postgres
from onetl.db import DBReader

postgres = Postgres(...)

postgres.execute(
    """
    CREATE TABLE schema.target_table (
        id int,
        range_column int8range -- any column type, actually
    )
    """,
)


@udf(returnType=StringType())
def array_to_range(value: tuple):
    """This UDF allows to convert tuple[start, end] to Postgres' range format"""
    start, end = value
    return f"[{start},{end})"


write_df = df.select(
    df.id,
    array_to_range(df.range_column).alias("range_column"),
)

writer = DBWriter(
    connection=postgres,
    target="schema.target_table",
)
writer.run(write_df)

This can be tricky to implement and may lead to longer write process. But this does not require extra space on Postgres side, and allows to avoid explicit value cast on every table read.