DB Reader#

DBReader

Allows you to read data from a table with specified database connection and parameters, and return its content as Spark dataframe.

DBReader.run()

Reads data from source table and saves as Spark dataframe.

DBReader.has_data()

Returns True if there is some data in the source, False otherwise.

DBReader.raise_if_no_data()

Raises exception NoDataError if source does not contain any data.

class onetl.db.db_reader.db_reader.DBReader(*, connection: BaseDBConnection, table: str, columns: ConstrainedListValue[str] | None = None, where: Any | None = None, hint: Any | None = None, df_schema: StructType | None = None, hwm_column: str | tuple | None = None, hwm_expression: str | None = None, hwm: AutoDetectHWM | ColumnHWM | KeyValueHWM | None = None, options: GenericOptions | None = None)#

Allows you to read data from a table with specified database connection and parameters, and return its content as Spark dataframe. support_hooks

Note

DBReader can return different results depending on Read Strategies

Note

This class operates with only one table at a time. It does NOT support executing JOINs.

To get the JOIN result you can instead:

  1. Use 2 instandes of DBReader with different tables, call run of each one to get a table dataframe, and then use df1.join(df2) syntax (Hive)

  2. Use connection.execute("INSERT INTO ... SELECT ... JOIN ...") to execute JOIN on RDBMS side, write the result into a temporary table, and then use DBReader to get the data from this temporary table (MPP systems, like Greenplum)

  3. Use connection.sql(query) method to pass SQL query with a JOIN, and fetch the result (other RDBMS)

Parameters:
connectiononetl.connection.BaseDBConnection

Class which contains DB connection properties. See DB Connections section

sourcestr

Table/collection/etc name to read data from.

If connection has schema support, you need to specify the full name of the source including the schema, e.g. schema.name.

columnslist of str, default: None

The list of columns to be read.

If RDBMS supports any kind of expressions, you can pass them too.

columns = [
    "mycolumn",
    "another_column as alias",
    "count(*) over ()",
    "some(function) as alias2",
]

Note

Some sources does not have columns.

Note

It is recommended to pass column names explicitly to avoid selecting too many columns, and to avoid adding unexpected columns to dataframe if source DDL is changed.

Deprecated since version 0.10.0: Syntax DBReader(columns="col1, col2") (string instead of list) is not supported, and will be removed in v1.0.0

whereAny, default: None

Custom where for SQL query or MongoDB pipeline.

where syntax depends on the source. For example, SQL sources accept where as a string, but MongoDB sources accept where as a dictionary.

# SQL database connection
where = "column_1 > 2"

# MongoDB connection
where = {
    "col_1": {"$gt": 1, "$lt": 100},
    "col_2": {"$gt": 2},
    "col_3": {"$eq": "hello"},
}

Note

Some sources does not support data filtering.

hwmtype[HWM] | None, default: None

HWM class to be used as HWM value.

hwm = DBReader.AutoDetectHWM(
    name="some_unique_hwm_name",
    expression="hwm_column",
)

HWM value will be fetched using hwm_column SQL query.

If you want to use some SQL expression as HWM value, you can use it as well:

hwm = DBReader.AutoDetectHWM(
    name="some_unique_hwm_name",
    expression="cast(hwm_column_orig as date)",
)

Note

Some sources does not support passing expressions and can be used only with column/field names which present in the source.

hintAny, default: None

Hint expression used for querying the data.

hint syntax depends on the source. For example, SQL sources accept hint as a string, but MongoDB sources accept hint as a dictionary.

# SQL database connection
hint = "index(myschema.mytable mycolumn)"

# MongoDB connection
hint = {
    "mycolumn": 1,
}

Note

Some sources does not support hints.

df_schemaStructType, optional, default: None

Spark DataFrame schema, used for proper type casting of the rows.

from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
    TimestampType,
)

df_schema = StructType(
    [
        StructField("_id", IntegerType()),
        StructField("text_string", StringType()),
        StructField("hwm_int", IntegerType()),
        StructField("hwm_datetime", TimestampType()),
        StructField("float_value", DoubleType()),
    ],
)

reader = DBReader(
    connection=connection,
    source="fiddle.dummy",
    df_schema=df_schema,
)

Note

Some sources does not support passing dataframe schema.

optionsdict, onetl.connection.BaseDBConnection.ReadOptions, default: None

Spark read options, like partitioning mode.

Postgres.ReadOptions(
    partitioningMode="hash",
    partitionColumn="some_column",
    numPartitions=20,
    fetchsize=1000,
)

Note

Some sources does not support reading options.

Examples

Simple Reader creation:

from onetl.db import DBReader
from onetl.connection import Postgres
from pyspark.sql import SparkSession

maven_packages = Postgres.get_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

postgres = Postgres(
    host="postgres.domain.com",
    user="your_user",
    password="***",
    database="target_db",
    spark=spark,
)

# create reader
reader = DBReader(connection=postgres, source="fiddle.dummy")

# read data from table "fiddle.dummy"
df = reader.run()

Reader creation with JDBC options:

from onetl.db import DBReader
from onetl.connection import Postgres
from pyspark.sql import SparkSession

maven_packages = Postgres.get_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

postgres = Postgres(
    host="postgres.domain.com",
    user="your_user",
    password="***",
    database="target_db",
    spark=spark,
)
options = {"sessionInitStatement": "select 300", "fetchsize": "100"}
# or (it is the same):
options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100")

# create reader and pass some options to the underlying connection object
reader = DBReader(connection=postgres, source="fiddle.dummy", options=options)

# read data from table "fiddle.dummy"
df = reader.run()

Reader creation with all parameters:

from onetl.db import DBReader
from onetl.connection import Postgres
from pyspark.sql import SparkSession

maven_packages = Postgres.get_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

postgres = Postgres(
    host="postgres.domain.com",
    user="your_user",
    password="***",
    database="target_db",
    spark=spark,
)
options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100")

# create reader with specific columns, rows filter
reader = DBReader(
    connection=postgres,
    source="default.test",
    where="d_id > 100",
    hint="NOWAIT",
    columns=["d_id", "d_name", "d_age"],
    options=options,
)

# read data from table "fiddle.dummy"
df = reader.run()

Incremental Reader:

from onetl.db import DBReader
from onetl.connection import Postgres
from onetl.strategy import IncrementalStrategy
from pyspark.sql import SparkSession

maven_packages = Postgres.get_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

postgres = Postgres(
    host="postgres.domain.com",
    user="your_user",
    password="***",
    database="target_db",
    spark=spark,
)

reader = DBReader(
    connection=postgres,
    source="fiddle.dummy",
    hwm=DBReader.AutoDetectHWM(  # mandatory for IncrementalStrategy
        name="some_unique_hwm_name",
        expression="d_age",
    ),
)

# read data from table "fiddle.dummy"
# but only with new rows (`WHERE d_age > previous_hwm_value`)
with IncrementalStrategy():
    df = reader.run()
has_data() bool#

Returns True if there is some data in the source, False otherwise. support_hooks

Note

This method can return different results depending on Read Strategies

Warning

If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Raises:
RuntimeError

Current strategy is not compatible with HWM parameter.

Examples

reader = DBReader(...)

# handle situation when there is no data in the source
if reader.has_data():
    df = reader.run()
else:
    # implement your handling logic here
    ...
raise_if_no_data() None#

Raises exception NoDataError if source does not contain any data. support_hooks

Note

This method can return different results depending on Read Strategies

Warning

If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Raises:
RuntimeError

Current strategy is not compatible with HWM parameter.

onetl.exception.NoDataError

There is no data in source.

Examples

reader = DBReader(...)

# ensure that there is some data in the source before reading it using Spark
reader.raise_if_no_data()
run() DataFrame#

Reads data from source table and saves as Spark dataframe. support_hooks

Note

This method can return different results depending on Read Strategies

Warning

If hwm is used, then method should be called inside Read Strategies context. And vise-versa, if HWM is not used, this method should not be called within strategy.

Returns:
dfpyspark.sql.dataframe.DataFrame

Spark dataframe

Note

Keep in mind that with differences in the timezone settings of the source and Spark, there may be discrepancies in the datetime on the source and in the Spark dataframe. It depends on the spark.sql.session.timeZone option set when creating the Spark session.

Examples

Read data to Spark dataframe:

df = reader.run()