DB Writer#

DBWriter

Class specifies schema and table where you can write your dataframe.

DBWriter.run(df)

Method for writing your df to specified target.

class onetl.db.db_writer.db_writer.DBWriter(*, connection: BaseDBConnection, table: str, options: GenericOptions | None = None)#

Class specifies schema and table where you can write your dataframe. support_hooks

Parameters:
connectiononetl.connection.DBConnection

Class which contains DB connection properties. See DB Connections section.

targetstr

Table/collection/etc name to write data to.

If connection has schema support, you need to specify the full name of the source including the schema, e.g. schema.name.

optionsdict, onetl.connection.DBConnection.WriteOptions, default: None

Spark write options.

For example: {"if_exists": "replace_entire_table", "compression": "snappy"} or Hive.WriteOptions(if_exists="replace_entire_table", compression="snappy")

Note

Some sources does not support writing options.

Examples

Simple Writer creation:

from onetl.connection import Postgres
from onetl.db import DBWriter
from pyspark.sql import SparkSession

maven_packages = Postgres.get_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

postgres = Postgres(
    host="postgres.domain.com",
    user="your_user",
    password="***",
    database="target_db",
    spark=spark,
)

writer = DBWriter(
    connection=postgres,
    target="fiddle.dummy",
)

Writer creation with options:

from onetl.connection import Postgres
from onetl.db import DBWriter
from pyspark.sql import SparkSession

maven_packages = Postgres.get_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

postgres = Postgres(
    host="postgres.domain.com",
    user="your_user",
    password="***",
    database="target_db",
    spark=spark,
)

options = {"truncate": "true", "batchsize": 1000}
# or (it is the same):
options = Postgres.WriteOptions(truncate=True, batchsize=1000)

writer = DBWriter(
    connection=postgres,
    target="fiddle.dummy",
    options=options,
)

Writer to Hive with options:

from onetl.db import DBWriter
from onetl.connection import Hive
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-app-name").enableHiveSupport().getOrCreate()

hive = Hive(cluster="rnd-dwh", spark=spark)

options = {"compression": "snappy", "partitionBy": "id"}
# or (it is the same):
options = Hive.WriteOptions(compression="snappy", partitionBy="id")

writer = DBWriter(
    connection=hive,
    target="default.test",
    options=options,
)
run(df: DataFrame)#

Method for writing your df to specified target. support_hooks

Note

Method does support only batching DataFrames.

Parameters:
dfpyspark.sql.dataframe.DataFrame

Spark dataframe

Examples

Write df to target:

writer.run(df)