Reading from MongoDB using MongoDB.pipeline
#
MongoDB.sql
allows passing custom pipeline,
but does not support incremental strategies.
Warning
Please take into account MongoDB <-> Spark type mapping
Recommendations#
Pay attention to pipeline
value#
Instead of filtering data on Spark side using df.filter(df.column == 'value')
pass proper mongodb.pipeline(..., pipeline={"$match": {"column": {"$eq": "value"}}})
value.
This both reduces the amount of data send from MongoDB to Spark, and may also improve performance of the query.
Especially if there are indexes for columns used in pipeline
value.
References#
- MongoDB.pipeline(collection: str, pipeline: dict | list[dict] | None = None, df_schema: StructType | None = None, options: MongoDBPipelineOptions | dict | None = None)#
Execute a pipeline for a specific collection, and return DataFrame.
Almost like Aggregation pipeline syntax in MongoDB:
db.collection_name.aggregate([{"$match": ...}, {"$group": ...}])
but pipeline is executed on Spark executors, in a distributed way.
Note
This method does not support Read Strategies, use
DBReader
instead- Parameters:
- collectionstr
Collection name.
- pipelinedict | list[dict], optional
Pipeline containing a database query. See Aggregation pipeline syntax.
- df_schemaStructType, optional
Schema describing the resulting DataFrame.
- optionsPipelineOptions | dict, optional
Additional pipeline options, see
PipelineOptions
.
Examples
Get document with a specific
field
value:df = connection.pipeline( collection="collection_name", pipeline={"$match": {"field": {"$eq": 1}}}, )
Calculate aggregation and get result:
df = connection.pipeline( collection="collection_name", pipeline={ "$group": { "_id": 1, "min": {"$min": "$column_int"}, "max": {"$max": "$column_int"}, } }, )
Explicitly pass DataFrame schema:
from pyspark.sql.types import ( DoubleType, IntegerType, StringType, StructField, StructType, TimestampType, ) df_schema = StructType( [ StructField("_id", StringType()), StructField("some_string", StringType()), StructField("some_int", IntegerType()), StructField("some_datetime", TimestampType()), StructField("some_float", DoubleType()), ], ) df = connection.pipeline( collection="collection_name", df_schema=df_schema, pipeline={"$match": {"some_int": {"$gt": 999}}}, )
Pass additional options to pipeline execution:
df = connection.pipeline( collection="collection_name", pipeline={"$match": {"field": {"$eq": 1}}}, options=MongoDB.PipelineOptions(hint={"field": 1}), )
- pydantic model onetl.connection.db_connection.mongodb.options.MongoDBPipelineOptions#
Aggregation pipeline options for MongoDB connector.
The only difference from
MongoDBReadOptions
that it is allowed to pass thehint
parameter.Note
You can pass any value supported by connector, even if it is not mentioned in this documentation.
The set of supported options depends on connector version. See link above.
Warning
Options
uri
,database
,collection
,pipeline
are populated from connection attributes, and cannot be overridden by the user inPipelineOptions
to avoid issues.Examples
Pipeline options initialization
MongoDB.PipelineOptions( hint={"some_field": 1}, )