Skip to content

Commit

Permalink
Removal of sequence number
Browse files Browse the repository at this point in the history
  • Loading branch information
antekresic committed Jul 5, 2024
1 parent c10fae7 commit 2e21dc1
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 350 deletions.
7 changes: 6 additions & 1 deletion tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "cache.h"
#include "chunk.h"
#include "compression.h"
#include "compression_storage.h"
#include "create.h"
#include "debug_point.h"
#include "error_utils.h"
Expand Down Expand Up @@ -1104,6 +1105,11 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ExclusiveLock);
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ExclusiveLock);

if (!get_compressed_chunk_index_for_recompression(uncompressed_chunk))
{
create_compressed_chunk_indexes(compressed_chunk, settings);
}

/*************** tuplesort state *************************/
Tuplesortstate *segment_tuplesortstate;
TupleDesc compressed_rel_tupdesc = RelationGetDescr(compressed_chunk_rel);
Expand Down Expand Up @@ -1163,7 +1169,6 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
compressed_chunk_rel,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/,
0 /*insert options*/);

/* create an array of the segmentby column offsets in the compressed chunk */
Expand Down
225 changes: 5 additions & 220 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,26 +368,12 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
}

RowCompressor row_compressor;
/* Reset sequence is used for resetting the sequence without
* having to fetch the existing sequence for an individual
* sequence group. If we are rolling up an uncompressed chunk
* into an existing compressed chunk, we have to fetch
* sequence numbers for each batch of the segment using
* the respective index. In normal compression, this is
* not necessary because we start segment numbers from zero.
* We distinguish these cases by insert_options being zero for rollups.
*
* Sequence numbers are planned to be removed in the near future,
* which will render this flag obsolete.
*/
bool reset_sequence = insert_options > 0;
row_compressor_init(settings,
&row_compressor,
in_rel,
out_rel,
out_desc->natts,
true /*need_bistate*/,
reset_sequence,
insert_options);

if (matched_index_rel != NULL)
Expand Down Expand Up @@ -610,11 +596,11 @@ get_compressed_chunk_index(ResultRelInfo *resultRelInfo, CompressionSettings *se
Relation index_relation = resultRelInfo->ri_IndexRelationDescs[i];
IndexInfo *index_info = resultRelInfo->ri_IndexRelationInfo[i];

/* the index must include all segment by columns and sequence number */
if (index_info->ii_NumIndexKeyAttrs != num_segmentby_columns + 1)
/* the index must include all segment by columns and at least one metadata column */
if (index_info->ii_NumIndexKeyAttrs <= num_segmentby_columns)
continue;

for (int j = 0; j < index_info->ii_NumIndexKeyAttrs - 1; j++)
for (int j = 0; j < num_segmentby_columns - 1; j++)
{
AttrNumber attno = index_relation->rd_index->indkey.values[j];
const char *attname = get_attname(index_relation->rd_index->indrelid, attno, false);
Expand All @@ -629,170 +615,12 @@ get_compressed_chunk_index(ResultRelInfo *resultRelInfo, CompressionSettings *se
if (!matches)
continue;

/* Check last index column is sequence number */
AttrNumber attno = index_relation->rd_index->indkey
.values[AttrNumberGetAttrOffset(index_info->ii_NumIndexKeyAttrs)];
const char *attname = get_attname(index_relation->rd_index->indrelid, attno, false);

if (strncmp(attname, COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME, NAMEDATALEN) == 0)
return index_relation->rd_id;
return index_relation->rd_id;
}

return InvalidOid;
}

static int32
index_scan_sequence_number(Relation table_rel, Oid index_oid, ScanKeyData *scankey,
int num_scankeys)
{
int32 result = 0;
bool is_null;
Relation index_rel = index_open(index_oid, AccessShareLock);

IndexScanDesc index_scan =
index_beginscan(table_rel, index_rel, GetTransactionSnapshot(), num_scankeys, 0);
index_scan->xs_want_itup = true;
index_rescan(index_scan, scankey, num_scankeys, NULL, 0);

if (index_getnext_tid(index_scan, BackwardScanDirection))
{
result = index_getattr(index_scan->xs_itup,
index_scan->xs_itupdesc
->natts, /* Last attribute of the index is sequence number. */
index_scan->xs_itupdesc,
&is_null);
if (is_null)
result = 0;
}

index_endscan(index_scan);
index_close(index_rel, AccessShareLock);

return result;
}

static int32
table_scan_sequence_number(Relation table_rel, int16 seq_num_column_num, ScanKeyData *scankey,
int num_scankeys)
{
int32 max_seq_num = 0;
TupleTableSlot *slot;
TableScanDesc scan;

slot = table_slot_create(table_rel, NULL);
scan = table_beginscan(table_rel, GetLatestSnapshot(), num_scankeys, scankey);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
bool is_null;
Datum seq_num = slot_getattr(slot, seq_num_column_num, &is_null);

if (!is_null)
{
int32 curr_seq_num = DatumGetInt32(seq_num);

if (max_seq_num < curr_seq_num)
max_seq_num = curr_seq_num;
}
}

table_endscan(scan);
ExecDropSingleTupleTableSlot(slot);

return max_seq_num;
}

/* Scan compressed chunk to get the sequence number for current group.
* This is necessary to do when merging chunks. If the chunk is empty,
* scan will always return 0 and the sequence number will start from
* SEQUENCE_NUM_GAP.
*/
static int32
get_sequence_number_for_current_group(Relation table_rel, Oid index_oid,
int16 *uncompressed_col_to_compressed_col,
PerColumn *per_column, int n_input_columns,
int16 seq_num_column_num)
{
/* If there is a suitable index, use index scan otherwise fallback to heap scan. */
bool is_index_scan = OidIsValid(index_oid);

int i, num_scankeys = 0;
int32 result = 0;

for (i = 0; i < n_input_columns; i++)
{
if (per_column[i].segmentby_column_index < 1)
continue;

num_scankeys++;
}

MemoryContext scan_ctx = AllocSetContextCreate(CurrentMemoryContext,
"get max sequence number scan",
ALLOCSET_DEFAULT_SIZES);
MemoryContext old_ctx;
old_ctx = MemoryContextSwitchTo(scan_ctx);

ScanKeyData *scankey = NULL;

if (num_scankeys > 0)
{
scankey = palloc0(sizeof(ScanKeyData) * num_scankeys);

for (i = 0; i < n_input_columns; i++)
{
if (per_column[i].segmentby_column_index < 1)
continue;

PerColumn col = per_column[i];
int16 attno = is_index_scan ?
col.segmentby_column_index :
AttrOffsetGetAttrNumber(uncompressed_col_to_compressed_col[i]);

if (col.segment_info->is_null)
{
ScanKeyEntryInitialize(&scankey[col.segmentby_column_index - 1],
SK_ISNULL | SK_SEARCHNULL,
attno,
InvalidStrategy, /* no strategy */
InvalidOid, /* no strategy subtype */
InvalidOid, /* no collation */
InvalidOid, /* no reg proc for this */
(Datum) 0); /* constant */
}
else
{
ScanKeyEntryInitializeWithInfo(&scankey[col.segmentby_column_index - 1],
0, /* flags */
attno,
BTEqualStrategyNumber,
InvalidOid, /* No strategy subtype. */
col.segment_info->collation,
&col.segment_info->eq_fn,
col.segment_info->val);
}
}
}

if (is_index_scan)
{
/* Index scan should always use at least one scan key to get the sequence number. */
Assert(num_scankeys > 0);

result = index_scan_sequence_number(table_rel, index_oid, scankey, num_scankeys);
}
else
{
/* Table scan can work without scan keys. */
result = table_scan_sequence_number(table_rel, seq_num_column_num, scankey, num_scankeys);
}

MemoryContextSwitchTo(old_ctx);
MemoryContextDelete(scan_ctx);

return result + SEQUENCE_NUM_GAP;
}

static void
build_column_map(CompressionSettings *settings, Relation uncompressed_table,
Relation compressed_table, PerColumn **pcolumns, int16 **pmap)
Expand Down Expand Up @@ -890,28 +718,18 @@ build_column_map(CompressionSettings *settings, Relation uncompressed_table,
void
row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor,
Relation uncompressed_table, Relation compressed_table,
int16 num_columns_in_compressed_table, bool need_bistate, bool reset_sequence,
int16 num_columns_in_compressed_table, bool need_bistate,
int insert_options)
{
Name count_metadata_name = DatumGetName(
DirectFunctionCall1(namein, CStringGetDatum(COMPRESSION_COLUMN_METADATA_COUNT_NAME)));
Name sequence_num_metadata_name = DatumGetName(
DirectFunctionCall1(namein,
CStringGetDatum(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME)));
AttrNumber count_metadata_column_num =
get_attnum(compressed_table->rd_id, NameStr(*count_metadata_name));
AttrNumber sequence_num_column_num =
get_attnum(compressed_table->rd_id, NameStr(*sequence_num_metadata_name));

if (count_metadata_column_num == InvalidAttrNumber)
elog(ERROR,
"missing metadata column '%s' in compressed table",
COMPRESSION_COLUMN_METADATA_COUNT_NAME);

if (sequence_num_column_num == InvalidAttrNumber)
elog(ERROR,
"missing metadata column '%s' in compressed table",
COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME);

*row_compressor = (RowCompressor){
.per_row_ctx = AllocSetContextCreate(CurrentMemoryContext,
Expand All @@ -922,14 +740,11 @@ row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor
.resultRelInfo = ts_catalog_open_indexes(compressed_table),
.n_input_columns = RelationGetDescr(uncompressed_table)->natts,
.count_metadata_column_offset = AttrNumberGetAttrOffset(count_metadata_column_num),
.sequence_num_metadata_column_offset = AttrNumberGetAttrOffset(sequence_num_column_num),
.compressed_values = palloc(sizeof(Datum) * num_columns_in_compressed_table),
.compressed_is_null = palloc(sizeof(bool) * num_columns_in_compressed_table),
.rows_compressed_into_current_value = 0,
.rowcnt_pre_compression = 0,
.num_compressed_rows = 0,
.sequence_num = SEQUENCE_NUM_GAP,
.reset_sequence = reset_sequence,
.first_iteration = true,
.insert_options = insert_options,
};
Expand Down Expand Up @@ -1044,26 +859,6 @@ row_compressor_update_group(RowCompressor *row_compressor, TupleTableSlot *row)
}
/* switch to original memory context */
MemoryContextSwitchTo(oldcontext);

/*
* The sequence number of the compressed tuple is per segment by grouping
* and should be reset when the grouping changes to prevent overflows with
* many segmentby columns.
*
*/
if (row_compressor->reset_sequence)
row_compressor->sequence_num = SEQUENCE_NUM_GAP; /* Start sequence from beginning */
else
row_compressor->sequence_num =
get_sequence_number_for_current_group(row_compressor->compressed_table,
row_compressor->index_oid,
row_compressor
->uncompressed_col_to_compressed_col,
row_compressor->per_column,
row_compressor->n_input_columns,
AttrOffsetGetAttrNumber(
row_compressor
->sequence_num_metadata_column_offset));
}

static bool
Expand Down Expand Up @@ -1194,16 +989,6 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
Int32GetDatum(row_compressor->rows_compressed_into_current_value);
row_compressor->compressed_is_null[row_compressor->count_metadata_column_offset] = false;

row_compressor->compressed_values[row_compressor->sequence_num_metadata_column_offset] =
Int32GetDatum(row_compressor->sequence_num);
row_compressor->compressed_is_null[row_compressor->sequence_num_metadata_column_offset] = false;

/* overflow could happen only if chunk has more than 200B rows */
if (row_compressor->sequence_num > PG_INT32_MAX - SEQUENCE_NUM_GAP)
elog(ERROR, "sequence id overflow");

row_compressor->sequence_num += SEQUENCE_NUM_GAP;

compressed_tuple = heap_form_tuple(RelationGetDescr(row_compressor->compressed_table),
row_compressor->compressed_values,
row_compressor->compressed_is_null);
Expand Down
6 changes: 1 addition & 5 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ typedef struct BulkInsertStateData *BulkInsertState;
uint8 compression_algorithm

#define TARGET_COMPRESSED_BATCH_SIZE 1000
/* gap in sequence id between rows, potential for adding rows in gap later */
#define SEQUENCE_NUM_GAP 10

typedef struct CompressedDataHeader
{
Expand Down Expand Up @@ -259,8 +257,6 @@ typedef struct RowCompressor
bool *compressed_is_null;
int64 rowcnt_pre_compression;
int64 num_compressed_rows;
/* if recompressing segmentwise, we use this info to reset the sequence number */
bool reset_sequence;
/* flag for checking if we are working on the first tuple */
bool first_iteration;
/* the heap insert options */
Expand Down Expand Up @@ -357,7 +353,7 @@ extern void compress_chunk_populate_sort_info_for_column(CompressionSettings *se
extern void row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor,
Relation uncompressed_table, Relation compressed_table,
int16 num_columns_in_compressed_table, bool need_bistate,
bool reset_sequence, int insert_options);
int insert_options);
extern void row_compressor_reset(RowCompressor *row_compressor);
extern void row_compressor_close(RowCompressor *row_compressor);
extern void row_compressor_append_sorted_rows(RowCompressor *row_compressor,
Expand Down
Loading

0 comments on commit 2e21dc1

Please sign in to comment.