Executing statements in Postgres#

Warning

Methods below read all the rows returned from DB to Spark driver memory, and then convert them to DataFrame.

Do NOT use them to read large amounts of data. Use DBReader or Postgres.sql instead.

How to#

There are 2 ways to execute some statement in Postgres

Use Postgres.fetch#

Use this method to execute some SELECT query which returns small number or rows, like reading Postgres config, or reading data from some reference table. Method returns Spark DataFrame.

Method accepts JDBCOptions.

Connection opened using this method should be then closed with connection.close() or with connection:.

Warning

Please take into account Postgres <-> Spark type mapping.

Syntax support#

This method supports any query syntax supported by Postgres, like:

  • ✅︎ SELECT ... FROM ...

  • ✅︎ WITH alias AS (...) SELECT ...

  • SET ...; SELECT ...; - multiple statements not supported

Examples#

from onetl.connection import Postgres

postgres = Postgres(...)

df = postgres.fetch(
    "SELECT value FROM some.reference_table WHERE key = 'some_constant'",
    options=Postgres.JDBCOptions(query_timeout=10),
)
postgres.close()
value = df.collect()[0][0]  # get value from first row and first column

Use Postgres.execute#

Use this method to execute DDL and DML operations. Each method call runs operation in a separated transaction, and then commits it.

Method accepts JDBCOptions.

Connection opened using this method should be then closed with connection.close() or with connection:.

Syntax support#

This method supports any query syntax supported by Postgres, like:

  • ✅︎ CREATE TABLE ..., CREATE VIEW ..., DROP TABLE ..., and so on

  • ✅︎ ALTER ...

  • ✅︎ INSERT INTO ... SELECT ..., UPDATE ..., DELETE ..., and so on

  • ✅︎ DROP TABLE ..., DROP VIEW ..., and so on

  • ✅︎ CALL procedure(arg1, arg2) ...

  • ✅︎ SELECT func(arg1, arg2) or {call func(arg1, arg2)} - special syntax for calling functions

  • ✅︎ other statements not mentioned here

  • SET ...; SELECT ...; - multiple statements not supported

Examples#

from onetl.connection import Postgres

postgres = Postgres(...)

with postgres:
    # automatically close connection after exiting this context manager
    postgres.execute("DROP TABLE schema.table")
    postgres.execute(
        """
        CREATE TABLE schema.table AS (
            id bigint GENERATED ALWAYS AS IDENTITY,
            key text,
            value real
        )
        """,
        options=Postgres.JDBCOptions(query_timeout=10),
    )

Options#

pydantic model onetl.connection.db_connection.jdbc_mixin.options.JDBCOptions#

Generic options, related to specific JDBC driver.

Note

You can pass any value supported by underlying JDBC driver class, even if it is not mentioned in this documentation.

field query_timeout: int | None = None (alias 'queryTimeout')#

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

field fetchsize: int | None = None#

How many rows to fetch per round trip.

Tuning this option can influence performance of reading.

Warning

Default value depends on driver. For example, Oracle has default fetchsize=10.