Skip to content

Commit

Permalink
#3995 [YSQL] Support deferrable for foreign key constraints
Browse files Browse the repository at this point in the history
Summary:
Deferred triggers access tuplestore at the end of subtransaction (after call of `AfterTriggerEndQuery` function).
But it is no accessible as pointer to tuplestore is stored in `query_stack` which is already empty at this point.
To solve the issue pointer to tuplestore for deferred trigger is be stored in new field `ybc_txn_fdw_tuplestore` of the `AfterTriggerSharedData` structure.
**Additional changes**
Deferred triggers are fired from the `CommitTransaction` function and YB transaction must not be committed before that time. Calling of `YBCPgCommitTransaction` function is moved inside `CommitTransaction` function.

Test Plan:
Postgresql regression tests were uncommented
```
./yb_build.sh --java-test org.yb.pgsql.TestPgForeignKey
./yb_build.sh --cxx-test pg_on_conflict-test --gtest_filter PgOnConflictTest.ValidSessionAfterTxnCommitConflict
```

Reviewers: mikhail, neha, mihnea

Reviewed By: mihnea

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D8379
  • Loading branch information
d-uspenskiy committed May 31, 2020
1 parent 13b0c35 commit 83a63c2
Show file tree
Hide file tree
Showing 16 changed files with 381 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,25 +332,19 @@ public void addUniqueConstraintAttributes() throws Exception {
stmt,
"ALTER TABLE test ADD CONSTRAINT test_constr UNIQUE " +
"USING INDEX test_idx DEFERRABLE INITIALLY DEFERRED",
"DEFERRABLE constraint not supported yet"
"DEFERRABLE unique constraints are not supported yet"
);
runInvalidQuery(
stmt,
"ALTER TABLE test ADD CONSTRAINT test_constr UNIQUE " +
"USING INDEX test_idx DEFERRABLE INITIALLY IMMEDIATE",
"DEFERRABLE constraint not supported yet"
"DEFERRABLE unique constraints are not supported yet"
);
runInvalidQuery(
stmt,
"ALTER TABLE test ADD CONSTRAINT test_constr UNIQUE " +
"USING INDEX test_idx INITIALLY DEFERRED",
"INITIALLY DEFERRED constraint not supported yet"
);
runInvalidQuery(
stmt,
"ALTER TABLE test ADD CONSTRAINT test_constr UNIQUE " +
"USING INDEX test_idx INITIALLY IMMEDIATE",
"INITIALLY IMMEDIATE constraint not supported yet"
"DEFERRABLE unique constraints are not supported yet"
);
stmt.execute("ALTER TABLE test ADD CONSTRAINT test_constr UNIQUE " +
"USING INDEX test_idx NOT DEFERRABLE");
Expand Down
47 changes: 9 additions & 38 deletions src/postgres/src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -2065,6 +2065,15 @@ CommitTransaction(void)
break;
}

/*
* Firing the triggers may abort current transaction.
* At this point all the them has been fired already.
* It is time to commit YB transaction.
* Postgres transaction can be aborted at this point without an issue
* in case of YBCCommitTransaction failure.
*/
YBCCommitTransaction();

CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT
: XACT_EVENT_PRE_COMMIT);

Expand Down Expand Up @@ -2863,26 +2872,6 @@ IsCurrentTxnWithPGRel(void)
return CurrentTransactionState->isYBTxnWithPostgresRel;
}

void
YBCCommitTransactionAndUpdateBlockState() {
TransactionState s = CurrentTransactionState;
if (YBCCommitTransaction()) {
/*
* This is still needed in the YugaByte case because we need to manage the
* PostgreSQL transaction state correctly.
*/
CommitTransaction();
s->blockState = TBLOCK_DEFAULT;
} else {
/*
* TBLOCK_STARTED means that we aren't in a transaction block, so should switch to
* default state in this case.
*/
s->blockState = s->blockState == TBLOCK_STARTED ? TBLOCK_DEFAULT : TBLOCK_ABORT;
YBCHandleCommitError();
}
}

/*
* CommitTransactionCommand
*/
Expand Down Expand Up @@ -2910,11 +2899,6 @@ CommitTransactionCommand(void)
* transaction commit, and return to the idle state.
*/
case TBLOCK_STARTED:
if (YBTransactionsEnabled())
{
YBCCommitTransactionAndUpdateBlockState();
break;
}
CommitTransaction();
s->blockState = TBLOCK_DEFAULT;
break;
Expand Down Expand Up @@ -2945,11 +2929,6 @@ CommitTransactionCommand(void)
* idle state.
*/
case TBLOCK_END:
if (YBTransactionsEnabled())
{
YBCCommitTransactionAndUpdateBlockState();
break;
}
CommitTransaction();
s->blockState = TBLOCK_DEFAULT;
break;
Expand Down Expand Up @@ -3698,14 +3677,6 @@ EndTransactionBlock(void)
*/
case TBLOCK_INPROGRESS:
s->blockState = TBLOCK_END;
if (YBTransactionsEnabled()) {
/*
* YugaByte transaction commit happens here, but could also happen in
* CommitTransactionCommand if this function is not called first.
*/
result = YBCCommitTransaction();
break;
}
result = true;
break;

Expand Down
88 changes: 78 additions & 10 deletions src/postgres/src/backend/commands/trigger.c
Original file line number Diff line number Diff line change
Expand Up @@ -3664,6 +3664,7 @@ typedef struct AfterTriggerSharedData
Oid ats_relid; /* the relation it's on */
CommandId ats_firing_id; /* ID for firing cycle */
struct AfterTriggersTableData *ats_table; /* transition table access */
Tuplestorestate *ybc_txn_fdw_tuplestore; /* tuplestore for deferred triggers */
} AfterTriggerSharedData;

typedef struct AfterTriggerEventData *AfterTriggerEvent;
Expand Down Expand Up @@ -3888,15 +3889,21 @@ static SetConstraintState SetConstraintStateAddItem(SetConstraintState state,
static void cancel_prior_stmt_triggers(Oid relid, CmdType cmdType, int tgevent);


static bool afterTriggerCheckState(AfterTriggerShared evtshared);

/*
* Get the FDW tuplestore for the current trigger query level, creating it
* if necessary.
*/
static Tuplestorestate *
GetCurrentFDWTuplestore(void)
GetCurrentFDWTuplestore(AfterTriggerShared evtshared)
{
Tuplestorestate *ret;

/* Check trigger has subtransaction level tuplestore (deferred trigger). */
if (evtshared->ybc_txn_fdw_tuplestore)
return evtshared->ybc_txn_fdw_tuplestore;

ret = afterTriggers.query_stack[afterTriggers.query_depth].fdw_tuplestore;
if (ret == NULL)
{
Expand All @@ -3906,17 +3913,23 @@ GetCurrentFDWTuplestore(void)
/*
* Make the tuplestore valid until end of subtransaction. We really
* only need it until AfterTriggerEndQuery().
* In YugaByte mode deferred trigger will access tuplestore
* at the end of subtransaction (after AfterTriggerEndQuery).
*/
oldcxt = MemoryContextSwitchTo(CurTransactionContext);
saveResourceOwner = CurrentResourceOwner;
CurrentResourceOwner = CurTransactionResourceOwner;

ret = tuplestore_begin_heap(false, false, work_mem);

/* Save tuplestore for trigger in DEFERRED state in ybc_txn_fdw_tuplestore field. */
if (IsYugaByteEnabled() && afterTriggerCheckState(evtshared))
evtshared->ybc_txn_fdw_tuplestore = ret;
else
afterTriggers.query_stack[afterTriggers.query_depth].fdw_tuplestore = ret;

CurrentResourceOwner = saveResourceOwner;
MemoryContextSwitchTo(oldcxt);

afterTriggers.query_stack[afterTriggers.query_depth].fdw_tuplestore = ret;
}

return ret;
Expand Down Expand Up @@ -3974,7 +3987,7 @@ afterTriggerCheckState(AfterTriggerShared evtshared)
* The passed-in event data is copied.
* ----------
*/
static void
static AfterTriggerEvent
afterTriggerAddEvent(AfterTriggerEventList *events,
AfterTriggerEvent event, AfterTriggerShared evtshared)
{
Expand Down Expand Up @@ -4059,10 +4072,22 @@ afterTriggerAddEvent(AfterTriggerEventList *events,
(char *) newshared >= chunk->endfree;
newshared--)
{
/*
* Deferred event migrates from one AfterTriggerEventList to another.
* Tuple is written into tuplestore when event is stored in one list, and read when
* event is migrated to another. This list may already have same AfterTriggerShared data,
* but with different ybc_txn_fdw_tuplestore.
* So ybc_txn_fdw_tuplestore must be checked before reuse existing AfterTriggerShared data,
* in other case tuple will be read from wrong tuplestore.
*
* Due to checking of the ybc_txn_fdw_tuplestore field postgres in YB mode will reuse
* AfterTriggerShared objects less often than vanilla postgres.
*/
if (newshared->ats_tgoid == evtshared->ats_tgoid &&
newshared->ats_relid == evtshared->ats_relid &&
newshared->ats_event == evtshared->ats_event &&
newshared->ats_table == evtshared->ats_table &&
newshared->ybc_txn_fdw_tuplestore == evtshared->ybc_txn_fdw_tuplestore &&
newshared->ats_firing_id == 0)
break;
}
Expand All @@ -4082,6 +4107,7 @@ afterTriggerAddEvent(AfterTriggerEventList *events,

chunk->freeptr += eventsize;
events->tailfree = chunk->freeptr;
return newevent;
}

/* ----------
Expand Down Expand Up @@ -4252,7 +4278,7 @@ AfterTriggerExecute(AfterTriggerEvent event,
{
case AFTER_TRIGGER_FDW_FETCH:
{
Tuplestorestate *fdw_tuplestore = GetCurrentFDWTuplestore();
Tuplestorestate *fdw_tuplestore = GetCurrentFDWTuplestore(evtshared);

if (!tuplestore_gettupleslot(fdw_tuplestore, true, false,
trig_tuple_slot1))
Expand Down Expand Up @@ -5031,6 +5057,28 @@ AfterTriggerEndXact(bool isCommit)
*/
if (afterTriggers.event_cxt)
{
/* Close all subtransaction level tuplestores. */
if (IsYugaByteEnabled())
{
AfterTriggerEventChunk *chunk;
for_each_chunk(chunk, afterTriggers.events)
{
AfterTriggerEvent event;
for_each_event(event, chunk)
{
AfterTriggerShared evtshared = GetTriggerSharedData(event);
if (evtshared->ybc_txn_fdw_tuplestore)
{
tuplestore_end(evtshared->ybc_txn_fdw_tuplestore);
/*
* Multiple events may have same shared data,
* so tuplestore must be nullified to avoid double closing.
*/
evtshared->ybc_txn_fdw_tuplestore = NULL;
}
}
}
}
MemoryContextDelete(afterTriggers.event_cxt);
afterTriggers.event_cxt = NULL;
afterTriggers.events.head = NULL;
Expand Down Expand Up @@ -5746,6 +5794,7 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
int tgtype_level;
int i;
Tuplestorestate *fdw_tuplestore = NULL;
Tuplestorestate *ybc_txn_fdw_tuplestore = NULL;

/*
* Check state. We use a normal test not Assert because it is possible to
Expand Down Expand Up @@ -5990,10 +6039,7 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
if ((IsYBBackedRelation(rel) || relkind == RELKIND_FOREIGN_TABLE) && row_trigger)
{
if (fdw_tuplestore == NULL)
{
fdw_tuplestore = GetCurrentFDWTuplestore();
new_event.ate_flags = AFTER_TRIGGER_FDW_FETCH;
}
else
/* subsequent event for the same tuple */
new_event.ate_flags = AFTER_TRIGGER_FDW_REUSE;
Expand Down Expand Up @@ -6028,9 +6074,31 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo,
new_shared.ats_table = transition_capture->tcs_private;
else
new_shared.ats_table = NULL;
/*
* Set current ybc_txn_fdw_tuplestore (if any) to new_shared field
* to reuse existing AfterTriggerShared object (is any)
* because ybc_txn_fdw_tuplestore is checked.
* In vanilla postgres ybc_txn_fdw_tuplestore is always NULL so reuse strategy of
* AfterTriggerShared object will not be changed.
*/
new_shared.ybc_txn_fdw_tuplestore =
ybc_txn_fdw_tuplestore && afterTriggerCheckState(&new_shared) ? ybc_txn_fdw_tuplestore
: NULL;
AfterTriggerEvent ev = afterTriggerAddEvent(
&afterTriggers.query_stack[afterTriggers.query_depth].events, &new_event, &new_shared);

afterTriggerAddEvent(&afterTriggers.query_stack[afterTriggers.query_depth].events,
&new_event, &new_shared);
/*
* In YugaByte mode we also use the tuplestore to store/pass tuples
* within a query execution.
*/
if ((IsYBBackedRelation(rel) || relkind == RELKIND_FOREIGN_TABLE) &&
row_trigger && fdw_tuplestore == NULL)
{
AfterTriggerShared evtshared = GetTriggerSharedData(ev);
fdw_tuplestore = GetCurrentFDWTuplestore(evtshared);
/* Save tuplestore for deferred triggers only. */
ybc_txn_fdw_tuplestore = evtshared->ybc_txn_fdw_tuplestore;
}
}

/*
Expand Down
7 changes: 0 additions & 7 deletions src/postgres/src/backend/parser/gram.y
Original file line number Diff line number Diff line change
Expand Up @@ -3788,31 +3788,27 @@ generated_when:
ConstraintAttr:
DEFERRABLE
{
parser_ybc_signal_unsupported(@1, "DEFERRABLE constraint", 1129);
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_ATTR_DEFERRABLE;
n->location = @1;
$$ = (Node *)n;
}
| NOT DEFERRABLE
{
parser_ybc_signal_unsupported(@1, "NOT DEFERRABLE constraint", 1129);
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_ATTR_NOT_DEFERRABLE;
n->location = @1;
$$ = (Node *)n;
}
| INITIALLY DEFERRED
{
parser_ybc_signal_unsupported(@1, "INITIALLY DEFERRED constraint", 1129);
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_ATTR_DEFERRED;
n->location = @1;
$$ = (Node *)n;
}
| INITIALLY IMMEDIATE
{
parser_ybc_signal_unsupported(@1, "INITIALLY IMMEDIATE constraint", 1129);
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_ATTR_IMMEDIATE;
n->location = @1;
Expand Down Expand Up @@ -5920,15 +5916,12 @@ ConstraintAttributeSpec:
ConstraintAttributeElem:
NOT DEFERRABLE { $$ = CAS_NOT_DEFERRABLE; }
| DEFERRABLE {
parser_ybc_signal_unsupported(@1, "DEFERRABLE constraint", 1129);
$$ = CAS_DEFERRABLE;
}
| INITIALLY IMMEDIATE {
parser_ybc_signal_unsupported(@1, "INITIALLY IMMEDIATE constraint", 1129);
$$ = CAS_INITIALLY_IMMEDIATE;
}
| INITIALLY DEFERRED {
parser_ybc_signal_unsupported(@1, "INITIALLY DEFERRED constraint", 1129);
$$ = CAS_INITIALLY_DEFERRED;
}
| NOT VALID { $$ = CAS_NOT_VALID; }
Expand Down
Loading

0 comments on commit 83a63c2

Please sign in to comment.