Snapshot Strategy#

class onetl.strategy.snapshot_strategy.SnapshotStrategy#

Snapshot strategy for DB Reader/File Downloader.

Used for fetching all the rows/files from a source. Does not support HWM.

Note

This is a default strategy.

For DB Reader:

Every snapshot run is executing the simple query which fetches all the table data:

SELECT id, data FROM public.mydata;
For File Downloader:

Every snapshot run is downloading all the files (from the source, or user-defined list):

$ hdfs dfs -ls /path

/path/my/file1
/path/my/file2
assert download_result == DownloadResult(
    successful=[
        "/path/my/file1",
        "/path/my/file2",
    ]
)

Examples

Snapshot run with DB Reader:

from onetl.connection import Postgres
from onetl.db import DBReader
from onetl.strategy import SnapshotStrategy

from pyspark.sql import SparkSession

maven_packages = Postgres.get_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

postgres = Postgres(
    host="postgres.domain.com",
    user="myuser",
    password="*****",
    database="target_database",
    spark=spark,
)

reader = DBReader(
    connection=postgres,
    source="public.mydata",
    columns=["id", "data"],
    hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
)

writer = DBWriter(connection=hive, target="newtable")

with SnapshotStrategy():
    df = reader.run()
    writer.run(df)

# current run will execute following query:

# SELECT id, data FROM public.mydata;

Snapshot run with File Downloader:

from onetl.connection import SFTP
from onetl.file import FileDownloader
from onetl.strategy import SnapshotStrategy

sftp = SFTP(
    host="sftp.domain.com",
    user="user",
    password="*****",
)

downloader = FileDownloader(
    connection=sftp,
    source_path="/remote",
    local_path="/local",
)

with SnapshotStrategy():
    df = downloader.run()

# current run will download all files from 'source_path'
__init__(**kwargs)#

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.