0.10.0 (2023-12-18)#

Breaking Changes#

  • Upgrade etl-entities from v1 to v2 (#172).

    This implies that HWM classes are now have different internal structure than they used to.

    Before:

    from etl_entities.old_hwm import IntHWM as OldIntHWM
    from etl_entities.source import Column, Table
    from etl_entities.process import Process
    
    hwm = OldIntHWM(
        process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
        source=Table(name="schema.table", instance="postgres://host:5432/db"),
        column=Column(name="col1"),
        value=123,
    )
    

    After:

    from etl_entities.hwm import ColumnIntHWM
    
    hwm = ColumnIntHWM(
        name="some_unique_name",
        description="any value you want",
        source="schema.table",
        expression="col1",
        value=123,
    )
    

    Breaking change: If you used HWM classes from etl_entities module, you should rewrite your code to make it compatible with new version.

    More details
    • HWM classes used by previous onETL versions were moved from etl_entities to etl_entities.old_hwm submodule. They are here for compatibility reasons, but are planned to be removed in etl-entities v3 release.

    • New HWM classes have flat structure instead of nested.

    • New HWM classes have mandatory name attribute (it was known as qualified_name before).

    • Type aliases used while serializing and deserializing HWM objects to dict representation were changed too: int -> column_int.

    To make migration simpler, you can use new method:

    old_hwm = OldIntHWM(...)
    new_hwm = old_hwm.as_new_hwm()
    

    Which automatically converts all fields from old structure to new one, including qualified_name -> name.

  • Breaking changes:

    • Methods BaseHWMStore.get() and BaseHWMStore.save() were renamed to get_hwm() and set_hwm().

    • They now can be used only with new HWM classes from etl_entities.hwm, old HWM classes are not supported.

    If you used them in your code, please update it accordingly.

  • YAMLHWMStore CANNOT read files created by older onETL versions (0.9.x or older).

    Update procedure
    # pip install onetl==0.9.5
    
    # Get qualified_name for HWM
    
    
    # Option 1. HWM is built manually
    from etl_entities import IntHWM, FileListHWM
    from etl_entities.source import Column, Table, RemoteFolder
    from etl_entities.process import Process
    
    # for column HWM
    old_column_hwm = IntHWM(
        process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
        source=Table(name="schema.table", instance="postgres://host:5432/db"),
        column=Column(name="col1"),
    )
    qualified_name = old_column_hwm.qualified_name
    # "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost"
    
    # for file HWM
    old_file_hwm = FileListHWM(
        process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
        source=RemoteFolder(name="/absolute/path", instance="ftp://ftp.server:21"),
    )
    qualified_name = old_file_hwm.qualified_name
    # "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost"
    
    
    # Option 2. HWM is generated automatically (by DBReader/FileDownloader)
    # See onETL logs and search for string like qualified_name = '...'
    
    qualified_name = "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost"
    
    
    # Get .yml file path by qualified_name
    
    import os
    from pathlib import PurePosixPath
    from onetl.hwm.store import YAMLHWMStore
    
    # here you should pass the same arguments as used on production, if any
    yaml_hwm_store = YAMLHWMStore()
    hwm_path = yaml_hwm_store.get_file_path(qualified_name)
    print(hwm_path)
    
    # for column HWM
    # LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/col1__schema.table__postgres_host_5432_db__cde.abc.myprocess__myhost.yml')
    
    # for file HWM
    # LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/file_list__absolute_path__ftp_ftp.server_21__cde.abc.myprocess__myhost.yml')
    
    
    # Read raw .yml file content
    
    from yaml import safe_load, dump
    
    raw_old_hwm_items = safe_load(hwm_path.read_text())
    print(raw_old_hwm_items)
    
    # for column HWM
    # [
    #   {
    #     "column": { "name": "col1", "partition": {} },
    #     "modified_time": "2023-12-18T10: 39: 47.377378",
    #     "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
    #     "source": { "instance": "postgres: //host:5432/db", "name": "schema.table" },
    #     "type": "int",
    #     "value": "123",
    #   },
    # ]
    
    # for file HWM
    # [
    #   {
    #     "modified_time": "2023-12-18T11:15:36.478462",
    #     "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
    #     "source": { "instance": "ftp://ftp.server:21", "name": "/absolute/path" },
    #     "type": "file_list",
    #     "value": ["file1.txt", "file2.txt"],
    #   },
    # ]
    
    
    # Convert file content to new structure, compatible with onETL 0.10.x
    raw_new_hwm_items = []
    for old_hwm in raw_old_hwm_items:
        new_hwm = {"name": qualified_name, "modified_time": old_hwm["modified_time"]}
    
        if "column" in old_hwm:
            new_hwm["expression"] = old_hwm["column"]["name"]
        new_hwm["entity"] = old_hwm["source"]["name"]
        old_hwm.pop("process", None)
    
        if old_hwm["type"] == "int":
            new_hwm["type"] = "column_int"
            new_hwm["value"] = old_hwm["value"]
    
        elif old_hwm["type"] == "date":
            new_hwm["type"] = "column_date"
            new_hwm["value"] = old_hwm["value"]
    
        elif old_hwm["type"] == "datetime":
            new_hwm["type"] = "column_datetime"
            new_hwm["value"] = old_hwm["value"]
    
        elif old_hwm["type"] == "file_list":
            new_hwm["type"] = "file_list"
            new_hwm["value"] = [
                os.fspath(PurePosixPath(old_hwm["source"]["name"]).joinpath(path))
                for path in old_hwm["value"]
            ]
    
        else:
            raise ValueError("WAT?")
    
        raw_new_hwm_items.append(new_hwm)
    
    
    print(raw_new_hwm_items)
    # for column HWM
    # [
    #   {
    #     "name": "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost",
    #     "modified_time": "2023-12-18T10:39:47.377378",
    #     "expression": "col1",
    #     "source": "schema.table",
    #     "type": "column_int",
    #     "value": 123,
    #   },
    # ]
    
    # for file HWM
    # [
    #   {
    #     "name": "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost",
    #     "modified_time": "2023-12-18T11:15:36.478462",
    #     "entity": "/absolute/path",
    #     "type": "file_list",
    #     "value": ["/absolute/path/file1.txt", "/absolute/path/file2.txt"],
    #   },
    # ]
    
    
    # Save file with new content
    with open(hwm_path, "w") as file:
        dump(raw_new_hwm_items, file)
    
    
    # Stop Python interpreter and update onETL
    # pip install onetl==0.10.0
    # Check that new .yml file can be read
    
    from onetl.hwm.store import YAMLHWMStore
    
    qualified_name = ...
    
    # here you should pass the same arguments as used on production, if any
    yaml_hwm_store = YAMLHWMStore()
    yaml_hwm_store.get_hwm(qualified_name)
    
    # for column HWM
    # ColumnIntHWM(
    #     name='col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost',
    #     description='',
    #     entity='schema.table',
    #     value=123,
    #     expression='col1',
    #     modified_time=datetime.datetime(2023, 12, 18, 10, 39, 47, 377378),
    # )
    
    # for file HWM
    # FileListHWM(
    #     name='file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost',
    #     description='',
    #     entity=AbsolutePath('/absolute/path'),
    #     value=frozenset({AbsolutePath('/absolute/path/file1.txt'), AbsolutePath('/absolute/path/file2.txt')}),
    #     expression=None,
    #     modified_time=datetime.datetime(2023, 12, 18, 11, 15, 36, 478462)
    # )
    
    
    # That's all!
    

    But most of users use other HWM store implementations which do not have such issues.

  • Several classes and functions were moved from onetl to etl_entities:

    onETL 0.9.x and older

    onETL 0.10.x and newer

    from onetl.hwm.store import (
        detect_hwm_store,
        BaseHWMStore,
        HWMStoreClassRegistry,
        register_hwm_store_class,
        HWMStoreManager,
        MemoryHWMStore,
    )
    
    from etl_entities.hwm_store import (
        detect_hwm_store,
        BaseHWMStore,
        HWMStoreClassRegistry,
        register_hwm_store_class,
        HWMStoreManager,
        MemoryHWMStore,
    )
    

    They still can be imported from old module, but this is deprecated and will be removed in v1.0.0 release.

  • Change the way of passing HWM to DBReader and FileDownloader classes:

    onETL 0.9.x and older

    onETL 0.10.x and newer

    reader = DBReader(
        connection=...,
        source=...,
        hwm_column="col1",
    )
    
    reader = DBReader(
        connection=...,
        source=...,
        hwm=DBReader.AutoDetectHWM(
            # name is mandatory now!
            name="my_unique_hwm_name",
            expression="col1",
        ),
    )
    
    reader = DBReader(
        connection=...,
        source=...,
        hwm_column=(
            "col1",
            "cast(col1 as date)",
        ),
    )
    
    reader = DBReader(
        connection=...,
        source=...,
        hwm=DBReader.AutoDetectHWM(
            # name is mandatory now!
            name="my_unique_hwm_name",
            expression="cast(col1 as date)",
        ),
    )
    
    downloader = FileDownloader(
        connection=...,
        source_path=...,
        target_path=...,
        hwm_type="file_list",
    )
    
    downloader = FileDownloader(
        connection=...,
        source_path=...,
        target_path=...,
        hwm=FileListHWM(
            # name is mandatory now!
            name="another_unique_hwm_name",
        ),
    )
    

    New HWM classes have mandatory name attribute which should be passed explicitly, instead of generating if automatically under the hood.

    Automatic name generation using the old DBReader.hwm_column / FileDownloader.hwm_type syntax is still supported, but will be removed in v1.0.0 release. (#179)

  • Performance of read Incremental and Batch strategies has been drastically improved. (#182).

    Before and after in details

    DBReader.run() + incremental/batch strategy behavior in versions 0.9.x and older:

    • Get table schema by making query SELECT * FROM table WHERE 1=0 (if DBReader.columns has *)

    • Expand * to real column names from table, add here hwm_column, remove duplicates (as some RDBMS does not allow that).

    • Create dataframe from query like SELECT hwm_expression AS hwm_column, ...other table columns... FROM table WHERE hwm_expression > prev_hwm.value.

    • Determine HWM class using dataframe schema: df.schema[hwm_column].dataType.

    • Determine x HWM column value using Spark: df.select(max(hwm_column)).collect().

    • Use max(hwm_column) as next HWM value, and save it to HWM Store.

    • Return dataframe to user.

    This was far from ideal:

    • Dataframe content (all rows or just changed ones) was loaded from the source to Spark only to get min/max values of specific column.

    • Step of fetching table schema and then substituting column names in the next query caused some unexpected errors.

      For example, source contains columns with mixed name case, like "CamelColumn" or "spaced column".

      Column names were not escaped during query generation, leading to queries that cannot be executed by database.

      So users have to explicitly pass column names DBReader, wrapping columns with mixed naming with ":

      reader = DBReader(
          connection=...,
          source=...,
          columns=[  # passing '*' here leads to wrong SQL query generation
              "normal_column",
              '"CamelColumn"',
              '"spaced column"',
              ...,
          ],
      )
      
    • Using DBReader with IncrementalStrategy could lead to reading rows already read before.

      Dataframe was created from query with WHERE clause like hwm.expression > prev_hwm.value, not hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value.

      So if new rows appeared in the source after HWM value is determined, they can be read by accessing dataframe content (because Spark dataframes are lazy), leading to inconsistencies between HWM value and dataframe content.

      This may lead to issues then DBReader.run() read some data, updated HWM value, and next call of DBReader.run() will read rows that were already read in previous run.

    DBReader.run() + incremental/batch strategy behavior in versions 0.10.x and newer:

    • Detect type of HWM expression: SELECT hwm.expression FROM table WHERE 1=0.

    • Determine corresponding Spark type df.schema[0] and when determine matching HWM class (if DReader.AutoDetectHWM is used).

    • Get min/max values by querying the source: SELECT MAX(hwm.expression) FROM table WHERE hwm.expression >= prev_hwm.value.

    • Use max(hwm.expression) as next HWM value, and save it to HWM Store.

    • Create dataframe from query SELECT ... table columns ... FROM table WHERE hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value, baking new HWM value into the query.

    • Return dataframe to user.

    Improvements:

    • Allow source to calculate min/max instead of loading everything to Spark. This should be faster on large amounts of data (up to x2), because we do not transfer all the data from the source to Spark. This can be even faster if source have indexes for HWM column.

    • Columns list is passed to source as-is, without any resolving on DBReader side. So you can pass DBReader(columns=["*"]) to read tables with mixed columns naming.

    • Restrict dataframe content to always match HWM values, which leads to never reading the same row twice.

    Breaking change: HWM column is not being implicitly added to dataframe. It was a part of SELECT clause, but now it is mentioned only in WHERE clause.

    So if you had code like this, you have to rewrite it:

    onETL 0.9.x and older

    onETL 0.10.x and newer

    reader = DBReader(
        connection=...,
        source=...,
        columns=[
            "col1",
            "col2",
        ],
        hwm_column="hwm_col",
    )
    
    df = reader.run()
    # hwm_column value is in the dataframe
    assert df.columns == ["col1", "col2", "hwm_col"]
    
    reader = DBReader(
        connection=...,
        source=...,
        columns=[
            "col1",
            "col2",
            # add hwm_column explicitly
            "hwm_col",
        ],
        hwm_column="hwm_col",
    )
    
    df = reader.run()
    # if columns list is not updated,
    # this fill fail
    assert df.columns == ["col1", "col2", "hwm_col"]
    
    reader = DBReader(
        connection=...,
        source=...,
        columns=[
            "col1",
            "col2",
        ],
        hwm_column=(
            "hwm_col",
            "cast(hwm_col as int)",
        ),
    )
    
    df = reader.run()
    # hwm_expression value is in the dataframe
    assert df.columns == ["col1", "col2", "hwm_col"]
    
    reader = DBReader(
        connection=...,
        source=...,
        columns=[
            "col1",
            "col2",
            # add hwm_expression explicitly
            "cast(hwm_col as int) as hwm_col",
        ],
        hwm_column=(
            "hwm_col",
            "cast(hwm_col as int)",
        ),
    )
    
    df = reader.run()
    # if columns list is not updated,
    # this fill fail
    assert df.columns == ["col1", "col2", "hwm_col"]
    

    But most users just use columns=["*"] anyway, they won’t see any changes.

  • FileDownloader.run() now updates HWM in HWM Store not after each file is being successfully downloaded, but after all files were handled.

    This is because:

    • FileDownloader can be used with DownloadOptions(workers=N), which could lead to race condition - one thread can save to HWM store one HWM value when another thread will save different value.

    • FileDownloader can download hundreds and thousands of files, and issuing a request to HWM Store for each file could potentially DDoS HWM Store. (#189)

    There is a exception handler which tries to save HWM to HWM store if download process was interrupted. But if it was interrupted by force, like sending SIGKILL event, HWM will not be saved to HWM store, so some already downloaded files may be downloaded again next time.

    But unexpected process kill may produce other negative impact, like some file will be downloaded partially, so this is an expected behavior.

Features#

  • Add Python 3.12 compatibility. (#167)

  • Excel file format now can be used with Spark 3.5.0. (#187)

  • SnapshotBatchStagy and IncrementalBatchStrategy does no raise exceptions if source does not contain any data. Instead they stop at first iteration and return empty dataframe. (#188)

  • Cache result of connection.check() in high-level classes like DBReader, FileDownloader and so on. This makes logs less verbose. (#190)

Bug Fixes#

  • Fix @slot and @hook decorators returning methods with missing arguments in signature (Pylance, VS Code). (#183)

  • Kafka connector documentation said that it does support reading topic data incrementally by passing group.id or groupIdPrefix. Actually, this is not true, because Spark does not send information to Kafka which messages were consumed. So currently users can only read the whole topic, no incremental reads are supported.