Reading from Greenplum using DBReader
#
Data can be read from Greenplum to Spark using DBReader
.
It also supports Read Strategies for incremental data reading.
Warning
Please take into account Greenplum <-> Spark type mapping.
Note
Unlike JDBC connectors, Greenplum connector for Spark does not support
executing custom SQL queries using .sql
method. Connector can be used to only read data from a table or view.
Supported DBReader features#
✅︎
columns
(see note below)✅︎
where
(see note below)✅︎
hwm
(see note below), supported strategies:❌
hint
(is not supported by Greenplum)❌
df_schema
✅︎
options
(seeGreenplumReadOptions
)
Warning
In case of Greenplum connector, DBReader
does not generate raw SELECT
query. Instead it relies on Spark SQL syntax
which in some cases (using column projection and predicate pushdown) can be converted to Greenplum SQL.
So columns
, where
and hwm.expression
should be specified in Spark SQL syntax, not Greenplum SQL.
This is OK:
DBReader(
columns=[
"some_column",
# this cast is executed on Spark side
"CAST(another_column AS STRING)",
],
# this predicate is parsed by Spark, and can be pushed down to Greenplum
where="some_column LIKE 'val1%'",
)
This is will fail:
DBReader(
columns=[
"some_column",
# Spark does not have `text` type
"CAST(another_column AS text)",
],
# Spark does not support ~ syntax for regexp matching
where="some_column ~ 'val1.*'",
)
Examples#
Snapshot strategy:
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
reader = DBReader(
connection=greenplum,
source="schema.table",
columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
where="key = 'something'",
)
df = reader.run()
Incremental strategy:
from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
greenplum = Greenplum(...)
reader = DBReader(
connection=greenplum,
source="schema.table",
columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
where="key = 'something'",
hwm=DBReader.AutoDetectHWM(name="greenplum_hwm", expression="updated_dt"),
)
with IncrementalStrategy():
df = reader.run()
Interaction schema#
High-level schema is described in Prerequisites. You can find detailed interaction schema below.
Spark <-> Greenplum interaction during DBReader.run()
Recommendations#
Select only required columns#
Instead of passing "*"
in DBReader(columns=[...])
prefer passing exact column names. This reduces the amount of data passed from Greenplum to Spark.
Pay attention to where
value#
Instead of filtering data on Spark side using df.filter(df.column == 'value')
pass proper DBReader(where="column = 'value'")
clause.
This both reduces the amount of data send from Greenplum to Spark, and may also improve performance of the query.
Especially if there are indexes or partitions for columns used in where
clause.
Read data in parallel#
DBReader
in case of Greenplum connector requires view or table to have a column which is used by Spark
for parallel reads.
Choosing proper column allows each Spark executor to read only part of data stored in the specified segment, avoiding moving large amounts of data between segments, which improves reading performance.
Using gp_segment_id
#
By default, DBReader
will use gp_segment_id
column for parallel data reading. Each DataFrame partition will contain data of a specific Greenplum segment.
This allows each Spark executor read only data from specific Greenplum segment, avoiding moving large amounts of data between segments.
If view is used, it is recommended to include gp_segment_id
column to this view:
Reading from view with gp_segment_id column
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE VIEW schema.view_with_gp_segment_id AS
SELECT
id,
some_column,
another_column,
gp_segment_id -- IMPORTANT
FROM schema.some_table
""",
)
reader = DBReader(
connection=greenplum,
source="schema.view_with_gp_segment_id",
)
df = reader.run()
Using custom partition_column
#
Sometimes table or view is lack of gp_segment_id
column, but there is some column
with value range correlated with Greenplum segment distribution.
In this case, custom column can be used instead:
Reading from view with custom partition_column
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE VIEW schema.view_with_partition_column AS
SELECT
id,
some_column,
part_column -- correlated to greenplum segment ID
FROM schema.some_table
""",
)
reader = DBReader(
connection=greenplum,
source="schema.view_with_partition_column",
options=Greenplum.Options(
# parallelize data using specified column
partition_column="part_column",
# create 10 Spark tasks, each will read only part of table data
num_partitions=10,
),
)
df = reader.run()
Reading DISTRIBUTED REPLICATED
tables#
Replicated tables do not have gp_segment_id
column at all, so you need to set partition_column
to some column name
of type integer/bigint/smallint.
Parallel JOIN
execution#
In case of using views which require some data motion between Greenplum segments, like JOIN
queries, another approach should be used.
Each Spark executor N will run the same query, so each of N query will start its own JOIN process, leading to really heavy load on Greenplum segments. This should be avoided.
Instead is recommended to run JOIN
query on Greenplum side, save the result to an intermediate table,
and then read this table using DBReader
:
Reading from view using intermediate table
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE UNLOGGED TABLE schema.intermediate_table AS
SELECT
id,
tbl1.col1,
tbl1.data,
tbl2.another_data
FROM
schema.table1 as tbl1
JOIN
schema.table2 as tbl2
ON
tbl1.col1 = tbl2.col2
WHERE ...
""",
)
reader = DBReader(
connection=greenplum,
source="schema.intermediate_table",
)
df = reader.run()
# write dataframe somethere
greenplum.execute(
"""
DROP TABLE schema.intermediate_table
""",
)
Warning
NEVER do that:
df1 = DBReader(connection=greenplum, table="public.table1", ...).run()
df2 = DBReader(connection=greenplum, table="public.table2", ...).run()
joined_df = df1.join(df2, on="col")
This will lead to sending all the data from both table1
and table2
to Spark executor memory, and then JOIN
will be performed on Spark side, not inside Greenplum. This is VERY inefficient.
TEMPORARY
tables notice#
Someone could think that writing data from view or result of JOIN
to TEMPORARY
table,
and then passing it to DBReader
, is an efficient way to read data from Greenplum. This is because temp tables are not generating WAL files,
and are automatically deleted after finishing the transaction.
That will NOT work. Each Spark executor establishes its own connection to Greenplum. And each connection starts its own transaction which means that every executor will read empty temporary table.
You should use UNLOGGED tables to write data to intermediate table without generating WAL logs.
Options#
- pydantic model onetl.connection.db_connection.greenplum.options.GreenplumReadOptions#
VMware’s Greenplum Spark connector reading options.
Note
You can pass any value supported by connector, even if it is not mentioned in this documentation.
The set of supported options depends on connector version. See link above.
Warning
Some options, like
url
,dbtable
,server.*
,pool.*
, etc are populated from connection attributes, and cannot be overridden by the user inReadOptions
to avoid issues.Examples
Read options initialization
Greenplum.ReadOptions( partition_column="reg_id", num_partitions=10, )
- field partition_column: str | None = None (alias 'partitionColumn')#
Column used to parallelize reading from a table.
Warning
You should not change this option, unless you know what you’re doing.
It’s preferable to use default values to read data parallel by number of segments in Greenplum cluster.
- Possible values:
None
(default):Spark generates N jobs (where N == number of segments in Greenplum cluster), each job is reading only data from a specific segment (filtering data by
gp_segment_id
column).This is very effective way to fetch the data from a cluster.
- table column
Allocate each executor a range of values from a specific column.
Note
Column type must be numeric. Other types are not supported.
Spark generates for each executor an SQL query like:
Executor 1:
SELECT ... FROM table WHERE (partition_column >= lowerBound OR partition_column IS NULL) AND partition_column < (lower_bound + stride)
Executor 2:
SELECT ... FROM table WHERE partition_column >= (lower_bound + stride) AND partition_column < (lower_bound + 2 * stride)
…
Executor N:
SELECT ... FROM table WHERE partition_column >= (lower_bound + (N-1) * stride) AND partition_column <= upper_bound
Where
stride=(upper_bound - lower_bound) / num_partitions
,lower_bound=MIN(partition_column)
,upper_bound=MAX(partition_column)
.Note
num_partitions
is used just to calculate the partition stride, NOT for filtering the rows in table. So all rows in the table will be returned (unlike Incremental Read Strategies).Note
All queries are executed in parallel. To execute them sequentially, use Batch Read Strategies.
Warning
Both options
partition_column
andnum_partitions
should have a value, or both should beNone
Examples
Read data in 10 parallel jobs by range of values in
id_column
column:Greenplum.ReadOptions( partition_column="id_column", num_partitions=10, )
- field num_partitions: int | None = None (alias 'partitions')#
Number of jobs created by Spark to read the table content in parallel.
See documentation for
partition_column
for more detailsWarning
By default connector uses number of segments in the Greenplum cluster. You should not change this option, unless you know what you’re doing
Warning
Both options
partition_column
andnum_partitions
should have a value, or both should beNone