HDFS connection#
- class onetl.connection.file_connection.hdfs.connection.HDFS(*, cluster: Cluster | None = None, host: Host | None = None, port: int = 50070, user: str | None = None, password: SecretStr | None = None, keytab: FilePath | None = None, timeout: int = 10)#
-
Powered by HDFS Python client.
Warning
Since onETL v0.7.0 to use HDFS connector you should install package as follows:
pip install onetl[hdfs] # or pip install onetl[files]
See File connections installation instruction for more details.
Note
To access Hadoop cluster with Kerberos installed, you should have
kinit
executable in some path inPATH
environment variable.See Kerberos support instruction for more details.
- Parameters:
- clusterstr, optional
Hadoop cluster name. For example:
rnd-dwh
.- Used for:
HWM and lineage (as instance name for file paths), if set.
- Validation of
host
value, if latter is passed and if some hooks are bound to
Slots.get_cluster_namenodes
- Validation of
- hoststr, optional
Hadoop namenode host. For example:
namenode1.domain.com
.Should be an active namenode (NOT standby).
If value is not set, but there are some hooks bound to
Slots.get_cluster_namenodes
andSlots.is_namenode_active
, onETL will iterate over cluster namenodes to detect which one is active.- webhdfs_portint, default:
50070
Port of Hadoop namenode (WebHDFS protocol).
If omitted, but there are some hooks bound to
Slots.get_webhdfs_port
slot, onETL will try to detect port number for a specificcluster
.- userstr, optional
User, which have access to the file source. For example:
someuser
.If set, Kerberos auth will be used. Otherwise an anonymous connection is created.
- passwordstr, default:
None
User password.
Used for generating Kerberos ticket.
Warning
You can provide only one of the parameters:
password
orkinit
. If you provide both, an exception will be raised.- keytabstr, default:
None
LocalPath to keytab file.
Used for generating Kerberos ticket.
Warning
You can provide only one of the parameters:
password
orkinit
. If you provide both, an exception will be raised.- timeoutint, default:
10
Connection timeout.
Examples
HDFS connection with password:
from onetl.connection import HDFS hdfs = HDFS( host="namenode1.domain.com", user="someuser", password="*****", ).check()
HDFS connection with keytab:
from onetl.connection import HDFS hdfs = HDFS( host="namenode1.domain.com", user="someuser", keytab="/path/to/keytab", ).check()
HDFS file connection initialization without auth (HDFS without Kerberos support)
from onetl.connection import HDFS hdfs = HDFS(host="namenode1.domain.com").check()
HDFS connection with both cluster and host names:
from onetl.connection import HDFS hdfs = HDFS( cluster="rnd-dwh", host="namenode1.domain.com", user="someuser", password="*****", ).check()
HDFS connection with cluster name only:
from onetl.connection import HDFS hdfs = HDFS( cluster="rnd-dwh", user="someuser", password="*****", ).check()
- check()#
-
If not, an exception will be raised.
- Returns:
- Connection itself
- Raises:
- RuntimeError
If the connection is not available
Examples
connection.check()
- create_dir(path: PathLike | str) RemoteDirectory #
Creates directory tree on remote filesystem.
- Parameters:
- pathstr or
os.PathLike
Directory path
- pathstr or
- Returns:
- Created directory with stats
- Raises:
onetl.exception.NotAFileError
Path is not a file
Examples
dir_path = connection.create_dir("/path/to/dir")
- download_file(remote_file_path: PathLike | str, local_file_path: PathLike | str, replace: bool = True) LocalPath #
Downloads file from the remote filesystem to a local path.
Warning
Supports only one file download per call. Directory download is NOT supported, use File Downloader instead.
- Parameters:
- remote_file_pathstr or
os.PathLike
Remote file path to read from
- local_file_pathstr or
os.PathLike
Local file path to create
- replacebool, default
False
If
True
, existing file will be replaced
- remote_file_pathstr or
- Returns:
- Local file with stats.
- Raises:
onetl.exception.NotAFileError
Remote or local path is not a file
- FileNotFoundError
Remote file does not exist
- FileExistsError
Local file already exists, and
replace=False
onetl.exception.FileSizeMismatchError
Target file size after download is different from source file size.
Examples
local_file = connection.download_file( remote_file_path="/path/to/source.csv", local_file_path="/path/to/target.csv" ) assert local_file.exists() assert os.fspath(local_file) == "/path/to/target.csv" assert local_file.stat().st_size == connection.get_stat("/path/to/source.csv").st_size
- classmethod get_current(**kwargs)#
Create connection for current cluster.
Automatically sets up current cluster name as
cluster
.Note
Can be used only if there are a some hooks bound to slot
Slots.get_current_cluster
- Parameters:
- userstr
- passwordstr | None
- keytabstr | None
- timeoutint
See
HDFS
constructor documentation.
Examples
from onetl.connection import HDFS # injecting current cluster name via hooks mechanism hdfs = HDFS.get_current(user="me", password="pass")
- get_stat(path: PathLike | str) PathStatProtocol #
Returns stats for a specific path.
- Parameters:
- pathstr or
os.PathLike
Path to get stats for
- pathstr or
- Returns:
- Stats object
- Raises:
- Any underlying client exception
Examples
stat = connection.get_stat("/path/to/file.csv") assert stat.st_size > 0 assert stat.st_uid == 12345 # owner id
- is_dir(path: PathLike | str) bool #
Check if specified path is a directory.
- Parameters:
- pathstr or
os.PathLike
Path to check
- pathstr or
- Returns:
True
if path is a directory,False
otherwise.
- Raises:
onetl.exception.DirectoryNotFoundError
Path does not exist
Examples
assert connection.is_dir("/path/to/dir") assert not connection.is_dir("/path/to/dir/file.csv")
- is_file(path: PathLike | str) bool #
Check if specified path is a file.
- Parameters:
- pathstr or
os.PathLike
Path to check
- pathstr or
- Returns:
True
if path is a file,False
otherwise.
- Raises:
- FileNotFoundError
Path does not exist
Examples
assert connection.is_file("/path/to/dir/file.csv") assert not connection.is_file("/path/to/dir")
- list_dir(path: PathLike | str, filters: Iterable[BaseFileFilter] | None = None, limits: Iterable[BaseFileLimit] | None = None) list[RemoteDirectory | RemoteFile] #
Return list of child files/directories in a specific directory.
- Parameters:
- pathstr or
os.PathLike
Directory path to list contents.
- filterslist of
BaseFileFilter
, optional Return only files/directories matching these filters. See File Filters
- limitslist of
BaseFileLimit
, optional Apply limits to the list of files/directories, and stop if one of the limits is reached. See File Limits
- pathstr or
- Returns:
- List of
onetl.base.PathWithStatsProtocol
- List of
- Raises:
- NotADirectoryError
Path is not a directory
onetl.exception.DirectoryNotFoundError
Path does not exist
Examples
dir_content = connection.list_dir("/path/to/dir") assert os.fspath(dir_content[0]) == "/path/to/dir/file.csv" assert connection.path_exists("/path/to/dir/file.csv")
- path_exists(path: PathLike | str) bool #
Check if specified path exists on remote filesystem.
- Parameters:
- pathstr or
os.PathLike
Path to check
- pathstr or
- Returns:
True
if path exists,False
otherwise
Examples
assert connection.path_exists("/path/to/file.csv") assert connection.path_exists("/path/to/dir") assert not connection.path_exists("/path/to/missing")
- remove_dir(path: PathLike | str, recursive: bool = False) bool #
Remove directory or directory tree.
If directory does not exist, no exception is raised.
- Parameters:
- pathstr or
os.PathLike
Directory path to remote
- recursivebool, default
False
If
True
, remove directory tree recursively.
- pathstr or
- Returns:
True
if directory was removed,False
if directory does not exist in the first place.
- Raises:
- NotADirectoryError
Path is not a directory
Examples
assert connection.remove_dir("/path/to/dir") assert not connection.path_exists("/path/to/dir/file.csv") assert not connection.path_exists("/path/to/dir") assert not connection.remove_dir("/path/to/dir") # already deleted
- remove_file(path: PathLike | str) bool #
Removes file on remote filesystem.
If file does not exist, no exception is raised.
Warning
Supports only one file removal per call. Directory removal is NOT supported, use
remove_dir
instead.- Parameters:
- pathstr or
os.PathLike
File path
- pathstr or
- Returns:
True
if file was removed,False
if file does not exist in the first place.
- Raises:
onetl.exception.NotAFileError
Path is not a file
Examples
assert connection.remove_file("/path/to/file.csv") assert not connection.path_exists("/path/to/dir/file.csv") assert not connection.remove_file("/path/to/file.csv") # already deleted
- rename_dir(source_dir_path: PathLike | str, target_dir_path: PathLike | str, replace: bool = False) RemoteDirectory #
Rename or move dir on remote filesystem.
- Parameters:
- source_dir_pathstr or
os.PathLike
Old directory path
- target_dir_pathstr or
os.PathLike
New directory path
- replacebool, default
False
If
True
, existing directory will be replaced.
- source_dir_pathstr or
- Returns:
- New directory path with stats.
- Raises:
- NotADirectoryError
Path is not a directory
onetl.exception.DirectoryNotFoundError
Path does not exist
onetl.exception.DirectoryExistsError
Directory already exists, and
replace=False
Examples
new_file = connection.rename_dir("/path/to/dir1", "/path/to/dir2") assert connection.path_exists("/path/to/dir1") assert not connection.path_exists("/path/to/dir2")
- rename_file(source_file_path: PathLike | str, target_file_path: PathLike | str, replace: bool = False) RemoteFile #
Rename or move file on remote filesystem.
Warning
Supports only one file move per call. Directory move/rename is NOT supported.
- Parameters:
- source_file_pathstr or
os.PathLike
Old file path
- target_file_pathstr or
os.PathLike
New file path
- replacebool, default
False
If
True
, existing file will be replaced.
- source_file_pathstr or
- Returns:
- New file path with stats.
- Raises:
onetl.exception.NotAFileError
Source or target path is not a file
- FileNotFoundError
File does not exist
- FileExistsError
File already exists, and
replace=False
Examples
new_file = connection.rename_file("/path/to/file1.csv", "/path/to/file2.csv") assert connection.path_exists("/path/to/file2.csv") assert not connection.path_exists("/path/to/file1.csv")
- resolve_dir(path: PathLike | str) RemoteDirectory #
Returns directory at specific path, with stats.
- Parameters:
- pathstr or
os.PathLike
Path to resolve
- pathstr or
- Returns:
- Directory path with stats
- Raises:
onetl.exception.DirectoryNotFoundError
Path does not exist
- NotADirectoryError
Path is not a directory
Examples
dir_path = connection.resolve_dir("/path/to/dir") assert os.fspath(dir_path) == "/path/to/dir" assert dir_path.stat.st_uid == 12345 # owner id
- resolve_file(path: PathLike | str) RemoteFile #
Returns file at specific path, with stats.
- Parameters:
- pathstr or
os.PathLike
Path to resolve
- pathstr or
- Returns:
- File path with stats
- Raises:
- FileNotFoundError
Path does not exist
onetl.exception.NotAFileError
Path is not a file
Examples
file_path = connection.resolve_file("/path/to/dir/file.csv") assert os.fspath(file_path) == "/path/to/dir/file.csv" assert file_path.stat.st_uid == 12345 # owner id
- upload_file(local_file_path: PathLike | str, remote_file_path: PathLike | str, replace: bool = False) RemoteFile #
Uploads local file to a remote filesystem.
Warning
Supports only one file upload per call. Directory upload is NOT supported, use File Uploader instead.
- Parameters:
- local_file_pathstr or
os.PathLike
Local file path to read from
- remote_file_pathstr or
os.PathLike
Remote file path to create
- replacebool, default
False
If
True
, existing file will be replaced
- local_file_pathstr or
- Returns:
- Remote file with stats.
- Raises:
onetl.exception.NotAFileError
Remote or local path is not a file
- FileNotFoundError
Local file does not exist
- FileExistsError
Remote file already exists, and
replace=False
onetl.exception.FileSizeMismatchError
Target file size after upload is different from source file size.
Examples
remote_file = connection.upload( local_file_path="/path/to/source.csv", remote_file_path="/path/to/target.csv", ) assert connection.path_exists("/path/to/target.csv") assert remote_file.stat().st_size == os.stat("/path/to/source.csv").st_size
- walk(root: PathLike | str, topdown: bool = True, filters: Iterable[BaseFileFilter] | None = None, limits: Iterable[BaseFileLimit] | None = None) Iterator[tuple[RemoteDirectory, list[RemoteDirectory], list[RemoteFile]]] #
Walk into directory tree, and iterate over its content in all nesting levels.
Just like
os.walk
, but with additional filter/limit logic.- Parameters:
- rootstr or
os.PathLike
Directory path to walk into.
- topdownbool, default
True
If
True
, walk in top-down order, otherwise walk in bottom-up order.- filterslist of
BaseFileFilter
, optional Return only files/directories matching these filters. See File Filters
- limitslist of
BaseFileLimit
, optional Apply limits to the list of files/directories, and stop if one of the limits is reached. See File Limits
- rootstr or
- Returns:
Iterator[tuple[root, dirs, files]]
, likeos.walk
.- But all the paths are not strings, instead path classes with embedded stats are returned.
- Raises:
- NotADirectoryError
Path is not a directory
onetl.exception.DirectoryNotFoundError
Path does not exist
Examples
for root, dirs, files in connection.walk("/path/to/dir"): assert os.fspath(root) == "/path/to/dir" assert dirs == [] assert os.fspath(files[0]) == "/path/to/dir/file.csv" assert connection.path_exists("/path/to/dir/file.csv")