Snapshot Batch Strategy#

class onetl.strategy.snapshot_strategy.SnapshotBatchStrategy(*, hwm: HWM | None = None, step: Any = None, start: Any = None, stop: Any = None)#

Snapshot batch strategy for DB Reader.

Note

Cannot be used with File Downloader

Same as SnapshotStrategy, but reads data from the source in sequential batches (1..N) like:

1:  SELECT id, data
    FROM public.mydata
    WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

2:  WHERE id > 1100 AND id <= 1200; -- + step
3:  WHERE id > 1200 AND id <= 1200; -- + step
N:  WHERE id > 1300 AND id <= 1400; -- until stop

This allows to use less CPU and RAM on Spark cluster than reading all the data in parallel, but takes proportionally more time.

Note

This strategy uses HWM column value to filter data for each batch, but does NOT save it into HWM Store. So every run starts from the beginning, not from the previous HWM value.

Note

If you only need to reduce number of rows read by Spark from opened cursor, use onetl.connection.db_connection.postgres.Postgres.ReadOptions.fetchsize instead

Warning

Not every DB connection supports batch strategy. For example, Kafka connection doesn’t support it. Make sure the connection you use is compatible with the SnapshotBatchStrategy.

Parameters:
stepAny

Step size used for generating batch SQL queries like:

SELECT id, data
FROM public.mydata
WHERE id >= 1000 AND id <= 1100; -- 1000 is start value, step is 100

Note

Step defines a range of values will be fetched by each batch. This is not a number of rows, it depends on a table content and value distribution across the rows.

Note

step value will be added to the HWM, so it should have a proper type.

For example, for TIMESTAMP column step type should be datetime.timedelta, not int

startAny, default: None

If passed, the value will be used for generating WHERE clauses with hwm.expression filter, as a start value for the first batch.

If not set, the value is determined by a separated query:

SELECT MIN(id) as start
FROM public.mydata
WHERE id <= 1400; -- 1400 here is stop value (if set)

Note

start should be the same type as hwm.expression value, e.g. datetime.datetime for TIMESTAMP column, datetime.date for DATE, and so on

stopAny, default: None

If passed, the value will be used for generating WHERE clauses with hwm.expression filter, as a stop value for the last batch.

If not set, the value is determined by a separated query:

SELECT MAX(id) as stop
FROM public.mydata
WHERE id >= 1000; -- 1000 here is start value (if set)

Note

stop should be the same type as hwm.expression value, e.g. datetime.datetime for TIMESTAMP column, datetime.date for DATE, and so on

Examples

SnapshotBatch run:

from onetl.connection import Postgres, Hive
from onetl.db import DBReader
from onetl.strategy import SnapshotBatchStrategy

from pyspark.sql import SparkSession

maven_packages = Postgres.get_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

postgres = Postgres(
    host="postgres.domain.com",
    user="myuser",
    password="*****",
    database="target_database",
    spark=spark,
)

hive = Hive(cluster="rnd-dwh", spark=spark)

reader = DBReader(
    connection=postgres,
    source="public.mydata",
    columns=["id", "data"],
    hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
)

writer = DBWriter(connection=hive, target="newtable")

with SnapshotBatchStrategy(step=100) as batches:
    for _ in batches:
        df = reader.run()
        writer.run(df)
-- get start and stop values

    SELECT MIN(id) as start, MAX(id) as stop
    FROM public.mydata;

-- for example, start=1000 and stop=2345

-- when each batch (1..N) will perform a query which return some part of input data

1:  SELECT id, data
    FROM public.mydata
    WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

2:  WHERE id > 1100 AND id <= 1200; -- + step
3:  WHERE id > 1200 AND id <= 1300; -- + step
N:  WHERE id > 2300 AND id <= 2345; -- until stop

SnapshotBatch run with stop value:

with SnapshotBatchStrategy(step=100, stop=1234) as batches:
    for _ in batches:
        df = reader.run()
        writer.run(df)
-- stop value is set, so there is no need to fetch it from DB
-- get start value

    SELECT MIN(id) as start
    FROM public.mydata
    WHERE id <= 1234; -- until stop

-- for example, start=1000.
-- when each batch (1..N) will perform a query which return some part of input data

1:  SELECT id, data
    FROM public.mydata
    WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

2:  WHERE id >  1100 AND id <= 1200; -- + step
3:  WHERE id >  1200 AND id <= 1300; -- + step
N:  WHERE id >  1300 AND id <= 1234; -- until stop

SnapshotBatch run with start value:

with SnapshotBatchStrategy(step=100, start=500) as batches:
    for _ in batches:
        df = reader.run()
        writer.run(df)
-- start value is set, so there is no need to fetch it from DB
-- get only stop value

    SELECT MAX(id) as stop
    FROM public.mydata
    WHERE id >= 500; -- from start

-- for example, stop=2345.
-- when each batch (1..N) will perform a query which return some part of input data

1:  SELECT id, data
    FROM public.mydata
    WHERE id >= 500 AND id <=  600; -- from start to start+step (INCLUDING first row)

2:  WHERE id >  600 AND id <=  700; -- + step
3:  WHERE id >  700 AND id <=  800; -- + step
...
N:  WHERE id > 2300 AND id <= 2345; -- until stop

SnapshotBatch run with all options:

with SnapshotBatchStrategy(
    start=1000,
    step=100,
    stop=2000,
) as batches:
    for _ in batches:
        df = reader.run()
        writer.run(df)
-- start and stop values are set, so no need to fetch boundaries from DB
-- each batch (1..N) will perform a query which return some part of input data

1:    SELECT id, data
    FROM public.mydata
    WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)

2:  WHERE id >  1100 AND id <= 1200; -- + step
3:  WHERE id >  1200 AND id <= 1300; -- + step
...
N:  WHERE id >  1900 AND id <= 2000; -- until stop

hwm.expression can be a date or datetime, not only integer:

from datetime import date, timedelta

reader = DBReader(
    connection=postgres,
    source="public.mydata",
    columns=["business_dt", "data"],
    hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="business_dt"),
)

with SnapshotBatchStrategy(
    start=date("2021-01-01"),
    step=timedelta(days=5),
    stop=date("2021-01-31"),
) as batches:
    for _ in batches:
        df = reader.run()
        writer.run(df)
-- start and stop values are set, so no need to fetch boundaries from DB
-- each batch will perform a query which return some part of input data
-- HWM value will casted to match column type

1:  SELECT business_dt, data
    FROM public.mydata
    WHERE business_dt >= CAST('2020-01-01' AS DATE) -- from start to start+step (INCLUDING first row)
    AND   business_dt <= CAST('2021-01-05' AS DATE);

2:  WHERE business_dt >  CAST('2021-01-05' AS DATE) -- + step
    AND   business_dt <= CAST('2021-01-10' AS DATE);

3:  WHERE business_dt >  CAST('2021-01-10' AS DATE) -- + step
    AND   business_dt <= CAST('2021-01-15' AS DATE);

...

N:  WHERE business_dt >  CAST('2021-01-30' AS DATE)
    AND   business_dt <= CAST('2021-01-31' AS DATE); -- until stop
__init__(**kwargs)#

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.