YAMLHWMStore#

class onetl.hwm.store.yaml_hwm_store.YAMLHWMStore(*, path: LocalPath = LocalPosixPath('/home/docs/.local/share/onETL/yml_hwm_store'), encoding: str = 'utf-8')#

YAML local store for HWM values. Used as default HWM store. support_hooks

Parameters:
pathpathlib.Path or str

Folder name there HWM value files will be stored.

Default:

  • ~/.local/share/onETL/yml_hwm_store on Linux

  • C:\Documents and Settings\<User>\Application Data\oneTools\onETL\yml_hwm_store on Windows

  • ~/Library/Application Support/onETL/yml_hwm_store on MacOS

encodingstr, default: utf-8

Encoding of files with HWM value

Examples

Default parameters

from onetl.connection import Hive, Postgres
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
from onetl.hwm.store import YAMLHWMStore

from pyspark.sql import SparkSession

maven_packages = Postgres.get_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

postgres = Postgres(
    host="postgres.domain.com",
    user="myuser",
    password="*****",
    database="target_database",
    spark=spark,
)

hive = Hive(cluster="rnd-dwh", spark=spark)

reader = DBReader(
    connection=postgres,
    source="public.mydata",
    columns=["id", "data"],
    hwm=DBReader.AutoDetectHWM(name="some_unique_name", expression="id"),
)

writer = DBWriter(connection=hive, target="newtable")

with YAMLHWMStore():
    with IncrementalStrategy():
        df = reader.run()
        writer.run(df)

# will create file
# "~/.local/share/onETL/id__public.mydata__postgres_postgres.domain.com_5432__myprocess__myhostname.yml"
# with encoding="utf-8" and save a serialized HWM values to this file

With all options

with YAMLHWMStore(path="/my/store", encoding="utf-8"):
    with IncrementalStrategy():
        df = reader.run()
        writer.run(df)

# will create file
# "/my/store/id__public.mydata__postgres_postgres.domain.com_5432__myprocess__myhostname.yml"
# with encoding="utf-8" and save a serialized HWM values to this file

File content example:

- column:
    name: id
    partition: {}
  modified_time: '2023-02-11T17:10:49.659019'
  process:
      dag: ''
      host: myhostname
      name: myprocess
      task: ''
  source:
      db: public
      instance: postgres://postgres.domain.com:5432/target_database
      name: mydata
  type: int
  value: '1500'
- column:
      name: id
      partition: {}
  modified_time: '2023-02-11T16:00:31.962150'
  process:
      dag: ''
      host: myhostname
      name: myprocess
      task: ''
  source:
      db: public
      instance: postgres://postgres.domain.com:5432/target_database
      name: mydata
  type: int
  value: '1000'
__enter__()#

HWM store context manager.

Enter this context to use this HWM store instance as current one (instead default).

Examples

from etl_entities.hwm_store import HWMStoreStackManager

with SomeHWMStore(...) as hwm_store:
    assert HWMStoreStackManager.get_current() == hwm_store

assert HWMStoreStackManager.get_current() == default_hwm_store
get_hwm(name: str) HWM | None#

Get HWM by name from HWM store.

Parameters:
namestr

HWM unique name

Returns:
HWM

HWM object, if it exists in HWM store, or None

Examples

hwm = hwm_store.get_hwm(hwm_unique_name)
set_hwm(hwm: HWM) LocalPath#

Save HWM object to HWM Store.

Parameters:
hwmHWM

HWM object

Returns:
Any

HWM location, like URL of file path. Result type is implementation-specific.

Examples

from etl_entities.hwm import ColumnIntHWM

hwm = ColumnIntHWM(name=..., value=...)
hwm_location = hwm_store.set_hwm(hwm)