Concepts#

Here you can find detailed documentation about each one of the onETL concepts and how to use them.

Connection#

Basics#

onETL is used to pull and push data into other systems, and so it has a first-class Connection concept for storing credentials that are used to communicate with external systems.

A Connection is essentially a set of parameters, such as username, password, hostname.

To create a connection to a specific storage type, you must use a class that matches the storage type. The class name is the same as the storage type name (Oracle, MSSQL, SFTP, etc):

from onetl.connection import SFTP

sftp = SFTP(
    host="sftp.test.com",
    user="onetl",
    password="onetl",
)

All connection types are inherited from the parent class BaseConnection.

Class diagram#

https://www.plantuml.com/plantuml/svg/ZPNTReCW7CNlVGfUOCcotZPDPurhrYxhGhDTKwMMYD00Bka-tdsArQ82palF-HtX_0CkfK92bGKDA3uge3WG93ifa1A14qKu2sHEs1a9L8248YbZpXGaNnWyt0yby1s0wWbHacU2bonTSSh5V1w5AyvO4xTOW6tzWIU1CRFROq9n3zYSK4xStYWUnuUqn9g02WlCrUMCwuMxgRr0J5PxmKovkS56_iMCTcDIlq7GD1SYQRYg8Zl5_2OOBg7szGntTp-BGLWRj2QVs1gW1GsQMelfGPrfMLZPc-eCUE4iur5enLlD2M_GCKTMj56Su7FrSgAuWSKPjQ8aoK-ybFRw35b7z6Ex5Y1yszexq8eFlJZHomIwur9b0aihtOcUW5U14chVzrNom7iiK8gKVNwTw0dednqmFanrqC3aEG7nVkVAg1GDzaK3X6uI-j1r5BjGBNdG3to87jyTy5Nqu10L1-IW6yq7kxk24sN1cGkSUJiUtCd0Tasty-uPFHE7rJsR6oCOfzqStTfCKozeonD4fx9QqtHSLvGZoUoh7xB4BAr-sNy=

DBConnection#

Classes inherited from DBConnection could be used for accessing databases.

A DBConnection could be instantiated as follows:

from onetl.connection import MSSQL

mssql = MSSQL(
    host="mssqldb.demo.com",
    user="onetl",
    password="onetl",
    database="Telecom",
    spark=spark,
)

where spark is the current SparkSession. onETL uses Spark and specific Java connectors under the hood to work with databases.

For a description of other parameters, see the documentation for the available DBConnections.

FileConnection#

Classes inherited from FileConnection could be used to access files stored on the different file systems/file servers

A FileConnection could be instantiated as follows:

from onetl.connection import SFTP

sftp = SFTP(
    host="sftp.test.com",
    user="onetl",
    password="onetl",
)

For a description of other parameters, see the documentation for the available FileConnections.

FileDFConnection#

Classes inherited from FileDFConnection could be used for accessing files as Spark DataFrames.

A FileDFConnection could be instantiated as follows:

from onetl.connection import SparkHDFS

spark_hdfs = SparkHDFS(
    host="namenode1.domain.com",
    cluster="mycluster",
    spark=spark,
)

where spark is the current SparkSession. onETL uses Spark and specific Java connectors under the hood to work with DataFrames.

For a description of other parameters, see the documentation for the available FileDFConnections.

Checking connection availability#

Once you have created a connection, you can check the database/filesystem availability using the method check():

mssql.check()
sftp.check()
spark_hdfs.check()

It will raise an exception if database/filesystem cannot be accessed.

This method returns connection itself, so you can create connection and immediately check its availability:

mssql = MSSQL(
    host="mssqldb.demo.com",
    user="onetl",
    password="onetl",
    database="Telecom",
    spark=spark,
).check()  # <--

Extract/Load data#

Basics#

As we said above, onETL is used to extract data from and load data into remote systems.

onETL provides several classes for this:

All of these classes have a method run() that starts extracting/loading the data:

from onetl.db import DBReader, DBWriter

reader = DBReader(
    connection=mssql,
    source="dbo.demo_table",
    columns=["column_1", "column_2"],
)

# Read data as Spark DataFrame
df = reader.run()

db_writer = DBWriter(
    connection=hive,
    target="dl_sb.demo_table",
)

# Save Spark DataFrame to Hive table
writer.run(df)

Extract data#

To extract data you can use classes:

Use case

Connection

run() gets

run() returns

DBReader

Reading data from a database

Any DBConnection

-

Spark DataFrame

FileDFReader

Read data from a file or set of files

Any FileDFConnection

No input, or List[File path on FileSystem]

Spark DataFrame

FileDownloader

Download files from remote FS to local FS

Any FileConnection

No input, or List[File path on remote FileSystem]

DownloadResult

Load data#

To load data you can use classes:

Use case

Connection

run() gets

run() returns

DBWriter

Writing data from a DataFrame to a database

Any DBConnection

Spark DataFrame

None

FileDFWriter

Writing data from a DataFrame to a folder

Any FileDFConnection

Spark DataFrame

None

FileUploader

Uploading files from a local FS to remote FS

Any FileConnection

List[File path on local FileSystem]

UploadResult

Manipulate data#

To manipulate data you can use classes:

Use case

Connection

run() gets

run() returns

FileMover

Move files between directories in remote FS

Any FileConnection

List[File path on remote FileSystem]

MoveResult

Options#

Extract and load classes have a options parameter, which has a special meaning:

  • all other parameters - WHAT we extract / WHERE we load to

  • options parameter - HOW we extract/load data

db_reader = DBReader(
    # WHAT do we read:
    connection=mssql,
    source="dbo.demo_table",  # some table from MSSQL
    columns=["column_1", "column_2"],  # but only specific set of columns
    where="column_2 > 1000",  # only rows matching the clause
    # HOW do we read:
    options=MSSQL.ReadOptions(
        numPartitions=10,  # read in 10 parallel jobs
        partitionColumn="id",  # balance data read by assigning each job a part of data using `hash(id) mod N` expression
        partitioningMode="hash",
        fetchsize=1000,  # each job will fetch block of 1000 rows each on every read attempt
    ),
)

db_writer = DBWriter(
    # WHERE do we write to - to some table in Hive
    connection=hive,
    target="dl_sb.demo_table",
    # HOW do we write - overwrite all the data in the existing table
    options=Hive.WriteOptions(if_exists="replace_entire_table"),
)

file_downloader = FileDownloader(
    # WHAT do we download - files from some dir in SFTP
    connection=sftp,
    source_path="/source",
    filters=[Glob("*.csv")],  # only CSV files
    limits=[MaxFilesCount(1000)],  # 1000 files max
    # WHERE do we download to - a specific dir on local FS
    local_path="/some",
    # HOW do we download:
    options=FileDownloader.Options(
        delete_source=True,  # after downloading each file remove it from source_path
        if_exists="replace_file",  # replace existing files in the local_path
    ),
)

file_uploader = FileUploader(
    # WHAT do we upload - files from some local dir
    local_path="/source",
    # WHERE do we upload to- specific remote dir in HDFS
    connection=hdfs,
    target_path="/some",
    # HOW do we upload:
    options=FileUploader.Options(
        delete_local=True,  # after uploading each file remove it from local_path
        if_exists="replace_file",  # replace existing files in the target_path
    ),
)

file_mover = FileMover(
    # WHAT do we move - files in some remote dir in HDFS
    source_path="/source",
    connection=hdfs,
    # WHERE do we move files to
    target_path="/some",  # a specific remote dir within the same HDFS connection
    # HOW do we load - replace existing files in the target_path
    options=FileMover.Options(if_exists="replace_file"),
)

file_df_reader = FileDFReader(
    # WHAT do we read - *.csv files from some dir in S3
    connection=s3,
    source_path="/source",
    file_format=CSV(),
    # HOW do we read - load files from /source/*.csv, not from /source/nested/*.csv
    options=FileDFReader.Options(recursive=False),
)

file_df_writer = FileDFWriter(
    # WHERE do we write to - as .csv files in some dir in S3
    connection=s3,
    target_path="/target",
    file_format=CSV(),
    # HOW do we write - replace all existing files in /target, if exists
    options=FileDFWriter.Options(if_exists="replace_entire_directory"),
)

More information about options could be found on DB connection. and File Downloader / File Uploader / File Mover / FileDF Reader / FileDF Writer documentation

Read Strategies#

onETL have several builtin strategies for reading data:

  1. Snapshot strategy (default strategy)

  2. Incremental strategy

  3. Snapshot batch strategy

  4. Incremental batch strategy

For example, an incremental strategy allows you to get only new data from the table:

from onetl.strategy import IncrementalStrategy

reader = DBReader(
    connection=mssql,
    source="dbo.demo_table",
    hwm_column="id",  # detect new data based on value of "id" column
)

# first run
with IncrementalStrategy():
    df = reader.run()

sleep(3600)

# second run
with IncrementalStrategy():
    # only rows, that appeared in the source since previous run
    df = reader.run()

or get only files which were not downloaded before:

from onetl.strategy import IncrementalStrategy

file_downloader = FileDownloader(
    connection=sftp,
    source_path="/remote",
    local_path="/local",
    hwm_type="file_list",  # save all downloaded files to a list, and exclude files already present in this list
)

# first run
with IncrementalStrategy():
    files = file_downloader.run()

sleep(3600)

# second run
with IncrementalStrategy():
    # only files, that appeared in the source since previous run
    files = file_downloader.run()

Most of strategies are based on HWM, Please check each strategy documentation for more details

Why just not use Connection class for extract/load?#

Connections are very simple, they have only a set of some basic operations, like mkdir, remove_file, get_table_schema, and so on.

High-level operations, like
  • Read Strategies support

  • Handling metadata push/pull

  • Handling different options, like if_exists="replace_file" in case of file download/upload

is moved to a separate class which calls the connection object methods to perform some complex logic.