Reading from Greenplum using DBReader#

Data can be read from Greenplum to Spark using DBReader. It also supports Read Strategies for incremental data reading.

Warning

Please take into account Greenplum <-> Spark type mapping.

Note

Unlike JDBC connectors, Greenplum connector for Spark does not support executing custom SQL queries using .sql method. Connector can be used to only read data from a table or view.

Supported DBReader features#

Warning

In case of Greenplum connector, DBReader does not generate raw SELECT query. Instead it relies on Spark SQL syntax which in some cases (using column projection and predicate pushdown) can be converted to Greenplum SQL.

So columns, where and hwm.expression should be specified in Spark SQL syntax, not Greenplum SQL.

This is OK:

DBReader(
    columns=[
        "some_column",
        # this cast is executed on Spark side
        "CAST(another_column AS STRING)",
    ],
    # this predicate is parsed by Spark, and can be pushed down to Greenplum
    where="some_column LIKE 'val1%'",
)

This is will fail:

DBReader(
    columns=[
        "some_column",
        # Spark does not have `text` type
        "CAST(another_column AS text)",
    ],
    # Spark does not support ~ syntax for regexp matching
    where="some_column ~ 'val1.*'",
)

Examples#

Snapshot strategy:

from onetl.connection import Greenplum
from onetl.db import DBReader

greenplum = Greenplum(...)

reader = DBReader(
    connection=greenplum,
    source="schema.table",
    columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
    where="key = 'something'",
)
df = reader.run()

Incremental strategy:

from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy

greenplum = Greenplum(...)

reader = DBReader(
    connection=greenplum,
    source="schema.table",
    columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
    where="key = 'something'",
    hwm=DBReader.AutoDetectHWM(name="greenplum_hwm", expression="updated_dt"),
)

with IncrementalStrategy():
    df = reader.run()

Interaction schema#

High-level schema is described in Prerequisites. You can find detailed interaction schema below.

Spark <-> Greenplum interaction during DBReader.run()
https://www.plantuml.com/plantuml/svg/hLVRRk8m47tlLupwiEeLYJooswefX18jTQeatQvMICXDJB4Q7DOnlKZxyJifWLo1f0NbWSHpFMTcR5z5Yagrc0K7Ygk0mOraJCo3nGncD59Cme_pIt3cL3w3B_aBamUG_1x3DpZyM3bSVyCtnJq-fq8bQudMLX7snho52cMxbfHMIugaKapuSR07-SZNYMumLmHYfrZ4dcPCg4r9b0GtvL4It9RAnKNAMCkRCk_vw1W_fWBKK_o5AhQ9ZlndGBBBcSFfANIWQn5YTDs3ZQhLkgWuCBe_mUo3gryF3J1-cuxhmDDyhEXZm59pPTqFataldPoLPcbqm1bO3-1WGBSwz2txyq4SSII2Ft6Xpi0BWpQi_ccWs1l-RRLQnpcQUjSsepwJBRaG1OugWYysnf82nzLj5rCtklSkHXzdu4zgXG9tg6cwfaMWNT-lja-_Mds_P9z-IGgq2B4H97-QAWWd5PKrvH7WuuU2eTMuBMaGiE1i95QHHK1blEQpEVOYDkT8_5izlue9PkZRrbUseRi6FDZcgh9TmoRwC2dqA3OrNfkgLtbmxvZa1gZlInP55sljyJICL2TzdOTIVHy9apW60cqIrzhblczRj-ksWOU1OHlnQpAgnXZSHQR22e1NjQH1lIcyJZaA8Wqu1ca00KS2fYn66W674B5cFkOe1CDn5WfGOMAXDHBtqG9PUZ-3anDosZuvoSWjsV8amm7emqI6isGhG_rYB3j7GCMaJwhK6E9SX9xkwdbunZvLT1oA5JfjG7mBWEw6AlNM0HA2poKc1vBzNR18HV38r2kAGaaXWc_BR16Z7WO2Ib8HJN1BZq4DNkax5hlWYd-0y_XUOQ5Lcb6PoR2lajQ-LD9QlgIrnYMjrIffhNbvQ4tBGokNXxQH0B8_0iYN221v0aXZ0aWj0aXp0aXJ0aYPWCpcKTcIvvTXrRG_HxScmVr77vf_ZDNHATwDq4D1V2Jkxft1ChpoQdu-16oItReoBf5pBhrWEJ75nyY4cxj-3qz943E2UrZ04IsnxFMLvhRqjChqj6RfQLlJgsohbBuAzwIPUxBLFScTq9R3CbBXlFfybYDumm6pPrjtrJldsfhFqZFvvaj65UfDVMfVyqa-uvFKzBaBtEBT8lPf4MCNFWtLo0wrvdUTeULaAx8X0rcf_7tj2e-RU4l_3m==

Recommendations#

Select only required columns#

Instead of passing "*" in DBReader(columns=[...]) prefer passing exact column names. This reduces the amount of data passed from Greenplum to Spark.

Pay attention to where value#

Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper DBReader(where="column = 'value'") clause. This both reduces the amount of data send from Greenplum to Spark, and may also improve performance of the query. Especially if there are indexes or partitions for columns used in where clause.

Read data in parallel#

DBReader in case of Greenplum connector requires view or table to have a column which is used by Spark for parallel reads.

Choosing proper column allows each Spark executor to read only part of data stored in the specified segment, avoiding moving large amounts of data between segments, which improves reading performance.

Using gp_segment_id#

By default, DBReader will use gp_segment_id column for parallel data reading. Each DataFrame partition will contain data of a specific Greenplum segment.

This allows each Spark executor read only data from specific Greenplum segment, avoiding moving large amounts of data between segments.

If view is used, it is recommended to include gp_segment_id column to this view:

Reading from view with gp_segment_id column
from onetl.connection import Greenplum
from onetl.db import DBReader

greenplum = Greenplum(...)

greenplum.execute(
    """
    CREATE VIEW schema.view_with_gp_segment_id AS
    SELECT
        id,
        some_column,
        another_column,
        gp_segment_id  -- IMPORTANT
    FROM schema.some_table
    """,
)

reader = DBReader(
    connection=greenplum,
    source="schema.view_with_gp_segment_id",
)
df = reader.run()

Using custom partition_column#

Sometimes table or view is lack of gp_segment_id column, but there is some column with value range correlated with Greenplum segment distribution.

In this case, custom column can be used instead:

Reading from view with custom partition_column
from onetl.connection import Greenplum
from onetl.db import DBReader

greenplum = Greenplum(...)

greenplum.execute(
    """
    CREATE VIEW schema.view_with_partition_column AS
    SELECT
        id,
        some_column,
        part_column  -- correlated to greenplum segment ID
    FROM schema.some_table
    """,
)

reader = DBReader(
    connection=greenplum,
    source="schema.view_with_partition_column",
    options=Greenplum.Options(
        # parallelize data using specified column
        partition_column="part_column",
        # create 10 Spark tasks, each will read only part of table data
        num_partitions=10,
    ),
)
df = reader.run()

Reading DISTRIBUTED REPLICATED tables#

Replicated tables do not have gp_segment_id column at all, so you need to set partition_column to some column name of type integer/bigint/smallint.

Parallel JOIN execution#

In case of using views which require some data motion between Greenplum segments, like JOIN queries, another approach should be used.

Each Spark executor N will run the same query, so each of N query will start its own JOIN process, leading to really heavy load on Greenplum segments. This should be avoided.

Instead is recommended to run JOIN query on Greenplum side, save the result to an intermediate table, and then read this table using DBReader:

Reading from view using intermediate table
from onetl.connection import Greenplum
from onetl.db import DBReader

greenplum = Greenplum(...)

greenplum.execute(
    """
    CREATE UNLOGGED TABLE schema.intermediate_table AS
    SELECT
        id,
        tbl1.col1,
        tbl1.data,
        tbl2.another_data
    FROM
        schema.table1 as tbl1
    JOIN
        schema.table2 as tbl2
    ON
        tbl1.col1 = tbl2.col2
    WHERE ...
    """,
)

reader = DBReader(
    connection=greenplum,
    source="schema.intermediate_table",
)
df = reader.run()

# write dataframe somethere

greenplum.execute(
    """
    DROP TABLE schema.intermediate_table
    """,
)

Warning

NEVER do that:

df1 = DBReader(connection=greenplum, table="public.table1", ...).run()
df2 = DBReader(connection=greenplum, table="public.table2", ...).run()

joined_df = df1.join(df2, on="col")

This will lead to sending all the data from both table1 and table2 to Spark executor memory, and then JOIN will be performed on Spark side, not inside Greenplum. This is VERY inefficient.

TEMPORARY tables notice#

Someone could think that writing data from view or result of JOIN to TEMPORARY table, and then passing it to DBReader, is an efficient way to read data from Greenplum. This is because temp tables are not generating WAL files, and are automatically deleted after finishing the transaction.

That will NOT work. Each Spark executor establishes its own connection to Greenplum. And each connection starts its own transaction which means that every executor will read empty temporary table.

You should use UNLOGGED tables to write data to intermediate table without generating WAL logs.

Options#

pydantic model onetl.connection.db_connection.greenplum.options.GreenplumReadOptions#

VMware’s Greenplum Spark connector reading options.

Note

You can pass any value supported by connector, even if it is not mentioned in this documentation.

The set of supported options depends on connector version. See link above.

Warning

Some options, like url, dbtable, server.*, pool.*, etc are populated from connection attributes, and cannot be overridden by the user in ReadOptions to avoid issues.

Examples

Read options initialization

Greenplum.ReadOptions(
    partition_column="reg_id",
    num_partitions=10,
)
field partition_column: str | None = None (alias 'partitionColumn')#

Column used to parallelize reading from a table.

Warning

You should not change this option, unless you know what you’re doing.

It’s preferable to use default values to read data parallel by number of segments in Greenplum cluster.

Possible values:
  • None (default):

    Spark generates N jobs (where N == number of segments in Greenplum cluster), each job is reading only data from a specific segment (filtering data by gp_segment_id column).

    This is very effective way to fetch the data from a cluster.

  • table column

    Allocate each executor a range of values from a specific column.

    Note

    Column type must be numeric. Other types are not supported.

    Spark generates for each executor an SQL query like:

    Executor 1:

    SELECT ... FROM table
    WHERE (partition_column >= lowerBound
            OR partition_column IS NULL)
    AND partition_column < (lower_bound + stride)
    

    Executor 2:

    SELECT ... FROM table
    WHERE partition_column >= (lower_bound + stride)
    AND partition_column < (lower_bound + 2 * stride)
    

    Executor N:

    SELECT ... FROM table
    WHERE partition_column >= (lower_bound + (N-1) * stride)
    AND partition_column <= upper_bound
    

    Where stride=(upper_bound - lower_bound) / num_partitions, lower_bound=MIN(partition_column), upper_bound=MAX(partition_column).

    Note

    num_partitions is used just to calculate the partition stride, NOT for filtering the rows in table. So all rows in the table will be returned (unlike Incremental Read Strategies).

    Note

    All queries are executed in parallel. To execute them sequentially, use Batch Read Strategies.

Warning

Both options partition_column and num_partitions should have a value, or both should be None

Examples

Read data in 10 parallel jobs by range of values in id_column column:

Greenplum.ReadOptions(
    partition_column="id_column",
    num_partitions=10,
)
field num_partitions: int | None = None (alias 'partitions')#

Number of jobs created by Spark to read the table content in parallel.

See documentation for partition_column for more details

Warning

By default connector uses number of segments in the Greenplum cluster. You should not change this option, unless you know what you’re doing

Warning

Both options partition_column and num_partitions should have a value, or both should be None