Skip to content

Commit

Permalink
[columnar] Do index update during columnar update
Browse files Browse the repository at this point in the history
* If we do update of tuple and index is on table, we should also update
  index table.
  • Loading branch information
mkaruza authored and wuputah committed Feb 8, 2023
1 parent 4f939f4 commit f5e0cc1
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
65 changes: 47 additions & 18 deletions columnar/src/backend/columnar/columnar_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ typedef struct ChunkGroupReadState
List *projectedColumnList; /* borrowed reference */
ChunkData *chunkGroupData;
bytea *rowMask;
bool rowMaskCached; /* If rowMask metadata is cached and borrowed */
uint32 chunkStripeRowOffset;
} ChunkGroupReadState;

Expand Down Expand Up @@ -109,12 +110,12 @@ static MemoryContext CreateStripeReadMemoryContext(void);
static bool ColumnarReadIsCurrentStripe(ColumnarReadState *readState,
uint64 rowNumber);
static StripeMetadata * ColumnarReadGetCurrentStripe(ColumnarReadState *readState);
static void ReadStripeRowByRowNumber(ColumnarReadState *readState,
static bool ReadStripeRowByRowNumber(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls);
static bool StripeReadIsCurrentChunkGroup(StripeReadState *stripeReadState,
int chunkGroupIndex);
static void ReadChunkGroupRowByRowOffset(ChunkGroupReadState *chunkGroupReadState,
static bool ReadChunkGroupRowByRowOffset(ChunkGroupReadState *chunkGroupReadState,
StripeMetadata *stripeMetadata,
uint64 stripeRowOffset, Datum *columnValues,
bool *columnNulls);
Expand Down Expand Up @@ -438,9 +439,7 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState,
readState->currentStripeMetadata = stripeMetadata;
}

ReadStripeRowByRowNumber(readState, rowNumber, columnValues, columnNulls);

return true;
return ReadStripeRowByRowNumber(readState, rowNumber, columnValues, columnNulls);
}


Expand Down Expand Up @@ -483,7 +482,7 @@ ColumnarReadGetCurrentStripe(ColumnarReadState *readState)
* stripeReadState into columnValues and columnNulls.
* Errors out if no such row exists in the stripe being read.
*/
static void
static bool
ReadStripeRowByRowNumber(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls)
Expand Down Expand Up @@ -514,11 +513,43 @@ ReadStripeRowByRowNumber(ColumnarReadState *readState,
stripeReadState->tupleDescriptor,
stripeReadState->projectedColumnList,
stripeReadState->stripeReadContext);

uint64 chunkFirstRowNumber =
stripeMetadata->firstRowNumber +
stripeReadState->chunkGroupReadState->chunkStripeRowOffset;

if (columnar_enable_dml)
{

RowMaskWriteStateEntry *rowMaskEntry =
RowMaskFindWriteState(stripeReadState->relation->rd_node.relNode,
GetCurrentSubTransactionId(), rowNumber);

if (rowMaskEntry != NULL)
{
stripeReadState->chunkGroupReadState->rowMask = rowMaskEntry->mask;
stripeReadState->chunkGroupReadState->rowMaskCached = true;
}
else
{
stripeReadState->chunkGroupReadState->rowMask =
ReadChunkRowMask(stripeReadState->relation->rd_node,
readState->snapshot,
stripeReadState->stripeReadContext,
chunkFirstRowNumber,
stripeReadState->chunkGroupReadState->rowCount);
stripeReadState->chunkGroupReadState->rowMaskCached = false;
}
}
else
{
stripeReadState->chunkGroupReadState->rowMask = NULL;
}
}

ReadChunkGroupRowByRowOffset(stripeReadState->chunkGroupReadState,
stripeMetadata, stripeRowOffset,
columnValues, columnNulls);
return ReadChunkGroupRowByRowOffset(stripeReadState->chunkGroupReadState,
stripeMetadata, stripeRowOffset,
columnValues, columnNulls);
}


Expand All @@ -543,7 +574,7 @@ StripeReadIsCurrentChunkGroup(StripeReadState *stripeReadState, int chunkGroupIn
* chunkGroupReadState into columnValues and columnNulls.
* Errors out if no such row exists in the chunk group being read.
*/
static void
static bool
ReadChunkGroupRowByRowOffset(ChunkGroupReadState *chunkGroupReadState,
StripeMetadata *stripeMetadata,
uint64 stripeRowOffset, Datum *columnValues,
Expand All @@ -552,13 +583,10 @@ ReadChunkGroupRowByRowOffset(ChunkGroupReadState *chunkGroupReadState,
/* set the exact row number to be read from given chunk roup */
chunkGroupReadState->currentRow = stripeRowOffset %
stripeMetadata->chunkGroupRowCount;
int dummyDeletedColumnNumber = 0;
if (!ReadChunkGroupNextRow(chunkGroupReadState, columnValues, columnNulls,
&dummyDeletedColumnNumber))
{
/* not expected but be on the safe side */
ereport(ERROR, (errmsg("could not find the row in stripe")));
}
int isDeleted = 0;
ReadChunkGroupNextRow(chunkGroupReadState, columnValues, columnNulls,
&isDeleted);
return !isDeleted;
}


Expand Down Expand Up @@ -850,6 +878,7 @@ ReadStripeNextRow(StripeReadState *stripeReadState, Datum *columnValues,
stripeReadState->stripeReadContext,
chunkFirstRowNumber,
stripeReadState->chunkGroupReadState->rowCount);
stripeReadState->chunkGroupReadState->rowMaskCached = false;
}
else
{
Expand Down Expand Up @@ -927,7 +956,7 @@ EndChunkGroupRead(ChunkGroupReadState *chunkGroupReadState)
{
FreeChunkBufferValueArray(chunkGroupReadState->chunkGroupData);
FreeChunkData(chunkGroupReadState->chunkGroupData);
if (chunkGroupReadState->rowMask)
if (chunkGroupReadState->rowMask != NULL && !chunkGroupReadState->rowMaskCached)
pfree(chunkGroupReadState->rowMask);
chunkGroupReadState->rowMask = NULL;
pfree(chunkGroupReadState);
Expand Down
2 changes: 2 additions & 0 deletions columnar/src/backend/columnar/columnar_tableam.c
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,8 @@ columnar_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,

columnar_tuple_insert(relation, slot, cid, 0, NULL);

*update_indexes = true;

return TM_Ok;
}

Expand Down
4 changes: 2 additions & 2 deletions columnar/src/backend/columnar/write_state_row_mask.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ InitRowMaskEntry(uint64 storageId, bytea *mask)
RowMaskWriteStateEntry *rowMask = palloc0(sizeof(RowMaskWriteStateEntry));

rowMask->storageId = storageId;
rowMask->mask = (bytea *)palloc0(VARSIZE(mask));
memcpy(rowMask->mask, mask, VARSIZE(mask));
rowMask->mask = (bytea *)palloc0(VARSIZE(mask) + VARHDRSZ);
memcpy(rowMask->mask, mask, VARSIZE(mask) + VARHDRSZ);
// rest of structure members needs be populated where RowMaskState was created

return rowMask;
Expand Down

0 comments on commit f5e0cc1

Please sign in to comment.