diff --git a/app/src/main/java/i5/las2peer/services/mobsos/dataProcessing/MobSOSDataProcessingService.java b/app/src/main/java/i5/las2peer/services/mobsos/dataProcessing/MobSOSDataProcessingService.java index 9933237..ce5a838 100644 --- a/app/src/main/java/i5/las2peer/services/mobsos/dataProcessing/MobSOSDataProcessingService.java +++ b/app/src/main/java/i5/las2peer/services/mobsos/dataProcessing/MobSOSDataProcessingService.java @@ -1,6 +1,10 @@ package i5.las2peer.services.mobsos.dataProcessing; import java.io.Serializable; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -129,6 +133,7 @@ private boolean processMessages(MonitoringMessage[] messages) { boolean returnStatement = true; int counter = 0; botMessages = new ArrayList(); + HashMap webhookCalls = new HashMap<>(); for (MonitoringMessage message : messages) { // Happens when a node has sent its last messages if (message == null) { @@ -272,9 +277,19 @@ else if (Math.abs(message.getEvent().getCode()) >= 7000 //} JSONParser p = new JSONParser(JSONParser.MODE_PERMISSIVE); try { - Object jo = p.parse(message.getRemarks()); - if (jo instanceof JSONObject) { - String function = ((JSONObject) jo).getAsString("functionName"); + Object obj = p.parse(message.getRemarks()); + if (obj instanceof JSONObject) { + JSONObject jsonObj = (JSONObject) obj; + + // check if the monitoring message should trigger a webhook call + if(jsonObj.containsKey("webhook")) { + JSONObject webhook = (JSONObject) jsonObj.get("webhook"); + String url = webhook.getAsString("url"); + JSONObject payload = (JSONObject) webhook.get("payload"); + webhookCalls.put(url, payload); + } + + String function = jsonObj.getAsString("functionName"); if (function != null && hasBot() && triggerFunctions.contains(function.toLowerCase())) { BotMessage m = new BotMessage(message.getTimestamp(), message.getEvent(), message.getSourceNode(), message.getSourceAgentId(), @@ -361,6 +376,28 @@ else if (Math.abs(message.getEvent().getCode()) >= 7000 e.printStackTrace(); } } + + // perform webhook calls + if(!webhookCalls.isEmpty()) { + HttpClient client = HttpClient.newHttpClient(); + for (Map.Entry entry : webhookCalls.entrySet()) { + String url = entry.getKey(); + JSONObject payload = entry.getValue(); + + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(url)) + .POST(HttpRequest.BodyPublishers.ofString(payload.toJSONString())) + .build(); + + try { + client.send(request, + HttpResponse.BodyHandlers.ofString()); + } catch (Exception e) { + System.out.println("Unable to call webhook"); + } + } + } + System.out.println((messages.length - counter) + "/" + messageCount + " messages were handled."); return returnStatement; } diff --git a/app/src/test/java/i5/las2peer/services/mobsos/dataProcessing/MobSOSDataProcessingServiceTest.java b/app/src/test/java/i5/las2peer/services/mobsos/dataProcessing/MobSOSDataProcessingServiceTest.java index ea022d2..ad0242d 100644 --- a/app/src/test/java/i5/las2peer/services/mobsos/dataProcessing/MobSOSDataProcessingServiceTest.java +++ b/app/src/test/java/i5/las2peer/services/mobsos/dataProcessing/MobSOSDataProcessingServiceTest.java @@ -13,6 +13,8 @@ import java.sql.Statement; import java.util.Properties; +import i5.las2peer.connectors.webConnector.WebConnector; +import net.minidev.json.JSONObject; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -55,6 +57,8 @@ public class MobSOSDataProcessingServiceTest { private final static String dNode = "1234567891022"; private final static String sAgent = "c4ca4238a0b923820dcc509a6f75849b"; // md5 for 1 + private static WebConnector connector; + @BeforeClass public static void setUpDatabase() { Properties prop = new Properties(); @@ -101,11 +105,22 @@ public void startServer() throws Exception { testService.unlock("a pass"); node.registerReceiver(testService); + + node.startService(new ServiceNameVersion(WebhookTestService.class.getName(), "1.0.0"), "a pass"); + + // start connector + connector = new WebConnector(true, 0, false, 0); // port 0 means use system defined port + connector.start(node); } @After public void stopNetwork() { try { + if (connector != null) { + connector.stop(); + connector = null; + } + System.out.println("stopping test network..."); node.shutDown(); } catch (Exception e) { @@ -238,4 +253,42 @@ public void testDefaultStartup() { } } + /** + * Test to verify that specific monitoring messages trigger a webhook call. + * Uses a helper las2peer service that receives the webhook call. + */ + @Test + public void testWebhookCallMessage() { + // create message content + JSONObject messageContent = new JSONObject(); + JSONObject webhook = new JSONObject(); + webhook.put("url", connector.getHttpEndpoint() + "/webhooktestservice/webhook"); + webhook.put("payload", new JSONObject()); + messageContent.put("webhook", webhook); + + // create monitoring message + MonitoringMessage msg = new MonitoringMessage((long) 1376750476, MonitoringEvent.SERVICE_CUSTOM_MESSAGE_1, + sNode, "1", dNode, "2", messageContent.toJSONString()); + + // verify that no webhook has been delivered until now + assertEquals(false, WebhookTestService.webhookDelivered); + + try { + // get monitoring agent + Object result = node.invoke(testService, testServiceClass, "getReceivingAgentId", new Serializable[] { "Test" }); + MonitoringAgent mAgent = (MonitoringAgent) node.getAgent((String) result); + mAgent.unlock("ProcessingAgentPass"); + + // send monitoring message + MonitoringMessage[] messages = { msg }; + node.invoke(mAgent, testServiceClass, "getMessages", new Serializable[] { messages }); + } catch (Exception e) { + e.printStackTrace(); + fail("Exception: " + e); + } + + // verify that the webhook has been delivered + assertEquals(true, WebhookTestService.webhookDelivered); + } + } diff --git a/app/src/test/java/i5/las2peer/services/mobsos/dataProcessing/WebhookTestService.java b/app/src/test/java/i5/las2peer/services/mobsos/dataProcessing/WebhookTestService.java new file mode 100644 index 0000000..37fea0c --- /dev/null +++ b/app/src/test/java/i5/las2peer/services/mobsos/dataProcessing/WebhookTestService.java @@ -0,0 +1,24 @@ +package i5.las2peer.services.mobsos.dataProcessing; + +import i5.las2peer.restMapper.RESTService; +import i5.las2peer.restMapper.annotations.ServicePath; + +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.core.Response; + +/** + * Helper service used to test whether webhook calls are executed. + */ +@ServicePath("/webhooktestservice") +public class WebhookTestService extends RESTService { + + public static boolean webhookDelivered = false; + + @POST + @Path("/webhook") + public Response webhook(String body) { + webhookDelivered = true; + return Response.status(200).build(); + } +} diff --git a/gradle.properties b/gradle.properties index 27365e6..7e546cc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,7 +1,7 @@ -core.version=1.2.2 +core.version=1.2.3 service.name=i5.las2peer.services.mobsos.dataProcessing service.class=MobSOSDataProcessingService -service.version=1.2.0 +service.version=1.2.3 java.version=17 las2peer_user1.name=alice