File Uploader#

FileUploader

Allows you to upload files to a remote source with specified file connection and parameters, and return an object with upload result summary.

FileUploader.run([files])

Method for uploading files to remote host.

FileUploader.view_files()

Get file list in the local_path.

class onetl.file.file_uploader.file_uploader.FileUploader(*, connection: ~onetl.base.base_file_connection.BaseFileConnection, target_path: ~onetl.impl.remote_path.RemotePath, local_path: ~onetl.impl.local_path.LocalPath | None = None, temp_path: ~onetl.impl.remote_path.RemotePath | None = None, options: ~onetl.file.file_uploader.options.FileUploaderOptions = FileUploaderOptions(if_exists=<FileExistBehavior.ERROR: 'error'>, delete_local=False, workers=1))#

Allows you to upload files to a remote source with specified file connection and parameters, and return an object with upload result summary. support_hooks

Note

This class is used to upload files only from local directory to the remote one.

It does NOT support direct file transfer between filesystems, like FTP -> SFTP. You should use File Downloader + FileUploader to implement FTP -> local dir -> SFTP.

Warning

This class does not support read strategies.

Parameters:
connectiononetl.connection.FileConnection

Class which contains File system connection properties. See File Connections section.

target_pathos.PathLike or str

Remote path where want you upload files to

local_pathos.PathLike or str, optional, default: None

The local directory from which the data is loaded.

Could be None, but only if you pass absolute file paths directly to run method

temp_pathos.PathLike or str, optional, default: None

If set, this path will be used for uploading a file, and then renaming it to the target file path. If None (default since v0.5.0) is passed, files are uploaded directly to target_path.

Warning

In case of production ETL pipelines, please set a value for temp_path (NOT None). This allows to properly handle upload interruption, without creating half-uploaded files in the target, because unlike file upload, rename call is atomic.

Warning

In case of connections like SFTP or FTP, which can have multiple underlying filesystems, please pass temp_path path on the SAME filesystem as target_path. Otherwise instead of rename, remote OS will move file between filesystems, which is NOT atomic operation.

optionsOptions | dict | None, default: None

File upload options. See Options

Examples

Simple Uploader creation

from onetl.connection import HDFS
from onetl.file import FileUploader

hdfs = HDFS(...)

uploader = FileUploader(
    connection=hdfs,
    target_path="/path/to/remote/source",
)

Uploader with all parameters

from onetl.connection import HDFS
from onetl.file import FileUploader

hdfs = HDFS(...)

uploader = FileUploader(
    connection=hdfs,
    target_path="/path/to/remote/source",
    temp_path="/user/onetl",
    local_path="/some/local/directory",
    options=FileUploader.Options(delete_local=True, if_exists="overwrite"),
)
run(files: Iterable[str | PathLike] | None = None) UploadResult#

Method for uploading files to remote host. support_hooks

Parameters:
filesIterator[str | os.PathLike] | None, default None

File list to upload.

If empty, upload files from local_path.

Returns:
uploaded_filesUploadResult

Upload result object

Raises:
onetl.exception.DirectoryNotFoundError

local_path does not found

NotADirectoryError

local_path is not a directory

ValueError

File in files argument does not match local_path

Examples

Upload files from local_path to target_path

from onetl.impl import (
    RemoteFile,
    LocalPath,
)
from onetl.file import FileUploader

uploader = FileUploader(local_path="/local", target_path="/remote", ...)
uploaded_files = uploader.run()

assert uploaded_files.successful == {
    RemoteFile("/remote/file1"),
    RemoteFile("/remote/file2"),
    RemoteFile("/remote/nested/path/file3"),  # directory structure is preserved
}
assert uploaded_files.failed == {FailedLocalFile("/local/failed.file")}
assert uploaded_files.skipped == {LocalPath("/local/already.exists")}
assert uploaded_files.missing == {LocalPath("/local/missing.file")}

Upload only certain files from local_path

from onetl.impl import (
    RemoteFile,
    LocalPath,
)
from onetl.file import FileUploader

uploader = FileUploader(local_path="/local", target_path="/remote", ...)

# paths could be relative or absolute, but all should be in "/local"
uploaded_files = uploader.run(
    [
        "/local/file1",
        "/local/nested/path/file3",
        # excluding "/local/file2",
    ]
)

assert uploaded_files.successful == {
    RemoteFile("/remote/file1"),
    RemoteFile("/remote/nested/path/file3"),  # directory structure is preserved
}
assert not uploaded_files.failed
assert not uploaded_files.skipped
assert not uploaded_files.missing

Upload only certain files from any folder

from onetl.impl import (
    RemoteFile,
    LocalPath,
)
from onetl.file import FileUploader

uploader = FileUploader(target_path="/remote", ...)  # no local_path set

# only absolute paths
uploaded_files = uploader.run(
    [
        "/local/file1.txt",
        "/any/nested/path/file3.txt",
    ]
)

assert uploaded_files.successful == {
    RemoteFile("/remote/file1"),
    RemoteFile("/remote/file3"),
    # directory structure is NOT preserved without local_path
}
assert not uploaded_files.failed
assert not uploaded_files.skipped
assert not uploaded_files.missing
view_files() FileSet[LocalPath]#

Get file list in the local_path. support_hooks

Returns:
FileSet[LocalPath]

Set of files in local_path

Raises:
onetl.exception.DirectoryNotFoundError

local_path does not found

NotADirectoryError

local_path is not a directory

Examples

View files

from onetl.impl import LocalPath
from onetl.file import FileUploader

uploader = FileUploader(local_path="/local", ...)

view_files = uploader.view_files()

assert view_files == {
    LocalPath("/local/file1.txt"),
    LocalPath("/local/file3.txt"),
    LocalPath("/local/nested/path/file3.txt"),
}