Skip to content

Commit

Permalink
- Avoiding synchronized in BaseServer as it would pin the threads whe…
Browse files Browse the repository at this point in the history
…n using virtual threads (JGRP-2864)

- Replaced 'synchronized' with locks where feasible
  • Loading branch information
ahus1 authored and belaban committed Feb 10, 2025
1 parent 3f845c1 commit daf075c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 26 deletions.
2 changes: 1 addition & 1 deletion bin/jgroups.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ FLAGS="-server -Xmx1G -Xms500m -XX:+HeapDumpOnOutOfMemoryError -Djava.net.prefer
# DUMP_VTHREADS=-Djdk.trackAllThreads=true

# Dump when virtual threads are pinned on a carrier thread
# FLAGS="$FLAGS Djdk.tracePinnedThreads=full"
# FLAGS="$FLAGS -Djdk.tracePinnedThreads=full"

java $GC $DUMP_VTHREADS $Z1 -cp $CP $SSL_FLAGS $DEBUG $LOG $JG_FLAGS $FLAGS $JMX $JMC $*

93 changes: 68 additions & 25 deletions src/org/jgroups/blocks/cs/BaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@
import java.io.DataInput;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand All @@ -35,6 +37,7 @@
*/
@MBean(description="Server used to accept connections from other servers (or clients) and send data to servers")
public abstract class BaseServer implements Closeable, ConnectionListener {
protected final Lock lock=new ReentrantLock();
protected Address local_addr; // typically the address of the server socket or channel
protected final List<ConnectionListener> conn_listeners=new CopyOnWriteArrayList<>();
protected final Map<Address,Connection> conns=new ConcurrentHashMap<>();
Expand Down Expand Up @@ -159,10 +162,13 @@ public void stop() {
Util.close(reaper);
reaper=null;

synchronized(this) {
lock.lock();
try {
for(Connection c: conns.values())
Util.close(c);
conns.clear();
} finally {
lock.unlock();
}
conn_listeners.clear();
}
Expand All @@ -171,17 +177,29 @@ public void close() throws IOException {
stop();
}

public synchronized void flush(Address dest) {
public void flush(Address dest) {
if(dest != null) {
Connection conn=conns.get(dest);
if(conn != null)
conn.flush();
lock.lock();
try {
Connection conn=conns.get(dest);
if(conn != null)
conn.flush();
}
finally {
lock.unlock();
}
}
}

public synchronized void flushAll() {
for(Connection c: conns.values())
c.flush();
public void flushAll() {
lock.lock();
try {
for(Connection c: conns.values())
c.flush();
}
finally {
lock.unlock();
}
}


Expand Down Expand Up @@ -322,7 +340,8 @@ public Connection getConnection(Address dest) throws Exception {
if(connected(conn=conns.get(dest)))
return conn;

synchronized(this) {
lock.lock();
try {
if(connected(conn=conns.get(dest)))
return conn;
conn=createConnection(dest);
Expand All @@ -340,6 +359,8 @@ public Connection getConnection(Address dest) throws Exception {
removeConnectionIfPresent(dest, conn); // removes and closes the conn
throw connect_ex;
}
} finally {
lock.unlock();
}
return conn;
}
Expand Down Expand Up @@ -378,17 +399,23 @@ public boolean closeConnection(Address addr, boolean notify) {
}


public synchronized void addConnection(Address peer_addr, Connection conn) throws Exception {
boolean conn_exists=hasConnection(peer_addr),
replace=conn_exists && local_addr.compareTo(peer_addr) < 0; // bigger conn wins
public void addConnection(Address peer_addr, Connection conn) throws Exception {
lock.lock();
try {
boolean conn_exists=hasConnection(peer_addr),
replace=conn_exists && local_addr.compareTo(peer_addr) < 0; // bigger conn wins

if(!conn_exists || replace) {
replaceConnection(peer_addr, conn); // closes old conn
conn.start();
if(!conn_exists || replace) {
replaceConnection(peer_addr, conn); // closes old conn
conn.start();
}
else {
log.trace("%s: rejected connection from %s %s", local_addr, peer_addr, explanation(conn_exists, replace));
Util.close(conn); // keep our existing conn, reject accept() and close client_sock
}
}
else {
log.trace("%s: rejected connection from %s %s", local_addr, peer_addr, explanation(conn_exists, replace));
Util.close(conn); // keep our existing conn, reject accept() and close client_sock
finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -427,11 +454,15 @@ public void removeConnectionIfPresent(Address address, Connection conn) {
if(address == null || conn == null)
return;
Connection tmp=null;
synchronized(this) {

lock.lock();
try {
Connection existing=conns.get(address);
if(conn == existing) {
tmp=conns.remove(address);
}
} finally {
lock.unlock();
}
if(tmp != null) { // Moved conn close outside of sync block (https://issues.redhat.com/browse/JGRP-2053)
log.trace("%s: removed connection to %s", local_addr, address);
Expand All @@ -440,9 +471,15 @@ public void removeConnectionIfPresent(Address address, Connection conn) {
}

/** Used only for testing ! */
public synchronized void clearConnections() {
conns.values().forEach(Util::close);
conns.clear();
public void clearConnections() {
lock.lock();
try {
conns.values().forEach(Util::close);
conns.clear();
}
finally {
lock.unlock();
}
}

public void forAllConnections(BiConsumer<Address,Connection> c) {
Expand All @@ -455,9 +492,12 @@ public void retainAll(Collection<Address> current_mbrs) {
return;

Map<Address,Connection> copy=null;
synchronized(this) {
lock.lock();
try {
copy=new HashMap<>(conns);
conns.keySet().retainAll(current_mbrs);
} finally {
lock.unlock();
}
copy.keySet().removeAll(current_mbrs);
for(Map.Entry<Address,Connection> entry: copy.entrySet())
Expand Down Expand Up @@ -599,7 +639,8 @@ public synchronized void stop() {

public void run() {
while(!Thread.currentThread().isInterrupted()) {
synchronized(BaseServer.this) {
lock.lock();
try {
for(Iterator<Entry<Address,Connection>> it=conns.entrySet().iterator();it.hasNext();) {
Entry<Address,Connection> entry=it.next();
Connection c=entry.getValue();
Expand All @@ -608,6 +649,8 @@ public void run() {
it.remove();
}
}
} finally {
lock.unlock();
}
Util.sleep(reaperInterval);
}
Expand Down

0 comments on commit daf075c

Please sign in to comment.