Skip to content

Commit

Permalink
[CELEBORN-1190][FOLLOWUP] Fix WARNING of error prone
Browse files Browse the repository at this point in the history
  • Loading branch information
SteNicholas committed Jun 11, 2024
1 parent 6f77bd9 commit 374a160
Show file tree
Hide file tree
Showing 41 changed files with 109 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
public class RemoteShuffleInputGateDelegation {
private static final Logger LOG = LoggerFactory.getLogger(RemoteShuffleInputGateDelegation.class);
/** Lock to protect {@link #receivedBuffers} and {@link #cause} and {@link #closed}. */
private Object lock = new Object();
private final Object lock = new Object();

/** Name of the corresponding computing task. */
private String taskName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ShuffleTaskInfo {
private ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, AtomicInteger>>
shuffleIdMapAttemptIdIndex = JavaUtils.newConcurrentHashMap();
// task shuffle id -> celeborn shuffle id
private ConcurrentHashMap<String, Integer> taskShuffleIdToShuffleId =
private final ConcurrentHashMap<String, Integer> taskShuffleIdToShuffleId =
JavaUtils.newConcurrentHashMap();
// celeborn shuffle id -> task shuffle id
private ConcurrentHashMap<Integer, String> shuffleIdToTaskShuffleId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,14 @@ private int assignCredits(CreditListener creditListener) {
private List<CreditAssignment> dispatchReservedCredits() {
assert Thread.holdsLock(lock);

if (numAvailableBuffers < MIN_CREDITS_TO_NOTIFY || listeners.size() <= 0) {
return Collections.emptyList();
}

List<CreditAssignment> creditAssignments = new ArrayList<>();
while (numAvailableBuffers > 0 && listeners.size() > 0) {
CreditListener creditListener = listeners.poll();
int numCredits = assignCredits(creditListener);
if (numCredits > 0) {
creditAssignments.add(new CreditAssignment(numCredits, creditListener));
if (numAvailableBuffers >= MIN_CREDITS_TO_NOTIFY && !listeners.isEmpty()) {
while (numAvailableBuffers > 0 && !listeners.isEmpty()) {
CreditListener creditListener = listeners.poll();
int numCredits = assignCredits(creditListener);
if (numCredits > 0) {
creditAssignments.add(new CreditAssignment(numCredits, creditListener));
}
}
}
return creditAssignments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class CelebornBufferStream {
private FlinkShuffleClientImpl mapShuffleClient;
private boolean isClosed;
private boolean isOpenSuccess;
private Object lock = new Object();
private final Object lock = new Object();
private Supplier<ByteBuf> bufferSupplier;
private int initialCredit;
private Consumer<RequestMessage> messageConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.celeborn.plugin.flink.utils;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableSet;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand All @@ -31,19 +31,16 @@
public class FlinkUtils {
private static final JobID ZERO_JOB_ID = new JobID(0, 0);
public static final Set<String> pluginConfNames =
new HashSet<String>() {
{
add("remote-shuffle.job.min.memory-per-partition");
add("remote-shuffle.job.min.memory-per-gate");
add("remote-shuffle.job.concurrent-readings-per-gate");
add("remote-shuffle.job.memory-per-partition");
add("remote-shuffle.job.memory-per-gate");
add("remote-shuffle.job.support-floating-buffer-per-input-gate");
add("remote-shuffle.job.enable-data-compression");
add("remote-shuffle.job.support-floating-buffer-per-output-gate");
add("remote-shuffle.job.compression.codec");
}
};
ImmutableSet.of(
"remote-shuffle.job.min.memory-per-partition",
"remote-shuffle.job.min.memory-per-gate",
"remote-shuffle.job.concurrent-readings-per-gate",
"remote-shuffle.job.memory-per-partition",
"remote-shuffle.job.memory-per-gate",
"remote-shuffle.job.support-floating-buffer-per-input-gate",
"remote-shuffle.job.enable-data-compression",
"remote-shuffle.job.support-floating-buffer-per-output-gate",
"remote-shuffle.job.compression.codec");

public static CelebornConf toCelebornConf(Configuration configuration) {
CelebornConf tmpCelebornConf = new CelebornConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,7 @@ public void testFailedToHandleRipeBufferAndClose() throws Exception {
packer.process(buffers.get(0), 0);
try {
packer.drain();
} catch (RuntimeException e) {
e.printStackTrace();
} catch (Exception e) {
throw e;
} catch (RuntimeException ignored) {
}

// this should never throw any exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void tesSimpleFlush() throws IOException, InterruptedException {

private List<SupplierWithException<BufferPool, IOException>> createBufferPoolFactory() {
NetworkBufferPool networkBufferPool =
new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000));
new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofSeconds(1));

int numBuffersPerPartition = 64 * 1024 / 32;
int numForResultPartition = numBuffersPerPartition * 7 / 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void tesSimpleFlush() throws IOException, InterruptedException {

private List<SupplierWithException<BufferPool, IOException>> createBufferPoolFactory() {
NetworkBufferPool networkBufferPool =
new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000));
new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofSeconds(1));

int numBuffersPerPartition = 64 * 1024 / 32;
int numForResultPartition = numBuffersPerPartition * 7 / 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void tesSimpleFlush() throws IOException, InterruptedException {

private List<SupplierWithException<BufferPool, IOException>> createBufferPoolFactory() {
NetworkBufferPool networkBufferPool =
new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000));
new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofSeconds(1));

int numBuffersPerPartition = 64 * 1024 / 32;
int numForResultPartition = numBuffersPerPartition * 7 / 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void tesSimpleFlush() throws IOException, InterruptedException {

private List<SupplierWithException<BufferPool, IOException>> createBufferPoolFactory() {
NetworkBufferPool networkBufferPool =
new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000));
new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofSeconds(1));

int numBuffersPerPartition = 64 * 1024 / 32;
int numForResultPartition = numBuffersPerPartition * 7 / 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void tesSimpleFlush() throws IOException, InterruptedException {

private List<SupplierWithException<BufferPool, IOException>> createBufferPoolFactory() {
NetworkBufferPool networkBufferPool =
new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000));
new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofSeconds(1));

int numBuffersPerPartition = 64 * 1024 / 32;
int numForResultPartition = numBuffersPerPartition * 7 / 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@
import org.apache.spark.rdd.DeterministicLevel;
import org.apache.spark.shuffle.*;
import org.apache.spark.shuffle.sort.SortShuffleManager;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.ShuffleMode;
import org.apache.celeborn.reflect.DynMethods;

public class SparkShuffleManager implements ShuffleManager {

Expand Down Expand Up @@ -117,7 +115,7 @@ public <K, V, C> ShuffleHandle registerShuffle(

lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
dependency.rdd().getOutputDeterministicLevel() != DeterministicLevel.INDETERMINATE());
!DeterministicLevel.INDETERMINATE().equals(dependency.rdd().getOutputDeterministicLevel()));

if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
Expand Down Expand Up @@ -244,19 +242,6 @@ public <K, C> ShuffleReader<K, C> getReader(
return _sortShuffleManager.getReader(handle, startPartition, endPartition, context);
}

private int executorCores(SparkConf conf) {
if (Utils.isLocalMaster(conf)) {
// SparkContext.numDriverCores is package private.
return DynMethods.builder("numDriverCores")
.impl("org.apache.spark.SparkContext$", String.class)
.build()
.bind(SparkContext$.MODULE$)
.invoke(conf.get("spark.master"));
} else {
return conf.getInt(SparkLauncher.EXECUTOR_CORES, 1);
}
}

private void checkUserClassPathFirst(ShuffleHandle handle) {
if ((Boolean) conf.get(package$.MODULE$.EXECUTOR_USER_CLASS_PATH_FIRST())
&& !Objects.equals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public <K, V, C> ShuffleHandle registerShuffle(

lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
dependency.rdd().getOutputDeterministicLevel() != DeterministicLevel.INDETERMINATE());
!DeterministicLevel.INDETERMINATE().equals(dependency.rdd().getOutputDeterministicLevel()));

if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public class ShuffleClientImpl extends ShuffleClient {
protected final Map<String, PushState> pushStates = JavaUtils.newConcurrentHashMap();

private final boolean pushExcludeWorkerOnFailureEnabled;
private final boolean shuffleCompressionEnabled;
private final Set<String> pushExcludedWorkers = ConcurrentHashMap.newKeySet();
private final ConcurrentHashMap<String, Long> fetchExcludedWorkers =
JavaUtils.newConcurrentHashMap();
Expand All @@ -126,13 +125,7 @@ public class ShuffleClientImpl extends ShuffleClient {
private final boolean authEnabled;
private final TransportConf dataTransportConf;

private final ThreadLocal<Compressor> compressorThreadLocal =
new ThreadLocal<Compressor>() {
@Override
protected Compressor initialValue() {
return Compressor.getCompressor(conf);
}
};
private ThreadLocal<Compressor> compressorThreadLocal;

private final ReviveManager reviveManager;

Expand Down Expand Up @@ -178,7 +171,9 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u
testRetryRevive = conf.testRetryRevive();
pushBufferMaxSize = conf.clientPushBufferMaxSize();
pushExcludeWorkerOnFailureEnabled = conf.clientPushExcludeWorkerOnFailureEnabled();
shuffleCompressionEnabled = !conf.shuffleCompressionCodec().equals(CompressionCodec.NONE);
if (!CompressionCodec.NONE.equals(conf.shuffleCompressionCodec())) {
compressorThreadLocal = ThreadLocal.withInitial(() -> Compressor.getCompressor(conf));
}
if (conf.clientPushReplicateEnabled()) {
pushDataTimeout = conf.pushDataTimeoutMs() * 2;
} else {
Expand Down Expand Up @@ -722,6 +717,7 @@ void excludeWorkerByCause(StatusCode cause, PartitionLocation oldLocation) {
case PUSH_DATA_TIMEOUT_REPLICA:
pushExcludedWorkers.add(oldLocation.getPeer().hostAndPushPort());
break;
default: // fall out
}
}
}
Expand Down Expand Up @@ -917,7 +913,7 @@ public int pushOrMergeData(
// increment batchId
final int nextBatchId = pushState.nextBatchId();

if (shuffleCompressionEnabled) {
if (compressorThreadLocal != null) {
// compress data
final Compressor compressor = compressorThreadLocal.get();
compressor.compress(data, offset, length);
Expand Down Expand Up @@ -1635,6 +1631,8 @@ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(int shuffleId)
"Request %s return %s for %s.",
getReducerFileGroup, response.status(), shuffleId);
logger.warn(exceptionMsg);
break;
default: // fall out
}
}
} catch (Exception e) {
Expand All @@ -1645,6 +1643,7 @@ protected Tuple2<ReduceFileGroups, String> loadFileGroupInternal(int shuffleId)
}
}

@Override
public ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
throws CelebornIOException {
Tuple2<ReduceFileGroups, String> fileGroupTuple =
Expand Down Expand Up @@ -1820,6 +1819,7 @@ private StatusCode getPushDataFailCause(String message) {
}

@VisibleForTesting
@Override
public TransportClientFactory getDataClientFactory() {
return dataClientFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
public class LocalPartitionReader implements PartitionReader {

private static final Logger logger = LoggerFactory.getLogger(LocalPartitionReader.class);
private static volatile ThreadPoolExecutor readLocalShufflePool;
private volatile ThreadPoolExecutor readLocalShufflePool;
private final LinkedBlockingQueue<ByteBuf> results;
private final AtomicReference<IOException> exception = new AtomicReference<>();
private final int fetchMaxReqsInFlight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public long getFileLength() {
return bytesFlushed;
}

public void updateBytesFlushed(long bytes) {
public synchronized void updateBytesFlushed(long bytes) {
bytesFlushed += bytes;
if (isReduceFileMeta) {
getReduceFileMeta().updateChunkOffset(bytesFlushed, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public interface TransportClientBootstrap {
* Performs the bootstrapping operation, throwing an exception on failure.
*
* @param client the transport client to bootstrap
* @param channel the associated channel with the transport client
* @throws RuntimeException
*/
void doBootstrap(TransportClient client) throws RuntimeException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
private final AtomicLong timeOfLastRequestNs;

private final long pushTimeoutCheckerInterval;
private static ScheduledExecutorService pushTimeoutChecker = null;
private ScheduledExecutorService pushTimeoutChecker;
private ScheduledFuture pushCheckerScheduleFuture;

private final long fetchTimeoutCheckerInterval;
private static ScheduledExecutorService fetchTimeoutChecker = null;
private ScheduledExecutorService fetchTimeoutChecker;
private ScheduledFuture fetchCheckerScheduleFuture;

public TransportResponseHandler(TransportConf conf, Channel channel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public boolean needCopyOut() {
return false;
}

@SuppressWarnings("NonOverridingEquals")
protected boolean equals(Message other) {
return Objects.equals(body, other.body);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.celeborn.common.network.sasl.SaslUtils.*;

import java.nio.charset.StandardCharsets;
import java.util.Map;

import javax.security.auth.callback.CallbackHandler;
Expand Down Expand Up @@ -70,7 +71,7 @@ public String[] getMechanismNames(Map<String, ?> props) {
return new String[] {ANONYMOUS};
}

class CelebornAnonymousSaslClient implements SaslClient {
static class CelebornAnonymousSaslClient implements SaslClient {

private boolean isCompleted = false;

Expand All @@ -90,7 +91,7 @@ public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
throw new IllegalStateException("Authentication has already completed.");
}
isCompleted = true;
return ANONYMOUS.getBytes();
return ANONYMOUS.getBytes(StandardCharsets.UTF_8);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public String[] getMechanismNames(Map<String, ?> props) {
return new String[] {ANONYMOUS};
}

class CelebornAnonymousSaslServer implements SaslServer {
static class CelebornAnonymousSaslServer implements SaslServer {
private boolean isCompleted = false;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,20 +200,22 @@ public void run() {
} catch (InterruptedException e) {
running = false;
}
try {
if (running && needsReload()) {
try {
trustManagerRef.set(loadTrustManager());
this.reloadCount += 1;
} catch (Exception ex) {
logger.warn(
"Could not load truststore (keep using existing one) : " + ex.toString(), ex);
synchronized (this) {
try {
if (running && needsReload()) {
try {
trustManagerRef.set(loadTrustManager());
this.reloadCount += 1;
} catch (Exception ex) {
logger.warn(
"Could not load truststore (keep using existing one) : " + ex.toString(), ex);
}
}
} catch (IOException ex) {
logger.warn("Could not check whether truststore needs reloading: " + ex.toString(), ex);
}
} catch (IOException ex) {
logger.warn("Could not check whether truststore needs reloading: " + ex.toString(), ex);
needsReloadCheckCounts++;
}
needsReloadCheckCounts++;
}
}

Expand Down
Loading

0 comments on commit 374a160

Please sign in to comment.