Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding file copy thread #3021

Merged
merged 2 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright 2025 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.filetransfer;

/**
* Interface for FileCopyHandler. This Handler does file copy of
* data from the given node to current node
*/
public interface FileCopyHandler {

/**
* do the file copy
* @throws Exception exception
*/
void copy() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright 2025 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.filetransfer;

/**
* Interface for Factory class which returns the {@link FileCopyHandler} depending on the implementation
*/
public interface FileCopyHandlerFactory {

/**
* @return returns the {@link FileCopyHandler}
*/
FileCopyHandler getFileCopyHandler();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright 2025 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.filetransfer;

/**
* This interface contains methods which will be called post {@link FileCopyHandler} is
* completed in {@link FileCopyThread}
*/
public interface FileCopyStatusListener {
/**
* This will be called when file copy is successful i.e.
* no exception from {@link FileCopyHandler}
*/
void onFileCopySuccess();

/**
* This will be called when file copy is failed i.e.
* exception from {@link FileCopyHandler}
* @param e exception
*/
void onFileCopyFailure(Exception e);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright 2025 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.filetransfer;

import java.util.Objects;
import javax.annotation.Nonnull;


/**
* Thread which will run the logic for FileCopy and will notify the listener
* whether File Copy succeeded or Failed.
*/
public class FileCopyThread implements Runnable {
private final FileCopyStatusListener fileCopyStatusListener;
private final FileCopyHandler fileCopyHandler;

FileCopyThread(@Nonnull FileCopyHandler fileCopyHandler, @Nonnull FileCopyStatusListener fileCopyStatusListener) {
Objects.requireNonNull(fileCopyHandler, "fileCopyHandler must not be null");
Objects.requireNonNull(fileCopyStatusListener, "fileCopyStatusListener must not be null");

this.fileCopyStatusListener = fileCopyStatusListener;
this.fileCopyHandler = fileCopyHandler;
}

@Override
public void run() {
try {
//TODO add required params for File copy handler
fileCopyHandler.copy();
fileCopyStatusListener.onFileCopySuccess();
} catch (Exception e) {
fileCopyStatusListener.onFileCopyFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright 2025 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.filetransfer;

import com.github.ambry.filecopy.MockFileCopyHandlerFactory;
import com.github.ambry.filecopy.MockNoOpFileCopyHandler;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;

import static org.junit.Assert.*;


public class FileCopyThreadTest {

/**
* Tests when fileCopyHandler is successful and onFileCopySuccess is called.
* and onFileCopyFailure is not called
*/
@Test
public void testFileCopyThreadHandlerSuccess() {
final Map<String, Integer> successFailCount = new HashMap<>();
successFailCount.putIfAbsent("success", 0);
successFailCount.putIfAbsent("fail", 0);
FileCopyHandler fileCopyHandler = new MockFileCopyHandlerFactory().getFileCopyHandler();
Thread fileCopyThreadThread = getThread(successFailCount, fileCopyHandler);
fileCopyThreadThread.start();
try {
fileCopyThreadThread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertEquals(1, successFailCount.get("success").intValue());
assertEquals(0, successFailCount.get("fail").intValue());
}

/**
* Tests when fileCopyHandler is failed and onFileCopyFailure is called.
* and onFileCopySuccess is not called
*/
@Test
public void testFileCopyThreadHandlerFailure() {
final Map<String, Integer> successFailCount = new HashMap<>();
successFailCount.putIfAbsent("success", 0);
successFailCount.putIfAbsent("fail", 0);
FileCopyHandler fileCopyHandler = new MockFileCopyHandlerFactory().getFileCopyHandler();
((MockNoOpFileCopyHandler) fileCopyHandler).setException(new Exception());
Thread fileCopyThreadThread = getThread(successFailCount, fileCopyHandler);
fileCopyThreadThread.start();
try {
fileCopyThreadThread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertEquals(0, successFailCount.get("success").intValue());
assertEquals(1, successFailCount.get("fail").intValue());
}

private static Thread getThread(Map<String, Integer> successFailCount, FileCopyHandler fileCopyHandler) {
FileCopyStatusListener fileCopyStatusListener = new FileCopyStatusListener() {
@Override
public void onFileCopySuccess() {
successFailCount.put("success", successFailCount.get("success") + 1);
}

@Override
public void onFileCopyFailure(Exception e) {
successFailCount.put("fail", successFailCount.get("fail") + 1);
}
};

FileCopyThread fileCopyThread = new FileCopyThread(fileCopyHandler, fileCopyStatusListener);
Thread fileCopyThreadThread = new Thread(fileCopyThread);
return fileCopyThreadThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.github.ambry.filetransfer;

import com.github.ambry.clustermap.HardwareState;
import com.github.ambry.clustermap.MockClusterMap;
import com.github.ambry.clustermap.MockDataNodeId;
import com.github.ambry.clustermap.MockPartitionId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright 2025 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.filecopy;

import com.github.ambry.filetransfer.FileCopyHandler;
import com.github.ambry.filetransfer.FileCopyHandlerFactory;


public class MockFileCopyHandlerFactory implements FileCopyHandlerFactory {
public MockFileCopyHandlerFactory() {

}

@Override
public FileCopyHandler getFileCopyHandler() {
return new MockNoOpFileCopyHandler();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2025 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.filecopy;

import com.github.ambry.filetransfer.FileCopyHandler;


public class MockNoOpFileCopyHandler implements FileCopyHandler {

private Exception exception;

public MockNoOpFileCopyHandler() {
this.exception = null;
}

@Override
public void copy() throws Exception {
if (exception != null) {
throw exception;
}
}

public void setException(Exception exception) {
this.exception = exception;
}

public void clearException() {
this.exception = null;
}
}
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ project(':ambry-test-utils') {
compile project(":ambry-router")
compile project(":ambry-server")
compile project(":ambry-store")
compile project(":ambry-file-transfer")
compile "org.bouncycastle:bcpkix-jdk15on:$bouncycastleVersion"
compile("org.apache.hadoop:hadoop-common:$hadoopCommonVersion") {
exclude group: "org.bouncycastle"
Expand Down
Loading