Skip to content

Commit

Permalink
Merge remote-tracking branch 'gev/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
getorymckeag committed Nov 19, 2024
2 parents b97df10 + 5eea363 commit f047733
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ protected String getContentType() {
}

@Override
protected void handleResponseContent(InputStream inputStream) {
new MonitoringSetsResponseParser(receiver).parseResponse(inputStream, jsonFactory);
protected Boolean handleResponseContent(InputStream inputStream) {
return new MonitoringSetsResponseParser(receiver).parseResponse(inputStream, jsonFactory);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected String getContentType() {
}

@Override
protected void handleResponseContent(InputStream inputStream) {
new MonitoringSetsResponseParser(receiver).parseResponse(inputStream, jsonFactory);
protected Boolean handleResponseContent(InputStream inputStream) {
return new MonitoringSetsResponseParser(receiver).parseResponse(inputStream, jsonFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ protected HttpGet createRequest() throws URISyntaxException {
}

@Override
protected void handleResponseContent(InputStream inputStream) {
new MonitoringSetsResponseParser(receiver).parseResponse(inputStream, jsonFactory);
protected Boolean handleResponseContent(InputStream inputStream) {
return new MonitoringSetsResponseParser(receiver).parseResponse(inputStream, jsonFactory);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.io.InputStream;

/**
* Implementation for parsing a real-time snapshot response shared between subscribed and on-demand requests
* Implementation for parsing a monitoring set response shared between subscribed and on-demand requests
*/
@AllArgsConstructor
@Slf4j
Expand All @@ -24,18 +24,21 @@ public class MonitoringSetsResponseParser {

MonitoringSetsReceiver receiver;

public void parseResponse(InputStream inputStream, JsonFactory jsonFactory) {
try (JsonParser parser = jsonFactory.createParser(inputStream);) {
public Boolean parseResponse(InputStream inputStream, JsonFactory jsonFactory) {

try (JsonParser parser = jsonFactory.createParser(inputStream)) {
MonitoringSet monitoringSet = parser.readValueAs(MonitoringSet.class);
receiver.monitoringSet(monitoringSet);
return true;
} catch (IOException e) {
logger.error("I/O error handling response",e);
receiver.error(new StreamingGetConnectionException(e));
} catch (Exception e) {
logger.error("Error handling response data",e);
receiver.error(new StreamingGetHandlingException(e));
}

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ protected HttpGet createRequest() throws URISyntaxException {
}

@Override
protected void handleResponseContent(InputStream inputStream) {
protected Boolean handleResponseContent(InputStream inputStream) {

new MonitoringSetsResponseParser(receiver).parseResponse(inputStream, jsonFactory);
return new MonitoringSetsResponseParser(receiver).parseResponse(inputStream, jsonFactory);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ protected HttpGet createRequest() throws URISyntaxException {
}

@Override
protected void handleResponseContent(InputStream inputStream) {
new ForecastSnapshotResponseParser(receiver).parseResponse(inputStream, jsonFactory);
protected Boolean handleResponseContent(InputStream inputStream) {
return new ForecastSnapshotResponseParser(receiver).parseResponse(inputStream, jsonFactory);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package org.trolie.client.request.operatingsnapshots;

import java.io.IOException;
import java.io.InputStream;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.trolie.client.model.operatingsnapshots.ForecastPeriodSnapshot;
import org.trolie.client.model.operatingsnapshots.ForecastSnapshotHeader;
import org.trolie.client.request.streaming.exception.StreamingGetConnectionException;
import org.trolie.client.request.streaming.exception.StreamingGetHandlingException;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;

import lombok.AllArgsConstructor;
import java.io.IOException;
import java.io.InputStream;

/**
* Implementation for parsing a real-time snapshot response shared between subscribed and on-demand requests
Expand All @@ -26,7 +24,7 @@ public class ForecastSnapshotResponseParser {

ForecastSnapshotReceiver receiver;

public void parseResponse(InputStream inputStream, JsonFactory jsonFactory) {
public Boolean parseResponse(InputStream inputStream, JsonFactory jsonFactory) {

try (JsonParser parser = jsonFactory.createParser(inputStream);) {

Expand Down Expand Up @@ -81,14 +79,17 @@ public void parseResponse(InputStream inputStream, JsonFactory jsonFactory) {
//exit loop on END_ARRAY ratings

receiver.endSnapshot();

return true;

} catch (IOException e) {
logger.error("I/O error handling response",e);
receiver.error(new StreamingGetConnectionException(e));
} catch (Exception e) {
logger.error("Error handling response data",e);
receiver.error(new StreamingGetHandlingException(e));
}

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ protected HttpGet createRequest() throws URISyntaxException {
}

@Override
protected void handleResponseContent(InputStream inputStream) {
protected Boolean handleResponseContent(InputStream inputStream) {

new ForecastSnapshotResponseParser(receiver).parseResponse(inputStream, jsonFactory);
return new ForecastSnapshotResponseParser(receiver).parseResponse(inputStream, jsonFactory);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ protected HttpGet createRequest() throws URISyntaxException {
}

@Override
protected void handleResponseContent(InputStream inputStream) {
new RealTimeSnapshotResponseParser(receiver).parseResponse(inputStream, jsonFactory);
protected Boolean handleResponseContent(InputStream inputStream) {
return new RealTimeSnapshotResponseParser(receiver).parseResponse(inputStream, jsonFactory);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package org.trolie.client.request.operatingsnapshots;

import java.io.IOException;
import java.io.InputStream;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.trolie.client.model.operatingsnapshots.RealTimeLimit;
import org.trolie.client.model.operatingsnapshots.RealTimeSnapshotHeader;
import org.trolie.client.request.streaming.exception.StreamingGetConnectionException;
import org.trolie.client.request.streaming.exception.StreamingGetHandlingException;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;

import lombok.AllArgsConstructor;
import java.io.IOException;
import java.io.InputStream;

/**
* Implementation for parsing a real-time snapshot response shared between subscribed and on-demand requests
Expand All @@ -26,7 +24,7 @@ public class RealTimeSnapshotResponseParser {

RealTimeSnapshotReceiver receiver;

public void parseResponse(InputStream inputStream, JsonFactory jsonFactory) {
public Boolean parseResponse(InputStream inputStream, JsonFactory jsonFactory) {

try (JsonParser parser = jsonFactory.createParser(inputStream);) {

Expand Down Expand Up @@ -58,6 +56,7 @@ public void parseResponse(InputStream inputStream, JsonFactory jsonFactory) {
//exit loop on END_ARRAY limits

receiver.endSnapshot();
return true;

} catch (IOException e) {
logger.error("I/O error handling response",e);
Expand All @@ -66,6 +65,8 @@ public void parseResponse(InputStream inputStream, JsonFactory jsonFactory) {
logger.error("Error handling response data",e);
receiver.error(new StreamingGetHandlingException(e));
}

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ protected HttpGet createRequest() throws URISyntaxException {
}

@Override
protected void handleResponseContent(InputStream inputStream) {
protected Boolean handleResponseContent(InputStream inputStream) {

new RealTimeSnapshotResponseParser(receiver).parseResponse(inputStream, jsonFactory);
return new RealTimeSnapshotResponseParser(receiver).parseResponse(inputStream, jsonFactory);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ public abstract class AbstractStreamingGet<T extends StreamingResponseReceiver>

/**
* Handle new content. This method should not throw exceptions but rather report them to {@link StreamingSubscribedResponseReceiver#error(org.trolie.client.request.streaming.exception.StreamingGetException)}
* Method should return true if the response content was handled successfully, false if not.
*
* @param inputStream
*/
protected abstract void handleResponseContent(InputStream inputStream);
protected abstract Boolean handleResponseContent(InputStream inputStream);

protected AbstractStreamingGet(
HttpClient httpClient,
Expand All @@ -79,12 +80,17 @@ protected HttpClientResponseHandler<Void> createResponseHandler() {
};
}

protected void handleResponse(ClassicHttpResponse response) {
/**
* Returns true if the response was handled successfully, or if a 304 was returned.
* @param response
* @return
*/
protected Boolean handleResponse(ClassicHttpResponse response) {
if (response.getCode() == HttpStatus.SC_OK) {
//create a new thread to consume the response stream to
//allow for a buffer between HTTP I/O and whatever is handling the data
try {
threadPoolExecutor.submit(new HandlerExecutor(response.getEntity().getContent())).get();
return threadPoolExecutor.submit(new HandlerExecutor(response.getEntity().getContent())).get();
} catch (IOException e) {
logger.error("I/O error initiating request",e);
receiver.error(new StreamingGetConnectionException(e));
Expand All @@ -94,11 +100,14 @@ protected void handleResponse(ClassicHttpResponse response) {
}
} else if (response.getCode() == HttpStatus.SC_NOT_MODIFIED) {
logger.trace("Server responded with status code 304. The requested resource has not changed.");
return true;
} else {
String s = "Server responded with status code " + response.getCode();
logger.error(s);
receiver.error(new StreamingGetResponseException(s, response.getCode()));
}

return false;
}

protected HttpGet createRequest() throws URISyntaxException {
Expand All @@ -123,7 +132,7 @@ public void executeRequest() {
}
}

private class HandlerExecutor implements Callable<Void> {
private class HandlerExecutor implements Callable<Boolean> {

InputStream inputStream;

Expand All @@ -133,15 +142,16 @@ public HandlerExecutor(InputStream inputStream) {
}

@Override
public Void call() throws Exception {
public Boolean call() throws Exception {
try (BufferedInputStream bufferedIn = new BufferedInputStream(inputStream, bufferSize)) {
handleResponseContent(bufferedIn);
return handleResponseContent(bufferedIn);
} catch (IOException e) {
receiver.error(new StreamingGetConnectionException(e));
} catch (Exception e) {
receiver.error(new SubscriberInternalException(e));
}
return null;

return false;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,24 @@ public boolean isActive() {
return active.get();
}

protected void handleResponse(ClassicHttpResponse response) {
super.handleResponse(response);
if (response.getCode() == HttpStatus.SC_OK) {
@Override
protected Boolean handleResponse(ClassicHttpResponse response) {
Boolean success = super.handleResponse(response);
// Cache the ETAG if the response was handled successfully and the status is OK
if (Boolean.TRUE.equals(success) && response.getCode() == HttpStatus.SC_OK) {
try {
eTagStore.putETag(getPath(), response.getHeader(HttpHeaders.ETAG).getValue());
} catch (ProtocolException e) {
logger.error("Error handling server response",e);
receiver.error(new StreamingGetHandlingException(e));
return false;
}
}

return success;
}


@Override
protected HttpGet createRequest() throws URISyntaxException {

HttpGet request = super.createRequest();
Expand Down

0 comments on commit f047733

Please sign in to comment.