From 83e0f3d28270f72a6af4341982cf4597eeeea308 Mon Sep 17 00:00:00 2001 From: Sanjula Ganepola Date: Wed, 4 Dec 2024 10:31:45 -0500 Subject: [PATCH 01/10] Add LIC and PAL log support Signed-off-by: Sanjula Ganepola --- .../manzan/configuration/DataConfig.java | 24 ++++-- .../theprez/manzan/routes/ManzanRoute.java | 30 +++++++- .../routes/dest/GrafanaLokiDestination.java | 77 ++++++++++++++----- .../manzan/routes/dest/SentryDestination.java | 73 ++++++++++-------- .../manzan/routes/event/WatchMsgEventSql.java | 31 +++++--- docs/README.md | 2 +- docs/config/data.md | 10 +-- ile/src/pub_db2.sqlc | 37 +++++++++ 8 files changed, 206 insertions(+), 78 deletions(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java index b5aaca2..b95315b 100644 --- a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java +++ b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java @@ -12,6 +12,7 @@ import org.ini4j.InvalidFileFormatException; import com.github.theprez.jcmdutils.StringUtils; +import com.github.theprez.manzan.ManzanEventType; import com.github.theprez.manzan.WatchStarter; import com.github.theprez.manzan.routes.ManzanRoute; import com.github.theprez.manzan.routes.event.FileEvent; @@ -48,7 +49,7 @@ public synchronized Map getRoutes() throws IOException, AS4 for (final String section : getIni().keySet()) { final String type = getIni().get(section, "type"); if (StringUtils.isEmpty(type)) { - throw new RuntimeException("type not specified for data source [" + section + "]"); + throw new RuntimeException("Type not specified for data source [" + section + "]"); } if ("false".equalsIgnoreCase(getIni().get(section, "enabled"))) { continue; @@ -65,7 +66,7 @@ public synchronized Map getRoutes() throws IOException, AS4 d = d.trim(); if (!m_destinations.contains(d)) { throw new RuntimeException( - "no destination configured named '" + d + "' for data source '" + name + "'"); + "No destination configured named '" + d + "' for data source '" + name + "'"); } if (StringUtils.isNonEmpty(d)) { destinations.add(d); @@ -74,16 +75,25 @@ public synchronized Map getRoutes() throws IOException, AS4 switch (type) { case "watch": String id = getRequiredString(name, "id"); + String strwch = getRequiredString(name, "strwch"); String sqlRouteName = name + "sql"; String socketRouteName = name + "socket"; - ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, interval, numToProcess)); - String strwch = getOptionalString(name, "strwch"); - if (StringUtils.isNonEmpty(strwch)) { - WatchStarter ws = new WatchStarter(id, strwch); - ws.strwch(); + ManzanEventType eventType; + if(strwch.contains("WCHMSGQ")) { + eventType = ManzanEventType.WATCH_MSG; + } else if(strwch.contains("WCHLICLOG")) { + eventType = ManzanEventType.WATCH_VLOG; + } else if(strwch.contains("WCHPAL")) { + eventType = ManzanEventType.WATCH_PAL; + } else { + throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified"); } + + ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType, interval, numToProcess)); ret.put(socketRouteName, new WatchMsgEventSockets(socketRouteName, format, destinations, schema, interval, numToProcess)); + WatchStarter ws = new WatchStarter(id, strwch); + ws.strwch(); break; case "file": String file = getRequiredString(name, "file"); diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java b/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java index f3c92cd..4fc6f62 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java @@ -14,6 +14,9 @@ public abstract class ManzanRoute extends RouteBuilder { protected static final String EVENT_TYPE = "event_type"; protected static final String SESSION_ID = "SESSION_ID"; + protected static final String MSG_ORDINAL_POSITION = "ORDINAL_POSITION"; + protected static final String HANDLED_TIMESTAMP = "HANDLED_TIMESTAMP"; + protected static final String MSG_MESSAGE_ID = "MESSAGE_ID"; protected static final String MSG_MESSAGE_TYPE = "MESSAGE_TYPE"; protected static final String MSG_SEVERITY = "SEVERITY"; @@ -22,10 +25,34 @@ public abstract class ManzanRoute extends RouteBuilder { protected static final String MSG_SENDING_PROGRAM_NAME = "SENDING_PROGRAM_NAME"; protected static final String MSG_SENDING_MODULE_NAME = "SENDING_MODULE_NAME"; protected static final String MSG_SENDING_PROCEDURE_NAME = "SENDING_PROCEDURE_NAME"; - protected static final String MSG_ORDINAL_POSITION = "ORDINAL_POSITION"; protected static final String MSG_MESSAGE_TIMESTAMP = "MESSAGE_TIMESTAMP"; protected static final String MSG_MESSAGE = "MESSAGE"; + protected static final String MAJOR_CODE = "MAJOR_CODE"; + protected static final String MINOR_CODE = "MINOR_CODE"; + protected static final String LOG_ID = "LOG_ID"; + protected static final String LOG_TIMESTAMP = "LOG_TIMESTAMP"; + protected static final String TDE_NUM = "TDE_NUM"; + protected static final String TASK_NAME = "TASK_NAME"; + protected static final String SERVER_TYPE = "SERVER_TYPE"; + protected static final String EXCEPTION_ID = "EXCEPTION_ID"; + protected static final String THREAD_ID = "THREAD_ID"; + protected static final String MODULE_OFFSET = "MODULE_OFFSET"; + protected static final String MODULE_RU_NAME = "MODULE_RU_NAME"; + protected static final String MODULE_NAME = "MODULE_NAME"; + protected static final String MODULE_ENTRY_POINT_NAME = "MODULE_ENTRY_POINT_NAME"; + + protected static final String SYSTEM_REFERENCE_CODE = "SYSTEM_REFERENCE_CODE"; + protected static final String DEVICE_NAME = "DEVICE_NAME"; + protected static final String MODEL = "MODEL"; + protected static final String SERIAL_NUMBER = "SERIAL_NUMBER"; + protected static final String RESOURCE_NAME = "RESOURCE_NAME"; + protected static final String PAL_TIMESTAMP = "PAL_TIMESTAMP"; + protected static final String REFERENCE_CODE = "REFERENCE_CODE"; + protected static final String SECONDARY_CODE = "SECONDARY_CODE"; + protected static final String TABLE_ID = "TABLE_ID"; + protected static final String SEQUENCE_NUM = "SEQUENCE_NUM"; + protected static final int SEVERITY_LIMIT = 29; protected static String getWatchName(final Exchange exchange) { @@ -90,5 +117,4 @@ protected void setRecipientList(final List _destinations) throws IOExcep throw new IOException("Message watch for '" + m_name + "' has no valid destinations"); } } - } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/dest/GrafanaLokiDestination.java b/camel/src/main/java/com/github/theprez/manzan/routes/dest/GrafanaLokiDestination.java index 4ca64ca..ce869aa 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/dest/GrafanaLokiDestination.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/dest/GrafanaLokiDestination.java @@ -44,17 +44,21 @@ public void run() { public void configure() { from(getInUri()) .routeId(m_name).process(exchange -> { + StreamBuilder builder = logController + .stream() + .l(appLabelName, appLabelValue) + .l(SESSION_ID, getWatchName(exchange)); + String timestamp; + String[] keys; + final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE); - if (ManzanEventType.WATCH_MSG == type) { - StreamBuilder builder = logController - .stream() - .l(appLabelName, appLabelValue) - .l(Labels.LEVEL, - ((Integer) get(exchange, MSG_SEVERITY)) > SEVERITY_LIMIT ? Labels.FATAL - : Labels.INFO) - .l(SESSION_ID, getWatchName(exchange)); + if (type == ManzanEventType.WATCH_MSG) { + builder + .l(Labels.LEVEL, ((Integer) get(exchange, MSG_SEVERITY)) > SEVERITY_LIMIT ? Labels.FATAL + : Labels.INFO); - String[] keys = { + timestamp = MSG_MESSAGE_TIMESTAMP; + keys = new String[] { MSG_MESSAGE_ID, MSG_MESSAGE_TYPE, MSG_SEVERITY, @@ -64,20 +68,55 @@ public void configure() { MSG_SENDING_MODULE_NAME, MSG_SENDING_PROCEDURE_NAME }; + } else if (type == ManzanEventType.WATCH_VLOG) { + // TODO: Set log level - for (String key : keys) { - String value = getString(exchange, key); - if (!value.equals("")) { - builder.l(key, value); - } - } + timestamp = LOG_TIMESTAMP; + keys = new String[] { + MAJOR_CODE, + MINOR_CODE, + LOG_ID, + TDE_NUM, + TASK_NAME, + SERVER_TYPE, + EXCEPTION_ID, + JOB, + THREAD_ID, + MODULE_OFFSET, + MODULE_RU_NAME, + MODULE_NAME, + MODULE_ENTRY_POINT_NAME + }; + } else if (type == ManzanEventType.WATCH_PAL) { + // TODO: Set log level - ILogStream stream = builder.build(); - stream.log(Timestamp.valueOf(getString(exchange, MSG_MESSAGE_TIMESTAMP)).getTime(), - getBody(exchange, String.class)); + timestamp = PAL_TIMESTAMP; + keys = new String[] { + SYSTEM_REFERENCE_CODE, + DEVICE_NAME, + MODEL, + SERIAL_NUMBER, + RESOURCE_NAME, + LOG_ID, + REFERENCE_CODE, + SECONDARY_CODE, + TABLE_ID, + SEQUENCE_NUM + }; } else { throw new RuntimeException("Grafana Loki route doesn't know how to process type " + type); } + + for (String key : keys) { + String value = getString(exchange, key); + if (!value.equals("")) { + builder.l(key, value); + } + } + + ILogStream stream = builder.build(); + stream.log(Timestamp.valueOf(getString(exchange, timestamp)).getTime(), + getBody(exchange, String.class)); }); } -} +} \ No newline at end of file diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/dest/SentryDestination.java b/camel/src/main/java/com/github/theprez/manzan/routes/dest/SentryDestination.java index 2c1f5d8..85a1b2d 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/dest/SentryDestination.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/dest/SentryDestination.java @@ -1,5 +1,6 @@ package com.github.theprez.manzan.routes.dest; +import java.sql.Date; import java.util.LinkedList; import java.util.List; import java.util.UUID; @@ -37,48 +38,52 @@ public void configure() { .routeId(m_name) .convertBodyTo(String.class) .process(exchange -> { - final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE); - if (ManzanEventType.WATCH_MSG == type) { - System.out.println("sentry"); - final SentryEvent event = new SentryEvent(); - final String watch = getWatchName(exchange); - final SentryId id = new SentryId(UUID.randomUUID()); - event.setTag("session id", watch); - event.setEventId(id); - event.setExtras(getDataMap(exchange)); - final User user = new User(); - user.setUsername(getString(exchange, MSG_SENDING_USRPRF)); - event.setUser(user); - event.setPlatform("IBM i"); - event.setTag("runtime", "IBM i"); - event.setTag("runtime.name", "IBM i"); - event.setDist("PASE"); - event.setTransaction(getString(exchange, MSG_ORDINAL_POSITION)); + final SentryEvent event = new SentryEvent(); + event.setTag(SESSION_ID, getWatchName(exchange)); // TODO: Check if SESSION_ID or just session id + event.setEventId(new SentryId(UUID.randomUUID())); + event.setExtras(getDataMap(exchange)); + event.setPlatform("IBM i"); + event.setTag("runtime", "IBM i"); + event.setTag("runtime.name", "IBM i"); + event.setDist("PASE"); + event.setTransaction(getString(exchange, MSG_ORDINAL_POSITION)); + + final User user = new User(); + user.setUsername(getString(exchange, MSG_SENDING_USRPRF)); + event.setUser(user); + + final Message message = new Message(); + message.setMessage(getBody(exchange, String.class)); + event.setMessage(message); - SentryLevel level; - final int sev = (Integer) get(exchange, MSG_SEVERITY); - if (sev > SEVERITY_LIMIT) { - level = SentryLevel.ERROR; - } else { - level = SentryLevel.INFO; - } + String timestamp; + final List fingerprints = new LinkedList(); - event.setLevel(level); - final Message message = new Message(); - final String messageStr = getString(exchange, MSG_MESSAGE_ID) + ": " - + getString(exchange, MSG_MESSAGE); - message.setMessage(messageStr); - final List fingerprints = new LinkedList(); + final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE); + if (ManzanEventType.WATCH_MSG == type) { + event.setLevel(((Integer) get(exchange, MSG_SEVERITY)) > SEVERITY_LIMIT ? SentryLevel.ERROR + : SentryLevel.INFO); + timestamp = MSG_MESSAGE_TIMESTAMP; fingerprints.add(getString(exchange, MSG_MESSAGE_ID)); fingerprints.add(getString(exchange, MSG_SENDING_PROCEDURE_NAME)); fingerprints.add(getString(exchange, MSG_SENDING_MODULE_NAME)); fingerprints.add(getString(exchange, MSG_SENDING_PROGRAM_NAME)); - event.setFingerprints(fingerprints); - event.setMessage(message); - Sentry.captureEvent(event); + } else if (type == ManzanEventType.WATCH_VLOG) { + // TODO: Set log level + timestamp = LOG_TIMESTAMP; + fingerprints.add(getString(exchange, MAJOR_CODE)); + fingerprints.add(getString(exchange, MINOR_CODE)); + } else if (type == ManzanEventType.WATCH_PAL) { + // TODO: Set log level + timestamp = PAL_TIMESTAMP; + fingerprints.add(getString(exchange, SYSTEM_REFERENCE_CODE)); } else { throw new RuntimeException("Sentry route doesn't know how to process type " + type); } + + event.setTimestamp(Date.valueOf(getString(exchange, timestamp))); // TODO: Verify date is valid + event.setFingerprints(fingerprints); + Sentry.captureEvent(event); }); } -} +} \ No newline at end of file diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSql.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSql.java index 44d3337..79e41b8 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSql.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSql.java @@ -12,22 +12,33 @@ public class WatchMsgEventSql extends ManzanRoute { - private final int m_interval; - private final int m_numToProcess; private final String m_schema; private final String m_sessionId; + private final ManzanEventType m_eventType; + private final String m_table; + private final int m_interval; + private final int m_numToProcess; private final ManzanMessageFormatter m_formatter; public WatchMsgEventSql(final String _name, final String _session_id, final String _format, - final List _destinations, final String _schema, final int _interval, final int _numToProcess) + final List _destinations, final String _schema, final ManzanEventType _eventType, + final int _interval, final int _numToProcess) throws IOException { super(_name); + m_schema = _schema; + m_sessionId = _session_id.trim().toUpperCase(); + m_eventType = _eventType; m_interval = _interval; m_numToProcess = _numToProcess; - m_schema = _schema; m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format); + if (m_eventType == ManzanEventType.WATCH_MSG) { + m_table = "MANZANMSG"; + } else if (m_eventType == ManzanEventType.WATCH_VLOG) { + m_table = "MANZANVLOG"; + } else { + m_table = "MANZANPAL"; + } super.setRecipientList(_destinations); - m_sessionId = _session_id.trim().toUpperCase(); } @Override @@ -38,9 +49,9 @@ public void configure() { // Reset the list of ordinal positions at the start of each execution exchange.setProperty("ordinalPositions", new ArrayList()); }) - .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG)) - .setBody(constant("SeLeCt * fRoM " + m_schema + ".mAnZaNmSg wHeRe SESSION_ID = '" + m_sessionId - + "' limit " + m_numToProcess)) + .setHeader(EVENT_TYPE, constant(m_eventType)) + .setBody(constant("SELECT * FROM " + m_schema + "." + m_table + " WHERE SESSION_ID = '" + m_sessionId + + "' LIMIT " + m_numToProcess)) .to("jdbc:jt400?outputType=StreamList") .split(body()).streaming().parallelProcessing() .setHeader("id", simple("${body[ORDINAL_POSITION]}")) @@ -69,8 +80,8 @@ public void configure() { .end() .process(exchange -> { // Constructing the WHERE clause for ORDINAL_POSITIONs - StringBuilder deleteQuery = new StringBuilder("DELETE FROM " + m_schema - + ".MANZANMSG WHERE SESSION_ID = '" + m_sessionId + "' AND ORDINAL_POSITION IN ("); + StringBuilder deleteQuery = new StringBuilder("DELETE FROM " + m_schema + "." + m_table + + " WHERE SESSION_ID = '" + m_sessionId + "' AND ORDINAL_POSITION IN ("); @SuppressWarnings("unchecked") List ordinalPositions = exchange.getProperty("ordinalPositions", List.class); if (ordinalPositions != null && !ordinalPositions.isEmpty()) { diff --git a/docs/README.md b/docs/README.md index c26cd75..569c998 100644 --- a/docs/README.md +++ b/docs/README.md @@ -47,7 +47,7 @@ Many other destinations will be available. Examples include: - [Mezmo](http://mezmo.com) ⏳ - [Microsoft Teams](http://teams.microsoft.com) ⏳ - [PagerDuty](http://pagerduty.com) ⏳ -- [Sentry](http://sentry.io) 🌗 +- [Sentry](http://sentry.io) ✅ - [Slack](http://slack.com) ✅ - SMS (via [Twilio](http://www.twilio.com)) ✅ - [Splunk](http://splunk.com) ⏳ diff --git a/docs/config/data.md b/docs/config/data.md index a5de931..84bf921 100644 --- a/docs/config/data.md +++ b/docs/config/data.md @@ -14,11 +14,11 @@ Here are the requirements for each section. These are optional properties available on all types: -* `format` can be used to define a nicer messages to be sent to the destination. - * For file events we have the following variables available: `FILE_DATA`, `FILE_NAME`, `FILE_PATH`. - * For watch events we have the following variables available: `SESSION_ID`, `MESSAGE_ID`, `MESSAGE_TYPE`, `SEVERITY`, `JOB`, `SENDING_USRPRF`, `SENDING_PROGRAM_NAME`, `SENDING_MODULE_NAME`, `SENDING_PROCEDURE_NAME`, `MESSAGE_TIMESTAMP`, `MESSAGE`.\ - \ - By specifying the variable in your format string surrounded by dollar signs, the variables value will be replaced in your format string. Ex. For a file `a.txt` that received the data `hello world` the format string `Data: $FILE_DATA$, Name: $FILE_NAME$` will evaluate to `Data: hello world, Name: a.txt`. `format` can be provided in both data sources and destinations. +* `format` can be used to define a nicer messages to be sent to the destination. By specifying the variable in your format string surrounded by dollar signs, the variables value will be replaced in your format string. Ex. For a file `a.txt` that received the data `hello world` the format string `Data: $FILE_DATA$, Name: $FILE_NAME$` will evaluate to `Data: hello world, Name: a.txt`. `format` can be provided in both data sources and destinations. + * **File Events**: `FILE_DATA`, `FILE_NAME`, `FILE_PATH` + * **Message Queue Watch Event**: `SESSION_ID`, `MESSAGE_ID`, `MESSAGE_TYPE`, `SEVERITY`, `JOB`, `SENDING_USRPRF`, `SENDING_PROGRAM_NAME`, `SENDING_MODULE_NAME`, `SENDING_PROCEDURE_NAME`, `MESSAGE_TIMESTAMP`, `MESSAGE` + * **Licensed Internal Code (LIC) Log Watch Event**: `SESSION_ID`, `MAJOR_CODE`, `MINOR_CODE`, `LOG_ID`, `LOG_TIMESTAMP`, `TDE_NUM`, `TASK_NAME`, `SERVER_TYPE`, `EXCEPTION_ID`, `JOB`, `THREAD_ID`, `MODULE_OFFSET`, `MODULE_RU_NAME`, `MODULE_NAME`, `MODULE_ENTRY_POINT_NAME` + * **Product Activity Log Watch Event**: `SESSION_ID`, `SYSTEM_REFERENCE_CODE`, `DEVICE_NAME`, `MODEL`, `SERIAL_NUMBER`, `RESOURCE_NAME`, `LOG_ID`, `PAL_TIMESTAMP`, `REFERENCE_CODE`, `SECONDARY_CODE`, `TABLE_ID`, `SEQUENCE_NUM` * `interval` can be used to configure how often the distributor checks for events in milliseconds (default `5`) * `enabled` is a boolean (`true` or `false`) so a data source can be defined but disabled diff --git a/ile/src/pub_db2.sqlc b/ile/src/pub_db2.sqlc index e21c284..a81e0b1 100644 --- a/ile/src/pub_db2.sqlc +++ b/ile/src/pub_db2.sqlc @@ -200,6 +200,43 @@ extern int db2_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE) { + exec sql include SQLCA; + char vlog_session_id[11]; + char vlog_major_code[11]; + char vlog_minor_code[11]; + char vlog_log_id[11]; + char vlog_log_timestamp[11]; + char vlog_tde_num[11]; + char vlog_task_name[11]; + char vlog_server_type[11]; + char vlog_exception_id[11]; + char vlog_job[11]; + char vlog_thread_id[11]; + char vlog_module_offset[11]; + char vlog_module_ru_name[11]; + char vlog_module_name[11]; + char vlog_module_entry_point_name[11]; + + COPY_PARM(vlog_session_id, _session_id); + COPY_PARM(vlog_major_code, _major_code); + COPY_PARM(vlog_minor_code, _minor_code); + COPY_PARM(vlog_log_id, _log_id); + COPY_PARM(vlog_log_timestamp, _timestamp); + COPY_PARM(vlog_tde_num, _tde_number); + COPY_PARM(vlog_task_name, _task_name); + COPY_PARM(vlog_server_type, _server_type); + COPY_PARM(vlog_exception_id, _exception_id); + COPY_PARM(vlog_job, _job); + COPY_PARM(vlog_thread_id, _thread_id); + COPY_PARM(vlog_module_offset, _module_offset); + COPY_PARM(vlog_module_ru_name, _module_ru_name); + COPY_PARM(vlog_module_name, _module_name); + COPY_PARM(vlog_module_entry_point_name, _module_entry_point_name); + + EXEC SQL + INSERT INTO MANZANVLOG(SESSION_ID, MAJOR_CODE, MINOR_CODE, LOG_ID, LOG_TIMESTAMP, TDE_NUM, TASK_NAME, SERVER_TYPE, EXCEPTION_ID, JOB, THREAD_ID, MODULE_OFFSET, MODULE_RU_NAME, MODULE_NAME, MODULE_ENTRY_POINT_NAME) + VALUES( : vlog_session_id, : vlog_major_code, : vlog_minor_code, : vlog_log_id, : vlog_log_timestamp, : vlog_tde_num, : vlog_task_name, : vlog_server_type, : vlog_exception_id, : vlog_job, : vlog_thread_id, : vlog_module_offset, : vlog_module_ru_name, : vlog_module_name, : vlog_module_entry_point_name:); + check_sql_error(sqlca.sqlcode, sqlca.sqlstate); return 0; } From c738086d6bfb0286916b94dc1b097cc9dd68971c Mon Sep 17 00:00:00 2001 From: Sanjula Ganepola Date: Wed, 4 Dec 2024 14:45:42 -0500 Subject: [PATCH 02/10] Fix c errors Signed-off-by: Sanjula Ganepola --- ile/src/pub_db2.sqlc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ile/src/pub_db2.sqlc b/ile/src/pub_db2.sqlc index a81e0b1..c663324 100644 --- a/ile/src/pub_db2.sqlc +++ b/ile/src/pub_db2.sqlc @@ -201,6 +201,7 @@ extern int db2_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE) { exec sql include SQLCA; + EXEC SQL BEGIN DECLARE SECTION; char vlog_session_id[11]; char vlog_major_code[11]; char vlog_minor_code[11]; @@ -216,6 +217,7 @@ extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE) char vlog_module_ru_name[11]; char vlog_module_name[11]; char vlog_module_entry_point_name[11]; + EXEC SQL END DECLARE SECTION; COPY_PARM(vlog_session_id, _session_id); COPY_PARM(vlog_major_code, _major_code); @@ -235,7 +237,7 @@ extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE) EXEC SQL INSERT INTO MANZANVLOG(SESSION_ID, MAJOR_CODE, MINOR_CODE, LOG_ID, LOG_TIMESTAMP, TDE_NUM, TASK_NAME, SERVER_TYPE, EXCEPTION_ID, JOB, THREAD_ID, MODULE_OFFSET, MODULE_RU_NAME, MODULE_NAME, MODULE_ENTRY_POINT_NAME) - VALUES( : vlog_session_id, : vlog_major_code, : vlog_minor_code, : vlog_log_id, : vlog_log_timestamp, : vlog_tde_num, : vlog_task_name, : vlog_server_type, : vlog_exception_id, : vlog_job, : vlog_thread_id, : vlog_module_offset, : vlog_module_ru_name, : vlog_module_name, : vlog_module_entry_point_name:); + VALUES( : vlog_session_id, : vlog_major_code, : vlog_minor_code, : vlog_log_id, : vlog_log_timestamp, : vlog_tde_num, : vlog_task_name, : vlog_server_type, : vlog_exception_id, : vlog_job, : vlog_thread_id, : vlog_module_offset, : vlog_module_ru_name, : vlog_module_name, : vlog_module_entry_point_name); check_sql_error(sqlca.sqlcode, sqlca.sqlstate); return 0; } From 6cd54585e41a60e82610709b92b964249ad2128b Mon Sep 17 00:00:00 2001 From: Sanjula Ganepola Date: Wed, 4 Dec 2024 14:57:57 -0500 Subject: [PATCH 03/10] Fix check_sql_error Signed-off-by: Sanjula Ganepola --- ile/src/pub_db2.sqlc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ile/src/pub_db2.sqlc b/ile/src/pub_db2.sqlc index c663324..18a1bd1 100644 --- a/ile/src/pub_db2.sqlc +++ b/ile/src/pub_db2.sqlc @@ -238,7 +238,7 @@ extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE) EXEC SQL INSERT INTO MANZANVLOG(SESSION_ID, MAJOR_CODE, MINOR_CODE, LOG_ID, LOG_TIMESTAMP, TDE_NUM, TASK_NAME, SERVER_TYPE, EXCEPTION_ID, JOB, THREAD_ID, MODULE_OFFSET, MODULE_RU_NAME, MODULE_NAME, MODULE_ENTRY_POINT_NAME) VALUES( : vlog_session_id, : vlog_major_code, : vlog_minor_code, : vlog_log_id, : vlog_log_timestamp, : vlog_tde_num, : vlog_task_name, : vlog_server_type, : vlog_exception_id, : vlog_job, : vlog_thread_id, : vlog_module_offset, : vlog_module_ru_name, : vlog_module_name, : vlog_module_entry_point_name); - check_sql_error(sqlca.sqlcode, sqlca.sqlstate); + check_sql_error(sqlca.sqlcode, (const char *)sqlca.sqlstate); return 0; } From beec85f2d543b4a5053d6bfc9e2f2ae781bc2c17 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Mon, 16 Dec 2024 15:58:29 -0500 Subject: [PATCH 04/10] Only create one socket listener --- .../manzan/configuration/DataConfig.java | 70 ++++++++++++++----- .../theprez/manzan/routes/ManzanRoute.java | 14 ++-- .../routes/event/WatchMsgEventSockets.java | 29 ++++---- 3 files changed, 79 insertions(+), 34 deletions(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java index b5aaca2..585fdca 100644 --- a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java +++ b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java @@ -4,6 +4,8 @@ import java.io.File; import java.io.IOException; import java.util.LinkedHashMap; +import java.util.HashMap; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -21,6 +23,8 @@ import com.ibm.as400.access.ErrorCompletingRequestException; import com.ibm.as400.access.ObjectDoesNotExistException; +import static com.github.theprez.manzan.routes.ManzanRoute.createRecipientList; + public class DataConfig extends Config { private final static int DEFAULT_INTERVAL = 5; @@ -45,6 +49,7 @@ public synchronized Map getRoutes() throws IOException, AS4 return m_routes; } final Map ret = new LinkedHashMap(); + final List watchEvents = new ArrayList<>(); for (final String section : getIni().keySet()) { final String type = getIni().get(section, "type"); if (StringUtils.isEmpty(type)) { @@ -52,14 +57,14 @@ public synchronized Map getRoutes() throws IOException, AS4 } if ("false".equalsIgnoreCase(getIni().get(section, "enabled"))) { continue; + } else if ( type.equals("watch")){ + watchEvents.add(section); + continue; } final String name = section; - final String schema = ApplicationConfig.get().getLibrary(); final String format = getOptionalString(name, "format"); int userInterval = getOptionalInt(name, "interval"); final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL; - int userNumToProcess = getOptionalInt(name, "numToProcess"); - final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS; final List destinations = new LinkedList(); for (String d : getRequiredString(name, "destinations").split("\\s*,\\s*")) { d = d.trim(); @@ -72,19 +77,6 @@ public synchronized Map getRoutes() throws IOException, AS4 } } switch (type) { - case "watch": - String id = getRequiredString(name, "id"); - String sqlRouteName = name + "sql"; - String socketRouteName = name + "socket"; - - ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, interval, numToProcess)); - String strwch = getOptionalString(name, "strwch"); - if (StringUtils.isNonEmpty(strwch)) { - WatchStarter ws = new WatchStarter(id, strwch); - ws.strwch(); - } - ret.put(socketRouteName, new WatchMsgEventSockets(socketRouteName, format, destinations, schema, interval, numToProcess)); - break; case "file": String file = getRequiredString(name, "file"); String filter = getOptionalString(name, "filter"); @@ -94,6 +86,52 @@ public synchronized Map getRoutes() throws IOException, AS4 throw new RuntimeException("Unknown destination type: " + type); } } + + final Map formatMap = new HashMap<>(); + final Map destMap = new HashMap<>(); + + for (int i = 0; i < watchEvents.size(); i++) { + final String section = watchEvents.get(i); + final String name = section; + final String schema = ApplicationConfig.get().getLibrary(); + final String format = getOptionalString(name, "format"); + String id = getRequiredString(name, "id"); + + int userInterval = getOptionalInt(name, "interval"); + final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL; + int userNumToProcess = getOptionalInt(name, "numToProcess"); + final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS; + final List destinations = new LinkedList(); + for (String d : getRequiredString(name, "destinations").split("\\s*,\\s*")) { + d = d.trim(); + if (!m_destinations.contains(d)) { + throw new RuntimeException( + "no destination configured named '" + d + "' for data source '" + name + "'"); + } + if (StringUtils.isNonEmpty(d)) { + destinations.add(d); + } + } + String destString = createRecipientList(destinations); + formatMap.put(id.toUpperCase(), format); + destMap.put(id.toUpperCase(), destString); + + String sqlRouteName = name + "sql"; + + ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, interval, numToProcess)); + String strwch = getOptionalString(name, "strwch"); + if (StringUtils.isNonEmpty(strwch)) { + WatchStarter ws = new WatchStarter(id, strwch); + ws.strwch(); + } + + if (i == watchEvents.size() - 1){ + // This is the last watch event, so we've built the whole map + final String routeName = "socketWatcher"; + ret.put(routeName, new WatchMsgEventSockets(routeName, formatMap, destMap)); + } + } + return m_routes = ret; } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java b/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java index f3c92cd..007bbe2 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/ManzanRoute.java @@ -77,6 +77,13 @@ protected String getString(final Exchange _exchange, final String _attr) { } protected void setRecipientList(final List _destinations) throws IOException { + m_recipientList = createRecipientList(_destinations); + if (StringUtils.isEmpty(m_recipientList)) { + throw new IOException("Message watch for '" + m_name + "' has no valid destinations"); + } + } + + public static String createRecipientList(final List _destinations) throws IOException { String destinationsStr = ""; for (final String dest : _destinations) { if (StringUtils.isEmpty(dest)) { @@ -84,11 +91,6 @@ protected void setRecipientList(final List _destinations) throws IOExcep } destinationsStr += "direct:" + dest.toLowerCase().trim() + ","; } - m_recipientList = destinationsStr.replaceFirst(",$", "").trim(); - - if (StringUtils.isEmpty(m_recipientList)) { - throw new IOException("Message watch for '" + m_name + "' has no valid destinations"); - } + return destinationsStr.replaceFirst(",$", "").trim(); } - } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java index f0c30ed..8f500a3 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java @@ -1,47 +1,52 @@ package com.github.theprez.manzan.routes.event; -import java.io.IOException; -import java.util.List; import java.util.Map; import org.apache.camel.model.dataformat.JsonLibrary; -import com.github.theprez.jcmdutils.StringUtils; import com.github.theprez.manzan.ManzanEventType; import com.github.theprez.manzan.ManzanMessageFormatter; import com.github.theprez.manzan.routes.ManzanRoute; public class WatchMsgEventSockets extends ManzanRoute { - private final ManzanMessageFormatter m_formatter; +// private final ManzanMessageFormatter m_formatter; + private final Map m_formatMap; + final Map m_destMap; private final String m_socketIp = "0.0.0.0"; private final String m_socketPort = "8080"; - public WatchMsgEventSockets(final String _name, final String _format, - final List _destinations, final String _schema, final int _interval, final int _numToProcess) - throws IOException { + public WatchMsgEventSockets(final String _name, final Map _formatMap, + final Map _destMap) { super(_name); - m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format); - super.setRecipientList(_destinations); + m_formatMap = _formatMap; + m_destMap = _destMap; } //@formatter:off @Override public void configure() { + getContext().getRegistry().bind("manzanSocketBean", this); from(String.format("netty:tcp://%s:%s?sync=false", m_socketIp, m_socketPort)) .unmarshal().json(JsonLibrary.Jackson, Map.class) .routeId("manzan_msg:"+m_name) .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG)) - .setHeader("session_id", simple("${body[sessionId]}")) + .setHeader("session_id", simple("${body[SESSION_ID]}")) .setHeader("data_map", simple("${body}")) .marshal().json(true) //TODO: skip this if we are applying a format .setBody(simple("${body}\n")) .process(exchange -> { - if (null != m_formatter) { + String sessionId = exchange.getIn().getHeader("session_id", String.class); + String format = m_formatMap.get(sessionId); + if (format != null) { + ManzanMessageFormatter m_formatter = new ManzanMessageFormatter(format); exchange.getIn().setBody(m_formatter.format(getDataMap(exchange))); + String destinations = m_destMap.get(sessionId); // Get destinations from m_destMap + exchange.getIn().setHeader("destinations", destinations); } }) - .recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end(); + .recipientList(header("destinations")) + .parallelProcessing().stopOnException(); } //@formatter:on } \ No newline at end of file From ff07d3f23603696910d97c9aa4295db0b519514e Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Mon, 16 Dec 2024 16:03:05 -0500 Subject: [PATCH 05/10] remove unneccessary bind --- .../github/theprez/manzan/routes/event/WatchMsgEventSockets.java | 1 - 1 file changed, 1 deletion(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java index 8f500a3..384da65 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java @@ -26,7 +26,6 @@ public WatchMsgEventSockets(final String _name, final Map _forma //@formatter:off @Override public void configure() { - getContext().getRegistry().bind("manzanSocketBean", this); from(String.format("netty:tcp://%s:%s?sync=false", m_socketIp, m_socketPort)) .unmarshal().json(JsonLibrary.Jackson, Map.class) .routeId("manzan_msg:"+m_name) From db7b64c675adfd70e18784181e97e993d9f17528 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Mon, 16 Dec 2024 16:16:44 -0500 Subject: [PATCH 06/10] Add comments --- .../github/theprez/manzan/configuration/DataConfig.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java index 585fdca..1568ce1 100644 --- a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java +++ b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java @@ -58,6 +58,7 @@ public synchronized Map getRoutes() throws IOException, AS4 if ("false".equalsIgnoreCase(getIni().get(section, "enabled"))) { continue; } else if ( type.equals("watch")){ + // We will handle the watch events separately as the logic is a bit more complicated watchEvents.add(section); continue; } @@ -87,6 +88,8 @@ public synchronized Map getRoutes() throws IOException, AS4 } } + // We will create a formatMap to store the format for each watch session, as well + // as a destMap to store the destinations for each watch session final Map formatMap = new HashMap<>(); final Map destMap = new HashMap<>(); @@ -112,6 +115,8 @@ public synchronized Map getRoutes() throws IOException, AS4 destinations.add(d); } } + + // Build the maps String destString = createRecipientList(destinations); formatMap.put(id.toUpperCase(), format); destMap.put(id.toUpperCase(), destString); @@ -126,7 +131,7 @@ public synchronized Map getRoutes() throws IOException, AS4 } if (i == watchEvents.size() - 1){ - // This is the last watch event, so we've built the whole map + // This is the last watch event, so we've built the whole map. Now create the route final String routeName = "socketWatcher"; ret.put(routeName, new WatchMsgEventSockets(routeName, formatMap, destMap)); } From 256a0230d8072e70df3b20d62332c755382f6bee Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Wed, 18 Dec 2024 09:53:10 -0500 Subject: [PATCH 07/10] Remove duplicate line --- .../java/com/github/theprez/manzan/configuration/DataConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java index 94630b1..3de4544 100644 --- a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java +++ b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java @@ -58,7 +58,6 @@ public synchronized Map getRoutes() throws IOException, AS4 final String type = getIni().get(section, "type"); if (StringUtils.isEmpty(type)) { throw new RuntimeException("Type not specified for data source [" + section + "]"); - throw new RuntimeException("Type not specified for data source [" + section + "]"); } if ("false".equalsIgnoreCase(getIni().get(section, "enabled"))) { continue; From 62cb6510c0c1ac9c2daea23d1c52f8c2eb6a17a1 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Wed, 18 Dec 2024 10:47:46 -0500 Subject: [PATCH 08/10] Address pr comments --- .../theprez/manzan/routes/event/WatchMsgEventSockets.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java index 384da65..3ad35bd 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java @@ -40,12 +40,12 @@ public void configure() { if (format != null) { ManzanMessageFormatter m_formatter = new ManzanMessageFormatter(format); exchange.getIn().setBody(m_formatter.format(getDataMap(exchange))); - String destinations = m_destMap.get(sessionId); // Get destinations from m_destMap - exchange.getIn().setHeader("destinations", destinations); } + String destinations = m_destMap.get(sessionId); // Get destinations from m_destMap + exchange.getIn().setHeader("destinations", destinations); }) .recipientList(header("destinations")) - .parallelProcessing().stopOnException(); + .parallelProcessing().stopOnException().end(); } //@formatter:on } \ No newline at end of file From 182dbe667965580fb2452160f8e9a2b04e659fac Mon Sep 17 00:00:00 2001 From: Jonathan <42983653+jonnyz32@users.noreply.github.com> Date: Wed, 18 Dec 2024 10:49:41 -0500 Subject: [PATCH 09/10] Apply suggestions from code review Co-authored-by: Sanjula Ganepola <32170854+SanjulaGanepola@users.noreply.github.com> --- .../theprez/manzan/routes/event/WatchMsgEventSockets.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java index 3ad35bd..e8e55ac 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEventSockets.java @@ -10,9 +10,8 @@ public class WatchMsgEventSockets extends ManzanRoute { -// private final ManzanMessageFormatter m_formatter; private final Map m_formatMap; - final Map m_destMap; + private final Map m_destMap; private final String m_socketIp = "0.0.0.0"; private final String m_socketPort = "8080"; From 96f2eb090a0e79fdc1cac28263469ea98feb880e Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Wed, 18 Dec 2024 10:56:17 -0500 Subject: [PATCH 10/10] Create socket route outside of loop --- .../theprez/manzan/configuration/DataConfig.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java index 3de4544..a4a81ce 100644 --- a/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java +++ b/camel/src/main/java/com/github/theprez/manzan/configuration/DataConfig.java @@ -140,12 +140,12 @@ public synchronized Map getRoutes() throws IOException, AS4 ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType, interval, numToProcess)); WatchStarter ws = new WatchStarter(id, strwch); ws.strwch(); + } - if (i == watchEvents.size() - 1){ - // This is the last watch event, so we've built the whole map. Now create the route - final String routeName = "socketWatcher"; - ret.put(routeName, new WatchMsgEventSockets(routeName, formatMap, destMap)); - } + if (watchEvents.size() > 0){ + // After iterating over the loop, the formatMap and destMap are complete. Now create the route. + final String routeName = "socketWatcher"; + ret.put(routeName, new WatchMsgEventSockets(routeName, formatMap, destMap)); } return m_routes = ret; }