File Downloader#

FileDownloader

Allows you to download files from a remote source with specified file connection and parameters, and return an object with download result summary.

FileDownloader.run([files])

Method for downloading files from source to local directory.

FileDownloader.view_files()

Get file list in the source_path, after filter, limit and hwm applied (if any).

class onetl.file.file_downloader.file_downloader.FileDownloader(*, connection: ~onetl.base.base_file_connection.BaseFileConnection, local_path: ~onetl.impl.local_path.LocalPath, source_path: ~onetl.impl.remote_path.RemotePath | None = None, temp_path: ~onetl.impl.local_path.LocalPath | None = None, filter: ~typing.List[~onetl.base.base_file_filter.BaseFileFilter] = None, limit: ~typing.List[~onetl.base.base_file_limit.BaseFileLimit] = None, hwm: ~etl_entities.hwm.file.file_hwm.FileHWM | None = None, hwm_type: ~typing.Type[~etl_entities.old_hwm.file_list_hwm.FileListHWM] | str | None = None, options: ~onetl.file.file_downloader.options.FileDownloaderOptions = FileDownloaderOptions(if_exists=<FileExistBehavior.ERROR: 'error'>, delete_source=False, workers=1))#

Allows you to download files from a remote source with specified file connection and parameters, and return an object with download result summary. support_hooks

Note

FileDownloader can return different results depending on Read Strategies

Note

This class is used to download files only from remote directory to the local one.

It does NOT support direct file transfer between filesystems, like FTP -> SFTP. You should use FileDownloader + File Uploader to implement FTP -> local dir -> SFTP.

Parameters:
connectiononetl.connection.FileConnection

Class which contains File system connection properties. See File Connections section.

local_pathos.PathLike or str

Local path where you download files

source_pathos.PathLike or str, optional, default: None

Remote path to download files from.

Could be None, but only if you pass absolute file paths directly to run method

temp_pathos.PathLike or str, optional, default: None

If set, this path will be used for downloading a file, and then renaming it to the target file path. If None is passed, files are downloaded directly to target_path.

Warning

In case of production ETL pipelines, please set a value for temp_path (NOT None). This allows to properly handle download interruption, without creating half-downloaded files in the target, because unlike file download, rename call is atomic.

Warning

In case of connections like SFTP or FTP, which can have multiple underlying filesystems, please pass to temp_path path on the SAME filesystem as target_path. Otherwise instead of rename, remote OS will move file between filesystems, which is NOT atomic operation.

filterslist of BaseFileFilter

Return only files/directories matching these filters. See File Filters

limitslist of BaseFileLimit

Apply limits to the list of files/directories, and stop if one of the limits is reached. See File Limits

optionsOptions | dict | None, default: None

File downloading options. See Options

hwmtype[HWM] | None, default: None

HWM class to detect changes in incremental run. See File HWM

Warning

Used only in IncrementalStrategy.

Examples

Simple Downloader creation

from onetl.connection import SFTP
from onetl.file import FileDownloader

sftp = SFTP(...)

# create downloader
downloader = FileDownloader(
    connection=sftp,
    source_path="/path/to/remote/source",
    local_path="/path/to/local",
)

# download files to "/path/to/local"
downloader.run()

Downloader with all parameters

from onetl.connection import SFTP
from onetl.file import FileDownloader
from onetl.file.filter import Glob, ExcludeDir
from onetl.file.limit import MaxFilesCount

sftp = SFTP(...)

# create downloader with a bunch of options
downloader = FileDownloader(
    connection=sftp,
    source_path="/path/to/remote/source",
    local_path="/path/to/local",
    temp_path="/tmp",
    filters=[
        Glob("*.txt"),
        ExcludeDir("/path/to/remote/source/exclude_dir"),
    ],
    limits=[MaxFilesCount(100)],
    options=FileDownloader.Options(delete_source=True, if_exists="replace_file"),
)

# download files to "/path/to/local",
# but only *.txt,
# excluding files from "/path/to/remote/source/exclude_dir" directory
# and stop before downloading 101 file
downloader.run()

Incremental download:

from onetl.connection import SFTP
from onetl.file import FileDownloader
from onetl.strategy import IncrementalStrategy
from etl_entities.hwm import FileListHWM

sftp = SFTP(...)

# create downloader
downloader = FileDownloader(
    connection=sftp,
    source_path="/path/to/remote/source",
    local_path="/path/to/local",
    hwm=FileListHWM(
        name="my_unique_hwm_name", directory="/path/to/remote/source"
    ),  # mandatory for IncrementalStrategy
)

# download files to "/path/to/local", but only new ones
with IncrementalStrategy():
    downloader.run()
run(files: Iterable[str | PathLike] | None = None) DownloadResult#

Method for downloading files from source to local directory. support_hooks

Note

This method can return different results depending on Read Strategies

Parameters:
filesIterable[str | os.PathLike] | None, default None

File list to download.

If empty, download files from source_path to local_path, applying filter, limit and hwm to each one (if set).

If not, download to local_path all input files, ignoring filters, limits and HWM.

Returns:
downloaded_filesDownloadResult

Download result object

Raises:
onetl.exception.DirectoryNotFoundError

source_path does not found

NotADirectoryError

source_path or local_path is not a directory

Examples

Download files from source_path to local_path

from onetl.impl import RemoteFile, LocalPath
from onetl.file import FileDownloader

downloader = FileDownloader(source_path="/remote", local_path="/local", ...)
downloaded_files = downloader.run()

assert downloaded_files.successful == {
    LocalPath("/local/file1.txt"),
    LocalPath("/local/file2.txt"),
    LocalPath("/local/nested/path/file3.txt"),  # directory structure is preserved
}
assert downloaded_files.failed == {FailedRemoteFile("/remote/failed.file")}
assert downloaded_files.skipped == {RemoteFile("/remote/already.exists")}
assert downloaded_files.missing == {RemotePath("/remote/missing.file")}

Download only certain files from source_path

from onetl.impl import RemoteFile, LocalPath
from onetl.file import FileDownloader

downloader = FileDownloader(source_path="/remote", local_path="/local", ...)

# paths could be relative or absolute, but all should be in "/remote"
downloaded_files = downloader.run(
    [
        "/remote/file1.txt",
        "/remote/nested/path/file3.txt",
        # excluding "/remote/file2.txt"
    ]
)

assert downloaded_files.successful == {
    LocalPath("/local/file1.txt"),
    LocalPath("/local/nested/path/file3.txt"),  # directory structure is preserved
}
assert not downloaded_files.failed
assert not downloaded_files.skipped
assert not downloaded_files.missing

Download certain files from any folder

from onetl.impl import RemoteFile, LocalPath
from onetl.file import FileDownloader

downloader = FileDownloader(local_path="/local", ...)  # no source_path set

# only absolute paths
downloaded_files = downloader.run(
    [
        "/remote/file1.txt",
        "/any/nested/path/file2.txt",
    ]
)

assert downloaded_files.successful == {
    LocalPath("/local/file1.txt"),
    LocalPath("/local/file2.txt"),
    # directory structure is NOT preserved without source_path
}
assert not downloaded_files.failed
assert not downloaded_files.skipped
assert not downloaded_files.missing
view_files() FileSet[RemoteFile]#

Get file list in the source_path, after filter, limit and hwm applied (if any). support_hooks

Note

This method can return different results depending on Read Strategies

Returns:
FileSet[RemoteFile]

Set of files in source_path, which will be downloaded by run method

Raises:
onetl.exception.DirectoryNotFoundError

source_path does not found

NotADirectoryError

source_path is not a directory

Examples

View files

from onetl.impl import RemoteFile
from onetl.file import FileDownloader

downloader = FileDownloader(source_path="/remote", ...)

view_files = downloader.view_files()

assert view_files == {
    RemoteFile("/remote/file1.txt"),
    RemoteFile("/remote/file3.txt"),
    RemoteFile("/remote/nested/file3.txt"),
}