diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyHandler.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyHandler.java new file mode 100644 index 0000000000..0e46c5e416 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyHandler.java @@ -0,0 +1,27 @@ +/** + * Copyright 2025 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.filetransfer; + +/** + * Interface for FileCopyHandler. This Handler does file copy of + * data from the given node to current node + */ +public interface FileCopyHandler { + + /** + * do the file copy + * @throws Exception exception + */ + void copy() throws Exception; +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyHandlerFactory.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyHandlerFactory.java new file mode 100644 index 0000000000..54216ddc20 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyHandlerFactory.java @@ -0,0 +1,25 @@ +/** + * Copyright 2025 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.filetransfer; + +/** + * Interface for Factory class which returns the {@link FileCopyHandler} depending on the implementation + */ +public interface FileCopyHandlerFactory { + + /** + * @return returns the {@link FileCopyHandler} + */ + FileCopyHandler getFileCopyHandler(); +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyStatusListener.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyStatusListener.java new file mode 100644 index 0000000000..4507acf1af --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyStatusListener.java @@ -0,0 +1,33 @@ +/** + * Copyright 2025 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.filetransfer; + +/** + * This interface contains methods which will be called post {@link FileCopyHandler} is + * completed in {@link FileCopyThread} + */ +public interface FileCopyStatusListener { + /** + * This will be called when file copy is successful i.e. + * no exception from {@link FileCopyHandler} + */ + void onFileCopySuccess(); + + /** + * This will be called when file copy is failed i.e. + * exception from {@link FileCopyHandler} + * @param e exception + */ + void onFileCopyFailure(Exception e); +} diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyThread.java b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyThread.java new file mode 100644 index 0000000000..3527b71595 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/filetransfer/FileCopyThread.java @@ -0,0 +1,46 @@ +/** + * Copyright 2025 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.filetransfer; + +import java.util.Objects; +import javax.annotation.Nonnull; + + +/** + * Thread which will run the logic for FileCopy and will notify the listener + * whether File Copy succeeded or Failed. + */ +public class FileCopyThread implements Runnable { + private final FileCopyStatusListener fileCopyStatusListener; + private final FileCopyHandler fileCopyHandler; + + FileCopyThread(@Nonnull FileCopyHandler fileCopyHandler, @Nonnull FileCopyStatusListener fileCopyStatusListener) { + Objects.requireNonNull(fileCopyHandler, "fileCopyHandler must not be null"); + Objects.requireNonNull(fileCopyStatusListener, "fileCopyStatusListener must not be null"); + + this.fileCopyStatusListener = fileCopyStatusListener; + this.fileCopyHandler = fileCopyHandler; + } + + @Override + public void run() { + try { + //TODO add required params for File copy handler + fileCopyHandler.copy(); + fileCopyStatusListener.onFileCopySuccess(); + } catch (Exception e) { + fileCopyStatusListener.onFileCopyFailure(e); + } + } +} diff --git a/ambry-file-transfer/src/test/java/com/github/ambry/filetransfer/FileCopyThreadTest.java b/ambry-file-transfer/src/test/java/com/github/ambry/filetransfer/FileCopyThreadTest.java new file mode 100644 index 0000000000..353465bf38 --- /dev/null +++ b/ambry-file-transfer/src/test/java/com/github/ambry/filetransfer/FileCopyThreadTest.java @@ -0,0 +1,87 @@ +/** + * Copyright 2025 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.filetransfer; + +import com.github.ambry.filecopy.MockFileCopyHandlerFactory; +import com.github.ambry.filecopy.MockNoOpFileCopyHandler; +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class FileCopyThreadTest { + + /** + * Tests when fileCopyHandler is successful and onFileCopySuccess is called. + * and onFileCopyFailure is not called + */ + @Test + public void testFileCopyThreadHandlerSuccess() { + final Map successFailCount = new HashMap<>(); + successFailCount.putIfAbsent("success", 0); + successFailCount.putIfAbsent("fail", 0); + FileCopyHandler fileCopyHandler = new MockFileCopyHandlerFactory().getFileCopyHandler(); + Thread fileCopyThreadThread = getThread(successFailCount, fileCopyHandler); + fileCopyThreadThread.start(); + try { + fileCopyThreadThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(1, successFailCount.get("success").intValue()); + assertEquals(0, successFailCount.get("fail").intValue()); + } + + /** + * Tests when fileCopyHandler is failed and onFileCopyFailure is called. + * and onFileCopySuccess is not called + */ + @Test + public void testFileCopyThreadHandlerFailure() { + final Map successFailCount = new HashMap<>(); + successFailCount.putIfAbsent("success", 0); + successFailCount.putIfAbsent("fail", 0); + FileCopyHandler fileCopyHandler = new MockFileCopyHandlerFactory().getFileCopyHandler(); + ((MockNoOpFileCopyHandler) fileCopyHandler).setException(new Exception()); + Thread fileCopyThreadThread = getThread(successFailCount, fileCopyHandler); + fileCopyThreadThread.start(); + try { + fileCopyThreadThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(0, successFailCount.get("success").intValue()); + assertEquals(1, successFailCount.get("fail").intValue()); + } + + private static Thread getThread(Map successFailCount, FileCopyHandler fileCopyHandler) { + FileCopyStatusListener fileCopyStatusListener = new FileCopyStatusListener() { + @Override + public void onFileCopySuccess() { + successFailCount.put("success", successFailCount.get("success") + 1); + } + + @Override + public void onFileCopyFailure(Exception e) { + successFailCount.put("fail", successFailCount.get("fail") + 1); + } + }; + + FileCopyThread fileCopyThread = new FileCopyThread(fileCopyHandler, fileCopyStatusListener); + Thread fileCopyThreadThread = new Thread(fileCopyThread); + return fileCopyThreadThread; + } +} diff --git a/ambry-file-transfer/src/test/java/com/github/ambry/filetransfer/FileCopyUtilsTest.java b/ambry-file-transfer/src/test/java/com/github/ambry/filetransfer/FileCopyUtilsTest.java index a71c585fba..a203695c2e 100644 --- a/ambry-file-transfer/src/test/java/com/github/ambry/filetransfer/FileCopyUtilsTest.java +++ b/ambry-file-transfer/src/test/java/com/github/ambry/filetransfer/FileCopyUtilsTest.java @@ -13,7 +13,6 @@ */ package com.github.ambry.filetransfer; -import com.github.ambry.clustermap.HardwareState; import com.github.ambry.clustermap.MockClusterMap; import com.github.ambry.clustermap.MockDataNodeId; import com.github.ambry.clustermap.MockPartitionId; diff --git a/ambry-test-utils/src/main/java/com/github/ambry/filecopy/MockFileCopyHandlerFactory.java b/ambry-test-utils/src/main/java/com/github/ambry/filecopy/MockFileCopyHandlerFactory.java new file mode 100644 index 0000000000..bb9b1ad180 --- /dev/null +++ b/ambry-test-utils/src/main/java/com/github/ambry/filecopy/MockFileCopyHandlerFactory.java @@ -0,0 +1,29 @@ +/** + * Copyright 2025 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.filecopy; + +import com.github.ambry.filetransfer.FileCopyHandler; +import com.github.ambry.filetransfer.FileCopyHandlerFactory; + + +public class MockFileCopyHandlerFactory implements FileCopyHandlerFactory { + public MockFileCopyHandlerFactory() { + + } + + @Override + public FileCopyHandler getFileCopyHandler() { + return new MockNoOpFileCopyHandler(); + } +} diff --git a/ambry-test-utils/src/main/java/com/github/ambry/filecopy/MockNoOpFileCopyHandler.java b/ambry-test-utils/src/main/java/com/github/ambry/filecopy/MockNoOpFileCopyHandler.java new file mode 100644 index 0000000000..5babb2d76b --- /dev/null +++ b/ambry-test-utils/src/main/java/com/github/ambry/filecopy/MockNoOpFileCopyHandler.java @@ -0,0 +1,41 @@ +/** + * Copyright 2025 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.filecopy; + +import com.github.ambry.filetransfer.FileCopyHandler; + + +public class MockNoOpFileCopyHandler implements FileCopyHandler { + + private Exception exception; + + public MockNoOpFileCopyHandler() { + this.exception = null; + } + + @Override + public void copy() throws Exception { + if (exception != null) { + throw exception; + } + } + + public void setException(Exception exception) { + this.exception = exception; + } + + public void clearException() { + this.exception = null; + } +} diff --git a/build.gradle b/build.gradle index 27f67fa869..109994cf75 100644 --- a/build.gradle +++ b/build.gradle @@ -223,6 +223,7 @@ project(':ambry-test-utils') { compile project(":ambry-router") compile project(":ambry-server") compile project(":ambry-store") + compile project(":ambry-file-transfer") compile "org.bouncycastle:bcpkix-jdk15on:$bouncycastleVersion" compile("org.apache.hadoop:hadoop-common:$hadoopCommonVersion") { exclude group: "org.bouncycastle"