Reading from Teradata using DBReader#

DBReader supports Read Strategies for incremental data reading, but does not support custom queries, like JOIN.

Supported DBReader features#

Examples#

Snapshot strategy:

from onetl.connection import Teradata
from onetl.db import DBReader

teradata = Teradata(...)

reader = DBReader(
    connection=teradata,
    source="database.table",
    columns=["id", "key", "CAST(value AS VARCHAR) value", "updated_dt"],
    where="key = 'something'",
    options=Teradata.ReadOptions(
        partition_column="id",
        num_partitions=10,
        partitioning_mode="hash",
    ),
)
df = reader.run()

Incremental strategy:

from onetl.connection import Teradata
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy

teradata = Teradata(...)

reader = DBReader(
    connection=teradata,
    source="database.table",
    columns=["id", "key", "CAST(value AS VARCHAR) value", "updated_dt"],
    where="key = 'something'",
    hwm=DBReader.AutoDetectHWM(name="teradata_hwm", expression="updated_dt"),
    options=Teradata.ReadOptions(
        partition_column="id",
        num_partitions=10,
        partitioning_mode="hash",
    ),
)

with IncrementalStrategy():
    df = reader.run()

Recommendations#

Select only required columns#

Instead of passing "*" in DBReader(columns=[...]) prefer passing exact column names. This reduces the amount of data passed from Teradata to Spark.

Pay attention to where value#

Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper DBReader(where="column = 'value'") clause. This both reduces the amount of data send from Teradata to Spark, and may also improve performance of the query. Especially if there are indexes or partitions for columns used in where clause.

Read data in parallel#

DBReader can read data in multiple parallel connections by passing Teradata.ReadOptions(num_partitions=..., partition_column=...).

In the example above, Spark opens 10 parallel connections, and data is evenly distributed between all these connections using expression HASHAMP(HASHBUCKET(HASHROW({partition_column}))) MOD {num_partitions}. This allows sending each Spark worker only some piece of data, reducing resource consumption. partition_column here can be table column of any type.

It is also possible to use partitioning_mode="mod" or partitioning_mode="range", but in this case partition_column have to be an integer, should not contain NULL, and values to be uniformly distributed. It is also less performant than partitioning_mode="hash" due to Teradata HASHAMP implementation.

Do NOT use TYPE=FASTEXPORT#

Teradata supports several different connection types:
  • TYPE=DEFAULT - perform plain SELECT queries

  • TYPE=FASTEXPORT - uses special FastExport protocol for select queries

But TYPE=FASTEXPORT uses exclusive lock on the source table, so it is impossible to use multiple Spark workers parallel data read. This leads to sending all the data to just one Spark worker, which is slow and takes a lot of RAM.

Prefer using partitioning_mode="hash" from example above.

Options#

pydantic model onetl.connection.db_connection.jdbc_connection.options.JDBCReadOptions#

Spark JDBC reading options.

Note

You can pass any value supported by Spark, even if it is not mentioned in this documentation. Option names should be in camelCase!

The set of supported options depends on Spark version. See link above.

Examples

Read options initialization

options = JDBC.ReadOptions(
    partition_column="reg_id",
    num_partitions=10,
    lower_bound=0,
    upper_bound=1000,
    customOption="value",
)
field partition_column: str | None = None (alias 'partitionColumn')#

Column used to parallelize reading from a table.

Warning

It is highly recommended to use primary key, or at least a column with an index to avoid performance issues.

Note

Column type depends on partitioning_mode.

  • partitioning_mode="range" requires column to be an integer or date (can be NULL, but not recommended).

  • partitioning_mode="hash" requires column to be an string (NOT NULL).

  • partitioning_mode="mod" requires column to be an integer (NOT NULL).

See documentation for partitioning_mode for more details

field num_partitions: PositiveInt = 1 (alias 'numPartitions')#

Number of jobs created by Spark to read the table content in parallel. See documentation for partitioning_mode for more details

field lower_bound: int | None = None (alias 'lowerBound')#

See documentation for partitioning_mode for more details

field upper_bound: int | None = None (alias 'upperBound')#

See documentation for partitioning_mode for more details

field session_init_statement: str | None = None (alias 'sessionInitStatement')#

After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block).

Use this to implement session initialization code.

Example:

sessionInitStatement = """
    BEGIN
        execute immediate
        'alter session set "_serial_direct_read"=true';
    END;
"""
field fetchsize: int = 100000#

Fetch N rows from an opened cursor per one read round.

Tuning this option can influence performance of reading.

Warning

Default value is different from Spark.

Spark uses driver’s own value, and it may be different in different drivers, and even versions of the same driver. For example, Oracle has default fetchsize=10, which is absolutely not usable.

Thus we’ve overridden default value with 100_000, which should increase reading performance.

field partitioning_mode: JDBCPartitioningMode = JDBCPartitioningMode.RANGE (alias 'partitioningMode')#

Defines how Spark will parallelize reading from table.

Possible values:

  • range (default)

    Allocate each executor a range of values from column passed into partition_column.

    Spark generates for each executor an SQL query like:

    Executor 1:

    SELECT ... FROM table
    WHERE (partition_column >= lowerBound
            OR partition_column IS NULL)
    AND partition_column < (lower_bound + stride)
    

    Executor 2:

    SELECT ... FROM table
    WHERE partition_column >= (lower_bound + stride)
    AND partition_column < (lower_bound + 2 * stride)
    

    Executor N:

    SELECT ... FROM table
    WHERE partition_column >= (lower_bound + (N-1) * stride)
    AND partition_column <= upper_bound
    

    Where stride=(upper_bound - lower_bound) / num_partitions.

    Note

    lower_bound, upper_bound and num_partitions are used just to calculate the partition stride, NOT for filtering the rows in table. So all rows in the table will be returned (unlike Incremental Read Strategies).

    Note

    All queries are executed in parallel. To execute them sequentially, use Batch Read Strategies.

  • hash

    Allocate each executor a set of values based on hash of the partition_column column.

    Spark generates for each executor an SQL query like:

    Executor 1:

    SELECT ... FROM table
    WHERE (some_hash(partition_column) mod num_partitions) = 0 -- lower_bound
    

    Executor 2:

    SELECT ... FROM table
    WHERE (some_hash(partition_column) mod num_partitions) = 1 -- lower_bound + 1
    

    Executor N:

    SELECT ... FROM table
    WHERE (some_hash(partition_column) mod num_partitions) = num_partitions-1 -- upper_bound
    

    Note

    The hash function implementation depends on RDBMS. It can be MD5 or any other fast hash function, or expression based on this function call.

  • mod

    Allocate each executor a set of values based on modulus of the partition_column column.

    Spark generates for each executor an SQL query like:

    Executor 1:

    SELECT ... FROM table
    WHERE (partition_column mod num_partitions) = 0 -- lower_bound
    

    Executor 2:

    SELECT ... FROM table
    WHERE (partition_column mod num_partitions) = 1 -- lower_bound + 1
    

    Executor N:

    SELECT ... FROM table
    WHERE (partition_column mod num_partitions) = num_partitions-1 -- upper_bound
    

Examples

Read data in 10 parallel jobs by range of values in id_column column:

JDBC.ReadOptions(
    partitioning_mode="range",  # default mode, can be omitted
    partition_column="id_column",
    num_partitions=10,
    # if you're using DBReader, options below can be omitted
    # because they are calculated by automatically as
    # MIN and MAX values of `partition_column`
    lower_bound=0,
    upper_bound=100_000,
)

Read data in 10 parallel jobs by hash of values in some_column column:

JDBC.ReadOptions(
    partitioning_mode="hash",
    partition_column="some_column",
    num_partitions=10,
    # lower_bound and upper_bound are automatically set to `0` and `9`
)

Read data in 10 parallel jobs by modulus of values in id_column column:

JDBC.ReadOptions(
    partitioning_mode="mod",
    partition_column="id_column",
    num_partitions=10,
    # lower_bound and upper_bound are automatically set to `0` and `9`
)
field query_timeout: int | None = None (alias 'queryTimeout')#

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.