diff --git a/tsl/src/compression/api.c b/tsl/src/compression/api.c index a305757cc15..502270239fa 100644 --- a/tsl/src/compression/api.c +++ b/tsl/src/compression/api.c @@ -32,6 +32,7 @@ #include "cache.h" #include "chunk.h" #include "compression.h" +#include "compression_storage.h" #include "create.h" #include "debug_point.h" #include "error_utils.h" @@ -1104,6 +1105,11 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk) Relation uncompressed_chunk_rel = table_open(uncompressed_chunk->table_id, ExclusiveLock); Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, ExclusiveLock); + if (!get_compressed_chunk_index_for_recompression(uncompressed_chunk)) + { + create_compressed_chunk_indexes(compressed_chunk, settings); + } + /*************** tuplesort state *************************/ Tuplesortstate *segment_tuplesortstate; TupleDesc compressed_rel_tupdesc = RelationGetDescr(compressed_chunk_rel); @@ -1163,7 +1169,6 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk) compressed_chunk_rel, compressed_rel_tupdesc->natts, true /*need_bistate*/, - true /*reset_sequence*/, 0 /*insert options*/); /* create an array of the segmentby column offsets in the compressed chunk */ diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 5db42ada1d9..ea2f3d979b8 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -368,26 +368,12 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options) } RowCompressor row_compressor; - /* Reset sequence is used for resetting the sequence without - * having to fetch the existing sequence for an individual - * sequence group. If we are rolling up an uncompressed chunk - * into an existing compressed chunk, we have to fetch - * sequence numbers for each batch of the segment using - * the respective index. In normal compression, this is - * not necessary because we start segment numbers from zero. - * We distinguish these cases by insert_options being zero for rollups. - * - * Sequence numbers are planned to be removed in the near future, - * which will render this flag obsolete. - */ - bool reset_sequence = insert_options > 0; row_compressor_init(settings, &row_compressor, in_rel, out_rel, out_desc->natts, true /*need_bistate*/, - reset_sequence, insert_options); if (matched_index_rel != NULL) @@ -610,11 +596,11 @@ get_compressed_chunk_index(ResultRelInfo *resultRelInfo, CompressionSettings *se Relation index_relation = resultRelInfo->ri_IndexRelationDescs[i]; IndexInfo *index_info = resultRelInfo->ri_IndexRelationInfo[i]; - /* the index must include all segment by columns and sequence number */ - if (index_info->ii_NumIndexKeyAttrs != num_segmentby_columns + 1) + /* the index must include all segment by columns and at least one metadata column */ + if (index_info->ii_NumIndexKeyAttrs <= num_segmentby_columns) continue; - for (int j = 0; j < index_info->ii_NumIndexKeyAttrs - 1; j++) + for (int j = 0; j < num_segmentby_columns - 1; j++) { AttrNumber attno = index_relation->rd_index->indkey.values[j]; const char *attname = get_attname(index_relation->rd_index->indrelid, attno, false); @@ -629,170 +615,12 @@ get_compressed_chunk_index(ResultRelInfo *resultRelInfo, CompressionSettings *se if (!matches) continue; - /* Check last index column is sequence number */ - AttrNumber attno = index_relation->rd_index->indkey - .values[AttrNumberGetAttrOffset(index_info->ii_NumIndexKeyAttrs)]; - const char *attname = get_attname(index_relation->rd_index->indrelid, attno, false); - - if (strncmp(attname, COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME, NAMEDATALEN) == 0) - return index_relation->rd_id; + return index_relation->rd_id; } return InvalidOid; } -static int32 -index_scan_sequence_number(Relation table_rel, Oid index_oid, ScanKeyData *scankey, - int num_scankeys) -{ - int32 result = 0; - bool is_null; - Relation index_rel = index_open(index_oid, AccessShareLock); - - IndexScanDesc index_scan = - index_beginscan(table_rel, index_rel, GetTransactionSnapshot(), num_scankeys, 0); - index_scan->xs_want_itup = true; - index_rescan(index_scan, scankey, num_scankeys, NULL, 0); - - if (index_getnext_tid(index_scan, BackwardScanDirection)) - { - result = index_getattr(index_scan->xs_itup, - index_scan->xs_itupdesc - ->natts, /* Last attribute of the index is sequence number. */ - index_scan->xs_itupdesc, - &is_null); - if (is_null) - result = 0; - } - - index_endscan(index_scan); - index_close(index_rel, AccessShareLock); - - return result; -} - -static int32 -table_scan_sequence_number(Relation table_rel, int16 seq_num_column_num, ScanKeyData *scankey, - int num_scankeys) -{ - int32 max_seq_num = 0; - TupleTableSlot *slot; - TableScanDesc scan; - - slot = table_slot_create(table_rel, NULL); - scan = table_beginscan(table_rel, GetLatestSnapshot(), num_scankeys, scankey); - - while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) - { - bool is_null; - Datum seq_num = slot_getattr(slot, seq_num_column_num, &is_null); - - if (!is_null) - { - int32 curr_seq_num = DatumGetInt32(seq_num); - - if (max_seq_num < curr_seq_num) - max_seq_num = curr_seq_num; - } - } - - table_endscan(scan); - ExecDropSingleTupleTableSlot(slot); - - return max_seq_num; -} - -/* Scan compressed chunk to get the sequence number for current group. - * This is necessary to do when merging chunks. If the chunk is empty, - * scan will always return 0 and the sequence number will start from - * SEQUENCE_NUM_GAP. - */ -static int32 -get_sequence_number_for_current_group(Relation table_rel, Oid index_oid, - int16 *uncompressed_col_to_compressed_col, - PerColumn *per_column, int n_input_columns, - int16 seq_num_column_num) -{ - /* If there is a suitable index, use index scan otherwise fallback to heap scan. */ - bool is_index_scan = OidIsValid(index_oid); - - int i, num_scankeys = 0; - int32 result = 0; - - for (i = 0; i < n_input_columns; i++) - { - if (per_column[i].segmentby_column_index < 1) - continue; - - num_scankeys++; - } - - MemoryContext scan_ctx = AllocSetContextCreate(CurrentMemoryContext, - "get max sequence number scan", - ALLOCSET_DEFAULT_SIZES); - MemoryContext old_ctx; - old_ctx = MemoryContextSwitchTo(scan_ctx); - - ScanKeyData *scankey = NULL; - - if (num_scankeys > 0) - { - scankey = palloc0(sizeof(ScanKeyData) * num_scankeys); - - for (i = 0; i < n_input_columns; i++) - { - if (per_column[i].segmentby_column_index < 1) - continue; - - PerColumn col = per_column[i]; - int16 attno = is_index_scan ? - col.segmentby_column_index : - AttrOffsetGetAttrNumber(uncompressed_col_to_compressed_col[i]); - - if (col.segment_info->is_null) - { - ScanKeyEntryInitialize(&scankey[col.segmentby_column_index - 1], - SK_ISNULL | SK_SEARCHNULL, - attno, - InvalidStrategy, /* no strategy */ - InvalidOid, /* no strategy subtype */ - InvalidOid, /* no collation */ - InvalidOid, /* no reg proc for this */ - (Datum) 0); /* constant */ - } - else - { - ScanKeyEntryInitializeWithInfo(&scankey[col.segmentby_column_index - 1], - 0, /* flags */ - attno, - BTEqualStrategyNumber, - InvalidOid, /* No strategy subtype. */ - col.segment_info->collation, - &col.segment_info->eq_fn, - col.segment_info->val); - } - } - } - - if (is_index_scan) - { - /* Index scan should always use at least one scan key to get the sequence number. */ - Assert(num_scankeys > 0); - - result = index_scan_sequence_number(table_rel, index_oid, scankey, num_scankeys); - } - else - { - /* Table scan can work without scan keys. */ - result = table_scan_sequence_number(table_rel, seq_num_column_num, scankey, num_scankeys); - } - - MemoryContextSwitchTo(old_ctx); - MemoryContextDelete(scan_ctx); - - return result + SEQUENCE_NUM_GAP; -} - static void build_column_map(CompressionSettings *settings, Relation uncompressed_table, Relation compressed_table, PerColumn **pcolumns, int16 **pmap) @@ -890,28 +718,18 @@ build_column_map(CompressionSettings *settings, Relation uncompressed_table, void row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor, Relation uncompressed_table, Relation compressed_table, - int16 num_columns_in_compressed_table, bool need_bistate, bool reset_sequence, + int16 num_columns_in_compressed_table, bool need_bistate, int insert_options) { Name count_metadata_name = DatumGetName( DirectFunctionCall1(namein, CStringGetDatum(COMPRESSION_COLUMN_METADATA_COUNT_NAME))); - Name sequence_num_metadata_name = DatumGetName( - DirectFunctionCall1(namein, - CStringGetDatum(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME))); AttrNumber count_metadata_column_num = get_attnum(compressed_table->rd_id, NameStr(*count_metadata_name)); - AttrNumber sequence_num_column_num = - get_attnum(compressed_table->rd_id, NameStr(*sequence_num_metadata_name)); - if (count_metadata_column_num == InvalidAttrNumber) elog(ERROR, "missing metadata column '%s' in compressed table", COMPRESSION_COLUMN_METADATA_COUNT_NAME); - if (sequence_num_column_num == InvalidAttrNumber) - elog(ERROR, - "missing metadata column '%s' in compressed table", - COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME); *row_compressor = (RowCompressor){ .per_row_ctx = AllocSetContextCreate(CurrentMemoryContext, @@ -922,14 +740,11 @@ row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor .resultRelInfo = ts_catalog_open_indexes(compressed_table), .n_input_columns = RelationGetDescr(uncompressed_table)->natts, .count_metadata_column_offset = AttrNumberGetAttrOffset(count_metadata_column_num), - .sequence_num_metadata_column_offset = AttrNumberGetAttrOffset(sequence_num_column_num), .compressed_values = palloc(sizeof(Datum) * num_columns_in_compressed_table), .compressed_is_null = palloc(sizeof(bool) * num_columns_in_compressed_table), .rows_compressed_into_current_value = 0, .rowcnt_pre_compression = 0, .num_compressed_rows = 0, - .sequence_num = SEQUENCE_NUM_GAP, - .reset_sequence = reset_sequence, .first_iteration = true, .insert_options = insert_options, }; @@ -1044,26 +859,6 @@ row_compressor_update_group(RowCompressor *row_compressor, TupleTableSlot *row) } /* switch to original memory context */ MemoryContextSwitchTo(oldcontext); - - /* - * The sequence number of the compressed tuple is per segment by grouping - * and should be reset when the grouping changes to prevent overflows with - * many segmentby columns. - * - */ - if (row_compressor->reset_sequence) - row_compressor->sequence_num = SEQUENCE_NUM_GAP; /* Start sequence from beginning */ - else - row_compressor->sequence_num = - get_sequence_number_for_current_group(row_compressor->compressed_table, - row_compressor->index_oid, - row_compressor - ->uncompressed_col_to_compressed_col, - row_compressor->per_column, - row_compressor->n_input_columns, - AttrOffsetGetAttrNumber( - row_compressor - ->sequence_num_metadata_column_offset)); } static bool @@ -1194,16 +989,6 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change Int32GetDatum(row_compressor->rows_compressed_into_current_value); row_compressor->compressed_is_null[row_compressor->count_metadata_column_offset] = false; - row_compressor->compressed_values[row_compressor->sequence_num_metadata_column_offset] = - Int32GetDatum(row_compressor->sequence_num); - row_compressor->compressed_is_null[row_compressor->sequence_num_metadata_column_offset] = false; - - /* overflow could happen only if chunk has more than 200B rows */ - if (row_compressor->sequence_num > PG_INT32_MAX - SEQUENCE_NUM_GAP) - elog(ERROR, "sequence id overflow"); - - row_compressor->sequence_num += SEQUENCE_NUM_GAP; - compressed_tuple = heap_form_tuple(RelationGetDescr(row_compressor->compressed_table), row_compressor->compressed_values, row_compressor->compressed_is_null); diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index 7d400ffe5ed..391652c4d74 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -33,8 +33,6 @@ typedef struct BulkInsertStateData *BulkInsertState; uint8 compression_algorithm #define TARGET_COMPRESSED_BATCH_SIZE 1000 -/* gap in sequence id between rows, potential for adding rows in gap later */ -#define SEQUENCE_NUM_GAP 10 typedef struct CompressedDataHeader { @@ -259,8 +257,6 @@ typedef struct RowCompressor bool *compressed_is_null; int64 rowcnt_pre_compression; int64 num_compressed_rows; - /* if recompressing segmentwise, we use this info to reset the sequence number */ - bool reset_sequence; /* flag for checking if we are working on the first tuple */ bool first_iteration; /* the heap insert options */ @@ -357,7 +353,7 @@ extern void compress_chunk_populate_sort_info_for_column(CompressionSettings *se extern void row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor, Relation uncompressed_table, Relation compressed_table, int16 num_columns_in_compressed_table, bool need_bistate, - bool reset_sequence, int insert_options); + int insert_options); extern void row_compressor_reset(RowCompressor *row_compressor); extern void row_compressor_close(RowCompressor *row_compressor); extern void row_compressor_append_sorted_rows(RowCompressor *row_compressor, diff --git a/tsl/src/compression/compression_storage.c b/tsl/src/compression/compression_storage.c index 7d37a3ce5c8..bdaeaa0a4c3 100644 --- a/tsl/src/compression/compression_storage.c +++ b/tsl/src/compression/compression_storage.c @@ -33,6 +33,7 @@ #include "extension_constants.h" #include "guc.h" #include "hypertable.h" +#include "ts_catalog/array_utils.h" #include "ts_catalog/catalog.h" #include "ts_catalog/compression_settings.h" #include "utils.h" @@ -51,7 +52,6 @@ static void set_toast_tuple_target_on_chunk(Oid compressed_table_id); static void set_statistics_on_compressed_chunk(Oid compressed_table_id); -static void create_compressed_chunk_indexes(Chunk *chunk, CompressionSettings *settings); static void clone_constraints_to_chunk(Oid ht_reloid, const Chunk *compressed_chunk); static List *get_fk_constraints(Oid reloid); @@ -268,7 +268,7 @@ modify_compressed_toast_table_storage(CompressionSettings *settings, List *colde } } -static void +void create_compressed_chunk_indexes(Chunk *chunk, CompressionSettings *settings) { IndexStmt stmt = { @@ -278,10 +278,7 @@ create_compressed_chunk_indexes(Chunk *chunk, CompressionSettings *settings) .relation = makeRangeVar(NameStr(chunk->fd.schema_name), NameStr(chunk->fd.table_name), 0), .tableSpace = get_tablespace_name(get_rel_tablespace(chunk->table_id)), }; - IndexElem sequence_num_elem = { - .type = T_IndexElem, - .name = COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME, - }; + NameData index_name; ObjectAddress index_addr; HeapTuple index_tuple; @@ -309,8 +306,62 @@ create_compressed_chunk_indexes(Chunk *chunk, CompressionSettings *settings) return; } - appendStringInfoString(buf, COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME); - indexcols = lappend(indexcols, &sequence_num_elem); + SortByDir ordering; + SortByNulls nulls_ordering; + + + StringInfo orderby_buf = makeStringInfo(); + for (int i = 1; i <= ts_array_length(settings->fd.orderby); i++) + { + + resetStringInfo(orderby_buf); + /* Add min metadata column */ + IndexElem *orderby_min_elem = makeNode(IndexElem); + orderby_min_elem->name = column_segment_min_name(i); + if (ts_array_get_element_bool(settings->fd.orderby_desc, i)) + { + appendStringInfoString(orderby_buf, " DESC"); + ordering = SORTBY_DESC; + } else { + appendStringInfoString(orderby_buf, " ASC"); + ordering = SORTBY_ASC; + } + orderby_min_elem->ordering = ordering; + + if (ts_array_get_element_bool(settings->fd.orderby_nullsfirst, i)) + { + if (orderby_min_elem->ordering != SORTBY_DESC) + { + appendStringInfoString(orderby_buf, " NULLS FIRST"); + nulls_ordering = SORTBY_NULLS_FIRST; + } else { + nulls_ordering = SORTBY_NULLS_DEFAULT; + } + } else { + if (orderby_min_elem->ordering != SORTBY_DESC) + { + nulls_ordering = SORTBY_NULLS_DEFAULT; + } else { + appendStringInfoString(orderby_buf, " NULLS LAST"); + nulls_ordering = SORTBY_NULLS_LAST; + } + } + orderby_min_elem->nulls_ordering = nulls_ordering; + appendStringInfoString(buf, orderby_min_elem->name); + appendStringInfoString(buf, orderby_buf->data); + appendStringInfoString(buf, ", "); + indexcols = lappend(indexcols, orderby_min_elem); + + /* Add max metadata column */ + IndexElem *orderby_max_elem = makeNode(IndexElem); + orderby_max_elem->name = column_segment_max_name(i); + orderby_max_elem->ordering = orderby_min_elem->ordering; + orderby_max_elem->nulls_ordering = orderby_min_elem->nulls_ordering; + appendStringInfoString(buf, orderby_max_elem->name); + appendStringInfoString(buf, orderby_buf->data); + appendStringInfoString(buf, ", "); + indexcols = lappend(indexcols, orderby_max_elem); + } stmt.indexParams = indexcols; index_addr = DefineIndexCompat(chunk->table_id, diff --git a/tsl/src/compression/compression_storage.h b/tsl/src/compression/compression_storage.h index 832bb96eefe..30801ad8e42 100644 --- a/tsl/src/compression/compression_storage.h +++ b/tsl/src/compression/compression_storage.h @@ -19,3 +19,4 @@ int32 compression_hypertable_create(Hypertable *ht, Oid owner, Oid tablespace_oi Oid compression_chunk_create(Chunk *src_chunk, Chunk *chunk, List *column_defs, Oid tablespace_oid); void modify_compressed_toast_table_storage(CompressionSettings *settings, List *coldefs, Oid compress_relid); +void create_compressed_chunk_indexes(Chunk *chunk, CompressionSettings *settings); diff --git a/tsl/src/compression/create.c b/tsl/src/compression/create.c index d6d6e8eba76..8b3e7c7b6e5 100644 --- a/tsl/src/compression/create.c +++ b/tsl/src/compression/create.c @@ -329,11 +329,7 @@ build_columndefs(CompressionSettings *settings, Oid src_relid) * Sequence number should probably go after all orderby columns, but we * put it here for simplicity. */ - List *all_column_defs = list_make2(makeColumnDef(COMPRESSION_COLUMN_METADATA_COUNT_NAME, - INT4OID, - -1 /* typemod */, - 0 /*collation*/), - makeColumnDef(COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME, + List *all_column_defs = list_make1(makeColumnDef(COMPRESSION_COLUMN_METADATA_COUNT_NAME, INT4OID, -1 /* typemod */, 0 /*collation*/)); diff --git a/tsl/src/nodes/decompress_chunk/decompress_chunk.c b/tsl/src/nodes/decompress_chunk/decompress_chunk.c index 0ae2f2948af..6f113331acc 100644 --- a/tsl/src/nodes/decompress_chunk/decompress_chunk.c +++ b/tsl/src/nodes/decompress_chunk/decompress_chunk.c @@ -79,73 +79,6 @@ is_compressed_column(CompressionInfo *info, Oid type) return type == info->compresseddata_oid; } -static EquivalenceClass * -append_ec_for_seqnum(PlannerInfo *root, CompressionInfo *info, SortInfo *sort_info, Var *var, - Oid sortop, bool nulls_first) -{ - MemoryContext oldcontext = MemoryContextSwitchTo(root->planner_cxt); - - Oid opfamily, opcintype, equality_op; - int16 strategy; - List *opfamilies; - EquivalenceClass *newec = makeNode(EquivalenceClass); - EquivalenceMember *em = makeNode(EquivalenceMember); - - /* Find the operator in pg_amop --- failure shouldn't happen */ - if (!get_ordering_op_properties(sortop, &opfamily, &opcintype, &strategy)) - elog(ERROR, "operator %u is not a valid ordering operator", sortop); - - /* - * EquivalenceClasses need to contain opfamily lists based on the family - * membership of mergejoinable equality operators, which could belong to - * more than one opfamily. So we have to look up the opfamily's equality - * operator and get its membership. - */ - equality_op = get_opfamily_member(opfamily, opcintype, opcintype, BTEqualStrategyNumber); - if (!OidIsValid(equality_op)) /* shouldn't happen */ - elog(ERROR, - "missing operator %d(%u,%u) in opfamily %u", - BTEqualStrategyNumber, - opcintype, - opcintype, - opfamily); - opfamilies = get_mergejoin_opfamilies(equality_op); - if (!opfamilies) /* certainly should find some */ - elog(ERROR, "could not find opfamilies for equality operator %u", equality_op); - - em->em_expr = (Expr *) var; - em->em_relids = bms_make_singleton(info->compressed_rel->relid); -#if PG16_LT - em->em_nullable_relids = NULL; -#endif - em->em_is_const = false; - em->em_is_child = false; - em->em_datatype = INT4OID; - - newec->ec_opfamilies = list_copy(opfamilies); - newec->ec_collation = 0; - newec->ec_members = list_make1(em); - newec->ec_sources = NIL; - newec->ec_derives = NIL; - newec->ec_relids = bms_make_singleton(info->compressed_rel->relid); - newec->ec_has_const = false; - newec->ec_has_volatile = false; -#if PG16_LT - newec->ec_below_outer_join = false; -#endif - newec->ec_broken = false; - newec->ec_sortref = 0; - newec->ec_min_security = UINT_MAX; - newec->ec_max_security = 0; - newec->ec_merged = NULL; - - root->eq_classes = lappend(root->eq_classes, newec); - - MemoryContextSwitchTo(oldcontext); - - return newec; -} - static void build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chunk_pathkeys, CompressionInfo *info) @@ -153,6 +86,7 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu Var *var; int varattno; List *required_compressed_pathkeys = NIL; + ListCell *lc = NULL; PathKey *pk; /* @@ -170,7 +104,6 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu * seen from the start, so that we arrive at the proper counts of seen * segmentby columns in the end. */ - ListCell *lc; for (lc = list_head(chunk_pathkeys); lc; lc = lnext(chunk_pathkeys, lc)) { PathKey *pk = lfirst(lc); @@ -205,39 +138,111 @@ build_compressed_scan_pathkeys(SortInfo *sort_info, PlannerInfo *root, List *chu */ if (sort_info->needs_sequence_num) { - bool nulls_first; - Oid sortop; - varattno = - get_attnum(info->compressed_rte->relid, COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME); - var = makeVar(info->compressed_rel->relid, varattno, INT4OID, -1, InvalidOid, 0); - - if (sort_info->reverse) + /* If there are no segmentby pathkeys, start from the beginning of the list */ + if (info->num_segmentby_columns == 0) { - sortop = get_commutator(Int4LessOperator); - nulls_first = true; + lc = list_head(chunk_pathkeys); } - else + Assert(lc != NULL); + Expr *expr; + char *column_name; + for (; lc != NULL; lc = lnext(chunk_pathkeys, lc)) { - sortop = Int4LessOperator; - nulls_first = false; - } - - /* - * Create the EquivalenceClass for the sequence number column of this - * compressed chunk, so that we can build the PathKey that refers to it. - */ - EquivalenceClass *ec = - append_ec_for_seqnum(root, info, sort_info, var, sortop, nulls_first); + pk = lfirst(lc); + expr = find_em_expr_for_rel(pk->pk_eclass, info->chunk_rel); - /* Find the operator in pg_amop --- failure shouldn't happen. */ - Oid opfamily, opcintype; - int16 strategy; - if (!get_ordering_op_properties(sortop, &opfamily, &opcintype, &strategy)) - elog(ERROR, "operator %u is not a valid ordering operator", sortop); + Assert(expr != NULL && IsA(expr, Var)); + var = castNode(Var, expr); + Assert(var->varattno > 0); - pk = make_canonical_pathkey(root, ec, opfamily, strategy, nulls_first); + column_name = get_attname(info->chunk_rte->relid, var->varattno, false); + int16 orderby_index = ts_array_position(info->settings->fd.orderby, column_name); + varattno = get_attnum(info->compressed_rte->relid, column_segment_min_name(orderby_index)); + Assert(orderby_index != 0); + var = makeVar(info->compressed_rel->relid, varattno, var->vartype, var->vartypmod, var->varcollid, var->varlevelsup); + bool orderby_desc = + ts_array_get_element_bool(info->settings->fd.orderby_desc, orderby_index); + bool orderby_nullsfirst = + ts_array_get_element_bool(info->settings->fd.orderby_nullsfirst, orderby_index); + + bool nulls_first; + int16 strategy; + + if (sort_info->reverse) + { + strategy = orderby_desc ? BTLessStrategyNumber : BTGreaterStrategyNumber; + nulls_first = !orderby_nullsfirst; + } + else + { + strategy = orderby_desc ? BTGreaterStrategyNumber : BTLessStrategyNumber; + nulls_first = orderby_nullsfirst; + } + MemoryContext oldcontext = MemoryContextSwitchTo(root->planner_cxt); + EquivalenceMember *em = makeNode(EquivalenceMember); - required_compressed_pathkeys = lappend(required_compressed_pathkeys, pk); + em->em_expr = (Expr *)var; + em->em_relids = bms_make_singleton(info->compressed_rte->relid); + em->em_is_const = false; + em->em_is_child = false; + em->em_datatype = var->vartype; + EquivalenceClass *ec = makeNode(EquivalenceClass); + ec->ec_opfamilies = pk->pk_eclass->ec_opfamilies; + ec->ec_collation = pk->pk_eclass->ec_collation; + ec->ec_members = list_make1(em); + ec->ec_sources = list_copy(pk->pk_eclass->ec_sources); + ec->ec_derives = list_copy(pk->pk_eclass->ec_derives); + ec->ec_relids = bms_make_singleton(info->compressed_rel->relid); + ec->ec_has_const = pk->pk_eclass->ec_has_const; + ec->ec_has_volatile = pk->pk_eclass->ec_has_volatile; +#if PG16_LT + ec->ec_below_outer_join = pk->pk_eclass->ec_below_outer_join; +#endif + ec->ec_broken = pk->pk_eclass->ec_broken; + ec->ec_sortref = pk->pk_eclass->ec_sortref; + ec->ec_min_security = pk->pk_eclass->ec_min_security; + ec->ec_max_security = pk->pk_eclass->ec_max_security; + ec->ec_merged = pk->pk_eclass->ec_merged; + MemoryContextSwitchTo(oldcontext); + root->eq_classes = lappend(root->eq_classes, ec); + PathKey *min = + make_canonical_pathkey(root, ec, pk->pk_opfamily, strategy, nulls_first); + required_compressed_pathkeys = lappend(required_compressed_pathkeys, min); + + varattno = get_attnum(info->compressed_rte->relid, column_segment_max_name(orderby_index)); + var = makeVar(info->compressed_rel->relid, varattno, var->vartype, var->vartypmod, var->varcollid, var->varlevelsup); + oldcontext = MemoryContextSwitchTo(root->planner_cxt); + EquivalenceMember *emm = makeNode(EquivalenceMember); + + emm->em_expr = (Expr *)var; + emm->em_relids = bms_make_singleton(info->compressed_rte->relid); + emm->em_is_const = false; + emm->em_is_child = false; + emm->em_datatype = var->vartype; + ec = makeNode(EquivalenceClass); + ec->ec_opfamilies = pk->pk_eclass->ec_opfamilies; + ec->ec_collation = pk->pk_eclass->ec_collation; + ec->ec_members = list_make1(emm); + ec->ec_sources = list_copy(pk->pk_eclass->ec_sources); + ec->ec_derives = list_copy(pk->pk_eclass->ec_derives); + ec->ec_relids = bms_make_singleton(info->compressed_rel->relid); + ec->ec_has_const = pk->pk_eclass->ec_has_const; + ec->ec_has_volatile = pk->pk_eclass->ec_has_volatile; +#if PG16_LT + ec->ec_below_outer_join = pk->pk_eclass->ec_below_outer_join; +#endif + ec->ec_broken = pk->pk_eclass->ec_broken; + ec->ec_sortref = pk->pk_eclass->ec_sortref; + ec->ec_min_security = pk->pk_eclass->ec_min_security; + ec->ec_max_security = pk->pk_eclass->ec_max_security; + ec->ec_merged = pk->pk_eclass->ec_merged; + MemoryContextSwitchTo(oldcontext); + root->eq_classes = lappend(root->eq_classes, ec); + PathKey *max = + make_canonical_pathkey(root, ec, pk->pk_opfamily, strategy, nulls_first); + + required_compressed_pathkeys = lappend(required_compressed_pathkeys, max); + } } sort_info->required_compressed_pathkeys = required_compressed_pathkeys; } @@ -1216,13 +1221,20 @@ compressed_rel_setup_reltarget(RelOptInfo *compressed_rel, CompressionInfo *info COMPRESSION_COLUMN_METADATA_COUNT_NAME, &attrs_used); - /* add the segment order column if we may try to order by it */ + /* add the orderby metadata columns if we try to order by them*/ if (needs_sequence_num) { - compressed_reltarget_add_var_for_column(compressed_rel, - compressed_relid, - COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME, - &attrs_used); + for (int i = 1; i <= ts_array_length(info->settings->fd.orderby); i++) + { + compressed_reltarget_add_var_for_column(compressed_rel, + compressed_relid, + column_segment_min_name(i), + &attrs_used); + compressed_reltarget_add_var_for_column(compressed_rel, + compressed_relid, + column_segment_max_name(i), + &attrs_used); + } } /* diff --git a/tsl/src/nodes/decompress_chunk/planner.c b/tsl/src/nodes/decompress_chunk/planner.c index cb03bb26eb0..e786ab95d25 100644 --- a/tsl/src/nodes/decompress_chunk/planner.c +++ b/tsl/src/nodes/decompress_chunk/planner.c @@ -241,7 +241,6 @@ build_decompression_map(DecompressionMapContext *context, List *compressed_scan_ * targetlist. */ bool missing_count = true; - bool missing_sequence = path->needs_sequence_num; Bitmapset *uncompressed_attrs_found = NULL; Bitmapset *selectedCols = NULL; @@ -365,12 +364,6 @@ build_decompression_map(DecompressionMapContext *context, List *compressed_scan_ destination_attno = DECOMPRESS_CHUNK_COUNT_ID; missing_count = false; } - else if (path->needs_sequence_num && - strcmp(column_name, COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME) == 0) - { - destination_attno = DECOMPRESS_CHUNK_SEQUENCE_NUM_ID; - missing_sequence = false; - } } const bool is_segment = ts_array_is_member(info->settings->fd.segmentby, column_name); @@ -428,10 +421,6 @@ build_decompression_map(DecompressionMapContext *context, List *compressed_scan_ elog(ERROR, "the count column was not found in the compressed targetlist"); } - if (missing_sequence) - { - elog(ERROR, "the sequence column was not found in the compressed scan targetlist"); - } /* * If possible, try to make the custom scan targetlist same as the required