DB Reader#
Allows you to read data from a table with specified database connection and parameters, and return its content as Spark dataframe. |
|
Reads data from source table and saves as Spark dataframe. |
|
Returns |
|
Raises exception |
- class onetl.db.db_reader.db_reader.DBReader(*, connection: BaseDBConnection, table: str, columns: ConstrainedListValue[str] | None = None, where: Any | None = None, hint: Any | None = None, df_schema: StructType | None = None, hwm_column: str | tuple | None = None, hwm_expression: str | None = None, hwm: AutoDetectHWM | ColumnHWM | KeyValueHWM | None = None, options: GenericOptions | None = None)#
Allows you to read data from a table with specified database connection and parameters, and return its content as Spark dataframe.
Note
DBReader can return different results depending on Read Strategies
Note
This class operates with only one table at a time. It does NOT support executing JOINs.
To get the JOIN result you can instead:
Use 2 instandes of DBReader with different tables, call
run
of each one to get a table dataframe, and then usedf1.join(df2)
syntax (Hive)Use
connection.execute("INSERT INTO ... SELECT ... JOIN ...")
to execute JOIN on RDBMS side, write the result into a temporary table, and then use DBReader to get the data from this temporary table (MPP systems, like Greenplum)Use
connection.sql(query)
method to pass SQL query with a JOIN, and fetch the result (other RDBMS)
- Parameters:
- connection
onetl.connection.BaseDBConnection
Class which contains DB connection properties. See DB Connections section
- sourcestr
Table/collection/etc name to read data from.
If connection has schema support, you need to specify the full name of the source including the schema, e.g.
schema.name
.- columnslist of str, default: None
The list of columns to be read.
If RDBMS supports any kind of expressions, you can pass them too.
columns = [ "mycolumn", "another_column as alias", "count(*) over ()", "some(function) as alias2", ]
Note
Some sources does not have columns.
Note
It is recommended to pass column names explicitly to avoid selecting too many columns, and to avoid adding unexpected columns to dataframe if source DDL is changed.
Deprecated since version 0.10.0: Syntax
DBReader(columns="col1, col2")
(string instead of list) is not supported, and will be removed in v1.0.0- whereAny, default:
None
Custom
where
for SQL query or MongoDB pipeline.where
syntax depends on the source. For example, SQL sources acceptwhere
as a string, but MongoDB sources acceptwhere
as a dictionary.# SQL database connection where = "column_1 > 2" # MongoDB connection where = { "col_1": {"$gt": 1, "$lt": 100}, "col_2": {"$gt": 2}, "col_3": {"$eq": "hello"}, }
Note
Some sources does not support data filtering.
- hwmtype[HWM] | None, default:
None
HWM class to be used as HWM value.
hwm = DBReader.AutoDetectHWM( name="some_unique_hwm_name", expression="hwm_column", )
HWM value will be fetched using
hwm_column
SQL query.If you want to use some SQL expression as HWM value, you can use it as well:
hwm = DBReader.AutoDetectHWM( name="some_unique_hwm_name", expression="cast(hwm_column_orig as date)", )
Note
Some sources does not support passing expressions and can be used only with column/field names which present in the source.
- hintAny, default:
None
Hint expression used for querying the data.
hint
syntax depends on the source. For example, SQL sources accepthint
as a string, but MongoDB sources accepthint
as a dictionary.# SQL database connection hint = "index(myschema.mytable mycolumn)" # MongoDB connection hint = { "mycolumn": 1, }
Note
Some sources does not support hints.
- df_schemaStructType, optional, default:
None
Spark DataFrame schema, used for proper type casting of the rows.
from pyspark.sql.types import ( DoubleType, IntegerType, StringType, StructField, StructType, TimestampType, ) df_schema = StructType( [ StructField("_id", IntegerType()), StructField("text_string", StringType()), StructField("hwm_int", IntegerType()), StructField("hwm_datetime", TimestampType()), StructField("float_value", DoubleType()), ], ) reader = DBReader( connection=connection, source="fiddle.dummy", df_schema=df_schema, )
Note
Some sources does not support passing dataframe schema.
- optionsdict,
onetl.connection.BaseDBConnection.ReadOptions
, default:None
Spark read options, like partitioning mode.
Postgres.ReadOptions( partitioningMode="hash", partitionColumn="some_column", numPartitions=20, fetchsize=1000, )
Note
Some sources does not support reading options.
- connection
Examples
Simple Reader creation:
from onetl.db import DBReader from onetl.connection import Postgres from pyspark.sql import SparkSession maven_packages = Postgres.get_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) .getOrCreate() ) postgres = Postgres( host="postgres.domain.com", user="your_user", password="***", database="target_db", spark=spark, ) # create reader reader = DBReader(connection=postgres, source="fiddle.dummy") # read data from table "fiddle.dummy" df = reader.run()
Reader creation with JDBC options:
from onetl.db import DBReader from onetl.connection import Postgres from pyspark.sql import SparkSession maven_packages = Postgres.get_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) .getOrCreate() ) postgres = Postgres( host="postgres.domain.com", user="your_user", password="***", database="target_db", spark=spark, ) options = {"sessionInitStatement": "select 300", "fetchsize": "100"} # or (it is the same): options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100") # create reader and pass some options to the underlying connection object reader = DBReader(connection=postgres, source="fiddle.dummy", options=options) # read data from table "fiddle.dummy" df = reader.run()
Reader creation with all parameters:
from onetl.db import DBReader from onetl.connection import Postgres from pyspark.sql import SparkSession maven_packages = Postgres.get_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) .getOrCreate() ) postgres = Postgres( host="postgres.domain.com", user="your_user", password="***", database="target_db", spark=spark, ) options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100") # create reader with specific columns, rows filter reader = DBReader( connection=postgres, source="default.test", where="d_id > 100", hint="NOWAIT", columns=["d_id", "d_name", "d_age"], options=options, ) # read data from table "fiddle.dummy" df = reader.run()
Incremental Reader:
from onetl.db import DBReader from onetl.connection import Postgres from onetl.strategy import IncrementalStrategy from pyspark.sql import SparkSession maven_packages = Postgres.get_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) .getOrCreate() ) postgres = Postgres( host="postgres.domain.com", user="your_user", password="***", database="target_db", spark=spark, ) reader = DBReader( connection=postgres, source="fiddle.dummy", hwm=DBReader.AutoDetectHWM( # mandatory for IncrementalStrategy name="some_unique_hwm_name", expression="d_age", ), ) # read data from table "fiddle.dummy" # but only with new rows (`WHERE d_age > previous_hwm_value`) with IncrementalStrategy(): df = reader.run()
- has_data() bool #
Returns
True
if there is some data in the source,False
otherwise.Note
This method can return different results depending on Read Strategies
Warning
If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.
- Raises:
- RuntimeError
Current strategy is not compatible with HWM parameter.
Examples
reader = DBReader(...) # handle situation when there is no data in the source if reader.has_data(): df = reader.run() else: # implement your handling logic here ...
- raise_if_no_data() None #
Raises exception
NoDataError
if source does not contain any data.Note
This method can return different results depending on Read Strategies
Warning
If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.
- Raises:
- RuntimeError
Current strategy is not compatible with HWM parameter.
onetl.exception.NoDataError
There is no data in source.
Examples
reader = DBReader(...) # ensure that there is some data in the source before reading it using Spark reader.raise_if_no_data()
- run() DataFrame #
Reads data from source table and saves as Spark dataframe.
Note
This method can return different results depending on Read Strategies
Warning
If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.
- Returns:
- dfpyspark.sql.dataframe.DataFrame
Spark dataframe
Note
Keep in mind that with differences in the timezone settings of the source and Spark, there may be discrepancies in the datetime on the source and in the Spark dataframe. It depends on the
spark.sql.session.timeZone
option set when creating the Spark session.
Examples
Read data to Spark dataframe:
df = reader.run()