Writing to Greenplum using DBWriter
#
For writing data to Greenplum, use DBWriter
with GreenplumWriteOptions
.
Warning
Please take into account Greenplum <-> Spark type mapping.
Warning
It is always recommended to create table explicitly using Greenplum.execute instead of relying on Spark’s table DDL generation.
This is because Spark’s DDL generator can create columns with different types than it is expected.
Examples#
from onetl.connection import Greenplum
from onetl.db import DBWriter
greenplum = Greenplum(...)
df = ... # data is here
writer = DBWriter(
connection=greenplum,
target="schema.table",
options=Greenplum.WriteOptions(
if_exists="append",
# by default distribution is random
distributedBy="id",
# partitionBy is not supported
),
)
writer.run(df)
Interaction schema#
High-level schema is described in Prerequisites. You can find detailed interaction schema below.
Spark <-> Greenplum interaction during DBWriter.run()
Options#
- pydantic model onetl.connection.db_connection.greenplum.options.GreenplumWriteOptions#
VMware’s Greenplum Spark connector writing options.
Note
You can pass any value supported by connector, even if it is not mentioned in this documentation.
The set of supported options depends on connector version. See link above.
Warning
Some options, like
url
,dbtable
,server.*
,pool.*
, etc are populated from connection attributes, and cannot be overridden by the user inWriteOptions
to avoid issues.Examples
Write options initialization
options = Greenplum.WriteOptions( if_exists="append", truncate="false", distributedBy="mycolumn", )
- field if_exists: GreenplumTableExistBehavior = GreenplumTableExistBehavior.APPEND (alias 'mode')#
Behavior of writing data into existing table.
- Possible values:
append
(default)Adds new rows into existing table.
Behavior in details
- Table does not exist
Table is created using options provided by user (
distributedBy
and others).
- Table exists
Data is appended to a table. Table has the same DDL as before writing data.
Warning
This mode does not check whether table already contains rows from dataframe, so duplicated rows can be created.
Also Spark does not support passing custom options to insert statement, like
ON CONFLICT
, so don’t try to implement deduplication using unique indexes or constraints.Instead, write to staging table and perform deduplication using
execute
method.
replace_entire_table
Table is dropped and then created.
Behavior in details
- Table does not exist
Table is created using options provided by user (
distributedBy
and others).
- Table exists
Table content is replaced with dataframe content.
After writing completed, target table could either have the same DDL as before writing data (
truncate=True
), or can be recreated (truncate=False
).
ignore
Ignores the write operation if the table already exists.
Behavior in details
- Table does not exist
Table is created using options provided by user (
distributedBy
and others).
- Table exists
The write operation is ignored, and no data is written to the table.
error
Raises an error if the table already exists.
Behavior in details
- Table does not exist
Table is created using options provided by user (
distributedBy
and others).
- Table exists
An error is raised, and no data is written to the table.