YAMLHWMStore#
- class onetl.hwm.store.yaml_hwm_store.YAMLHWMStore(*, path: LocalPath = LocalPosixPath('/home/docs/.local/share/onETL/yml_hwm_store'), encoding: str = 'utf-8')#
YAML local store for HWM values. Used as default HWM store.
- Parameters:
- path
pathlib.Path
or str Folder name there HWM value files will be stored.
Default:
~/.local/share/onETL/yml_hwm_store
on LinuxC:\Documents and Settings\<User>\Application Data\oneTools\onETL\yml_hwm_store
on Windows~/Library/Application Support/onETL/yml_hwm_store
on MacOS
- encodingstr, default:
utf-8
Encoding of files with HWM value
- path
Examples
Default parameters
from onetl.connection import Hive, Postgres from onetl.db import DBReader from onetl.strategy import IncrementalStrategy from onetl.hwm.store import YAMLHWMStore from pyspark.sql import SparkSession maven_packages = Postgres.get_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) .getOrCreate() ) postgres = Postgres( host="postgres.domain.com", user="myuser", password="*****", database="target_database", spark=spark, ) hive = Hive(cluster="rnd-dwh", spark=spark) reader = DBReader( connection=postgres, source="public.mydata", columns=["id", "data"], hwm=DBReader.AutoDetectHWM(name="some_unique_name", expression="id"), ) writer = DBWriter(connection=hive, target="newtable") with YAMLHWMStore(): with IncrementalStrategy(): df = reader.run() writer.run(df) # will create file # "~/.local/share/onETL/id__public.mydata__postgres_postgres.domain.com_5432__myprocess__myhostname.yml" # with encoding="utf-8" and save a serialized HWM values to this file
With all options
with YAMLHWMStore(path="/my/store", encoding="utf-8"): with IncrementalStrategy(): df = reader.run() writer.run(df) # will create file # "/my/store/id__public.mydata__postgres_postgres.domain.com_5432__myprocess__myhostname.yml" # with encoding="utf-8" and save a serialized HWM values to this file
File content example:
- column: name: id partition: {} modified_time: '2023-02-11T17:10:49.659019' process: dag: '' host: myhostname name: myprocess task: '' source: db: public instance: postgres://postgres.domain.com:5432/target_database name: mydata type: int value: '1500' - column: name: id partition: {} modified_time: '2023-02-11T16:00:31.962150' process: dag: '' host: myhostname name: myprocess task: '' source: db: public instance: postgres://postgres.domain.com:5432/target_database name: mydata type: int value: '1000'
- __enter__()#
HWM store context manager.
Enter this context to use this HWM store instance as current one (instead default).
Examples
from etl_entities.hwm_store import HWMStoreStackManager with SomeHWMStore(...) as hwm_store: assert HWMStoreStackManager.get_current() == hwm_store assert HWMStoreStackManager.get_current() == default_hwm_store
- get_hwm(name: str) HWM | None #
Get HWM by name from HWM store.
- Parameters:
- namestr
HWM unique name
- Returns:
HWM
HWM object, if it exists in HWM store, or None
Examples
hwm = hwm_store.get_hwm(hwm_unique_name)
- set_hwm(hwm: HWM) LocalPath #
Save HWM object to HWM Store.
- Parameters:
- hwm
HWM
HWM object
- hwm
- Returns:
- Any
HWM location, like URL of file path. Result type is implementation-specific.
Examples
from etl_entities.hwm import ColumnIntHWM hwm = ColumnIntHWM(name=..., value=...) hwm_location = hwm_store.set_hwm(hwm)