Skip to content

Commit

Permalink
[FileCopy based bootstrap] GetDataChunk Api (#3013)
Browse files Browse the repository at this point in the history
* Api2 WIP

* Api2 WIP

* Api2 WIP

* Api2 WIP

* Api2 WIP

* Fixing broken test (#3020)

* Fixing broken test

* Adding tests

* PR comments

* PR comments
  • Loading branch information
DevenAhluwalia authored Mar 3, 2025
1 parent 36aefbb commit d14df63
Show file tree
Hide file tree
Showing 17 changed files with 923 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,12 @@ default void handleUndeleteRequest(NetworkRequest request) throws InterruptedExc
* @throws InterruptedException if request processing is interrupted.
*/
void handleFileCopyGetMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException;

/**
* Get a chunk of a file (log segment, index file, bloom file) in a partition.
* @param request the request that contains the partition, filename, size and offset of the requested file chunk.
* @throws InterruptedException if request processing is interrupted.
* @throws IOException if there are I/O errors carrying our the required operation.
*/
void handleFileCopyGetChunkRequest(NetworkRequest request) throws InterruptedException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
*/
package com.github.ambry.server;

import com.github.ambry.clustermap.ClusterParticipant;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.store.PartitionFileStore;
import com.github.ambry.store.Store;
import com.github.ambry.store.StoreException;
import java.io.IOException;
import java.nio.file.FileStore;
import java.util.Collection;
import java.util.List;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -92,7 +91,7 @@ public interface StoreManager {
* @return the {@link FileStore} corresponding to the given {@link PartitionId}, or {@code null} if no store was found for
* that partition, or that store was not started.
*/
FileStore getFileStore(PartitionId id);
PartitionFileStore getFileStore(PartitionId id);

/**
* Get replicaId on current node by partition name. (There should be at most one replica belonging to specific
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright 2025 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.store;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.List;


/**
* Represents a store that contains log segments.
* Provides methods to get a ByteBuffer for a file chunk and to put a chunk to a file.
* Also provides methods to persist and read metadata to/from a partition.
*/
public interface PartitionFileStore {
/**
* Get a ByteBuffer for a file chunk.
* @param fileName the name of the requested file. This could be a log segment, index segment or bloom filter.
* @param offset the start offset of the requested chunk.
* @param size the size of the requested chunk in bytes.
* @return a StoreFileChunk representing the chunk stream of the file requested.
* @throws StoreException
*/
StoreFileChunk getByteBufferForFileChunk(String fileName, long offset, long size) throws StoreException;

/**
* Put a chunk to a file.
* @param outputFilePath the path of the file to put the chunk to.
* @param dataInputStream the chunk stream of the file to put.
* @throws IOException
*/
void putChunkToFile(String outputFilePath, DataInputStream dataInputStream) throws IOException;

/**
* Persist metadata for a partition. This metadata contains information about log segments, associated index segments and bloom filters.
* @param logInfoList the list of LogInfo objects to persist.
* @throws IOException
*/
void persistMetaDataToFile(List<LogInfo> logInfoList) throws IOException;

/**
* Read metadata from a partition. This metadata contains information about log segments, associated index segments and bloom filters.
* @return
* @throws IOException
*/
List<LogInfo> readMetaDataFromFile() throws IOException;
}
86 changes: 86 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/StoreFileChunk.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright 2025 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.store;

import com.github.ambry.utils.ByteBufferInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;


/**
* Represents a file chunk that exists in the store
*/
public class StoreFileChunk {
/**
* The chunk stream of the file requested.
*/
private final DataInputStream stream;

/**
* The size of the chunk in bytes.
*/
private final long chunkLength;

/**
* Constructor to create a StoreFileChunk
* @param stream the chunk stream of the file requested
* @param chunkLength the size of the chunk in bytes
*/
public StoreFileChunk(DataInputStream stream, long chunkLength) {
Objects.requireNonNull(stream, "DataInputStream cannot be null");

this.stream = stream;
this.chunkLength = chunkLength;
}

/**
* Create a StoreFileChunk from a ByteBuffer
* @param buf the ByteBuffer to create the StoreFileChunk from
* @return StoreFileChunk representing the chunk stream of the file requested
*/
public static StoreFileChunk from(ByteBuffer buf) {
Objects.requireNonNull(buf, "ByteBuffer cannot be null");
return new StoreFileChunk(
new DataInputStream(new ByteBufferInputStream(buf)), buf.remaining());
}

/**
* Convert the chunk stream to a ByteBuffer
* @return ByteBuffer representing the chunk stream of the file requested
* @throws IOException
*/
public ByteBuffer toBuffer() throws IOException {
byte[] buf = new byte[(int) chunkLength];
stream.readFully(buf);
return ByteBuffer.wrap(buf);
}

/**
* Get the chunk stream of the file requested
* @return DataInputStream representing the chunk stream of the file requested
*/
public DataInputStream getStream() {
return stream;
}

/**
* Get the size of the chunk in bytes
* @return the size of the chunk in bytes
*/
public long getChunkLength() {
return chunkLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.server.StoreManager;
import com.github.ambry.store.PartitionFileStore;
import com.github.ambry.store.Store;
import java.nio.file.FileStore;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -110,7 +110,7 @@ public Store getStore(PartitionId id) {
}

@Override
public FileStore getFileStore(PartitionId id) {
public PartitionFileStore getFileStore(PartitionId id) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ public class ServerMetrics {
public final Meter fileCopyGetMetadataRequestRate;
public final Meter fileCopyGetMetadataDroppedRate;

public final Histogram fileCopyGetChunkRequestQueueTimeInMs;
public final Histogram fileCopyGetChunkProcessingTimeInMs;
public final Histogram fileCopyGetChunkResponseQueueTimeInMs;
public final Histogram fileCopyGetChunkSendTimeInMs;
public final Histogram fileCopyGetChunkTotalTimeInMs;
public final Meter fileCopyGetChunkRequestRate;
public final Meter fileCopyGetChunkDroppedRate;


public final Histogram batchDeleteBlobRequestQueueTimeInMs;
public final Histogram batchDeleteBlobProcessingTimeInMs;
Expand Down Expand Up @@ -444,6 +452,21 @@ public ServerMetrics(MetricRegistry registry, Class<?> requestClass, Class<?> se
fileCopyGetMetadataDroppedRate =
registry.meter(MetricRegistry.name(requestClass, "FileCopyGetMetadataDroppedRate"));

fileCopyGetChunkRequestQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetChunkRequestQueueTimeInMs"));
fileCopyGetChunkProcessingTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetChunkProcessingTimeInMs"));
fileCopyGetChunkResponseQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetChunkResponseQueueTimeInMs"));
fileCopyGetChunkSendTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetChunkSendTimeInMs"));
fileCopyGetChunkTotalTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "FileCopyGetChunkTotalTimeInMs"));
fileCopyGetChunkRequestRate =
registry.meter(MetricRegistry.name(requestClass, "FileCopyGetChunkRequestRate"));
fileCopyGetChunkDroppedRate =
registry.meter(MetricRegistry.name(requestClass, "FileCopyGetChunkDroppedRate"));

batchDeleteBlobRequestQueueTimeInMs =
registry.histogram(MetricRegistry.name(requestClass, "BatchDeleteBlobRequestQueueTimeInMs"));
batchDeleteBlobProcessingTimeInMs = registry.histogram(MetricRegistry.name(requestClass, "BatchDeleteBlobProcessingTimeInMs"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.ambry.protocol.AdminResponse;
import com.github.ambry.protocol.DeleteRequest;
import com.github.ambry.protocol.DeleteResponse;
import com.github.ambry.protocol.FileCopyGetChunkRequest;
import com.github.ambry.protocol.FileCopyGetMetaDataRequest;
import com.github.ambry.protocol.FileCopyGetMetaDataResponse;
import com.github.ambry.protocol.GetRequest;
Expand Down Expand Up @@ -101,6 +102,9 @@ public RequestOrResponse getDecodedRequest(NetworkRequest networkRequest) throws
case FileCopyGetMetaDataRequest:
request = FileCopyGetMetaDataRequest.readFrom(dis, clusterMap);
break;
case FileCopyGetChunkRequest:
request = FileCopyGetChunkRequest.readFrom(dis, clusterMap);
break;
default:
throw new UnsupportedOperationException("Request type not supported");
}
Expand Down Expand Up @@ -161,6 +165,9 @@ public Response createErrorResponse(RequestOrResponse request, ServerErrorCode s
case FileCopyGetMetaDataRequest:
response = new FileCopyGetMetaDataResponse(request.getCorrelationId(), request.getClientId(), serverErrorCode);
break;
case FileCopyGetChunkRequest:
response = new FileCopyGetMetaDataResponse(request.getCorrelationId(), request.getClientId(), serverErrorCode);
break;
default:
throw new UnsupportedOperationException("Request type not supported");
}
Expand Down
Loading

0 comments on commit d14df63

Please sign in to comment.