diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 2b9cf8f954ef..7e7ac809d808 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -67,6 +67,7 @@ import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.ControllerConf; @@ -1974,6 +1975,22 @@ private Set findConsumingSegments(IdealState idealState) { } } }); + // For pauseless tables, a segment marked ONLINE in the ideal state may not have been committed yet. + // We rely on SegmentZkMetadata to determine whether a segment has been committed (status is DONE) + // instead of relying solely on the ideal state. + // A segment in COMMITTING state is treated as consuming for pauseStatus. + String tableNameWithType = idealState.getResourceName(); + if (PauselessConsumptionUtils.isPauselessEnabled(getTableConfig(tableNameWithType))) { + Map metadataMap = getLatestSegmentZKMetadataMap(tableNameWithType); + if (metadataMap != null) { + metadataMap.values() + .stream() + .filter(segmentZKMetadata -> !consumingSegments.contains(segmentZKMetadata.getSegmentName())) + .filter(segmentZKMetadata -> segmentZKMetadata.getStatus() == Status.COMMITTING) + .map(SegmentZKMetadata::getSegmentName) + .forEach(consumingSegments::add); + } + } return consumingSegments; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index d5969e611f91..e36644e53b11 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.Charset; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -57,6 +58,7 @@ import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.resources.PauseStatusDetails; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; @@ -64,10 +66,13 @@ import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.spi.config.table.PauseState; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.stream.LongMsgOffset; @@ -1248,6 +1253,60 @@ public void testGetPartitionIds() Assert.assertEquals(partitionIds.size(), 2); } + @Test + void testPauseStatus() { + // Set up a new table with 2 replicas, 5 instances, 4 partition + PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); + FakePinotLLCRealtimeSegmentManager segmentManager = + new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); + setUpNewTable(segmentManager, 2, 5, 4); + + // pause the table + PauseState pauseState = new PauseState(true, PauseState.ReasonCode.ADMINISTRATIVE, "comment", + new Timestamp(CURRENT_TIME_MS).toString()); + segmentManager._idealState.getRecord() + .getSimpleFields() + .put(PinotLLCRealtimeSegmentManager.PAUSE_STATE, pauseState.toJsonString()); + + // update the ideal state to ONLINE for the segment + String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); + Map instanceStatesMap = + segmentManager._idealState.getRecord().getMapFields().get(committingSegment); + instanceStatesMap.replaceAll((k, v) -> "ONLINE"); + segmentManager._idealState.getRecord().getMapFields().put(committingSegment, instanceStatesMap); + + // update the segment metadata to COMMITTING + SegmentZKMetadata segmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment); + segmentZKMetadata.setStatus(Status.COMMITTING); + segmentManager._segmentZKMetadataMap.put(committingSegment, segmentZKMetadata); + + PauseStatusDetails pauseStatusDetails = + segmentManager.getPauseStatusDetails(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME)); + + // assert that the segment is ONLINE and is not present as a consuming segment in the PauseStatusDetails + // as for non pauseless enabled tables the consuming segments are derived solely from ideal state + assertEquals(SegmentStateModel.ONLINE, segmentManager._idealState.getRecord().getMapFields().get(committingSegment) + .values().stream().findFirst().orElseThrow()); + assertFalse(pauseStatusDetails.getConsumingSegments().contains(committingSegment)); + + // enable pauseless for table + StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(null); + streamIngestionConfig.setPauselessConsumptionEnabled(true); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(streamIngestionConfig); + segmentManager._tableConfig.setIngestionConfig(ingestionConfig); + + pauseStatusDetails = + segmentManager.getPauseStatusDetails(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME)); + + // assert that the segment is ONLINE and is present as a consuming segment in the PauseStatusDetails + // as it is still COMMITTING + + assertEquals(SegmentStateModel.ONLINE, segmentManager._idealState.getRecord().getMapFields().get(committingSegment) + .values().stream().findFirst().orElseThrow()); + assertTrue(pauseStatusDetails.getConsumingSegments().contains(committingSegment)); + } + @Test public void getSegmentsYetToBeCommitted() { PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);