Skip to content

Commit

Permalink
[pinpoint-apm#11278] Fix atomicity of transportTerminated
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Oct 16, 2024
1 parent 2ac7d3d commit fccae40
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@

import io.grpc.Attributes;
import io.grpc.ServerTransportFilter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ConnectionCountServerTransportFilter extends ServerTransportFilter {

private static final Attributes.Key<AtomicInteger> TERMINATED = Attributes.Key.create("TransportTerminated");

private final Logger logger = LogManager.getLogger(this.getClass());
private final AtomicLong currentConnection = new AtomicLong();

public ConnectionCountServerTransportFilter() {
Expand All @@ -31,13 +37,25 @@ public ConnectionCountServerTransportFilter() {

@Override
public Attributes transportReady(Attributes transportAttrs) {
Attributes.Builder builder = transportAttrs.toBuilder();
builder.set(TERMINATED, new AtomicInteger());
Attributes attributes = builder.build();

currentConnection.incrementAndGet();
return transportAttrs;
return attributes;
}

@Override
public void transportTerminated(Attributes transportAttrs) {
currentConnection.decrementAndGet();
final AtomicInteger terminated = transportAttrs.get(TERMINATED);
if (terminated == null) {
return;
}
if (terminated.getAndIncrement() == 0) {
currentConnection.decrementAndGet();
} else {
logger.info("transportTerminated() already terminated attribute:{}", transportAttrs);
}
}

public long getCurrentConnection() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.navercorp.pinpoint.grpc.server;

import io.grpc.Attributes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class ConnectionCountServerTransportFilterTest {

@Test
void transportReady() {
ConnectionCountServerTransportFilter filter = new ConnectionCountServerTransportFilter();
Attributes attributes = Attributes.newBuilder().build();
filter.transportReady(attributes);

Assertions.assertEquals(1, filter.getCurrentConnection());
}

@Test
void transportReadyAndTerminated() {
ConnectionCountServerTransportFilter filter = new ConnectionCountServerTransportFilter();
Attributes attributes = Attributes.newBuilder().build();
attributes = filter.transportReady(attributes);
filter.transportTerminated(attributes);

Assertions.assertEquals(0, filter.getCurrentConnection());
}

@Test
void transportTerminated_duplicate() {
ConnectionCountServerTransportFilter filter = new ConnectionCountServerTransportFilter();
Attributes attributes = Attributes.newBuilder().build();
attributes = filter.transportReady(attributes);
filter.transportTerminated(attributes);
filter.transportTerminated(attributes);
filter.transportTerminated(attributes);

Assertions.assertEquals(0, filter.getCurrentConnection());


}
}

0 comments on commit fccae40

Please sign in to comment.