MongoDB Connection#

class onetl.connection.db_connection.mongodb.connection.MongoDB(*, spark: SparkSession, database: str, host: Host, user: str, password: SecretStr, port: int = 27017, extra: MongoDBExtra = MongoDBExtra())#

MongoDB connection. support_hooks

Based on package org.mongodb.spark:mongo-spark-connector:10.1.1 (MongoDB connector for Spark)

Warning

Before using this connector please take into account Prerequisites

Parameters:
hoststr

Host of MongoDB. For example: test.mongodb.com or 193.168.1.17.

portint, default: 27017.

Port of MongoDB

userstr

User, which have proper access to the database. For example: some_user.

passwordstr

Password for database connection.

databasestr

Database in MongoDB.

extradict, default: None

Specifies one or more extra parameters by which clients can connect to the instance.

For example: {"tls": "false"}

See Connection string options documentation for more details

sparkpyspark.sql.SparkSession

Spark session.

Examples

MongoDB connection initialization.

from onetl.connection import MongoDB
from pyspark.sql import SparkSession

# Create Spark session with MongoDB connector loaded
maven_packages = MongoDB.get_packages(spark_version="3.4")
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .getOrCreate()
)

# Create connection
mongo = MongoDB(
    host="master.host.or.ip",
    user="user",
    password="*****",
    database="target_database",
    spark=spark,
).check()
classmethod get_packages(scala_version: str | Version | None = None, spark_version: str | Version | None = None) list[str]#

Get package names to be downloaded by Spark. support_hooks

Warning

You should pass at least one parameter.

Parameters:
scala_versionstr, optional

Scala version in format major.minor.

If None, spark_version is used to determine Scala version.

spark_versionstr, optional

Spark version in format major.minor.

Used only if scala_version=None.

Examples

from onetl.connection import MongoDB

MongoDB.get_packages(scala_version="2.12")
MongoDB.get_packages(spark_version="3.4")
check()#

Check source availability. support_hooks

If not, an exception will be raised.

Returns:
Connection itself
Raises:
RuntimeError

If the connection is not available

Examples

connection.check()