Skip to content

Commit

Permalink
#1773: JSR356WebSocket - acquire semaphore with timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-treskunov authored and jfarcand committed Nov 21, 2014
1 parent 79d6a4e commit ae43e56
Showing 1 changed file with 47 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
* Asynchronous based {@link Session} websocket
Expand All @@ -41,13 +42,14 @@ public class JSR356WebSocket extends WebSocket {

private final Logger logger = LoggerFactory.getLogger(JSR356WebSocket.class);
private final Session session;
private final Semaphore semaphore = new Semaphore(1, true);
private final Semaphore semaphore = new Semaphore(1, true);// https://issues.apache.org/bugzilla/show_bug.cgi?id=56026
private final int writeTimeout;

public JSR356WebSocket(Session session, AtmosphereConfig config) {
super(config);
this.session = session;
// https://issues.apache.org/bugzilla/show_bug.cgi?id=56026
session.getAsyncRemote().setSendTimeout(config.getInitParameter(ApplicationConfig.WEBSOCKET_WRITE_TIMEOUT, 60 * 1000));
this.writeTimeout = config.getInitParameter(ApplicationConfig.WEBSOCKET_WRITE_TIMEOUT, 60 * 1000);
session.getAsyncRemote().setSendTimeout(writeTimeout);
}

@Override
Expand All @@ -57,36 +59,60 @@ public boolean isOpen() {

@Override
public WebSocket write(String s) throws IOException {
boolean acquired = false;
try {
semaphore.acquireUninterruptibly();
session.getAsyncRemote().sendText(s, new WriteResult(resource(), s));
} catch (NullPointerException e) {
patchGlassFish(e);
semaphore.release();
} catch (RuntimeException e){
semaphore.release();
throw e;
acquired = semaphore.tryAcquire(writeTimeout, TimeUnit.MILLISECONDS);
if (acquired) {
session.getAsyncRemote().sendText(s, new WriteResult(resource(), s));
} else {
logger.warn("Timeout while waiting until socket is available, message {} will be lost", s);
}
} catch (Throwable e) {
handleError(e, acquired);
}
return this;
}

@Override
public WebSocket write(byte[] data, int offset, int length) throws IOException {
boolean acquired = false;
try {
semaphore.acquireUninterruptibly();
ByteBuffer b = ByteBuffer.wrap(data, offset, length);
session.getAsyncRemote().sendBinary(ByteBuffer.wrap(data, offset, length),
new WriteResult(resource(), b.array()));
} catch (NullPointerException e) {
patchGlassFish(e);
semaphore.release();
} catch (RuntimeException e){
semaphore.release();
throw e;
acquired = semaphore.tryAcquire(writeTimeout, TimeUnit.MILLISECONDS);
if (acquired) {
ByteBuffer b = ByteBuffer.wrap(data, offset, length);
session.getAsyncRemote().sendBinary(ByteBuffer.wrap(data, offset, length),
new WriteResult(resource(), b.array()));
} else {
logger.warn("Timeout while waiting until socket is available, message will be lost");
}
} catch (Throwable e) {
handleError(e, acquired);
}
return this;
}

private void handleError(Throwable e, boolean acquired) {
if (acquired) {
semaphore.release();
}

if (e instanceof NullPointerException) {
patchGlassFish((NullPointerException) e);
return;
}

if (e instanceof InterruptedException) {
logger.info("Writing to the socket {} was interrupted", session.getId());
return;
}

if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}

throw new RuntimeException("Unexpected error while writing to socket", e);
}

void patchGlassFish(NullPointerException e) {
// https://java.net/jira/browse/TYRUS-175
logger.trace("", e);
Expand Down

0 comments on commit ae43e56

Please sign in to comment.