HDFS connection¶
HDFS
¶
Bases: FileConnection, RenameDirMixin
Powered by HDFS Python client.
Warning
Since onETL v0.7.0 to use HDFS connector you should install package as follows:
pip install "onetl[hdfs]"
# or
pip install "onetl[files]"
Note
To access Hadoop cluster with Kerberos installed, you should have kinit executable
in some path in PATH environment variable.
See Kerberos support instruction for more details.
Parameters:
-
cluster(str) –Hadoop cluster name. For example:
rnd-dwh.Used for:
- HWM and lineage (as instance name for file paths), if set.
- Validation of
hostvalue, if latter is passed and if some hooks are bound to Slots.get_cluster_namenodes
Warning
You should pass at least one of these arguments:
cluster,host.Added in 0.7.0
-
host(str) –Hadoop namenode host. For example:
namenode1.domain.com.Should be an active namenode (NOT standby).
If value is not set, but there are some hooks bound to Slots.get_cluster_namenodes and Slots.is_namenode_active, onETL will iterate over cluster namenodes to detect which one is active.
Warning
You should pass at least one of these arguments:
cluster,host. -
webhdfs_port(int, default:50070) –Port of Hadoop namenode (WebHDFS protocol).
If omitted, but there are some hooks bound to Slots.get_webhdfs_port slot, onETL will try to detect port number for a specific
cluster. -
user(str) –User, which have access to the file source. For example:
someuser.If set, Kerberos auth will be used. Otherwise an anonymous connection is created.
-
password(str, default:None) –User password.
Used for generating Kerberos ticket.
Warning
You can provide only one of the parameters:
passwordorkinit. If you provide both, an exception will be raised. -
keytab(str, default:None) –LocalPath to keytab file.
Used for generating Kerberos ticket.
Warning
You can provide only one of the parameters:
passwordorkinit. If you provide both, an exception will be raised. -
extra–Extra options passed to underlying HDFS client.
Examples:
from onetl.connection import HDFS
hdfs = HDFS(
host="namenode1.domain.com",
user="someuser",
password="*****",
).check()
from onetl.connection import HDFS
hdfs = HDFS(
host="namenode1.domain.com",
user="someuser",
keytab="/path/to/keytab",
).check()
from onetl.connection import HDFS
hdfs = HDFS(host="namenode1.domain.com").check()
Can be used only if some third-party plugin provides HDFS Slots implementation
from onetl.connection import HDFS
hdfs = HDFS(
cluster="rnd-dwh",
user="someuser",
password="*****",
).check()
from onetl.connection import HDFS
from urllib3.util.retry import Retry
from urllib3.util.timeout import Timeout
hdfs = HDFS(
host="namenode1.domain.com",
user="someuser",
keytab="/path/to/keytab",
extra=HDFS.Extra(
timeout=Timeout(connect=10, read=60),
retry=Retry(
total=3,
backoff_factor=0.2,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "PUT", "OPTIONS"],
),
),
).check()
get_current(**kwargs)
classmethod
¶
Create connection for current cluster.
Automatically sets up current cluster name as cluster.
Note
Can be used only if there are a some hooks bound to slot Slots.get_current_cluster
Added in 0.7.0
Parameters:
-
user(str) –User which has access to HDFS. See
HDFSconstructor documentation. -
password(str | None) –User password for Kerberos. See
HDFSconstructor documentation. -
keytab(str | None) –Path to keytab file for Kerberos. See
HDFSconstructor documentation. -
extra(HDFSExtra) –Extra options passed to underlying HDFS client. See
HDFSconstructor documentation.
Examples:
from onetl.connection import HDFS
# injecting current cluster name via hooks mechanism
hdfs = HDFS.get_current(user="me", password="pass")
check()
¶
path_exists(path)
¶
Check if specified path exists on remote filesystem. .
Added in 0.8.0
Parameters:
-
path(str | PathLike) –Path to check
Returns:
-
bool–Trueif path exists,Falseotherwise
Examples:
>>> connection.path_exists("/path/to/file.csv")
True
>>> connection.path_exists("/path/to/dir")
True
>>> connection.path_exists("/path/to/missing")
False
is_file(path)
¶
Check if specified path is a file.
Added in 0.8.0
Parameters:
-
path(str | PathLike) –Path to check
Returns:
-
bool–Trueif path is a file,Falseotherwise.
Raises:
-
FileNotFoundError–Path does not exist
Examples:
>>> connection.is_file("/path/to/dir/file.csv")
True
>>> connection.is_file("/path/to/dir")
False
is_dir(path)
¶
Check if specified path is a directory.
Added in 0.8.0
Parameters:
-
path(str | PathLike) –Path to check
Returns:
-
bool–Trueif path is a directory,Falseotherwise.
Raises:
-
DirectoryNotFoundError–Path does not exist
Examples:
>>> connection.is_dir("/path/to/dir")
True
>>> connection.is_dir("/path/to/dir/file.csv")
False
get_stat(path)
¶
Returns stats for a specific path.
Added in 0.8.0
Parameters:
-
path(str | PathLike) –Path to get stats for
Returns:
-
PathStatProtocol–Stats object
Raises:
-
Any underlying client exception–
Examples:
>>> stat = connection.get_stat("/path/to/file.csv")
>>> stat.st_size # in bytes
1024
>>> stat.st_uid # owner id or name
12345
resolve_dir(path)
¶
Returns directory at specific path, with stats.
Added in 0.8.0
Parameters:
-
path(str | PathLike) –Path to resolve
Returns:
-
PathWithStatsProtocol–Directory path with stats
Raises:
-
DirectoryNotFoundError–Path does not exist
-
NotADirectoryError–Path is not a directory
Examples:
>>> dir_path = connection.resolve_dir("/path/to/dir")
>>> os.fspath(dir_path)
'/path/to/dir'
>>> dir_path.stat().st_uid # owner id
12345
resolve_file(path)
¶
Returns file at specific path, with stats.
Added in 0.8.0
Parameters:
-
path(str | PathLike) –Path to resolve
Returns:
-
PathWithStatsProtocol–File path with stats
Raises:
-
FileNotFoundError–Path does not exist
-
NotAFileError–Path is not a file
Examples:
>>> file_path = connection.resolve_file("/path/to/dir/file.csv")
>>> os.fspath(file_path)
'/path/to/dir/file.csv'
>>> file_path.stat().st_uid # owner id
12345
create_dir(path)
¶
Creates directory tree on remote filesystem.
Added in 0.8.0
Parameters:
-
path(str | PathLike) –Directory path
Returns:
-
PathWithStatsProtocol–Created directory with stats
Raises:
-
NotAFileError–Path is not a file
Examples:
>>> dir_path = connection.create_dir("/path/to/dir")
>>> os.fspath(dir_path)
'/path/to/dir'
remove_file(path)
¶
Removes file on remote filesystem.
If file does not exist, no exception is raised.
Warning
Supports only one file removal per call.
Directory removal is NOT supported, use remove_dir instead.
Added in 0.8.0
Parameters:
-
path(str | PathLike) –File path
Returns:
-
bool–Trueif file was removed,Falseif file does not exist in the first place.
Raises:
-
NotAFileError–Path is not a file
Examples:
>>> connection.remove_file("/path/to/file.csv")
True
>>> connection.path_exists("/path/to/dir/file.csv")
False
>>> connection.remove_file("/path/to/file.csv") # already deleted, no error
False
remove_dir(path, *, recursive=False)
¶
Remove directory or directory tree.
If directory does not exist, no exception is raised.
Added in 0.8.0
Parameters:
-
path(str | PathLike) –Directory path to remove
-
recursive(bool, default:False) –If
True, remove directory tree recursively (including files and subdirectories).If
False, remove only directory itself. Directory should be empty.
Returns:
-
bool–Trueif directory was removed,Falseif directory does not exist in the first place.
Raises:
-
NotADirectoryError–Path is not a directory
-
DirectoryNotEmptyError–Directory is not empty, and
recursiveisFalse
Examples:
>>> connection.remove_dir("/path/to/dir")
Traceback (most recent call last):
...
onetl.exception.DirectoryNotEmptyError: Cannot delete non-empty directory '/path/to/dir'
>>> connection.remove_dir("/path/to/dir", recirsive=True)
True
>>> connection.path_exists("/path/to/dir")
False
>>> connection.path_exists("/path/to/dir/file.csv")
False
>>> connection.remove_dir("/path/to/dir") # already deleted, no error
False
rename_dir(source_dir_path, target_dir_path, *, replace=False)
¶
Rename or move dir on remote filesystem.
Added in 0.8.0
Parameters:
-
source_dir_path(str | PathLike) –Old directory path
-
target_dir_path(str | PathLike) –New directory path
-
replace(bool, default:False) –If
True, existing directory will be replaced.
Returns:
-
PathWithStatsProtocol–New directory path with stats.
Raises:
-
NotADirectoryError–Path is not a directory
-
DirectoryNotFoundError–Path does not exist
-
DirectoryExistsError–Directory already exists, and
replace=False
Examples:
>>> new_dir = connection.rename_dir("/path/to/dir1", "/path/to/dir2")
>>> os.fspath(new_dir)
'/path/to/dir2'
>>> connection.path_exists("/path/to/dir1")
False
>>> connection.path_exists("/path/to/dir2")
True
rename_file(source_file_path, target_file_path, *, replace=False)
¶
Rename or move file on remote filesystem.
Warning
Supports only one file move per call. Directory move/rename is NOT supported.
Added in 0.8.0
Parameters:
-
source_file_path(str | PathLike) –Old file path
-
target_file_path(str | PathLike) –New file path
-
replace(bool, default:False) –If
True, existing file will be replaced.
Returns:
-
PathWithStatsProtocol–New file path with stats.
Raises:
-
NotAFileError–Source or target path is not a file
-
FileNotFoundError–File does not exist
-
FileExistsError–File already exists, and
replace=False
Examples:
>>> new_file = connection.rename_file("/path/to/file1.csv", "/path/to/file2.csv")
>>> os.fspath(new_file)
'/path/to/file2.csv'
>>> connection.path_exists("/path/to/file2.csv")
True
>>> connection.path_exists("/path/to/file1.csv")
False
list_dir(path, filters=None, limits=None)
¶
Return list of child files/directories in a specific directory.
Added in 0.8.0
Changed in 0.14.0
Method returns full file/directory path instead of just name.
Parameters:
-
path(str | PathLike) –Directory path to list contents.
-
filters(list[BaseFileFilter], default:None) –Return only files/directories matching these filters. See File Filters
-
limits(list[BaseFileLimit], default:None) –Apply limits to the list of files/directories, and stop if one of the limits is reached. See File Limits
Returns:
-
list[PathWithStatsProtocol]–Directory contents.
Raises:
-
NotADirectoryError–Path is not a directory
-
DirectoryNotFoundError–Path does not exist
Examples:
>>> dir_content = connection.list_dir("/path/to/dir")
>>> os.fspath(dir_content[0])
'/path/to/dir/file.csv'
>>> connection.path_exists("/path/to/dir/file.csv")
True
walk(root, *, topdown=True, filters=None, limits=None)
¶
Walk into directory tree, and iterate over its content in all nesting levels.
Just like os.walk, but with additional filter/limit logic.
Added in 0.8.0
Changed in 0.14.0
Method returns full file/directory path instead of just name.
Parameters:
-
root(str | PathLike) –Directory path to walk into.
-
topdown(bool, default:True) –If
True, walk in top-down order, otherwise walk in bottom-up order. -
filters(list[BaseFileFilter], default:None) –Return only files/directories matching these filters. See File Filters.
-
limits(list[BaseFileLimit], default:None) –Apply limits to the list of files/directories, and immediately stop if any of these limits is reached. See File Limits.
Returns:
-
Iterable[tuple]–Like
os.walk, but all the paths are not strings, instead path classes with embedded stats are returned.
Raises:
-
NotADirectoryError–Path is not a directory
-
DirectoryNotFoundError–Path does not exist
Examples:
>>> for root, dirs, files in connection.walk("/path/to/dir"):
... break
>>> os.fspath(root)
'/path/to/dir'
>>> dirs
[]
>>> os.fspath(files[0])
'/path/to/dir/file.csv'
>>> connection.path_exists("/path/to/dir/file.csv")
True
download_file(remote_file_path, local_file_path, *, replace=True)
¶
Downloads file from the remote filesystem to a local path.
Warning
Supports only one file download per call. Directory download is NOT supported, use File Downloader instead.
Added in 0.8.0
Parameters:
-
remote_file_path(str | PathLike) –Remote file path to read from
-
local_file_path(str | PathLike) –Local file path to create
-
replace(bool, default:False) –If
True, existing file will be replaced
Returns:
-
PathWithStatsProtocol–Local file with stats.
Raises:
-
NotAFileError–Remote or local path is not a file
-
FileNotFoundError–Remote file does not exist
-
FileExistsError–Local file already exists, and
replace=False -
FileSizeMismatchError–Target file size after download is different from source file size.
Examples:
>>> local_file = connection.download_file(
... remote_file_path="/path/to/source.csv",
... local_file_path="/path/to/target.csv",
... )
>>> os.fspath(local_file)
'/path/to/target.csv'
>>> local_file.exists()
True
>>> local_file.stat().st_size # in bytes
1024
>>> connection.get_stat("/path/to/source.csv").st_size # same size
1024
upload_file(local_file_path, remote_file_path, *, replace=False)
¶
Uploads local file to a remote filesystem.
Warning
Supports only one file upload per call. Directory upload is NOT supported, use File Uploader instead.
Added in 0.8.0
Parameters:
-
local_file_path(str | PathLike) –Local file path to read from
-
remote_file_path(str | PathLike) –Remote file path to create
-
replace(bool, default:False) –If
True, existing file will be replaced
Returns:
-
PathWithStatsProtocol–Remote file with stats.
Raises:
-
NotAFileError–Remote or local path is not a file
-
FileNotFoundError–Local file does not exist
-
FileExistsError–Remote file already exists, and
replace=False -
FileSizeMismatchError–Target file size after upload is different from source file size.
Examples:
>>> remote_file = connection.upload_file(
... local_file_path="/path/to/source.csv",
... remote_file_path="/path/to/target.csv",
... )
>>> os.fspath(remote_file)
'/path/to/target.csv'
>>> connection.path_exists("/path/to/target.csv")
True
>>> remote_file.stat().st_size # in bytes
1024
>>> os.stat("/path/to/source.csv").st_size # same as source
1024
HDFSExtra
¶
Bases: GenericOptions
Extra options for HDFS connection.
You can pass here any parameters supported by hdfs.client.Client,
without webdav_ prefix.
Parameters:
-
timeout(Timeout) –Timeout for requests, see urllib3 documentation.
-
retry(Retry) –Retry for requests, see urllib3 documentation.