From 38b7401705fb74fd20acc1bdce7ee3db0fba9429 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 20 Sep 2024 12:12:33 -0400 Subject: [PATCH 01/25] Add socket client --- ile/src/socketClient.cpp | 121 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 ile/src/socketClient.cpp diff --git a/ile/src/socketClient.cpp b/ile/src/socketClient.cpp new file mode 100644 index 0000000..cf036cb --- /dev/null +++ b/ile/src/socketClient.cpp @@ -0,0 +1,121 @@ +#include +#include // For memset and memcpy +#include // For socket functions +#include // For inet_addr +#include // For close + +class SocketClient { +public: + // Constructor to initialize the socket descriptor to -1 + SocketClient(){ + sock_fd = -1; + } + + // Method to open a socket and connect to the server + bool openSocket(const std::string ip, int port) { + // Create socket + sock_fd = socket(AF_INET, SOCK_STREAM, 0); + if (sock_fd < 0) { + std::cerr << "Error creating socket\n"; + return false; + } + + // Define server address + struct sockaddr_in server_address; + server_address.sin_family = AF_INET; + server_address.sin_port = htons(port); + server_address.sin_addr.s_addr = inet_addr(ip.c_str()); + + // Connect to server + if (connect(sock_fd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { + std::cerr << "Error connecting to server\n"; + closeSocket(); + return false; + } + + std::cout << "Connected to server at " << ip << ":" << port << "\n"; + return true; + } + + // Method to send a message (string) over the socket + bool sendMessage(const std::string message) { + if (sock_fd < 0) { + std::cerr << "Socket is not open\n"; + return false; + } + + int bytes_sent = send(sock_fd, message.c_str(), message.size(), 0); + if (bytes_sent < 0) { + std::cerr << "Error sending message\n"; + return false; + } + + std::cout << "Sent message: " << message << "\n"; + return true; + } + + // Method to send a struct over the socket + template + bool sendStruct(const T& data) { + if (sock_fd < 0) { + std::cerr << "Socket is not open\n"; + return false; + } + + // Send the raw data of the struct + int bytes_sent = send(sock_fd, &data, sizeof(data), 0); + if (bytes_sent < 0) { + std::cerr << "Error sending struct\n"; + return false; + } + + std::cout << "Sent struct of size: " << sizeof(data) << " bytes\n"; + return true; + } + + // Method to close the socket + void closeSocket() { + if (sock_fd >= 0) { + close(sock_fd); + sock_fd = -1; + std::cout << "Socket closed\n"; + } + } + + // Destructor to ensure socket is closed + ~SocketClient() { + closeSocket(); + } + +private: + int sock_fd; // Socket file descriptor +}; + +// Example struct to send over the socket +struct ExampleStruct { + int id; + float value; + char name[50]; +}; + +int main() { + // Create a SocketClient instance + SocketClient client; + + // Open a socket and connect to a server + if (!client.openSocket("127.0.0.1", 8080)) { + return 1; + } + + // Send a message over the socket + client.sendMessage("Hello from C++"); + + // Prepare and send a struct over the socket + ExampleStruct data = { 1, 3.14, "TestStruct" }; + client.sendStruct(data); + + // Close the socket + client.closeSocket(); + + return 0; +} From 6957f01eb99b07f07d693e6765972fd1e2f9eb6e Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Tue, 24 Sep 2024 14:34:09 -0400 Subject: [PATCH 02/25] added sockclient.cpp --- ile/Makefile | 4 +- ile/src/{socketClient.cpp => SockClient.cpp} | 107 ++++++++++--------- ile/src/SockClient.h | 27 +++++ ile/src/handler.cpp | 39 ++++++- ile/src/pub_json.cpp | 15 ++- ile/src/pub_json.h | 5 + 6 files changed, 137 insertions(+), 60 deletions(-) rename ile/src/{socketClient.cpp => SockClient.cpp} (50%) create mode 100644 ile/src/SockClient.h diff --git a/ile/Makefile b/ile/Makefile index fcc16c5..55aac28 100644 --- a/ile/Makefile +++ b/ile/Makefile @@ -19,13 +19,13 @@ src/mzversion.h: /qsys.lib/${BUILDLIB}.lib: system "RUNSQL SQL('create schema ${BUILDLIB}') COMMIT(*NONE) NAMING(*SQL) " -/qsys.lib/${BUILDLIB}.lib/handler.pgm: /qsys.lib/${BUILDLIB}.lib/handler.module /qsys.lib/${BUILDLIB}.lib/pub_json.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/debug.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/userconf.module +/qsys.lib/${BUILDLIB}.lib/handler.pgm: /qsys.lib/${BUILDLIB}.lib/handler.module /qsys.lib/${BUILDLIB}.lib/pub_json.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/debug.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/userconf.module /qsys.lib/${BUILDLIB}.lib/SockClient.module /qsys.lib/${BUILDLIB}.lib/%.pgm: system "CRTPGM PGM(${BUILDLIB}/$*) MODULE($(patsubst %.module,$(BUILDLIB)/%,$(notdir $^))) ACTGRP(*CALLER)" /qsys.lib/${BUILDLIB}.lib/%.module: src/%.cpp src/mzversion.h - system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED)" + system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED) OUTPUT(*PRINT)" /qsys.lib/${BUILDLIB}.lib/%.module: src/%.sqlc system "CRTSQLCI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)" diff --git a/ile/src/socketClient.cpp b/ile/src/SockClient.cpp similarity index 50% rename from ile/src/socketClient.cpp rename to ile/src/SockClient.cpp index cf036cb..9052b15 100644 --- a/ile/src/socketClient.cpp +++ b/ile/src/SockClient.cpp @@ -3,16 +3,17 @@ #include // For socket functions #include // For inet_addr #include // For close +#include "SockClient.h" + + -class SocketClient { -public: // Constructor to initialize the socket descriptor to -1 - SocketClient(){ + SockClient::SockClient(){ sock_fd = -1; } // Method to open a socket and connect to the server - bool openSocket(const std::string ip, int port) { + bool SockClient::openSocket(const std::string ip, int port) { // Create socket sock_fd = socket(AF_INET, SOCK_STREAM, 0); if (sock_fd < 0) { @@ -24,7 +25,7 @@ class SocketClient { struct sockaddr_in server_address; server_address.sin_family = AF_INET; server_address.sin_port = htons(port); - server_address.sin_addr.s_addr = inet_addr(ip.c_str()); + server_address.sin_addr.s_addr = inet_addr(const_cast(ip.c_str())); // Connect to server if (connect(sock_fd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { @@ -38,13 +39,13 @@ class SocketClient { } // Method to send a message (string) over the socket - bool sendMessage(const std::string message) { + bool SockClient::sendMessage(const std::string message) { if (sock_fd < 0) { std::cerr << "Socket is not open\n"; return false; } - int bytes_sent = send(sock_fd, message.c_str(), message.size(), 0); + int bytes_sent = send(sock_fd, const_cast(message.c_str()), message.size(), 0); if (bytes_sent < 0) { std::cerr << "Error sending message\n"; return false; @@ -54,27 +55,27 @@ class SocketClient { return true; } - // Method to send a struct over the socket - template - bool sendStruct(const T& data) { - if (sock_fd < 0) { - std::cerr << "Socket is not open\n"; - return false; - } - - // Send the raw data of the struct - int bytes_sent = send(sock_fd, &data, sizeof(data), 0); - if (bytes_sent < 0) { - std::cerr << "Error sending struct\n"; - return false; - } - - std::cout << "Sent struct of size: " << sizeof(data) << " bytes\n"; - return true; - } + // // Method to send a struct over the socket + // template + // bool SockClient::sendStruct(const T& data) { + // if (sock_fd < 0) { + // std::cerr << "Socket is not open\n"; + // return false; + // } + + // // Send the raw data of the struct + // int bytes_sent = send(sock_fd, &data, sizeof(data), 0); + // if (bytes_sent < 0) { + // std::cerr << "Error sending struct\n"; + // return false; + // } + + // std::cout << "Sent struct of size: " << sizeof(data) << " bytes\n"; + // return true; + // } // Method to close the socket - void closeSocket() { + void SockClient::closeSocket() { if (sock_fd >= 0) { close(sock_fd); sock_fd = -1; @@ -83,39 +84,39 @@ class SocketClient { } // Destructor to ensure socket is closed - ~SocketClient() { + SockClient::~SockClient() { closeSocket(); } -private: - int sock_fd; // Socket file descriptor -}; + + + // Socket file descriptor // Example struct to send over the socket -struct ExampleStruct { - int id; - float value; - char name[50]; -}; - -int main() { - // Create a SocketClient instance - SocketClient client; - - // Open a socket and connect to a server - if (!client.openSocket("127.0.0.1", 8080)) { - return 1; - } +// struct ExampleStruct { +// int id; +// float value; +// char name[50]; +// }; + +// int main() { +// // Create a SockClient instance +// SockClient client; + +// // Open a socket and connect to a server +// if (!client.openSocket("127.0.0.1", 8080)) { +// return 1; +// } - // Send a message over the socket - client.sendMessage("Hello from C++"); +// // Send a message over the socket +// client.sendMessage("Hello from C++"); - // Prepare and send a struct over the socket - ExampleStruct data = { 1, 3.14, "TestStruct" }; - client.sendStruct(data); +// // Prepare and send a struct over the socket +// ExampleStruct data = { 1, 3.14, "TestStruct" }; +// client.sendStruct(data); - // Close the socket - client.closeSocket(); +// // Close the socket +// client.closeSocket(); - return 0; -} +// return 0; +// } diff --git a/ile/src/SockClient.h b/ile/src/SockClient.h new file mode 100644 index 0000000..2e9c10b --- /dev/null +++ b/ile/src/SockClient.h @@ -0,0 +1,27 @@ +#include + +// extern "C" { +class SockClient { + +int sock_fd; +public: + // Constructor to initialize the socket descriptor to -1 + SockClient(); + + // Method to open a socket and connect to the server + bool openSocket(const std::string ip, int port); + + // Method to send a message (string) over the socket + bool sendMessage(const std::string message); + + // // Method to send a struct over the socket + // template + // bool sendStruct(const typename T& data); + + // Method to close the socket + void closeSocket(); + + // Destructor to ensure socket is closed + ~SockClient(); +}; +// } diff --git a/ile/src/handler.cpp b/ile/src/handler.cpp index 092fd2e..1ab1995 100644 --- a/ile/src/handler.cpp +++ b/ile/src/handler.cpp @@ -12,6 +12,8 @@ #include "event_data.h" #include "userconf.h" #include "mzversion.h" +#include "pub_json.h" +#include "SockClient.h" static FILE *fd = NULL; @@ -182,8 +184,12 @@ int main(int _argc, char **argv) for (int i = 0; i < num_publishers; i++) { msg_publish_func func = publishers->array[i].msg_publish_func_ptr; - func( - session_id.c_str(), + // This is where we send the data to the publish function which encodes it and also + // publishes it to the table + + // Instead, lets encode the data and then send it to the socket + std::string json_message = construct_json_message( + session_id.c_str(), msgid.c_str(), message_type.c_str(), message_severity, @@ -193,7 +199,34 @@ int main(int _argc, char **argv) msg_info_buf->message, sending_program_name.c_str(), sending_module_name.c_str(), - sending_procedure_name.c_str()); + sending_procedure_name.c_str() + ); + + // Create a SocketClient instance + SockClient client; + + // Open a socket and connect to server + if (!client.openSocket("127.0.0.1", 8080)) { + return 1; + } + + // Send a message over the socket + client.sendMessage(json_message); + + // Close the socket + client.closeSocket(); + // func( + // session_id.c_str(), + // msgid.c_str(), + // message_type.c_str(), + // message_severity, + // message_timestamp.c_str(), + // job.c_str(), + // sending_usrprf.c_str(), + // msg_info_buf->message, + // sending_program_name.c_str(), + // sending_module_name.c_str(), + // sending_procedure_name.c_str()); DEBUG_INFO("Published\n"); } free(msg_info_buf); diff --git a/ile/src/pub_json.cpp b/ile/src/pub_json.cpp index 5c035ec..9d7a242 100644 --- a/ile/src/pub_json.cpp +++ b/ile/src/pub_json.cpp @@ -121,7 +121,7 @@ int json_publish(const char *_session_id, std::string &_json) return 0; } -extern "C" int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) +std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) { std::string jsonStr; jsonStr += "{\n "; @@ -148,8 +148,19 @@ extern "C" int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) append_json_element(jsonStr, "sending_module_name", _sending_module_name); jsonStr += ",\n "; append_json_element(jsonStr, "sending_procedure_name", _sending_procedure_name); - jsonStr += "\n}"; + return 0; + // return jsonStr; +} + +/** + * Publish json message to DTAQ + */ +extern "C" int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) +{ + std::string jsonStr = construct_json_message(_session_id, _msgid, _msg_type, _msg_severity, _msg_timestamp, _job, _sending_usrprf, + _message, _sending_program_name, _sending_module_name, _sending_procedure_name); + return json_publish(_session_id, jsonStr); } diff --git a/ile/src/pub_json.h b/ile/src/pub_json.h index bbb6f97..bae920d 100644 --- a/ile/src/pub_json.h +++ b/ile/src/pub_json.h @@ -1,3 +1,5 @@ +#include + #ifndef _MANZAN_JSON_PUB_H_ #define _MANZAN_JSON_PUB_H_ extern "C" { @@ -6,4 +8,7 @@ int json_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE); int json_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE); int json_publish_other(PUBLISH_OTHER_FUNCTION_SIGNATURE); } + +std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE); + #endif \ No newline at end of file From 75fe62303ad4ecde1a0d83071b9a57ccb25ede71 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Wed, 25 Sep 2024 09:41:59 -0400 Subject: [PATCH 03/25] Handle recieved json message --- camel/pom.xml | 4 + .../theprez/manzan/routes/event/MsgEvent.java | 155 ++++++++++++++++++ .../manzan/routes/event/WatchMsgEvent.java | 17 +- ile/src/pub_json.h | 1 + 4 files changed, 176 insertions(+), 1 deletion(-) create mode 100644 camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java diff --git a/camel/pom.xml b/camel/pom.xml index dbd53fa..cede079 100644 --- a/camel/pom.xml +++ b/camel/pom.xml @@ -60,6 +60,10 @@ camel-stream 3.14.7 + + org.apache.camel + camel-netty + org.apache.camel camel-jdbc diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java new file mode 100644 index 0000000..bb4fdd2 --- /dev/null +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java @@ -0,0 +1,155 @@ +package com.github.theprez.manzan.routes.event; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class MsgEvent { + + @JsonProperty("event_type") + private String eventType; + + @JsonProperty("session_id") + private String sessionId; + + private String job; + + @JsonProperty("msgid") + private String msgId; + + @JsonProperty("msgtype") + private String msgType; + + private int severity; + + @JsonProperty("message_timestamp") + private String messageTimestamp; + + @JsonProperty("sending_usrprf") + private String sendingUsrprf; + + private String message; + + @JsonProperty("sending_program_name") + private String sendingProgramName; + + @JsonProperty("sending_module_name") + private String sendingModuleName; + + @JsonProperty("sending_procedure_name") + private String sendingProcedureName; + + // Getters and Setters + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public String getSessionId() { + return sessionId; + } + + public void setSessionId(String sessionId) { + this.sessionId = sessionId; + } + + public String getJob() { + return job; + } + + public void setJob(String job) { + this.job = job; + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public String getMsgType() { + return msgType; + } + + public void setMsgType(String msgType) { + this.msgType = msgType; + } + + public int getSeverity() { + return severity; + } + + public void setSeverity(int severity) { + this.severity = severity; + } + + public String getMessageTimestamp() { + return messageTimestamp; + } + + public void setMessageTimestamp(String messageTimestamp) { + this.messageTimestamp = messageTimestamp; + } + + public String getSendingUsrprf() { + return sendingUsrprf; + } + + public void setSendingUsrprf(String sendingUsrprf) { + this.sendingUsrprf = sendingUsrprf; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getSendingProgramName() { + return sendingProgramName; + } + + public void setSendingProgramName(String sendingProgramName) { + this.sendingProgramName = sendingProgramName; + } + + public String getSendingModuleName() { + return sendingModuleName; + } + + public void setSendingModuleName(String sendingModuleName) { + this.sendingModuleName = sendingModuleName; + } + + public String getSendingProcedureName() { + return sendingProcedureName; + } + + public void setSendingProcedureName(String sendingProcedureName) { + this.sendingProcedureName = sendingProcedureName; + } + + @Override + public String toString() { + return "MessageEvent{" + + "eventType='" + eventType + '\'' + + ", sessionId='" + sessionId + '\'' + + ", job='" + job + '\'' + + ", msgId='" + msgId + '\'' + + ", msgType='" + msgType + '\'' + + ", severity=" + severity + + ", messageTimestamp='" + messageTimestamp + '\'' + + ", sendingUsrprf='" + sendingUsrprf + '\'' + + ", message='" + message + '\'' + + ", sendingProgramName='" + sendingProgramName + '\'' + + ", sendingModuleName='" + sendingModuleName + '\'' + + ", sendingProcedureName='" + sendingProcedureName + '\'' + + '}'; + } +} diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java index bd95763..b3c078d 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java @@ -3,6 +3,8 @@ import java.io.IOException; import java.util.List; +import org.apache.camel.model.dataformat.JsonLibrary; + import com.github.theprez.jcmdutils.StringUtils; import com.github.theprez.manzan.ManzanEventType; import com.github.theprez.manzan.ManzanMessageFormatter; @@ -29,7 +31,20 @@ public WatchMsgEvent(final String _name, final String _session_id, final String //@formatter:off @Override public void configure() { - from("timer://foo?synchronous=true&period=" + m_interval) + from("netty:tcp://0.0.0.0:8080?sync=false") + .log("Received raw message: ${body}") + .unmarshal().json(JsonLibrary.Jackson) + .process(exchange -> { + // Access the decoded JSON object + MsgEvent myJsonObject = exchange.getIn().getBody(MsgEvent.class); + // Process the object (e.g., log it) + exchange.getIn().setBody("Received JSON: " + myJsonObject); // Set body for logging + System.out.println("Result: " + myJsonObject); + }) + .log("body ${body}") // Log the processed message (deserialized object) + + // Optionally log the incoming message +// from("timer://foo?synchronous=true&period=" + m_interval) .routeId("manzan_msg:"+m_name) .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG)) .setBody(constant("SeLeCt * fRoM " + m_schema + ".mAnZaNmSg wHeRe SESSION_ID = '"+m_sessionId+"' limit " + m_numToProcess )) diff --git a/ile/src/pub_json.h b/ile/src/pub_json.h index bae920d..31eec73 100644 --- a/ile/src/pub_json.h +++ b/ile/src/pub_json.h @@ -7,6 +7,7 @@ int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE); int json_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE); int json_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE); int json_publish_other(PUBLISH_OTHER_FUNCTION_SIGNATURE); +int to_utf8(char *out, size_t out_len, const char *in); } std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE); From 443caac48c70fd222c6a7ee9eca89fd42038af55 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Thu, 26 Sep 2024 15:07:47 -0400 Subject: [PATCH 04/25] Do case insensitive matching for formatting data --- .../com/github/theprez/manzan/ManzanMessageFormatter.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java b/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java index c1f142f..805fa33 100644 --- a/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java +++ b/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java @@ -23,7 +23,7 @@ public static String format(final String _in, final Map _mapping private final String m_fmtStr; public ManzanMessageFormatter(final String _fmtStr) { - m_fmtStr = _fmtStr; + m_fmtStr = _fmtStr.toUpperCase(); } public String format(final Map _mappings) { @@ -33,8 +33,9 @@ public String format(final Map _mappings) { ret = ret.replace("\\n", "\n").replace("\\t", "\t"); for (final Entry repl : _mappings.entrySet()) { - ret = ret.replace("$" + repl.getKey() + "$", "" + repl.getValue()); - String jsonIndicator ="$json:" + repl.getKey() + "$"; + final String key = repl.getKey().toUpperCase(); + ret = ret.replace("$" + key + "$", "" + repl.getValue()); + String jsonIndicator ="$json:" + key + "$"; if(ret.contains(jsonIndicator)) { ret = ret.replace(jsonIndicator, jsonEncode("" + repl.getValue())); } From ff364791a0eeace4d3cf95eee8ca3d56089d9e63 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Thu, 26 Sep 2024 15:15:11 -0400 Subject: [PATCH 05/25] send message over socket --- .../manzan/routes/event/WatchMsgEvent.java | 37 ++++++------------- ile/src/handler.cpp | 18 ++++++++- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java index b3c078d..72844e1 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java @@ -2,6 +2,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.camel.model.dataformat.JsonLibrary; @@ -18,12 +19,14 @@ public class WatchMsgEvent extends ManzanRoute { private final String m_sessionId; private final ManzanMessageFormatter m_formatter; - public WatchMsgEvent(final String _name, final String _session_id, final String _format, final List _destinations, final String _schema, final int _interval, final int _numToProcess) throws IOException { + public WatchMsgEvent(final String _name, final String _session_id, final String _format, + final List _destinations, final String _schema, final int _interval, final int _numToProcess) + throws IOException { super(_name); m_interval = _interval; m_numToProcess = _numToProcess; m_schema = _schema; - m_formatter = StringUtils.isEmpty(_format) ? null: new ManzanMessageFormatter(_format); + m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format); super.setRecipientList(_destinations); m_sessionId = _session_id.trim().toUpperCase(); } @@ -32,27 +35,11 @@ public WatchMsgEvent(final String _name, final String _session_id, final String @Override public void configure() { from("netty:tcp://0.0.0.0:8080?sync=false") - .log("Received raw message: ${body}") - .unmarshal().json(JsonLibrary.Jackson) - .process(exchange -> { - // Access the decoded JSON object - MsgEvent myJsonObject = exchange.getIn().getBody(MsgEvent.class); - // Process the object (e.g., log it) - exchange.getIn().setBody("Received JSON: " + myJsonObject); // Set body for logging - System.out.println("Result: " + myJsonObject); - }) - .log("body ${body}") // Log the processed message (deserialized object) - - // Optionally log the incoming message -// from("timer://foo?synchronous=true&period=" + m_interval) - .routeId("manzan_msg:"+m_name) - .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG)) - .setBody(constant("SeLeCt * fRoM " + m_schema + ".mAnZaNmSg wHeRe SESSION_ID = '"+m_sessionId+"' limit " + m_numToProcess )) - // .to("stream:out") - .to("jdbc:jt400?outputType=StreamList") - .split(body()).streaming().parallelProcessing() - .setHeader("id", simple("${body[ORDINAL_POSITION]}")) - .setHeader("session_id", simple("${body[SESSION_ID]}")) + .log("Received raw message: ${body}") + .unmarshal().json(JsonLibrary.Jackson, Map.class) + .routeId("manzan_msg:"+m_name) + .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG)) + .setHeader("session_id", simple("${body[sessionId]}")) .setHeader("data_map", simple("${body}")) .marshal().json(true) //TODO: skip this if we are applying a format .setBody(simple("${body}\n")) @@ -61,9 +48,7 @@ public void configure() { exchange.getIn().setBody(m_formatter.format(getDataMap(exchange))); } }) - .recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end() - .setBody(simple("delete fRoM " + m_schema + ".mAnZaNmSg where ORDINAL_POSITION = ${header.id} WITH NC")) - .to("jdbc:jt400").to("stream:err"); + .recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end(); } //@formatter:on diff --git a/ile/src/handler.cpp b/ile/src/handler.cpp index 1ab1995..9952349 100644 --- a/ile/src/handler.cpp +++ b/ile/src/handler.cpp @@ -14,6 +14,8 @@ #include "mzversion.h" #include "pub_json.h" #include "SockClient.h" +#include + static FILE *fd = NULL; @@ -92,6 +94,8 @@ int main(int _argc, char **argv) return 0; } + setlocale(LC_ALL, "En_US.UTF-8"); + BUFSTRN(watch_option, argv[1], 10); BUFSTRN(session_id, argv[2], 10); @@ -188,6 +192,7 @@ int main(int _argc, char **argv) // publishes it to the table // Instead, lets encode the data and then send it to the socket + DEBUG_INFO("Constructing JSON MESSAGE\n"); std::string json_message = construct_json_message( session_id.c_str(), msgid.c_str(), @@ -201,6 +206,13 @@ int main(int _argc, char **argv) sending_module_name.c_str(), sending_procedure_name.c_str() ); + int json_len = 1 + json_message.length(); + DEBUG_INFO("Sending message %s", const_cast(json_message.c_str())); + + char *json_utf8 = (char *)malloc(56 + json_message.length() * 2); + to_utf8(json_utf8, json_len, json_message.c_str()); + DEBUG_INFO("DONE JSON MESSAGE\n"); + // Create a SocketClient instance SockClient client; @@ -211,7 +223,11 @@ int main(int _argc, char **argv) } // Send a message over the socket - client.sendMessage(json_message); + DEBUG_INFO("SENDING JSON MESSAGE\n"); + + client.sendMessage(json_utf8); + DEBUG_INFO("CLOSING SOCKET\n"); + // Close the socket client.closeSocket(); From f629fff252c39f75aba524c9af7e0d46c8394dde Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 00:33:08 -0400 Subject: [PATCH 06/25] change session_id to sessionId --- .../java/com/github/theprez/manzan/routes/event/MsgEvent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java index bb4fdd2..a8ab0c9 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java @@ -7,7 +7,7 @@ public class MsgEvent { @JsonProperty("event_type") private String eventType; - @JsonProperty("session_id") + @JsonProperty("sessionId") private String sessionId; private String job; From 8d0af5cfadb91483877a522696be8fd4b7abdb7b Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 00:36:11 -0400 Subject: [PATCH 07/25] change db2_publish_message to use socket instead of sql --- ile/Makefile | 2 +- ile/src/handler.cpp | 49 ++------------------- ile/src/pub_db2.sqlc | 100 ++++++++++++++++++++++++++++--------------- 3 files changed, 70 insertions(+), 81 deletions(-) diff --git a/ile/Makefile b/ile/Makefile index 55aac28..b948ac4 100644 --- a/ile/Makefile +++ b/ile/Makefile @@ -28,7 +28,7 @@ src/mzversion.h: system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED) OUTPUT(*PRINT)" /qsys.lib/${BUILDLIB}.lib/%.module: src/%.sqlc - system "CRTSQLCI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)" + system "CRTSQLCPPI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)" /qsys.lib/${BUILDLIB.lib}: -system "RUNSQL SQL('create schema ${BUILDLIB}') NAMING(*SYS)" diff --git a/ile/src/handler.cpp b/ile/src/handler.cpp index 9952349..bd22a48 100644 --- a/ile/src/handler.cpp +++ b/ile/src/handler.cpp @@ -14,8 +14,6 @@ #include "mzversion.h" #include "pub_json.h" #include "SockClient.h" -#include - static FILE *fd = NULL; @@ -94,8 +92,6 @@ int main(int _argc, char **argv) return 0; } - setlocale(LC_ALL, "En_US.UTF-8"); - BUFSTRN(watch_option, argv[1], 10); BUFSTRN(session_id, argv[2], 10); @@ -192,9 +188,8 @@ int main(int _argc, char **argv) // publishes it to the table // Instead, lets encode the data and then send it to the socket - DEBUG_INFO("Constructing JSON MESSAGE\n"); - std::string json_message = construct_json_message( - session_id.c_str(), + func( + session_id.c_str(), msgid.c_str(), message_type.c_str(), message_severity, @@ -204,45 +199,7 @@ int main(int _argc, char **argv) msg_info_buf->message, sending_program_name.c_str(), sending_module_name.c_str(), - sending_procedure_name.c_str() - ); - int json_len = 1 + json_message.length(); - DEBUG_INFO("Sending message %s", const_cast(json_message.c_str())); - - char *json_utf8 = (char *)malloc(56 + json_message.length() * 2); - to_utf8(json_utf8, json_len, json_message.c_str()); - DEBUG_INFO("DONE JSON MESSAGE\n"); - - - // Create a SocketClient instance - SockClient client; - - // Open a socket and connect to server - if (!client.openSocket("127.0.0.1", 8080)) { - return 1; - } - - // Send a message over the socket - DEBUG_INFO("SENDING JSON MESSAGE\n"); - - client.sendMessage(json_utf8); - DEBUG_INFO("CLOSING SOCKET\n"); - - - // Close the socket - client.closeSocket(); - // func( - // session_id.c_str(), - // msgid.c_str(), - // message_type.c_str(), - // message_severity, - // message_timestamp.c_str(), - // job.c_str(), - // sending_usrprf.c_str(), - // msg_info_buf->message, - // sending_program_name.c_str(), - // sending_module_name.c_str(), - // sending_procedure_name.c_str()); + sending_procedure_name.c_str()); DEBUG_INFO("Published\n"); } free(msg_info_buf); diff --git a/ile/src/pub_db2.sqlc b/ile/src/pub_db2.sqlc index 288137d..59a4f01 100644 --- a/ile/src/pub_db2.sqlc +++ b/ile/src/pub_db2.sqlc @@ -7,12 +7,20 @@ #include #include +#include +#include "pub_json.h" +#include "SockClient.h" + + #define COPY_PARM(dest, parm) \ memset(dest, 0, sizeof(dest)); \ strncpy(dest, parm ? parm : "", -1 + sizeof(dest)); -void check_sql_error(int sqlcode, const char* sqlstate) +const int PORT = 8080; +const std::string SERVER = "127.0.0.1"; + +void check_sql_error(int sqlcode, const char *sqlstate) { if (sqlcode != 0) { @@ -25,6 +33,30 @@ void check_sql_error(int sqlcode, const char* sqlstate) } } +bool send_message(std::string message){ + // Create a SocketClient instance + SockClient client; + + // Open a socket and connect to server + if (!client.openSocket(SERVER, PORT)) { + // TODO: How do we want to handle this error? Drop the message, insert into table? + DEBUG_ERROR("Failed to connect to socket: %s:%d\n", SERVER.c_str(), PORT); + return false; + } + + // Send a message over the socket + if (!client.sendMessage(message)){ + DEBUG_ERROR("Failed to send message: %s", message.c_str()); + return false; + } + + // Close the socket + client.closeSocket(); + return true; +} + + + // TODO: implement this // 1. define Db2 tables // // have autoincrement and autotimestamp columns @@ -33,37 +65,31 @@ void check_sql_error(int sqlcode, const char* sqlstate) // 4. implement! extern int db2_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) { - exec sql include SQLCA; - char msg_session_id[11]; - char msg_msg_id[8]; - char msg_msg_type[11]; - char msg_severity[32]; - char msg_timestamp[32]; - char msg_job[32]; - char msg_sending_usrprf[11]; - char msg_message[2048]; - char msg_sending_program_name[257]; - char msg_sending_module_name[11]; - char msg_sending_procedure_name[2048]; - - COPY_PARM(msg_session_id, _session_id); - COPY_PARM(msg_msg_id, _msgid); - COPY_PARM(msg_msg_type, _msg_type); - sprintf(msg_severity, "%d", _msg_severity); - COPY_PARM(msg_timestamp, _msg_timestamp); - COPY_PARM(msg_job, _job); - COPY_PARM(msg_sending_usrprf, _sending_usrprf); - COPY_PARM(msg_message, _message); - COPY_PARM(msg_sending_program_name, _sending_program_name); - COPY_PARM(msg_sending_module_name, _sending_module_name); - COPY_PARM(msg_sending_procedure_name, _sending_procedure_name); + std::string json_message = construct_json_message( + _session_id, + _msgid, + _msg_type, + _msg_severity, + _msg_timestamp, + _job, + _sending_usrprf, + _message, + _sending_program_name, + _sending_module_name, + _sending_procedure_name); - EXEC SQL - INSERT INTO MANZANMSG( - SESSION_ID, MESSAGE_ID, MESSAGE_TYPE, SEVERITY, JOB, SENDING_USRPRF, SENDING_PROGRAM_NAME, SENDING_MODULE_NAME, SENDING_PROCEDURE_NAME, MESSAGE_TIMESTAMP, MESSAGE) - VALUES( : msg_session_id, : msg_msg_id, : msg_msg_type, : msg_severity, : msg_job, : msg_sending_usrprf, : msg_sending_program_name, : msg_sending_module_name, : msg_sending_procedure_name, : msg_timestamp, : msg_message); - check_sql_error(sqlca.sqlcode, sqlca.sqlstate); - return 0; + DEBUG_INFO("Sending message %s", const_cast(json_message.c_str())); + + // Allocate memory for the utf-8 encoded json message + char *json_utf8 = get_json_utf8_output_buf(json_message); + + to_utf8(json_utf8, 1 + json_message.length(), json_message.c_str()); + + // Send the message to the socket + bool res = send_message(json_utf8); + + free(json_utf8); + return res ? 0 : -1; } extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE) @@ -74,6 +100,7 @@ extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE) int db2_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE) { exec sql include SQLCA; + EXEC SQL BEGIN DECLARE SECTION; char pal_sessid[11]; char pal_system_reference_code[11]; char pal_device_name[11]; @@ -87,6 +114,7 @@ int db2_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE) char pal_secondary_code[11]; char pal_table_id[11]; char pal_sequence[32]; + EXEC SQL END DECLARE SECTION; COPY_PARM(pal_sessid, _session_id); COPY_PARM(pal_system_reference_code, _system_reference_code); @@ -105,23 +133,27 @@ int db2_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE) EXEC SQL INSERT INTO MANZANPAL(SESSION_ID, SYSTEM_REFERENCE_CODE, DEVICE_NAME, MODEL, SERIAL_NUMBER, RESOURCE_NAME, LOG_ID, PAL_TIMESTAMP, REFERENCE_CODE, SECONDARY_CODE, TABLE_ID, SEQUENCE_NUM) VALUES( : pal_sessid, : pal_system_reference_code, : pal_device_name, : pal_model, : pal_serial_number, : pal_resource_name, : pal_log_identifier, : pal_timestamp, : pal_reference_code, : pal_secondary_code, : pal_table_id, : pal_sequence); - check_sql_error(sqlca.sqlcode, sqlca.sqlstate); + check_sql_error(sqlca.sqlcode, (const char *)sqlca.sqlstate); return 0; } extern int db2_publish_other(PUBLISH_OTHER_FUNCTION_SIGNATURE) { exec sql include SQLCA; + + EXEC SQL BEGIN DECLARE SECTION; char oth_sessid[11]; + char oth_type[11]; + EXEC SQL END DECLARE SECTION; + memset(oth_sessid, 0, sizeof(oth_sessid)); strncpy(oth_sessid, _session_id, 10); - char oth_type[11]; memset(oth_type, 0, sizeof(oth_type)); strncpy(oth_type, _event_type, 10); EXEC SQL INSERT INTO MANZANOTH(SESSION_ID, EVENT) VALUES( : oth_sessid, : oth_type); - check_sql_error(sqlca.sqlcode, sqlca.sqlstate); + check_sql_error(sqlca.sqlcode, (const char *)sqlca.sqlstate); return 0; } \ No newline at end of file From 6b6b1cce747d89d24bc9f230f019eb9d1fea5c86 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 00:38:07 -0400 Subject: [PATCH 08/25] Add helper function to get json utf8 output buf --- ile/src/pub_json.cpp | 33 +++++++++++++++++++++++++-------- ile/src/pub_json.h | 1 + 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/ile/src/pub_json.cpp b/ile/src/pub_json.cpp index 9d7a242..162c14b 100644 --- a/ile/src/pub_json.cpp +++ b/ile/src/pub_json.cpp @@ -74,7 +74,7 @@ void json_encode(std::string &str, const char *_src) } void append_json_element(std::string &_str, const char *_key, const char *_value) { - _str += "\""; + _str += "\""; _str += _key; _str += "\":\""; std::string encoded; @@ -91,12 +91,29 @@ void append_json_element(std::string &_str, const char *_key, const int _value) _str += value; } +/* +* Return an output buffer containing enough space for the utf-8 encoded json message. +* Return NULL if there is no space remaining on the heap. +* Remember to free the buffer after use. +*/ +char* get_json_utf8_output_buf(std::string json_message){ + char *buf = (char *)malloc(56 + json_message.length() * 2); + if (buf == NULL) { + DEBUG_ERROR("No heap space available to allocate buffer for %s\n", json_message.c_str()); + return NULL; + } + return buf; +} + int json_publish(const char *_session_id, std::string &_json) { - int json_len = 1 + _json.length(); - char *utf8 = (char *)malloc(56 + _json.length() * 2); + char *utf8 = get_json_utf8_output_buf(_json); + if (utf8 == NULL){ + DEBUG_ERROR("No heap space available. Aborting publishing message %s\n", utf8); + return -1; + } - to_utf8(utf8, json_len, _json.c_str()); + to_utf8(utf8, 1 + _json.length(), _json.c_str()); DEBUG_INFO("Publishing JSON\n"); DEBUG_INFO("%s\n", _json.c_str()); @@ -123,11 +140,11 @@ int json_publish(const char *_session_id, std::string &_json) std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) { - std::string jsonStr; + std::string jsonStr; jsonStr += "{\n "; append_json_element(jsonStr, "event_type", "message"); jsonStr += ",\n "; - append_json_element(jsonStr, "session_id", _session_id); + append_json_element(jsonStr, "sessionId", _session_id); jsonStr += ",\n "; append_json_element(jsonStr, "job", _job); jsonStr += ",\n "; @@ -149,8 +166,8 @@ std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) jsonStr += ",\n "; append_json_element(jsonStr, "sending_procedure_name", _sending_procedure_name); jsonStr += "\n}"; - return 0; - // return jsonStr; + + return jsonStr; } /** diff --git a/ile/src/pub_json.h b/ile/src/pub_json.h index 31eec73..44c36a5 100644 --- a/ile/src/pub_json.h +++ b/ile/src/pub_json.h @@ -8,6 +8,7 @@ int json_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE); int json_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE); int json_publish_other(PUBLISH_OTHER_FUNCTION_SIGNATURE); int to_utf8(char *out, size_t out_len, const char *in); +char* get_json_utf8_output_buf(std::string json_message); } std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE); From 8d3f32081415b52aa9b28b3e0f68b0788051df58 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 00:38:33 -0400 Subject: [PATCH 09/25] Add error handling to socketclient --- ile/src/SockClient.cpp | 149 ++++++++++++++--------------------------- 1 file changed, 49 insertions(+), 100 deletions(-) diff --git a/ile/src/SockClient.cpp b/ile/src/SockClient.cpp index 9052b15..eec3406 100644 --- a/ile/src/SockClient.cpp +++ b/ile/src/SockClient.cpp @@ -4,119 +4,68 @@ #include // For inet_addr #include // For close #include "SockClient.h" +#include "manzan.h" - // Constructor to initialize the socket descriptor to -1 - SockClient::SockClient(){ - sock_fd = -1; - } - - // Method to open a socket and connect to the server - bool SockClient::openSocket(const std::string ip, int port) { - // Create socket - sock_fd = socket(AF_INET, SOCK_STREAM, 0); - if (sock_fd < 0) { - std::cerr << "Error creating socket\n"; - return false; - } - - // Define server address - struct sockaddr_in server_address; - server_address.sin_family = AF_INET; - server_address.sin_port = htons(port); - server_address.sin_addr.s_addr = inet_addr(const_cast(ip.c_str())); +// Constructor to initialize the socket descriptor to -1 +SockClient::SockClient(){ + sock_fd = -1; +} - // Connect to server - if (connect(sock_fd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { - std::cerr << "Error connecting to server\n"; - closeSocket(); - return false; - } - - std::cout << "Connected to server at " << ip << ":" << port << "\n"; - return true; +// Method to open a socket and connect to the server +bool SockClient::openSocket(const std::string ip, int port) { + // Create socket + sock_fd = socket(AF_INET, SOCK_STREAM, 0); + if (sock_fd < 0) { + DEBUG_ERROR("Error creating socket\n"); + return false; } - // Method to send a message (string) over the socket - bool SockClient::sendMessage(const std::string message) { - if (sock_fd < 0) { - std::cerr << "Socket is not open\n"; - return false; - } - - int bytes_sent = send(sock_fd, const_cast(message.c_str()), message.size(), 0); - if (bytes_sent < 0) { - std::cerr << "Error sending message\n"; - return false; - } + // Define server address + struct sockaddr_in server_address; + server_address.sin_family = AF_INET; + server_address.sin_port = htons(port); + server_address.sin_addr.s_addr = inet_addr(const_cast(ip.c_str())); - std::cout << "Sent message: " << message << "\n"; - return true; + // Connect to server + if (connect(sock_fd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { + DEBUG_ERROR("Error connecting to server\n"); + closeSocket(); + return false; } - // // Method to send a struct over the socket - // template - // bool SockClient::sendStruct(const T& data) { - // if (sock_fd < 0) { - // std::cerr << "Socket is not open\n"; - // return false; - // } - - // // Send the raw data of the struct - // int bytes_sent = send(sock_fd, &data, sizeof(data), 0); - // if (bytes_sent < 0) { - // std::cerr << "Error sending struct\n"; - // return false; - // } + DEBUG_INFO("Connected to server at %s:%d\n", ip.c_str(), port); + return true; +} - // std::cout << "Sent struct of size: " << sizeof(data) << " bytes\n"; - // return true; - // } - - // Method to close the socket - void SockClient::closeSocket() { - if (sock_fd >= 0) { - close(sock_fd); - sock_fd = -1; - std::cout << "Socket closed\n"; - } +// Method to send a message (string) over the socket +bool SockClient::sendMessage(const std::string message) { + if (sock_fd < 0) { + DEBUG_ERROR("Socket is not open\n"); + return false; } - // Destructor to ensure socket is closed - SockClient::~SockClient() { - closeSocket(); + int bytes_sent = send(sock_fd, const_cast(message.c_str()), message.size(), 0); + if (bytes_sent < 0) { + DEBUG_ERROR("Error sending message\n"); + return false; } + DEBUG_INFO("Sent message: %s\n", message); - + return true; +} - // Socket file descriptor - -// Example struct to send over the socket -// struct ExampleStruct { -// int id; -// float value; -// char name[50]; -// }; - -// int main() { -// // Create a SockClient instance -// SockClient client; - -// // Open a socket and connect to a server -// if (!client.openSocket("127.0.0.1", 8080)) { -// return 1; -// } - -// // Send a message over the socket -// client.sendMessage("Hello from C++"); - -// // Prepare and send a struct over the socket -// ExampleStruct data = { 1, 3.14, "TestStruct" }; -// client.sendStruct(data); - -// // Close the socket -// client.closeSocket(); +// Method to close the socket +void SockClient::closeSocket() { + if (sock_fd >= 0) { + close(sock_fd); + sock_fd = -1; + DEBUG_INFO("Socket closed\n"); + } +} -// return 0; -// } +// Destructor to ensure socket is closed +SockClient::~SockClient() { + closeSocket(); +} From 7ebeed6b9a6c6563949df23e8f58610a684733a1 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 00:50:08 -0400 Subject: [PATCH 10/25] revert sessionId in ile code to session_id --- .../java/com/github/theprez/manzan/routes/event/MsgEvent.java | 2 +- ile/src/pub_json.cpp | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java index a8ab0c9..bb4fdd2 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java @@ -7,7 +7,7 @@ public class MsgEvent { @JsonProperty("event_type") private String eventType; - @JsonProperty("sessionId") + @JsonProperty("session_id") private String sessionId; private String job; diff --git a/ile/src/pub_json.cpp b/ile/src/pub_json.cpp index 162c14b..f8ac360 100644 --- a/ile/src/pub_json.cpp +++ b/ile/src/pub_json.cpp @@ -144,7 +144,7 @@ std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) jsonStr += "{\n "; append_json_element(jsonStr, "event_type", "message"); jsonStr += ",\n "; - append_json_element(jsonStr, "sessionId", _session_id); + append_json_element(jsonStr, "session_id", _session_id); jsonStr += ",\n "; append_json_element(jsonStr, "job", _job); jsonStr += ",\n "; @@ -166,7 +166,6 @@ std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) jsonStr += ",\n "; append_json_element(jsonStr, "sending_procedure_name", _sending_procedure_name); jsonStr += "\n}"; - return jsonStr; } From ef7ce385782b255c73b58b38413dc2998f744b4c Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 00:52:20 -0400 Subject: [PATCH 11/25] remove space --- ile/src/SockClient.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/ile/src/SockClient.cpp b/ile/src/SockClient.cpp index eec3406..d3dd799 100644 --- a/ile/src/SockClient.cpp +++ b/ile/src/SockClient.cpp @@ -6,8 +6,6 @@ #include "SockClient.h" #include "manzan.h" - - // Constructor to initialize the socket descriptor to -1 SockClient::SockClient(){ sock_fd = -1; From f9ed636a822d293f0bfd2ab73778a2a3d9bd2450 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 00:57:20 -0400 Subject: [PATCH 12/25] remove msgEvent class --- .../theprez/manzan/routes/event/MsgEvent.java | 155 ------------------ 1 file changed, 155 deletions(-) delete mode 100644 camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java deleted file mode 100644 index bb4fdd2..0000000 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/MsgEvent.java +++ /dev/null @@ -1,155 +0,0 @@ -package com.github.theprez.manzan.routes.event; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class MsgEvent { - - @JsonProperty("event_type") - private String eventType; - - @JsonProperty("session_id") - private String sessionId; - - private String job; - - @JsonProperty("msgid") - private String msgId; - - @JsonProperty("msgtype") - private String msgType; - - private int severity; - - @JsonProperty("message_timestamp") - private String messageTimestamp; - - @JsonProperty("sending_usrprf") - private String sendingUsrprf; - - private String message; - - @JsonProperty("sending_program_name") - private String sendingProgramName; - - @JsonProperty("sending_module_name") - private String sendingModuleName; - - @JsonProperty("sending_procedure_name") - private String sendingProcedureName; - - // Getters and Setters - - public String getEventType() { - return eventType; - } - - public void setEventType(String eventType) { - this.eventType = eventType; - } - - public String getSessionId() { - return sessionId; - } - - public void setSessionId(String sessionId) { - this.sessionId = sessionId; - } - - public String getJob() { - return job; - } - - public void setJob(String job) { - this.job = job; - } - - public String getMsgId() { - return msgId; - } - - public void setMsgId(String msgId) { - this.msgId = msgId; - } - - public String getMsgType() { - return msgType; - } - - public void setMsgType(String msgType) { - this.msgType = msgType; - } - - public int getSeverity() { - return severity; - } - - public void setSeverity(int severity) { - this.severity = severity; - } - - public String getMessageTimestamp() { - return messageTimestamp; - } - - public void setMessageTimestamp(String messageTimestamp) { - this.messageTimestamp = messageTimestamp; - } - - public String getSendingUsrprf() { - return sendingUsrprf; - } - - public void setSendingUsrprf(String sendingUsrprf) { - this.sendingUsrprf = sendingUsrprf; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - public String getSendingProgramName() { - return sendingProgramName; - } - - public void setSendingProgramName(String sendingProgramName) { - this.sendingProgramName = sendingProgramName; - } - - public String getSendingModuleName() { - return sendingModuleName; - } - - public void setSendingModuleName(String sendingModuleName) { - this.sendingModuleName = sendingModuleName; - } - - public String getSendingProcedureName() { - return sendingProcedureName; - } - - public void setSendingProcedureName(String sendingProcedureName) { - this.sendingProcedureName = sendingProcedureName; - } - - @Override - public String toString() { - return "MessageEvent{" + - "eventType='" + eventType + '\'' + - ", sessionId='" + sessionId + '\'' + - ", job='" + job + '\'' + - ", msgId='" + msgId + '\'' + - ", msgType='" + msgType + '\'' + - ", severity=" + severity + - ", messageTimestamp='" + messageTimestamp + '\'' + - ", sendingUsrprf='" + sendingUsrprf + '\'' + - ", message='" + message + '\'' + - ", sendingProgramName='" + sendingProgramName + '\'' + - ", sendingModuleName='" + sendingModuleName + '\'' + - ", sendingProcedureName='" + sendingProcedureName + '\'' + - '}'; - } -} From 995798d31f1d1a1a04444869d23ad8dc3087be00 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 01:34:14 -0400 Subject: [PATCH 13/25] move socket ip and port into var --- .../github/theprez/manzan/routes/event/WatchMsgEvent.java | 5 ++++- test/e2e/msg/snd2q/data.ini | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java index 72844e1..31d6095 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java @@ -18,6 +18,9 @@ public class WatchMsgEvent extends ManzanRoute { private final String m_schema; private final String m_sessionId; private final ManzanMessageFormatter m_formatter; + private final String m_socketIp = "0.0.0.0"; + private final String m_socketPort = "8080"; + public WatchMsgEvent(final String _name, final String _session_id, final String _format, final List _destinations, final String _schema, final int _interval, final int _numToProcess) @@ -34,7 +37,7 @@ public WatchMsgEvent(final String _name, final String _session_id, final String //@formatter:off @Override public void configure() { - from("netty:tcp://0.0.0.0:8080?sync=false") + from(String.format("netty:tcp://%s:%s?sync=false", m_socketIp, m_socketPort)) .log("Received raw message: ${body}") .unmarshal().json(JsonLibrary.Jackson, Map.class) .routeId("manzan_msg:"+m_name) diff --git a/test/e2e/msg/snd2q/data.ini b/test/e2e/msg/snd2q/data.ini index c9b8831..481b396 100644 --- a/test/e2e/msg/snd2q/data.ini +++ b/test/e2e/msg/snd2q/data.ini @@ -2,4 +2,4 @@ type=watch id=TESTING destinations=myfile -format=$MESSAGE_ID$ (severity $SEVERITY$): $MESSAGE$ \ No newline at end of file +format=$MSGID$ (severity $SEVERITY$): $MESSAGE$ \ No newline at end of file From 80cb27ff54558ad4244015cffcc2e19d0eaba7ab Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 12:59:14 -0400 Subject: [PATCH 14/25] change tgtccsid to job --- ile/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ile/Makefile b/ile/Makefile index b948ac4..546b942 100644 --- a/ile/Makefile +++ b/ile/Makefile @@ -25,7 +25,7 @@ src/mzversion.h: system "CRTPGM PGM(${BUILDLIB}/$*) MODULE($(patsubst %.module,$(BUILDLIB)/%,$(notdir $^))) ACTGRP(*CALLER)" /qsys.lib/${BUILDLIB}.lib/%.module: src/%.cpp src/mzversion.h - system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED) OUTPUT(*PRINT)" + system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED) OUTPUT(*PRINT) TGTCCSID(*JOB)" /qsys.lib/${BUILDLIB}.lib/%.module: src/%.sqlc system "CRTSQLCPPI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)" From e2530e69524c3da406f443e5e2873fef48b9e98b Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 13:04:05 -0400 Subject: [PATCH 15/25] add get_utf8_output_buf_length --- ile/src/pub_json.cpp | 56 +++++++++++++++++++++++++++++++++++++------- ile/src/pub_json.h | 3 ++- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/ile/src/pub_json.cpp b/ile/src/pub_json.cpp index f8ac360..ec05c0f 100644 --- a/ile/src/pub_json.cpp +++ b/ile/src/pub_json.cpp @@ -17,7 +17,8 @@ int to_utf8(char *out, size_t out_len, const char *in) memset(&tocode, 0, sizeof(tocode)); tocode.CCSID = 1208; QtqCode_T fromcode; - fromcode.CCSID = 37; + + // Setting to 0 allows the system to automatically detect ccsid (hopefully) memset(&fromcode, 0, sizeof(fromcode)); iconv_t cd = QtqIconvOpen(&tocode, &fromcode); @@ -29,9 +30,10 @@ int to_utf8(char *out, size_t out_len, const char *in) int rc = iconv(cd, &input, &inleft, &output, &outleft); if (rc == -1) { - DEBUG_ERROR("Error in converting characters\n"); + DEBUG_ERROR("Error in converting characters. %d: %s\n", errno, strerror(errno)); return 9; } + DEBUG_INFO("Conversion to UTF-8 successful.\n"); return iconv_close(cd); } @@ -91,15 +93,20 @@ void append_json_element(std::string &_str, const char *_key, const int _value) _str += value; } +size_t get_utf8_output_buf_length(std::string str){ + // Each UTF-8 encoded byte can be up to 4 bytes, and add one for the null terminator. + return str.length() * 4 + 1; +} + /* -* Return an output buffer containing enough space for the utf-8 encoded json message. +* Return an output buffer containing enough space for the utf-8 encoded message. * Return NULL if there is no space remaining on the heap. * Remember to free the buffer after use. */ -char* get_json_utf8_output_buf(std::string json_message){ - char *buf = (char *)malloc(56 + json_message.length() * 2); +char* get_utf8_output_buf(std::string str){ + char *buf = (char *)malloc(get_utf8_output_buf_length(str)); if (buf == NULL) { - DEBUG_ERROR("No heap space available to allocate buffer for %s\n", json_message.c_str()); + DEBUG_ERROR("No heap space available to allocate buffer for %s\n", str.c_str()); return NULL; } return buf; @@ -107,13 +114,13 @@ char* get_json_utf8_output_buf(std::string json_message){ int json_publish(const char *_session_id, std::string &_json) { - char *utf8 = get_json_utf8_output_buf(_json); + char *utf8 = get_utf8_output_buf(_json); if (utf8 == NULL){ DEBUG_ERROR("No heap space available. Aborting publishing message %s\n", utf8); return -1; } - to_utf8(utf8, 1 + _json.length(), _json.c_str()); + to_utf8(utf8, get_utf8_output_buf_length(_json), _json.c_str()); DEBUG_INFO("Publishing JSON\n"); DEBUG_INFO("%s\n", _json.c_str()); @@ -140,7 +147,38 @@ int json_publish(const char *_session_id, std::string &_json) std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) { - std::string jsonStr; + std::string jsonStr; + jsonStr += "{\n "; + append_json_element(jsonStr, "event_type", "message"); + jsonStr += ",\n "; + append_json_element(jsonStr, "session_id", _session_id); + jsonStr += ",\n "; + append_json_element(jsonStr, "job", _job); + jsonStr += ",\n "; + append_json_element(jsonStr, "msgid", _msgid); + jsonStr += ",\n "; + append_json_element(jsonStr, "msgtype", _msg_type); + jsonStr += ",\n "; + append_json_element(jsonStr, "severity", _msg_severity); + jsonStr += ",\n "; + append_json_element(jsonStr, "message_timestamp", _msg_timestamp); + jsonStr += ",\n "; + append_json_element(jsonStr, "sending_usrprf", _sending_usrprf); + jsonStr += ",\n "; + append_json_element(jsonStr, "message", _message); + jsonStr += ",\n "; + append_json_element(jsonStr, "sending_program_name", _sending_program_name); + jsonStr += ",\n "; + append_json_element(jsonStr, "sending_module_name", _sending_module_name); + jsonStr += ",\n "; + append_json_element(jsonStr, "sending_procedure_name", _sending_procedure_name); + jsonStr += "\n}"; + return jsonStr; +} + +std::string kill_me_now2(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) +{ + std::string jsonStr; jsonStr += "{\n "; append_json_element(jsonStr, "event_type", "message"); jsonStr += ",\n "; diff --git a/ile/src/pub_json.h b/ile/src/pub_json.h index 44c36a5..572ab4a 100644 --- a/ile/src/pub_json.h +++ b/ile/src/pub_json.h @@ -8,7 +8,8 @@ int json_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE); int json_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE); int json_publish_other(PUBLISH_OTHER_FUNCTION_SIGNATURE); int to_utf8(char *out, size_t out_len, const char *in); -char* get_json_utf8_output_buf(std::string json_message); +char* get_utf8_output_buf(std::string str); +size_t get_utf8_output_buf_length(std::string str); } std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE); From 21249be82e84d575ba06ce1ae99e7725932ca9fd Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 13:04:46 -0400 Subject: [PATCH 16/25] add get_utf8_output_buf method --- ile/src/pub_db2.sqlc | 5 ++--- ile/src/pub_json.cpp | 31 ------------------------------- 2 files changed, 2 insertions(+), 34 deletions(-) diff --git a/ile/src/pub_db2.sqlc b/ile/src/pub_db2.sqlc index 59a4f01..0412634 100644 --- a/ile/src/pub_db2.sqlc +++ b/ile/src/pub_db2.sqlc @@ -12,7 +12,6 @@ #include "SockClient.h" - #define COPY_PARM(dest, parm) \ memset(dest, 0, sizeof(dest)); \ strncpy(dest, parm ? parm : "", -1 + sizeof(dest)); @@ -81,9 +80,9 @@ extern int db2_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) DEBUG_INFO("Sending message %s", const_cast(json_message.c_str())); // Allocate memory for the utf-8 encoded json message - char *json_utf8 = get_json_utf8_output_buf(json_message); + char *json_utf8 = get_utf8_output_buf(json_message); - to_utf8(json_utf8, 1 + json_message.length(), json_message.c_str()); + to_utf8(json_utf8, get_utf8_output_buf_length(json_message), json_message.c_str()); // Send the message to the socket bool res = send_message(json_utf8); diff --git a/ile/src/pub_json.cpp b/ile/src/pub_json.cpp index ec05c0f..7f95afb 100644 --- a/ile/src/pub_json.cpp +++ b/ile/src/pub_json.cpp @@ -176,37 +176,6 @@ std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) return jsonStr; } -std::string kill_me_now2(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) -{ - std::string jsonStr; - jsonStr += "{\n "; - append_json_element(jsonStr, "event_type", "message"); - jsonStr += ",\n "; - append_json_element(jsonStr, "session_id", _session_id); - jsonStr += ",\n "; - append_json_element(jsonStr, "job", _job); - jsonStr += ",\n "; - append_json_element(jsonStr, "msgid", _msgid); - jsonStr += ",\n "; - append_json_element(jsonStr, "msgtype", _msg_type); - jsonStr += ",\n "; - append_json_element(jsonStr, "severity", _msg_severity); - jsonStr += ",\n "; - append_json_element(jsonStr, "message_timestamp", _msg_timestamp); - jsonStr += ",\n "; - append_json_element(jsonStr, "sending_usrprf", _sending_usrprf); - jsonStr += ",\n "; - append_json_element(jsonStr, "message", _message); - jsonStr += ",\n "; - append_json_element(jsonStr, "sending_program_name", _sending_program_name); - jsonStr += ",\n "; - append_json_element(jsonStr, "sending_module_name", _sending_module_name); - jsonStr += ",\n "; - append_json_element(jsonStr, "sending_procedure_name", _sending_procedure_name); - jsonStr += "\n}"; - return jsonStr; -} - /** * Publish json message to DTAQ */ From 4341920396dde5ce90752e6e1907527e3ac7f51b Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 13:05:24 -0400 Subject: [PATCH 17/25] Clear output file in snd2q test --- test/e2e/msg/snd2q/Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/test/e2e/msg/snd2q/Makefile b/test/e2e/msg/snd2q/Makefile index 8999435..d1ef8ff 100644 --- a/test/e2e/msg/snd2q/Makefile +++ b/test/e2e/msg/snd2q/Makefile @@ -11,6 +11,7 @@ SEVERITY:=80 setup: /qsys.lib/${TESTLIB}.lib/msgs.msgq echo "\nstrwch=WCHMSG((*ALL)) WCHMSGQ((${TESTLIB}/MSGS))" >> data.ini + rm -f ${OUTPUT_FILE} cleanup: system -kKv "ENDWCH SSNID(TESTING)" From 0830e027d9d04b1eee6d33b6eeee45d91d3de528 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 13:30:42 -0400 Subject: [PATCH 18/25] remove comments, whitespace --- ile/src/handler.cpp | 4 ---- ile/src/pub_json.cpp | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/ile/src/handler.cpp b/ile/src/handler.cpp index bd22a48..3444eac 100644 --- a/ile/src/handler.cpp +++ b/ile/src/handler.cpp @@ -184,10 +184,6 @@ int main(int _argc, char **argv) for (int i = 0; i < num_publishers; i++) { msg_publish_func func = publishers->array[i].msg_publish_func_ptr; - // This is where we send the data to the publish function which encodes it and also - // publishes it to the table - - // Instead, lets encode the data and then send it to the socket func( session_id.c_str(), msgid.c_str(), diff --git a/ile/src/pub_json.cpp b/ile/src/pub_json.cpp index 7f95afb..faf803f 100644 --- a/ile/src/pub_json.cpp +++ b/ile/src/pub_json.cpp @@ -76,7 +76,7 @@ void json_encode(std::string &str, const char *_src) } void append_json_element(std::string &_str, const char *_key, const char *_value) { - _str += "\""; + _str += "\""; _str += _key; _str += "\":\""; std::string encoded; From 72fda27903408d9e5e1959a02595a27b9d3b13ed Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Fri, 27 Sep 2024 13:32:39 -0400 Subject: [PATCH 19/25] remove whitespace --- ile/src/pub_db2.sqlc | 2 -- 1 file changed, 2 deletions(-) diff --git a/ile/src/pub_db2.sqlc b/ile/src/pub_db2.sqlc index 0412634..1d0ddeb 100644 --- a/ile/src/pub_db2.sqlc +++ b/ile/src/pub_db2.sqlc @@ -54,8 +54,6 @@ bool send_message(std::string message){ return true; } - - // TODO: implement this // 1. define Db2 tables // // have autoincrement and autotimestamp columns From c48339e5e652a8bf976b77495917d7ab5c39cdfb Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Thu, 3 Oct 2024 09:53:27 -0400 Subject: [PATCH 20/25] Remove unused method --- ile/src/SockClient.h | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ile/src/SockClient.h b/ile/src/SockClient.h index 2e9c10b..0e556cb 100644 --- a/ile/src/SockClient.h +++ b/ile/src/SockClient.h @@ -14,10 +14,6 @@ int sock_fd; // Method to send a message (string) over the socket bool sendMessage(const std::string message); - // // Method to send a struct over the socket - // template - // bool sendStruct(const typename T& data); - // Method to close the socket void closeSocket(); From 999d575d2e80e59f2b3c16a88552d6d26421409e Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Thu, 3 Oct 2024 10:00:29 -0400 Subject: [PATCH 21/25] Comment out insert into manzanmsg code which we may use later --- ile/src/pub_db2.sqlc | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/ile/src/pub_db2.sqlc b/ile/src/pub_db2.sqlc index 1d0ddeb..7ac4464 100644 --- a/ile/src/pub_db2.sqlc +++ b/ile/src/pub_db2.sqlc @@ -62,6 +62,37 @@ bool send_message(std::string message){ // 4. implement! extern int db2_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) { + // exec sql include SQLCA; + // char msg_session_id[11]; + // char msg_msg_id[8]; + // char msg_msg_type[11]; + // char msg_severity[32]; + // char msg_timestamp[32]; + // char msg_job[32]; + // char msg_sending_usrprf[11]; + // char msg_message[2048]; + // char msg_sending_program_name[257]; + // char msg_sending_module_name[11]; + // char msg_sending_procedure_name[2048]; + + // COPY_PARM(msg_session_id, _session_id); + // COPY_PARM(msg_msg_id, _msgid); + // COPY_PARM(msg_msg_type, _msg_type); + // sprintf(msg_severity, "%d", _msg_severity); + // COPY_PARM(msg_timestamp, _msg_timestamp); + // COPY_PARM(msg_job, _job); + // COPY_PARM(msg_sending_usrprf, _sending_usrprf); + // COPY_PARM(msg_message, _message); + // COPY_PARM(msg_sending_program_name, _sending_program_name); + // COPY_PARM(msg_sending_module_name, _sending_module_name); + // COPY_PARM(msg_sending_procedure_name, _sending_procedure_name); + + // EXEC SQL + // INSERT INTO MANZANMSG( + // SESSION_ID, MESSAGE_ID, MESSAGE_TYPE, SEVERITY, JOB, SENDING_USRPRF, SENDING_PROGRAM_NAME, SENDING_MODULE_NAME, SENDING_PROCEDURE_NAME, MESSAGE_TIMESTAMP, MESSAGE) + // VALUES( : msg_session_id, : msg_msg_id, : msg_msg_type, : msg_severity, : msg_job, : msg_sending_usrprf, : msg_sending_program_name, : msg_sending_module_name, : msg_sending_procedure_name, : msg_timestamp, : msg_message); + // check_sql_error(sqlca.sqlcode, sqlca.sqlstate); + std::string json_message = construct_json_message( _session_id, _msgid, From 9901f84dcbac8445adf722866c55a42ce3d1490a Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Thu, 3 Oct 2024 11:19:38 -0400 Subject: [PATCH 22/25] Fix message formatting; add test --- .../manzan/ManzanMessageFormatter.java | 22 ++++++++++++++++- .../test/java/TestManzanMessageFormatter.java | 24 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 camel/src/test/java/TestManzanMessageFormatter.java diff --git a/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java b/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java index 805fa33..f18331d 100644 --- a/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java +++ b/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java @@ -20,10 +20,30 @@ public static String format(final String _in, final Map _mapping return formatter.format(_mappings); } + public String getM_fmtStr() { + return m_fmtStr; + } + private final String m_fmtStr; public ManzanMessageFormatter(final String _fmtStr) { - m_fmtStr = _fmtStr.toUpperCase(); + StringBuilder result = new StringBuilder(); + + // Use a boolean array to avoid final constraint + boolean[] keyOpened = {false}; + _fmtStr.chars().forEach(c -> { + char next = (char) c; + if (c == '$'){ + keyOpened[0] = !keyOpened[0]; + result.append(next); + } + else if (keyOpened[0]){ + result.append(Character.toUpperCase(next)); + } else { + result.append(next); + } + }); + m_fmtStr = result.toString(); } public String format(final Map _mappings) { diff --git a/camel/src/test/java/TestManzanMessageFormatter.java b/camel/src/test/java/TestManzanMessageFormatter.java new file mode 100644 index 0000000..70e720c --- /dev/null +++ b/camel/src/test/java/TestManzanMessageFormatter.java @@ -0,0 +1,24 @@ +import com.github.theprez.manzan.ManzanMessageFormatter; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestManzanMessageFormatter { + + @Test + public void testMessageFormattedCorrectly() throws Exception { + String message = "$message_id$ (severity $SeverIty$): $MESSAGE$"; + String expectedFormat = "$MESSAGE_ID$ (severity $SEVERITY$): $MESSAGE$"; + ManzanMessageFormatter manzanMessageFormatter = new ManzanMessageFormatter(message); + assertEquals(expectedFormat, manzanMessageFormatter.getM_fmtStr()); + } + + @Test + public void testIncorrectlyFormattedMessage() throws Exception { + String message = "$message_id$ (severity $$$"; + String expectedFormat = "$MESSAGE_ID$ (severity $$$"; + ManzanMessageFormatter manzanMessageFormatter = new ManzanMessageFormatter(message); + assertEquals(expectedFormat, manzanMessageFormatter.getM_fmtStr()); + } + +} \ No newline at end of file From a896b6e968a308221af627894a79be84ca951ef5 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Thu, 3 Oct 2024 11:36:56 -0400 Subject: [PATCH 23/25] Change msgid to message_id --- ile/src/pub_json.cpp | 2 +- test/e2e/msg/snd2q/data.ini | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ile/src/pub_json.cpp b/ile/src/pub_json.cpp index faf803f..b77ceff 100644 --- a/ile/src/pub_json.cpp +++ b/ile/src/pub_json.cpp @@ -155,7 +155,7 @@ std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) jsonStr += ",\n "; append_json_element(jsonStr, "job", _job); jsonStr += ",\n "; - append_json_element(jsonStr, "msgid", _msgid); + append_json_element(jsonStr, "message_id", _msgid); jsonStr += ",\n "; append_json_element(jsonStr, "msgtype", _msg_type); jsonStr += ",\n "; diff --git a/test/e2e/msg/snd2q/data.ini b/test/e2e/msg/snd2q/data.ini index 481b396..c9b8831 100644 --- a/test/e2e/msg/snd2q/data.ini +++ b/test/e2e/msg/snd2q/data.ini @@ -2,4 +2,4 @@ type=watch id=TESTING destinations=myfile -format=$MSGID$ (severity $SEVERITY$): $MESSAGE$ \ No newline at end of file +format=$MESSAGE_ID$ (severity $SEVERITY$): $MESSAGE$ \ No newline at end of file From 4e582ab6148cb0db75f0d25b38cd69b882abe3a6 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Thu, 3 Oct 2024 12:05:33 -0400 Subject: [PATCH 24/25] change msgtype to message_type --- ile/src/pub_json.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ile/src/pub_json.cpp b/ile/src/pub_json.cpp index b77ceff..f19118c 100644 --- a/ile/src/pub_json.cpp +++ b/ile/src/pub_json.cpp @@ -157,7 +157,7 @@ std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE) jsonStr += ",\n "; append_json_element(jsonStr, "message_id", _msgid); jsonStr += ",\n "; - append_json_element(jsonStr, "msgtype", _msg_type); + append_json_element(jsonStr, "message_type", _msg_type); jsonStr += ",\n "; append_json_element(jsonStr, "severity", _msg_severity); jsonStr += ",\n "; From a0b50e3188cf8ae2560efcd4eae3783ebdfd8cb8 Mon Sep 17 00:00:00 2001 From: Jonathan Zak Date: Thu, 3 Oct 2024 12:39:16 -0400 Subject: [PATCH 25/25] Dont uppercase message format --- .../manzan/ManzanMessageFormatter.java | 20 ++-------------- .../test/java/TestManzanMessageFormatter.java | 24 ------------------- 2 files changed, 2 insertions(+), 42 deletions(-) delete mode 100644 camel/src/test/java/TestManzanMessageFormatter.java diff --git a/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java b/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java index f18331d..976fb8e 100644 --- a/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java +++ b/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java @@ -27,23 +27,7 @@ public String getM_fmtStr() { private final String m_fmtStr; public ManzanMessageFormatter(final String _fmtStr) { - StringBuilder result = new StringBuilder(); - - // Use a boolean array to avoid final constraint - boolean[] keyOpened = {false}; - _fmtStr.chars().forEach(c -> { - char next = (char) c; - if (c == '$'){ - keyOpened[0] = !keyOpened[0]; - result.append(next); - } - else if (keyOpened[0]){ - result.append(Character.toUpperCase(next)); - } else { - result.append(next); - } - }); - m_fmtStr = result.toString(); + m_fmtStr = _fmtStr; } public String format(final Map _mappings) { @@ -53,7 +37,7 @@ public String format(final Map _mappings) { ret = ret.replace("\\n", "\n").replace("\\t", "\t"); for (final Entry repl : _mappings.entrySet()) { - final String key = repl.getKey().toUpperCase(); + final String key = repl.getKey(); ret = ret.replace("$" + key + "$", "" + repl.getValue()); String jsonIndicator ="$json:" + key + "$"; if(ret.contains(jsonIndicator)) { diff --git a/camel/src/test/java/TestManzanMessageFormatter.java b/camel/src/test/java/TestManzanMessageFormatter.java deleted file mode 100644 index 70e720c..0000000 --- a/camel/src/test/java/TestManzanMessageFormatter.java +++ /dev/null @@ -1,24 +0,0 @@ -import com.github.theprez.manzan.ManzanMessageFormatter; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class TestManzanMessageFormatter { - - @Test - public void testMessageFormattedCorrectly() throws Exception { - String message = "$message_id$ (severity $SeverIty$): $MESSAGE$"; - String expectedFormat = "$MESSAGE_ID$ (severity $SEVERITY$): $MESSAGE$"; - ManzanMessageFormatter manzanMessageFormatter = new ManzanMessageFormatter(message); - assertEquals(expectedFormat, manzanMessageFormatter.getM_fmtStr()); - } - - @Test - public void testIncorrectlyFormattedMessage() throws Exception { - String message = "$message_id$ (severity $$$"; - String expectedFormat = "$MESSAGE_ID$ (severity $$$"; - ManzanMessageFormatter manzanMessageFormatter = new ManzanMessageFormatter(message); - assertEquals(expectedFormat, manzanMessageFormatter.getM_fmtStr()); - } - -} \ No newline at end of file