HDFS connection#

class onetl.connection.file_connection.hdfs.connection.HDFS(*, cluster: Cluster | None = None, host: Host | None = None, port: int = 50070, user: str | None = None, password: SecretStr | None = None, keytab: FilePath | None = None, timeout: int = 10)#

HDFS file connection. support_hooks

Powered by HDFS Python client.

Warning

Since onETL v0.7.0 to use HDFS connector you should install package as follows:

pip install onetl[hdfs]

# or
pip install onetl[files]

See File connections installation instruction for more details.

Note

To access Hadoop cluster with Kerberos installed, you should have kinit executable in some path in PATH environment variable.

See Kerberos support instruction for more details.

Parameters:
clusterstr, optional

Hadoop cluster name. For example: rnd-dwh.

Used for:
  • HWM and lineage (as instance name for file paths), if set.

  • Validation of host value,

    if latter is passed and if some hooks are bound to Slots.get_cluster_namenodes

hoststr, optional

Hadoop namenode host. For example: namenode1.domain.com.

Should be an active namenode (NOT standby).

If value is not set, but there are some hooks bound to Slots.get_cluster_namenodes and Slots.is_namenode_active, onETL will iterate over cluster namenodes to detect which one is active.

webhdfs_portint, default: 50070

Port of Hadoop namenode (WebHDFS protocol).

If omitted, but there are some hooks bound to Slots.get_webhdfs_port slot, onETL will try to detect port number for a specific cluster.

userstr, optional

User, which have access to the file source. For example: someuser.

If set, Kerberos auth will be used. Otherwise an anonymous connection is created.

passwordstr, default: None

User password.

Used for generating Kerberos ticket.

Warning

You can provide only one of the parameters: password or kinit. If you provide both, an exception will be raised.

keytabstr, default: None

LocalPath to keytab file.

Used for generating Kerberos ticket.

Warning

You can provide only one of the parameters: password or kinit. If you provide both, an exception will be raised.

timeoutint, default: 10

Connection timeout.

Examples

HDFS connection with password:

from onetl.connection import HDFS

hdfs = HDFS(
    host="namenode1.domain.com",
    user="someuser",
    password="*****",
).check()

HDFS connection with keytab:

from onetl.connection import HDFS

hdfs = HDFS(
    host="namenode1.domain.com",
    user="someuser",
    keytab="/path/to/keytab",
).check()

HDFS file connection initialization without auth (HDFS without Kerberos support)

from onetl.connection import HDFS

hdfs = HDFS(host="namenode1.domain.com").check()

HDFS connection with both cluster and host names:

from onetl.connection import HDFS

hdfs = HDFS(
    cluster="rnd-dwh",
    host="namenode1.domain.com",
    user="someuser",
    password="*****",
).check()

HDFS connection with cluster name only:

from onetl.connection import HDFS

hdfs = HDFS(
    cluster="rnd-dwh",
    user="someuser",
    password="*****",
).check()
check()#

Check source availability. support_hooks

If not, an exception will be raised.

Returns:
Connection itself
Raises:
RuntimeError

If the connection is not available

Examples

connection.check()
create_dir(path: PathLike | str) RemoteDirectory#

Creates directory tree on remote filesystem. support_hooks

Parameters:
pathstr or os.PathLike

Directory path

Returns:
Created directory with stats
Raises:
onetl.exception.NotAFileError

Path is not a file

Examples

dir_path = connection.create_dir("/path/to/dir")
download_file(remote_file_path: PathLike | str, local_file_path: PathLike | str, replace: bool = True) LocalPath#

Downloads file from the remote filesystem to a local path. support_hooks

Warning

Supports only one file download per call. Directory download is NOT supported, use File Downloader instead.

Parameters:
remote_file_pathstr or os.PathLike

Remote file path to read from

local_file_pathstr or os.PathLike

Local file path to create

replacebool, default False

If True, existing file will be replaced

Returns:
Local file with stats.
Raises:
onetl.exception.NotAFileError

Remote or local path is not a file

FileNotFoundError

Remote file does not exist

FileExistsError

Local file already exists, and replace=False

onetl.exception.FileSizeMismatchError

Target file size after download is different from source file size.

Examples

local_file = connection.download_file(
    remote_file_path="/path/to/source.csv", local_file_path="/path/to/target.csv"
)
assert local_file.exists()
assert os.fspath(local_file) == "/path/to/target.csv"
assert local_file.stat().st_size == connection.get_stat("/path/to/source.csv").st_size
classmethod get_current(**kwargs)#

Create connection for current cluster. support_hooks

Automatically sets up current cluster name as cluster.

Note

Can be used only if there are a some hooks bound to slot Slots.get_current_cluster

Parameters:
userstr
passwordstr | None
keytabstr | None
timeoutint

See HDFS constructor documentation.

Examples

from onetl.connection import HDFS

# injecting current cluster name via hooks mechanism
hdfs = HDFS.get_current(user="me", password="pass")
get_stat(path: PathLike | str) PathStatProtocol#

Returns stats for a specific path. support_hooks

Parameters:
pathstr or os.PathLike

Path to get stats for

Returns:
Stats object
Raises:
Any underlying client exception

Examples

stat = connection.get_stat("/path/to/file.csv")
assert stat.st_size > 0
assert stat.st_uid == 12345  # owner id
is_dir(path: PathLike | str) bool#

Check if specified path is a directory. support_hooks

Parameters:
pathstr or os.PathLike

Path to check

Returns:
True if path is a directory, False otherwise.
Raises:
onetl.exception.DirectoryNotFoundError

Path does not exist

Examples

assert connection.is_dir("/path/to/dir")
assert not connection.is_dir("/path/to/dir/file.csv")
is_file(path: PathLike | str) bool#

Check if specified path is a file. support_hooks

Parameters:
pathstr or os.PathLike

Path to check

Returns:
True if path is a file, False otherwise.
Raises:
FileNotFoundError

Path does not exist

Examples

assert connection.is_file("/path/to/dir/file.csv")
assert not connection.is_file("/path/to/dir")
list_dir(path: PathLike | str, filters: Iterable[BaseFileFilter] | None = None, limits: Iterable[BaseFileLimit] | None = None) list[RemoteDirectory | RemoteFile]#

Return list of child files/directories in a specific directory. support_hooks

Parameters:
pathstr or os.PathLike

Directory path to list contents.

filterslist of BaseFileFilter, optional

Return only files/directories matching these filters. See File Filters

limitslist of BaseFileLimit, optional

Apply limits to the list of files/directories, and stop if one of the limits is reached. See File Limits

Returns:
List of onetl.base.PathWithStatsProtocol
Raises:
NotADirectoryError

Path is not a directory

onetl.exception.DirectoryNotFoundError

Path does not exist

Examples

dir_content = connection.list_dir("/path/to/dir")
assert os.fspath(dir_content[0]) == "/path/to/dir/file.csv"
assert connection.path_exists("/path/to/dir/file.csv")
path_exists(path: PathLike | str) bool#

Check if specified path exists on remote filesystem. support_hooks

Parameters:
pathstr or os.PathLike

Path to check

Returns:
True if path exists, False otherwise

Examples

assert connection.path_exists("/path/to/file.csv")
assert connection.path_exists("/path/to/dir")
assert not connection.path_exists("/path/to/missing")
remove_dir(path: PathLike | str, recursive: bool = False) bool#

Remove directory or directory tree. support_hooks

If directory does not exist, no exception is raised.

Parameters:
pathstr or os.PathLike

Directory path to remote

recursivebool, default False

If True, remove directory tree recursively.

Returns:
True if directory was removed, False if directory does not exist in the first place.
Raises:
NotADirectoryError

Path is not a directory

Examples

assert connection.remove_dir("/path/to/dir")
assert not connection.path_exists("/path/to/dir/file.csv")
assert not connection.path_exists("/path/to/dir")

assert not connection.remove_dir("/path/to/dir")  # already deleted
remove_file(path: PathLike | str) bool#

Removes file on remote filesystem. support_hooks

If file does not exist, no exception is raised.

Warning

Supports only one file removal per call. Directory removal is NOT supported, use remove_dir instead.

Parameters:
pathstr or os.PathLike

File path

Returns:
True if file was removed, False if file does not exist in the first place.
Raises:
onetl.exception.NotAFileError

Path is not a file

Examples

assert connection.remove_file("/path/to/file.csv")
assert not connection.path_exists("/path/to/dir/file.csv")

assert not connection.remove_file("/path/to/file.csv")  # already deleted
rename_dir(source_dir_path: PathLike | str, target_dir_path: PathLike | str, replace: bool = False) RemoteDirectory#

Rename or move dir on remote filesystem.

Parameters:
source_dir_pathstr or os.PathLike

Old directory path

target_dir_pathstr or os.PathLike

New directory path

replacebool, default False

If True, existing directory will be replaced.

Returns:
New directory path with stats.
Raises:
NotADirectoryError

Path is not a directory

onetl.exception.DirectoryNotFoundError

Path does not exist

onetl.exception.DirectoryExistsError

Directory already exists, and replace=False

Examples

new_file = connection.rename_dir("/path/to/dir1", "/path/to/dir2")
assert connection.path_exists("/path/to/dir1")
assert not connection.path_exists("/path/to/dir2")
rename_file(source_file_path: PathLike | str, target_file_path: PathLike | str, replace: bool = False) RemoteFile#

Rename or move file on remote filesystem. support_hooks

Warning

Supports only one file move per call. Directory move/rename is NOT supported.

Parameters:
source_file_pathstr or os.PathLike

Old file path

target_file_pathstr or os.PathLike

New file path

replacebool, default False

If True, existing file will be replaced.

Returns:
New file path with stats.
Raises:
onetl.exception.NotAFileError

Source or target path is not a file

FileNotFoundError

File does not exist

FileExistsError

File already exists, and replace=False

Examples

new_file = connection.rename_file("/path/to/file1.csv", "/path/to/file2.csv")
assert connection.path_exists("/path/to/file2.csv")
assert not connection.path_exists("/path/to/file1.csv")
resolve_dir(path: PathLike | str) RemoteDirectory#

Returns directory at specific path, with stats. support_hooks

Parameters:
pathstr or os.PathLike

Path to resolve

Returns:
Directory path with stats
Raises:
onetl.exception.DirectoryNotFoundError

Path does not exist

NotADirectoryError

Path is not a directory

Examples

dir_path = connection.resolve_dir("/path/to/dir")
assert os.fspath(dir_path) == "/path/to/dir"
assert dir_path.stat.st_uid == 12345  # owner id
resolve_file(path: PathLike | str) RemoteFile#

Returns file at specific path, with stats. support_hooks

Parameters:
pathstr or os.PathLike

Path to resolve

Returns:
File path with stats
Raises:
FileNotFoundError

Path does not exist

onetl.exception.NotAFileError

Path is not a file

Examples

file_path = connection.resolve_file("/path/to/dir/file.csv")
assert os.fspath(file_path) == "/path/to/dir/file.csv"
assert file_path.stat.st_uid == 12345  # owner id
upload_file(local_file_path: PathLike | str, remote_file_path: PathLike | str, replace: bool = False) RemoteFile#

Uploads local file to a remote filesystem. support_hooks

Warning

Supports only one file upload per call. Directory upload is NOT supported, use File Uploader instead.

Parameters:
local_file_pathstr or os.PathLike

Local file path to read from

remote_file_pathstr or os.PathLike

Remote file path to create

replacebool, default False

If True, existing file will be replaced

Returns:
Remote file with stats.
Raises:
onetl.exception.NotAFileError

Remote or local path is not a file

FileNotFoundError

Local file does not exist

FileExistsError

Remote file already exists, and replace=False

onetl.exception.FileSizeMismatchError

Target file size after upload is different from source file size.

Examples

remote_file = connection.upload(
    local_file_path="/path/to/source.csv",
    remote_file_path="/path/to/target.csv",
)
assert connection.path_exists("/path/to/target.csv")
assert remote_file.stat().st_size == os.stat("/path/to/source.csv").st_size
walk(root: PathLike | str, topdown: bool = True, filters: Iterable[BaseFileFilter] | None = None, limits: Iterable[BaseFileLimit] | None = None) Iterator[tuple[RemoteDirectory, list[RemoteDirectory], list[RemoteFile]]]#

Walk into directory tree, and iterate over its content in all nesting levels. support_hooks

Just like os.walk, but with additional filter/limit logic.

Parameters:
rootstr or os.PathLike

Directory path to walk into.

topdownbool, default True

If True, walk in top-down order, otherwise walk in bottom-up order.

filterslist of BaseFileFilter, optional

Return only files/directories matching these filters. See File Filters

limitslist of BaseFileLimit, optional

Apply limits to the list of files/directories, and stop if one of the limits is reached. See File Limits

Returns:
Iterator[tuple[root, dirs, files]], like os.walk.
But all the paths are not strings, instead path classes with embedded stats are returned.
Raises:
NotADirectoryError

Path is not a directory

onetl.exception.DirectoryNotFoundError

Path does not exist

Examples

for root, dirs, files in connection.walk("/path/to/dir"):
    assert os.fspath(root) == "/path/to/dir"
    assert dirs == []
    assert os.fspath(files[0]) == "/path/to/dir/file.csv"
    assert connection.path_exists("/path/to/dir/file.csv")