File Downloader#
Allows you to download files from a remote source with specified file connection and parameters, and return an object with download result summary. |
|
|
Method for downloading files from source to local directory. |
Get file list in the |
- class onetl.file.file_downloader.file_downloader.FileDownloader(*, connection: ~onetl.base.base_file_connection.BaseFileConnection, local_path: ~onetl.impl.local_path.LocalPath, source_path: ~onetl.impl.remote_path.RemotePath | None = None, temp_path: ~onetl.impl.local_path.LocalPath | None = None, filter: ~typing.List[~onetl.base.base_file_filter.BaseFileFilter] = None, limit: ~typing.List[~onetl.base.base_file_limit.BaseFileLimit] = None, hwm: ~etl_entities.hwm.file.file_hwm.FileHWM | None = None, hwm_type: ~typing.Type[~etl_entities.old_hwm.file_list_hwm.FileListHWM] | str | None = None, options: ~onetl.file.file_downloader.options.FileDownloaderOptions = FileDownloaderOptions(if_exists=<FileExistBehavior.ERROR: 'error'>, delete_source=False, workers=1))#
Allows you to download files from a remote source with specified file connection and parameters, and return an object with download result summary.
Note
FileDownloader can return different results depending on Read Strategies
Note
This class is used to download files only from remote directory to the local one.
It does NOT support direct file transfer between filesystems, like
FTP -> SFTP
. You should use FileDownloader + File Uploader to implementFTP -> local dir -> SFTP
.- Parameters:
- connection
onetl.connection.FileConnection
Class which contains File system connection properties. See File Connections section.
- local_path
os.PathLike
orstr
Local path where you download files
- source_path
os.PathLike
orstr
, optional, default:None
Remote path to download files from.
Could be
None
, but only if you pass absolute file paths directly torun
method- temp_path
os.PathLike
orstr
, optional, default:None
If set, this path will be used for downloading a file, and then renaming it to the target file path. If
None
is passed, files are downloaded directly totarget_path
.Warning
In case of production ETL pipelines, please set a value for
temp_path
(NOTNone
). This allows to properly handle download interruption, without creating half-downloaded files in the target, because unlike file download,rename
call is atomic.Warning
In case of connections like SFTP or FTP, which can have multiple underlying filesystems, please pass to
temp_path
path on the SAME filesystem astarget_path
. Otherwise instead ofrename
, remote OS will move file between filesystems, which is NOT atomic operation.- filterslist of
BaseFileFilter
Return only files/directories matching these filters. See File Filters
- limitslist of
BaseFileLimit
Apply limits to the list of files/directories, and stop if one of the limits is reached. See File Limits
- options
Options
| dict | None, default:None
File downloading options. See
Options
- hwmtype[HWM] | None, default:
None
HWM class to detect changes in incremental run. See File HWM
Warning
Used only in
IncrementalStrategy
.
- connection
Examples
Simple Downloader creation
from onetl.connection import SFTP from onetl.file import FileDownloader sftp = SFTP(...) # create downloader downloader = FileDownloader( connection=sftp, source_path="/path/to/remote/source", local_path="/path/to/local", ) # download files to "/path/to/local" downloader.run()
Downloader with all parameters
from onetl.connection import SFTP from onetl.file import FileDownloader from onetl.file.filter import Glob, ExcludeDir from onetl.file.limit import MaxFilesCount sftp = SFTP(...) # create downloader with a bunch of options downloader = FileDownloader( connection=sftp, source_path="/path/to/remote/source", local_path="/path/to/local", temp_path="/tmp", filters=[ Glob("*.txt"), ExcludeDir("/path/to/remote/source/exclude_dir"), ], limits=[MaxFilesCount(100)], options=FileDownloader.Options(delete_source=True, if_exists="replace_file"), ) # download files to "/path/to/local", # but only *.txt, # excluding files from "/path/to/remote/source/exclude_dir" directory # and stop before downloading 101 file downloader.run()
Incremental download:
from onetl.connection import SFTP from onetl.file import FileDownloader from onetl.strategy import IncrementalStrategy from etl_entities.hwm import FileListHWM sftp = SFTP(...) # create downloader downloader = FileDownloader( connection=sftp, source_path="/path/to/remote/source", local_path="/path/to/local", hwm=FileListHWM( name="my_unique_hwm_name", directory="/path/to/remote/source" ), # mandatory for IncrementalStrategy ) # download files to "/path/to/local", but only new ones with IncrementalStrategy(): downloader.run()
- run(files: Iterable[str | PathLike] | None = None) DownloadResult #
Method for downloading files from source to local directory.
Note
This method can return different results depending on Read Strategies
- Parameters:
- filesIterable[str | os.PathLike] | None, default
None
File list to download.
If empty, download files from
source_path
tolocal_path
, applyingfilter
,limit
andhwm
to each one (if set).If not, download to
local_path
all input files, ignoring filters, limits and HWM.
- filesIterable[str | os.PathLike] | None, default
- Returns:
- downloaded_files
DownloadResult
Download result object
- downloaded_files
- Raises:
onetl.exception.DirectoryNotFoundError
source_path
does not found- NotADirectoryError
source_path
orlocal_path
is not a directory
Examples
Download files from
source_path
tolocal_path
from onetl.impl import RemoteFile, LocalPath from onetl.file import FileDownloader downloader = FileDownloader(source_path="/remote", local_path="/local", ...) downloaded_files = downloader.run() assert downloaded_files.successful == { LocalPath("/local/file1.txt"), LocalPath("/local/file2.txt"), LocalPath("/local/nested/path/file3.txt"), # directory structure is preserved } assert downloaded_files.failed == {FailedRemoteFile("/remote/failed.file")} assert downloaded_files.skipped == {RemoteFile("/remote/already.exists")} assert downloaded_files.missing == {RemotePath("/remote/missing.file")}
Download only certain files from
source_path
from onetl.impl import RemoteFile, LocalPath from onetl.file import FileDownloader downloader = FileDownloader(source_path="/remote", local_path="/local", ...) # paths could be relative or absolute, but all should be in "/remote" downloaded_files = downloader.run( [ "/remote/file1.txt", "/remote/nested/path/file3.txt", # excluding "/remote/file2.txt" ] ) assert downloaded_files.successful == { LocalPath("/local/file1.txt"), LocalPath("/local/nested/path/file3.txt"), # directory structure is preserved } assert not downloaded_files.failed assert not downloaded_files.skipped assert not downloaded_files.missing
Download certain files from any folder
from onetl.impl import RemoteFile, LocalPath from onetl.file import FileDownloader downloader = FileDownloader(local_path="/local", ...) # no source_path set # only absolute paths downloaded_files = downloader.run( [ "/remote/file1.txt", "/any/nested/path/file2.txt", ] ) assert downloaded_files.successful == { LocalPath("/local/file1.txt"), LocalPath("/local/file2.txt"), # directory structure is NOT preserved without source_path } assert not downloaded_files.failed assert not downloaded_files.skipped assert not downloaded_files.missing
- view_files() FileSet[RemoteFile] #
Get file list in the
source_path
, afterfilter
,limit
andhwm
applied (if any).Note
This method can return different results depending on Read Strategies
- Returns:
- FileSet[RemoteFile]
Set of files in
source_path
, which will be downloaded byrun
method
- Raises:
onetl.exception.DirectoryNotFoundError
source_path
does not found- NotADirectoryError
source_path
is not a directory
Examples
View files
from onetl.impl import RemoteFile from onetl.file import FileDownloader downloader = FileDownloader(source_path="/remote", ...) view_files = downloader.view_files() assert view_files == { RemoteFile("/remote/file1.txt"), RemoteFile("/remote/file3.txt"), RemoteFile("/remote/nested/file3.txt"), }