Reading from MongoDB using DBReader#

DBReader supports Read Strategies for incremental data reading, but does not support custom pipelines, e.g. aggregation.

Warning

Please take into account MongoDB <-> Spark type mapping

Supported DBReader features#

Examples#

Snapshot strategy:

from onetl.connection import MongoDB
from onetl.db import DBReader

from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    TimestampType,
)

mongodb = MongoDB(...)

# mandatory
df_schema = StructType(
    [
        StructField("_id", StringType()),
        StructField("some", StringType()),
        StructField(
            "field",
            StructType(
                [
                    StructField("nested", IntegerType()),
                ],
            ),
        ),
        StructField("updated_dt", TimestampType()),
    ]
)

reader = DBReader(
    connection=mongodb,
    source="some_collection",
    df_schema=df_schema,
    where={"field": {"$eq": 123}},
    hint={"field": 1},
    options=MongoDBReadOptions(batchSize=10000),
)
df = reader.run()

Incremental strategy:

from onetl.connection import MongoDB
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy

from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    TimestampType,
)

mongodb = MongoDB(...)

# mandatory
df_schema = StructType(
    [
        StructField("_id", StringType()),
        StructField("some", StringType()),
        StructField(
            "field",
            StructType(
                [
                    StructField("nested", IntegerType()),
                ],
            ),
        ),
        StructField("updated_dt", TimestampType()),
    ]
)

reader = DBReader(
    connection=mongodb,
    source="some_collection",
    df_schema=df_schema,
    where={"field": {"$eq": 123}},
    hint={"field": 1},
    hwm=DBReader.AutoDetectHWM(name="mongodb_hwm", expression="updated_dt"),
    options=MongoDBReadOptions(batchSize=10000),
)

with IncrementalStrategy():
    df = reader.run()

Recommendations#

Pay attention to where value#

Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper DBReader(where={"column": {"$eq": "value"}}) clause. This both reduces the amount of data send from MongoDB to Spark, and may also improve performance of the query. Especially if there are indexes for columns used in where clause.

Read options#

pydantic model onetl.connection.db_connection.mongodb.options.MongoDBReadOptions#

Reading options for MongoDB connector.

Note

You can pass any value supported by connector, even if it is not mentioned in this documentation.

The set of supported options depends on connector version. See link above.

Warning

Options uri, database, collection, pipeline, hint are populated from connection attributes, and cannot be overridden by the user in ReadOptions to avoid issues.

Examples

Read options initialization

MongoDB.ReadOptions(
    batchSize=10000,
)