From f9461117f587bd5df756970e6725d80544387860 Mon Sep 17 00:00:00 2001 From: Aman Khanchandani Date: Thu, 23 Jan 2025 17:16:00 +0530 Subject: [PATCH 1/3] Return segments in COMMITTING status in the pauseStatus API for pauseless enabled tables --- .../PinotLLCRealtimeSegmentManager.java | 13 ++++ .../PinotLLCRealtimeSegmentManagerTest.java | 59 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 3ed88967c67f..9d481398dffc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -67,6 +67,7 @@ import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.ControllerConf; @@ -1985,6 +1986,18 @@ private Set findConsumingSegments(IdealState idealState) { public PauseStatusDetails getPauseStatusDetails(String tableNameWithType) { IdealState idealState = getIdealState(tableNameWithType); Set consumingSegments = findConsumingSegments(idealState); + // For pauseless tables, a segment marked ONLINE in the ideal state may not have been committed yet. + // We rely on SegmentZkMetadata to determine whether a segment has been committed (status is DONE) + // instead of relying solely on the ideal state. + // A segment in COMMITTING state is treated as consuming for pauseStatus. + if (PauselessConsumptionUtils.isPauselessEnabled(getTableConfig(tableNameWithType))) { + getLatestSegmentZKMetadataMap(tableNameWithType).values() + .stream() + .filter(segmentZKMetadata -> !consumingSegments.contains(segmentZKMetadata.getSegmentName())) + .filter(segmentZKMetadata -> segmentZKMetadata.getStatus() == Status.COMMITTING) + .map(SegmentZKMetadata::getSegmentName) + .forEach(consumingSegments::add); + } PauseState pauseState = extractTablePauseState(idealState); if (pauseState != null) { return new PauseStatusDetails(pauseState.isPaused(), consumingSegments, pauseState.getReasonCode(), diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index dbe640d36400..ff3359badff2 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.Charset; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -56,6 +57,7 @@ import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.resources.PauseStatusDetails; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; @@ -63,10 +65,13 @@ import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.spi.config.table.PauseState; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.stream.LongMsgOffset; @@ -1247,6 +1252,60 @@ public void testGetPartitionIds() Assert.assertEquals(partitionIds.size(), 2); } + @Test + void testPauseStatus() { + // Set up a new table with 2 replicas, 5 instances, 4 partition + PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); + FakePinotLLCRealtimeSegmentManager segmentManager = + new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); + setUpNewTable(segmentManager, 2, 5, 4); + + // pause the table + PauseState pauseState = new PauseState(true, PauseState.ReasonCode.ADMINISTRATIVE, "comment", + new Timestamp(CURRENT_TIME_MS).toString()); + segmentManager._idealState.getRecord() + .getSimpleFields() + .put(PinotLLCRealtimeSegmentManager.PAUSE_STATE, pauseState.toJsonString()); + + // update the ideal state to ONLINE for the segment + String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); + Map instanceStatesMap = + segmentManager._idealState.getRecord().getMapFields().get(committingSegment); + instanceStatesMap.replaceAll((k, v) -> "ONLINE"); + segmentManager._idealState.getRecord().getMapFields().put(committingSegment, instanceStatesMap); + + // update the segment metadata to COMMITTING + SegmentZKMetadata segmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment); + segmentZKMetadata.setStatus(Status.COMMITTING); + segmentManager._segmentZKMetadataMap.put(committingSegment, segmentZKMetadata); + + PauseStatusDetails pauseStatusDetails = + segmentManager.getPauseStatusDetails(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME)); + + // assert that the segment is ONLINE and is not present as a consuming segment in the PauseStatusDetails + // as for non pauseless enabled tables the consuming segments are derived solely from ideal state + assertEquals(SegmentStateModel.ONLINE, segmentManager._idealState.getRecord().getMapFields().get(committingSegment) + .values().stream().findFirst().orElseThrow()); + assertFalse(pauseStatusDetails.getConsumingSegments().contains(committingSegment)); + + // enable pauseless for table + StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(null); + streamIngestionConfig.setPauselessConsumptionEnabled(true); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(streamIngestionConfig); + segmentManager._tableConfig.setIngestionConfig(ingestionConfig); + + pauseStatusDetails = + segmentManager.getPauseStatusDetails(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME)); + + // assert that the segment is ONLINE and is present as a consuming segment in the PauseStatusDetails + // as it is still COMMITTING + + assertEquals(SegmentStateModel.ONLINE, segmentManager._idealState.getRecord().getMapFields().get(committingSegment) + .values().stream().findFirst().orElseThrow()); + assertTrue(pauseStatusDetails.getConsumingSegments().contains(committingSegment)); + } + ////////////////////////////////////////////////////////////////////////////////// // Fake classes ///////////////////////////////////////////////////////////////////////////////// From eb2535c9095565aea6e0751d3c38db17132e226a Mon Sep 17 00:00:00 2001 From: Aman Khanchandani Date: Fri, 24 Jan 2025 13:41:48 +0530 Subject: [PATCH 2/3] refactoring the consuming segment logic and moving it to the right function --- .../PinotLLCRealtimeSegmentManager.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 9d481398dffc..63fa0820cd53 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1975,21 +1975,11 @@ private Set findConsumingSegments(IdealState idealState) { } } }); - return consumingSegments; - } - - /** - * Return pause status: - * - Information from the 'pauseState' in the table ideal state - * - list of consuming segments - */ - public PauseStatusDetails getPauseStatusDetails(String tableNameWithType) { - IdealState idealState = getIdealState(tableNameWithType); - Set consumingSegments = findConsumingSegments(idealState); // For pauseless tables, a segment marked ONLINE in the ideal state may not have been committed yet. // We rely on SegmentZkMetadata to determine whether a segment has been committed (status is DONE) // instead of relying solely on the ideal state. // A segment in COMMITTING state is treated as consuming for pauseStatus. + String tableNameWithType = idealState.getResourceName(); if (PauselessConsumptionUtils.isPauselessEnabled(getTableConfig(tableNameWithType))) { getLatestSegmentZKMetadataMap(tableNameWithType).values() .stream() @@ -1998,6 +1988,17 @@ public PauseStatusDetails getPauseStatusDetails(String tableNameWithType) { .map(SegmentZKMetadata::getSegmentName) .forEach(consumingSegments::add); } + return consumingSegments; + } + + /** + * Return pause status: + * - Information from the 'pauseState' in the table ideal state + * - list of consuming segments + */ + public PauseStatusDetails getPauseStatusDetails(String tableNameWithType) { + IdealState idealState = getIdealState(tableNameWithType); + Set consumingSegments = findConsumingSegments(idealState); PauseState pauseState = extractTablePauseState(idealState); if (pauseState != null) { return new PauseStatusDetails(pauseState.isPaused(), consumingSegments, pauseState.getReasonCode(), From 0c77011aedc91d32e6b096e352080d0f578aa0fb Mon Sep 17 00:00:00 2001 From: Aman Khanchandani Date: Fri, 24 Jan 2025 13:51:07 +0530 Subject: [PATCH 3/3] Adding null check for empty response --- .../realtime/PinotLLCRealtimeSegmentManager.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 63fa0820cd53..113ee5207af1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1981,12 +1981,15 @@ private Set findConsumingSegments(IdealState idealState) { // A segment in COMMITTING state is treated as consuming for pauseStatus. String tableNameWithType = idealState.getResourceName(); if (PauselessConsumptionUtils.isPauselessEnabled(getTableConfig(tableNameWithType))) { - getLatestSegmentZKMetadataMap(tableNameWithType).values() - .stream() - .filter(segmentZKMetadata -> !consumingSegments.contains(segmentZKMetadata.getSegmentName())) - .filter(segmentZKMetadata -> segmentZKMetadata.getStatus() == Status.COMMITTING) - .map(SegmentZKMetadata::getSegmentName) - .forEach(consumingSegments::add); + Map metadataMap = getLatestSegmentZKMetadataMap(tableNameWithType); + if (metadataMap != null) { + metadataMap.values() + .stream() + .filter(segmentZKMetadata -> !consumingSegments.contains(segmentZKMetadata.getSegmentName())) + .filter(segmentZKMetadata -> segmentZKMetadata.getStatus() == Status.COMMITTING) + .map(SegmentZKMetadata::getSegmentName) + .forEach(consumingSegments::add); + } } return consumingSegments; }