Skip to content

DML Support

Harish Butani edited this page Jan 13, 2022 · 2 revisions

Currently, we support Insert, Insert Overwrite and Delete on Oracle tables. We have a The Basic Insert Pipeline that supports all use cases. We working on specialized pipelines for certain use-cases, such as when the destination is an external table. We are also working on support for Create, CTAS, Replace for External Tables that are stored in OCI object store and are of Parquet format.

The Basic Insert Pipeline

See Write Path FLow for technical details on how we plug in to Spark's Datasource V2 data write flows. At a high level the basic flow proceeds by:

  • Creating a temporary table in Oracle for the job
  • Spark tasks then write to this temporary table
  • When the tasks are all done; on the driver:
    • Overwritten rows are deleted from the destination table
    • Temporary table rows are copied into the destination table
    • The above two steps are done in one transaction.
    • Finally the temporary is droppped.

basic_insert_flow

The basic Insert pipeline handles all use cases but may not provide optimized performance. The behavior is transactional, so:

  • During the job execution, all other jobs will see the state of the table as of the start of the job.
  • On success the destination will be observed in the new state.
  • On failure the state of the destination table will not change.
  • We don't prevent concurrent dml jobs on a table.
    • On success, the destination table will be in a state that matches some serialization of the jobs.
    • During the 'job commit' phase the final transaction in each job is where old rows are deleted and new rows inserted. There maybe contention between the transactions of 2 jobs. In that case Oracle will rollback one of the trancsations, leaving the destination table in a consistent state.

It has the following behavior and parameters:

  • A nologging non-partitioned temporary table is created. We cannot create the temp table as an Oracle temp table because the data needs to accessible from many connections (connections in executor tasks write rows and the connection the driver moves data from the temporary table to the destination table). Use the spark.sql.oracle.temp_table.tablespace parameter to specify what tablespace to create this temporary table in. By default this parameter is not set, and the temp data is stored in the default tablespace of the connection.
  • Executor tasks write rows to the temporary table using a Insert statement with /*+ APPEND */ hint. This ensures no redo/undo log is created for writing to temporary tables.
  • There is no special logic for deleting existing rows
    • When the user issues a insert overwrite with no condition, we don't issue a truncate on Oracle because we don't know the recovery requirements of the customer. So we are conservative and issue a delete that allows for recovery on failure.
    • In this pipeline there is no special handling for partition-wise inserts/overwrites.
  • There is no special processing for inserting new rows
    • A insert with select * from temp_table is issued.
    • The select query block has PARALLEL hint on temp_table.
    • Again we don't know the recovery or data management requirements of the customer, so we are conservative and have no hints by default.
    • We allow the user to specify hints for the INSERT by configuring spark.sql.oracle.insert_into_dest_table.hints So user can set hints like APPEND, PARALLEL, NOLOGGING. But please check oracle docs to ensure your hints comply with your recovery and data management requirements.

Planned specialized pipelines for certain use-cases

We are working on optimized insert pipelines for certain cases:

  • If the source query of the insert statement is a Oracle Scan operation, then we can perform the operation by issuing an Oracle SQL Insert Statement.
  • For external tables (that are in an object store of Parquet fil format)
    • We can have the Spark tasks directly write Parquet files to the object store.
  • When the destination table is partitioned, for insert overwrite:
    • We are investigating doing a partition exchange insert of an insert table select...
  • When the destination table is partitioned:
    • We plan to set up the pipeline such that a Spark task only writes to a single file or partition.
    • This means we will introduce the necessary shuffle prior to the writing stage in Spark.