MongoDB <-> Spark type mapping#
Type detection & casting#
Spark’s DataFrames always have a schema
which is a list of fields with corresponding Spark types. All operations on a field are performed using field type.
MongoDB is, by design, __schemaless__. So there are 2 ways how this can be handled:
User provides DataFrame schema explicitly:
See example
from onetl.connection import MongoDB from onetl.db import DBReader from pyspark.sql.types import ( StructType, StructField, IntegerType, StringType, TimestampType, ) mongodb = MongoDB(...) df_schema = StructType( [ StructField("_id", StringType()), StructField("some", StringType()), StructField( "field", StructType( [ StructField("nested", IntegerType()), ] ), ), ] ) reader = DBReader( connection=mongodb, source="some_collection", df_schema=df_schema, ) df = reader.run() # or df = mongodb.pipeline( collection="some_collection", df_schema=df_schema, )
Rely on MongoDB connector schema infer:
df = mongodb.pipeline(collection="some_collection")
In this case MongoDB connector read a sample of collection documents, and build DataFrame schema based on document fields and values.
It is highly recommended to pass df_schema
explicitly, to avoid type conversion issues.
References#
Here you can find source code with type conversions:
Supported types#
Numeric types#
MongoDB type (read) |
Spark type |
MongoDB type (write) |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Temporal types#
MongoDB type (read) |
Spark type |
MongoDB type (write) |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
unsupported |
|
|
Warning
Note that types in MongoDB and Spark have different value ranges:
MongoDB type |
Min value |
Max value |
Spark type |
Min value |
Max value |
---|---|---|---|---|---|
|
-290 million years |
290 million years |
|
|
|
|
|
|
So not all values can be read from MongoDB to Spark, and can written from Spark DataFrame to MongoDB.
String types#
Note: fields of deprecated MongoDB type Symbol
are excluded during read.
MongoDB type (read) |
Spark type |
MongoDB type (write) |
---|---|---|
|
|
|
|
||
|
Binary types#
MongoDB type (read) |
Spark type |
MongoDB type (write) |
---|---|---|
|
|
|
|
|
|
Struct types#
MongoDB type (read) |
Spark type |
MongoDB type (write) |
---|---|---|
|
|
|
|
|
|
|
|
Special types#
MongoDB type (read) |
Spark type |
MongoDB type (write) |
---|---|---|
|
|
|
|
||
|
||
|
|
|
|
||
|
|
|
Explicit type cast#
DBReader
#
Currently it is not possible to cast field types using DBReader
. But this can be done using MongoDB.pipeline
.
MongoDB.pipeline
#
You can use $project
aggregation to cast field types:
from pyspark.sql.types import IntegerType, StructField, StructType
from onetl.connection import MongoDB
from onetl.db import DBReader
mongodb = MongoDB(...)
df = mongodb.pipeline(
collection="my_collection",
pipeline=[
{
"$project": {
# convert unsupported_field to string
"unsupported_field_str": {
"$convert": {
"input": "$unsupported_field",
"to": "string",
},
},
# skip unsupported_field from result
"unsupported_field": 0,
}
}
],
)
# cast field content to proper Spark type
df = df.select(
df.id,
df.supported_field,
# explicit cast
df.unsupported_field_str.cast("integer").alias("parsed_integer"),
)
DBWriter
#
Convert dataframe field to string on Spark side, and then write it to MongoDB:
df = df.select(
df.id,
df.unsupported_field.cast("string").alias("array_field_json"),
)
writer.run(df)