DB Writer#
Class specifies schema and table where you can write your dataframe. |
|
|
Method for writing your df to specified target. |
- class onetl.db.db_writer.db_writer.DBWriter(*, connection: BaseDBConnection, table: str, options: GenericOptions | None = None)#
Class specifies schema and table where you can write your dataframe.
- Parameters:
- connection
onetl.connection.DBConnection
Class which contains DB connection properties. See DB Connections section.
- targetstr
Table/collection/etc name to write data to.
If connection has schema support, you need to specify the full name of the source including the schema, e.g.
schema.name
.- optionsdict,
onetl.connection.DBConnection.WriteOptions
, default:None
Spark write options.
For example:
{"if_exists": "replace_entire_table", "compression": "snappy"}
orHive.WriteOptions(if_exists="replace_entire_table", compression="snappy")
Note
Some sources does not support writing options.
- connection
Examples
Simple Writer creation:
from onetl.connection import Postgres from onetl.db import DBWriter from pyspark.sql import SparkSession maven_packages = Postgres.get_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) .getOrCreate() ) postgres = Postgres( host="postgres.domain.com", user="your_user", password="***", database="target_db", spark=spark, ) writer = DBWriter( connection=postgres, target="fiddle.dummy", )
Writer creation with options:
from onetl.connection import Postgres from onetl.db import DBWriter from pyspark.sql import SparkSession maven_packages = Postgres.get_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) .getOrCreate() ) postgres = Postgres( host="postgres.domain.com", user="your_user", password="***", database="target_db", spark=spark, ) options = {"truncate": "true", "batchsize": 1000} # or (it is the same): options = Postgres.WriteOptions(truncate=True, batchsize=1000) writer = DBWriter( connection=postgres, target="fiddle.dummy", options=options, )
Writer to Hive with options:
from onetl.db import DBWriter from onetl.connection import Hive from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-app-name").enableHiveSupport().getOrCreate() hive = Hive(cluster="rnd-dwh", spark=spark) options = {"compression": "snappy", "partitionBy": "id"} # or (it is the same): options = Hive.WriteOptions(compression="snappy", partitionBy="id") writer = DBWriter( connection=hive, target="default.test", options=options, )
- run(df: DataFrame)#
Method for writing your df to specified target.
Note
Method does support only batching DataFrames.
- Parameters:
- dfpyspark.sql.dataframe.DataFrame
Spark dataframe
Examples
Write df to target:
writer.run(df)