From 651d5c9842df53574d4c34ef3ccc8138a1a8a188 Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Mon, 18 Nov 2024 12:11:18 +0100 Subject: [PATCH 1/5] Raise CygnusPersistenceError when ArcgisException to be handled by batch --- .../iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java index 77a7e98c4..f0fbff573 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java @@ -396,7 +396,7 @@ public void expirateRecords(long expirationTime) throws CygnusExpiratingError { * @param aggregator * @throws CygnusRuntimeError */ - public void persistAggregation(NGSIArcgisAggregator aggregator) throws CygnusRuntimeError { + public void persistAggregation(NGSIArcgisAggregator aggregator) throws CygnusPersistenceError, CygnusRuntimeError, CygnusBadContextData { try { List aggregationList = aggregator.getListArcgisAggregatorDomain(); LOGGER.debug("[" + this.getName() + "] persisting aggregation, " @@ -424,6 +424,10 @@ public void persistAggregation(NGSIArcgisAggregator aggregator) throws CygnusRun String stackTrace = ExceptionUtils.getFullStackTrace(e); LOGGER.debug(" PersistAggregation Error: " + stackTrace); throw (e); + } catch (ArcgisException e) { + LOGGER.error("[" + this.getName() + "] Error persisting batch, " + e.getClass().getSimpleName() + " - " + + e.getMessage()); + throw new CygnusPersistenceError(e.getMessage()); } catch (Exception e) { LOGGER.error("[" + this.getName() + "] Error persisting batch, " + e.getClass().getSimpleName() + " - " + e.getMessage()); From c4b1338ea6495d50044e6944d5fe3874492a957b Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Mon, 18 Nov 2024 16:15:25 +0100 Subject: [PATCH 2/5] fix persistBatch catch --- .../sinks/NGSIArcgisFeatureTableSink.java | 53 +++++++++---------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java index f0fbff573..cee6d251d 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java @@ -167,7 +167,7 @@ protected int featuresBatched() { * @return The persistence backend * @throws CygnusRuntimeError */ - protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) throws CygnusRuntimeError { + protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) throws CygnusPersistenceError, CygnusRuntimeError { LOGGER.debug("Current persistenceBackend has " + arcgisPersistenceBackend.size() +" tables "); @@ -187,7 +187,7 @@ protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) thr if (newTable.hasError() || !newTable.connected()) { LOGGER.error("Error creating new persistence Backend. " + newTable.getErrorDesc()); - throw new CygnusRuntimeError("[" + this.getName() + "Error creating Persistence backend: " + throw new CygnusPersistenceError("[" + this.getName() + "Error creating Persistence backend: " + newTable.getErrorCode() + " - " + newTable.getErrorDesc()); } else { arcgisPersistenceBackend.put(featureServiceUrl, newTable); @@ -199,7 +199,7 @@ protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) thr LOGGER.error("Error creating new persistence Backend. " +e.getClass().getSimpleName()); LOGGER.debug(stackTrace); - throw new CygnusRuntimeError("Error creating new persistence Backend. ", e.getClass().getName(), + throw new CygnusPersistenceError("Error creating new persistence Backend. ", e.getClass().getName(), e.getMessage()); } } @@ -306,34 +306,29 @@ void persistBatch(NGSIBatch batch) return; } // if - try { - // Iterate on the destinations - batch.startIterator(); + // Iterate on the destinations + batch.startIterator(); - while (batch.hasNext()) { - String destination = batch.getNextDestination(); - LOGGER.debug( - "[" + this.getName() + "] Processing sub-batch regarding the " + destination + " destination"); + while (batch.hasNext()) { + String destination = batch.getNextDestination(); + LOGGER.debug( + "[" + this.getName() + "] Processing sub-batch regarding the " + destination + " destination"); - // Get the events within the current sub-batch - ArrayList events = batch.getNextEvents(); + // Get the events within the current sub-batch + ArrayList events = batch.getNextEvents(); - // Get an aggregator for this destination and initialize it - NGSIArcgisAggregator aggregator = new NGSIArcgisAggregator(getrAcgisServicesUrl(), enableNameMappings); - - for (NGSIEvent event : events) { - aggregator.aggregate(event); - } // for + // Get an aggregator for this destination and initialize it + NGSIArcgisAggregator aggregator = new NGSIArcgisAggregator(getrAcgisServicesUrl(), enableNameMappings); - // Persist the aggregation - persistAggregation(aggregator); - batch.setNextPersisted(true); + for (NGSIEvent event : events) { + aggregator.aggregate(event); + } // for - } // while - } catch (Exception e) { - LOGGER.error("[" + this.getName() + "] Error persisting batch, " + e.getClass().getSimpleName() + "." + e.getMessage()); - throw new CygnusRuntimeError(e.getMessage()); - } + // Persist the aggregation + persistAggregation(aggregator); + batch.setNextPersisted(true); + + } // while } // persistBatch /* @@ -422,7 +417,11 @@ public void persistAggregation(NGSIArcgisAggregator aggregator) throws CygnusPer } } catch (CygnusRuntimeError e) { String stackTrace = ExceptionUtils.getFullStackTrace(e); - LOGGER.debug(" PersistAggregation Error: " + stackTrace); + LOGGER.debug(" PersistAggregation CygnusRuntimeError: " + stackTrace); + throw (e); + } catch (CygnusPersistenceError e) { + String stackTrace = ExceptionUtils.getFullStackTrace(e); + LOGGER.debug(" PersistAggregation CygnusPersistenceError: " + stackTrace); throw (e); } catch (ArcgisException e) { LOGGER.error("[" + this.getName() + "] Error persisting batch, " + e.getClass().getSimpleName() + " - " From 72f55a170148e1ce4368c6afbadd6a6a7089f20a Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Mon, 18 Nov 2024 16:21:01 +0100 Subject: [PATCH 3/5] update CNR --- CHANGES_NEXT_RELEASE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index 8b1378917..713bc85b1 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -1 +1 @@ - +- Fix way to handle CygnusPersistenceException to allow batch retries in arcgis-sink From af88a682b37de5793ad5cc4676b2067271df3a34 Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Tue, 19 Nov 2024 10:35:06 +0100 Subject: [PATCH 4/5] Update CHANGES_NEXT_RELEASE --- CHANGES_NEXT_RELEASE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index 713bc85b1..e85aac94b 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -1 +1 @@ -- Fix way to handle CygnusPersistenceException to allow batch retries in arcgis-sink +- Fix way to handle CygnusPersistenceException to allow batch retries in arcgis-sink if `.batch_ttl` configured From 73edc743bf0753011546dd21b1df4047938192db Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Wed, 20 Nov 2024 11:04:51 +0100 Subject: [PATCH 5/5] Update CHANGES_NEXT_RELEASE --- CHANGES_NEXT_RELEASE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index e85aac94b..176c50b9c 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -1 +1 @@ -- Fix way to handle CygnusPersistenceException to allow batch retries in arcgis-sink if `.batch_ttl` configured +- [cygnus-ngsi][cygnus-common] Fix way to handle CygnusPersistenceException to allow batch retries in arcgis-sink if `.batch_ttl` configured