diff --git a/camel/pom.xml b/camel/pom.xml
index 8ab90eb..5129ef7 100644
--- a/camel/pom.xml
+++ b/camel/pom.xml
@@ -60,6 +60,10 @@
camel-stream
3.14.7
+
+ org.apache.camel
+ camel-netty
+
org.apache.camel
camel-jdbc
diff --git a/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java b/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java
index c1f142f..976fb8e 100644
--- a/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java
+++ b/camel/src/main/java/com/github/theprez/manzan/ManzanMessageFormatter.java
@@ -20,6 +20,10 @@ public static String format(final String _in, final Map _mapping
return formatter.format(_mappings);
}
+ public String getM_fmtStr() {
+ return m_fmtStr;
+ }
+
private final String m_fmtStr;
public ManzanMessageFormatter(final String _fmtStr) {
@@ -33,8 +37,9 @@ public String format(final Map _mappings) {
ret = ret.replace("\\n", "\n").replace("\\t", "\t");
for (final Entry repl : _mappings.entrySet()) {
- ret = ret.replace("$" + repl.getKey() + "$", "" + repl.getValue());
- String jsonIndicator ="$json:" + repl.getKey() + "$";
+ final String key = repl.getKey();
+ ret = ret.replace("$" + key + "$", "" + repl.getValue());
+ String jsonIndicator ="$json:" + key + "$";
if(ret.contains(jsonIndicator)) {
ret = ret.replace(jsonIndicator, jsonEncode("" + repl.getValue()));
}
diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java
index bd95763..31d6095 100644
--- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java
+++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java
@@ -2,6 +2,9 @@
import java.io.IOException;
import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.model.dataformat.JsonLibrary;
import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanEventType;
@@ -15,13 +18,18 @@ public class WatchMsgEvent extends ManzanRoute {
private final String m_schema;
private final String m_sessionId;
private final ManzanMessageFormatter m_formatter;
+ private final String m_socketIp = "0.0.0.0";
+ private final String m_socketPort = "8080";
+
- public WatchMsgEvent(final String _name, final String _session_id, final String _format, final List _destinations, final String _schema, final int _interval, final int _numToProcess) throws IOException {
+ public WatchMsgEvent(final String _name, final String _session_id, final String _format,
+ final List _destinations, final String _schema, final int _interval, final int _numToProcess)
+ throws IOException {
super(_name);
m_interval = _interval;
m_numToProcess = _numToProcess;
m_schema = _schema;
- m_formatter = StringUtils.isEmpty(_format) ? null: new ManzanMessageFormatter(_format);
+ m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format);
super.setRecipientList(_destinations);
m_sessionId = _session_id.trim().toUpperCase();
}
@@ -29,15 +37,12 @@ public WatchMsgEvent(final String _name, final String _session_id, final String
//@formatter:off
@Override
public void configure() {
- from("timer://foo?synchronous=true&period=" + m_interval)
- .routeId("manzan_msg:"+m_name)
- .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG))
- .setBody(constant("SeLeCt * fRoM " + m_schema + ".mAnZaNmSg wHeRe SESSION_ID = '"+m_sessionId+"' limit " + m_numToProcess ))
- // .to("stream:out")
- .to("jdbc:jt400?outputType=StreamList")
- .split(body()).streaming().parallelProcessing()
- .setHeader("id", simple("${body[ORDINAL_POSITION]}"))
- .setHeader("session_id", simple("${body[SESSION_ID]}"))
+ from(String.format("netty:tcp://%s:%s?sync=false", m_socketIp, m_socketPort))
+ .log("Received raw message: ${body}")
+ .unmarshal().json(JsonLibrary.Jackson, Map.class)
+ .routeId("manzan_msg:"+m_name)
+ .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG))
+ .setHeader("session_id", simple("${body[sessionId]}"))
.setHeader("data_map", simple("${body}"))
.marshal().json(true) //TODO: skip this if we are applying a format
.setBody(simple("${body}\n"))
@@ -46,9 +51,7 @@ public void configure() {
exchange.getIn().setBody(m_formatter.format(getDataMap(exchange)));
}
})
- .recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end()
- .setBody(simple("delete fRoM " + m_schema + ".mAnZaNmSg where ORDINAL_POSITION = ${header.id} WITH NC"))
- .to("jdbc:jt400").to("stream:err");
+ .recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end();
}
//@formatter:on
diff --git a/ile/Makefile b/ile/Makefile
index fcc16c5..546b942 100644
--- a/ile/Makefile
+++ b/ile/Makefile
@@ -19,16 +19,16 @@ src/mzversion.h:
/qsys.lib/${BUILDLIB}.lib:
system "RUNSQL SQL('create schema ${BUILDLIB}') COMMIT(*NONE) NAMING(*SQL) "
-/qsys.lib/${BUILDLIB}.lib/handler.pgm: /qsys.lib/${BUILDLIB}.lib/handler.module /qsys.lib/${BUILDLIB}.lib/pub_json.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/debug.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/userconf.module
+/qsys.lib/${BUILDLIB}.lib/handler.pgm: /qsys.lib/${BUILDLIB}.lib/handler.module /qsys.lib/${BUILDLIB}.lib/pub_json.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/debug.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/userconf.module /qsys.lib/${BUILDLIB}.lib/SockClient.module
/qsys.lib/${BUILDLIB}.lib/%.pgm:
system "CRTPGM PGM(${BUILDLIB}/$*) MODULE($(patsubst %.module,$(BUILDLIB)/%,$(notdir $^))) ACTGRP(*CALLER)"
/qsys.lib/${BUILDLIB}.lib/%.module: src/%.cpp src/mzversion.h
- system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED)"
+ system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED) OUTPUT(*PRINT) TGTCCSID(*JOB)"
/qsys.lib/${BUILDLIB}.lib/%.module: src/%.sqlc
- system "CRTSQLCI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)"
+ system "CRTSQLCPPI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)"
/qsys.lib/${BUILDLIB.lib}:
-system "RUNSQL SQL('create schema ${BUILDLIB}') NAMING(*SYS)"
diff --git a/ile/src/SockClient.cpp b/ile/src/SockClient.cpp
new file mode 100644
index 0000000..d3dd799
--- /dev/null
+++ b/ile/src/SockClient.cpp
@@ -0,0 +1,69 @@
+#include
+#include // For memset and memcpy
+#include // For socket functions
+#include // For inet_addr
+#include // For close
+#include "SockClient.h"
+#include "manzan.h"
+
+// Constructor to initialize the socket descriptor to -1
+SockClient::SockClient(){
+ sock_fd = -1;
+}
+
+// Method to open a socket and connect to the server
+bool SockClient::openSocket(const std::string ip, int port) {
+ // Create socket
+ sock_fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock_fd < 0) {
+ DEBUG_ERROR("Error creating socket\n");
+ return false;
+ }
+
+ // Define server address
+ struct sockaddr_in server_address;
+ server_address.sin_family = AF_INET;
+ server_address.sin_port = htons(port);
+ server_address.sin_addr.s_addr = inet_addr(const_cast(ip.c_str()));
+
+ // Connect to server
+ if (connect(sock_fd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) {
+ DEBUG_ERROR("Error connecting to server\n");
+ closeSocket();
+ return false;
+ }
+
+ DEBUG_INFO("Connected to server at %s:%d\n", ip.c_str(), port);
+ return true;
+}
+
+// Method to send a message (string) over the socket
+bool SockClient::sendMessage(const std::string message) {
+ if (sock_fd < 0) {
+ DEBUG_ERROR("Socket is not open\n");
+ return false;
+ }
+
+ int bytes_sent = send(sock_fd, const_cast(message.c_str()), message.size(), 0);
+ if (bytes_sent < 0) {
+ DEBUG_ERROR("Error sending message\n");
+ return false;
+ }
+ DEBUG_INFO("Sent message: %s\n", message);
+
+ return true;
+}
+
+// Method to close the socket
+void SockClient::closeSocket() {
+ if (sock_fd >= 0) {
+ close(sock_fd);
+ sock_fd = -1;
+ DEBUG_INFO("Socket closed\n");
+ }
+}
+
+// Destructor to ensure socket is closed
+SockClient::~SockClient() {
+ closeSocket();
+}
diff --git a/ile/src/SockClient.h b/ile/src/SockClient.h
new file mode 100644
index 0000000..0e556cb
--- /dev/null
+++ b/ile/src/SockClient.h
@@ -0,0 +1,23 @@
+#include
+
+// extern "C" {
+class SockClient {
+
+int sock_fd;
+public:
+ // Constructor to initialize the socket descriptor to -1
+ SockClient();
+
+ // Method to open a socket and connect to the server
+ bool openSocket(const std::string ip, int port);
+
+ // Method to send a message (string) over the socket
+ bool sendMessage(const std::string message);
+
+ // Method to close the socket
+ void closeSocket();
+
+ // Destructor to ensure socket is closed
+ ~SockClient();
+};
+// }
diff --git a/ile/src/handler.cpp b/ile/src/handler.cpp
index 092fd2e..3444eac 100644
--- a/ile/src/handler.cpp
+++ b/ile/src/handler.cpp
@@ -12,6 +12,8 @@
#include "event_data.h"
#include "userconf.h"
#include "mzversion.h"
+#include "pub_json.h"
+#include "SockClient.h"
static FILE *fd = NULL;
diff --git a/ile/src/pub_db2.sqlc b/ile/src/pub_db2.sqlc
index 288137d..7ac4464 100644
--- a/ile/src/pub_db2.sqlc
+++ b/ile/src/pub_db2.sqlc
@@ -7,12 +7,19 @@
#include
#include
+#include
+#include "pub_json.h"
+#include "SockClient.h"
+
#define COPY_PARM(dest, parm) \
memset(dest, 0, sizeof(dest)); \
strncpy(dest, parm ? parm : "", -1 + sizeof(dest));
-void check_sql_error(int sqlcode, const char* sqlstate)
+const int PORT = 8080;
+const std::string SERVER = "127.0.0.1";
+
+void check_sql_error(int sqlcode, const char *sqlstate)
{
if (sqlcode != 0)
{
@@ -25,6 +32,28 @@ void check_sql_error(int sqlcode, const char* sqlstate)
}
}
+bool send_message(std::string message){
+ // Create a SocketClient instance
+ SockClient client;
+
+ // Open a socket and connect to server
+ if (!client.openSocket(SERVER, PORT)) {
+ // TODO: How do we want to handle this error? Drop the message, insert into table?
+ DEBUG_ERROR("Failed to connect to socket: %s:%d\n", SERVER.c_str(), PORT);
+ return false;
+ }
+
+ // Send a message over the socket
+ if (!client.sendMessage(message)){
+ DEBUG_ERROR("Failed to send message: %s", message.c_str());
+ return false;
+ }
+
+ // Close the socket
+ client.closeSocket();
+ return true;
+}
+
// TODO: implement this
// 1. define Db2 tables
// // have autoincrement and autotimestamp columns
@@ -33,37 +62,62 @@ void check_sql_error(int sqlcode, const char* sqlstate)
// 4. implement!
extern int db2_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
{
- exec sql include SQLCA;
- char msg_session_id[11];
- char msg_msg_id[8];
- char msg_msg_type[11];
- char msg_severity[32];
- char msg_timestamp[32];
- char msg_job[32];
- char msg_sending_usrprf[11];
- char msg_message[2048];
- char msg_sending_program_name[257];
- char msg_sending_module_name[11];
- char msg_sending_procedure_name[2048];
-
- COPY_PARM(msg_session_id, _session_id);
- COPY_PARM(msg_msg_id, _msgid);
- COPY_PARM(msg_msg_type, _msg_type);
- sprintf(msg_severity, "%d", _msg_severity);
- COPY_PARM(msg_timestamp, _msg_timestamp);
- COPY_PARM(msg_job, _job);
- COPY_PARM(msg_sending_usrprf, _sending_usrprf);
- COPY_PARM(msg_message, _message);
- COPY_PARM(msg_sending_program_name, _sending_program_name);
- COPY_PARM(msg_sending_module_name, _sending_module_name);
- COPY_PARM(msg_sending_procedure_name, _sending_procedure_name);
+ // exec sql include SQLCA;
+ // char msg_session_id[11];
+ // char msg_msg_id[8];
+ // char msg_msg_type[11];
+ // char msg_severity[32];
+ // char msg_timestamp[32];
+ // char msg_job[32];
+ // char msg_sending_usrprf[11];
+ // char msg_message[2048];
+ // char msg_sending_program_name[257];
+ // char msg_sending_module_name[11];
+ // char msg_sending_procedure_name[2048];
- EXEC SQL
- INSERT INTO MANZANMSG(
- SESSION_ID, MESSAGE_ID, MESSAGE_TYPE, SEVERITY, JOB, SENDING_USRPRF, SENDING_PROGRAM_NAME, SENDING_MODULE_NAME, SENDING_PROCEDURE_NAME, MESSAGE_TIMESTAMP, MESSAGE)
- VALUES( : msg_session_id, : msg_msg_id, : msg_msg_type, : msg_severity, : msg_job, : msg_sending_usrprf, : msg_sending_program_name, : msg_sending_module_name, : msg_sending_procedure_name, : msg_timestamp, : msg_message);
- check_sql_error(sqlca.sqlcode, sqlca.sqlstate);
- return 0;
+ // COPY_PARM(msg_session_id, _session_id);
+ // COPY_PARM(msg_msg_id, _msgid);
+ // COPY_PARM(msg_msg_type, _msg_type);
+ // sprintf(msg_severity, "%d", _msg_severity);
+ // COPY_PARM(msg_timestamp, _msg_timestamp);
+ // COPY_PARM(msg_job, _job);
+ // COPY_PARM(msg_sending_usrprf, _sending_usrprf);
+ // COPY_PARM(msg_message, _message);
+ // COPY_PARM(msg_sending_program_name, _sending_program_name);
+ // COPY_PARM(msg_sending_module_name, _sending_module_name);
+ // COPY_PARM(msg_sending_procedure_name, _sending_procedure_name);
+
+ // EXEC SQL
+ // INSERT INTO MANZANMSG(
+ // SESSION_ID, MESSAGE_ID, MESSAGE_TYPE, SEVERITY, JOB, SENDING_USRPRF, SENDING_PROGRAM_NAME, SENDING_MODULE_NAME, SENDING_PROCEDURE_NAME, MESSAGE_TIMESTAMP, MESSAGE)
+ // VALUES( : msg_session_id, : msg_msg_id, : msg_msg_type, : msg_severity, : msg_job, : msg_sending_usrprf, : msg_sending_program_name, : msg_sending_module_name, : msg_sending_procedure_name, : msg_timestamp, : msg_message);
+ // check_sql_error(sqlca.sqlcode, sqlca.sqlstate);
+
+ std::string json_message = construct_json_message(
+ _session_id,
+ _msgid,
+ _msg_type,
+ _msg_severity,
+ _msg_timestamp,
+ _job,
+ _sending_usrprf,
+ _message,
+ _sending_program_name,
+ _sending_module_name,
+ _sending_procedure_name);
+
+ DEBUG_INFO("Sending message %s", const_cast(json_message.c_str()));
+
+ // Allocate memory for the utf-8 encoded json message
+ char *json_utf8 = get_utf8_output_buf(json_message);
+
+ to_utf8(json_utf8, get_utf8_output_buf_length(json_message), json_message.c_str());
+
+ // Send the message to the socket
+ bool res = send_message(json_utf8);
+
+ free(json_utf8);
+ return res ? 0 : -1;
}
extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE)
@@ -74,6 +128,7 @@ extern int db2_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE)
int db2_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE)
{
exec sql include SQLCA;
+ EXEC SQL BEGIN DECLARE SECTION;
char pal_sessid[11];
char pal_system_reference_code[11];
char pal_device_name[11];
@@ -87,6 +142,7 @@ int db2_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE)
char pal_secondary_code[11];
char pal_table_id[11];
char pal_sequence[32];
+ EXEC SQL END DECLARE SECTION;
COPY_PARM(pal_sessid, _session_id);
COPY_PARM(pal_system_reference_code, _system_reference_code);
@@ -105,23 +161,27 @@ int db2_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE)
EXEC SQL
INSERT INTO MANZANPAL(SESSION_ID, SYSTEM_REFERENCE_CODE, DEVICE_NAME, MODEL, SERIAL_NUMBER, RESOURCE_NAME, LOG_ID, PAL_TIMESTAMP, REFERENCE_CODE, SECONDARY_CODE, TABLE_ID, SEQUENCE_NUM)
VALUES( : pal_sessid, : pal_system_reference_code, : pal_device_name, : pal_model, : pal_serial_number, : pal_resource_name, : pal_log_identifier, : pal_timestamp, : pal_reference_code, : pal_secondary_code, : pal_table_id, : pal_sequence);
- check_sql_error(sqlca.sqlcode, sqlca.sqlstate);
+ check_sql_error(sqlca.sqlcode, (const char *)sqlca.sqlstate);
return 0;
}
extern int db2_publish_other(PUBLISH_OTHER_FUNCTION_SIGNATURE)
{
exec sql include SQLCA;
+
+ EXEC SQL BEGIN DECLARE SECTION;
char oth_sessid[11];
+ char oth_type[11];
+ EXEC SQL END DECLARE SECTION;
+
memset(oth_sessid, 0, sizeof(oth_sessid));
strncpy(oth_sessid, _session_id, 10);
- char oth_type[11];
memset(oth_type, 0, sizeof(oth_type));
strncpy(oth_type, _event_type, 10);
EXEC SQL
INSERT INTO MANZANOTH(SESSION_ID, EVENT) VALUES( : oth_sessid, : oth_type);
- check_sql_error(sqlca.sqlcode, sqlca.sqlstate);
+ check_sql_error(sqlca.sqlcode, (const char *)sqlca.sqlstate);
return 0;
}
\ No newline at end of file
diff --git a/ile/src/pub_json.cpp b/ile/src/pub_json.cpp
index 5c035ec..f19118c 100644
--- a/ile/src/pub_json.cpp
+++ b/ile/src/pub_json.cpp
@@ -17,7 +17,8 @@ int to_utf8(char *out, size_t out_len, const char *in)
memset(&tocode, 0, sizeof(tocode));
tocode.CCSID = 1208;
QtqCode_T fromcode;
- fromcode.CCSID = 37;
+
+ // Setting to 0 allows the system to automatically detect ccsid (hopefully)
memset(&fromcode, 0, sizeof(fromcode));
iconv_t cd = QtqIconvOpen(&tocode, &fromcode);
@@ -29,9 +30,10 @@ int to_utf8(char *out, size_t out_len, const char *in)
int rc = iconv(cd, &input, &inleft, &output, &outleft);
if (rc == -1)
{
- DEBUG_ERROR("Error in converting characters\n");
+ DEBUG_ERROR("Error in converting characters. %d: %s\n", errno, strerror(errno));
return 9;
}
+ DEBUG_INFO("Conversion to UTF-8 successful.\n");
return iconv_close(cd);
}
@@ -91,12 +93,34 @@ void append_json_element(std::string &_str, const char *_key, const int _value)
_str += value;
}
+size_t get_utf8_output_buf_length(std::string str){
+ // Each UTF-8 encoded byte can be up to 4 bytes, and add one for the null terminator.
+ return str.length() * 4 + 1;
+}
+
+/*
+* Return an output buffer containing enough space for the utf-8 encoded message.
+* Return NULL if there is no space remaining on the heap.
+* Remember to free the buffer after use.
+*/
+char* get_utf8_output_buf(std::string str){
+ char *buf = (char *)malloc(get_utf8_output_buf_length(str));
+ if (buf == NULL) {
+ DEBUG_ERROR("No heap space available to allocate buffer for %s\n", str.c_str());
+ return NULL;
+ }
+ return buf;
+}
+
int json_publish(const char *_session_id, std::string &_json)
{
- int json_len = 1 + _json.length();
- char *utf8 = (char *)malloc(56 + _json.length() * 2);
+ char *utf8 = get_utf8_output_buf(_json);
+ if (utf8 == NULL){
+ DEBUG_ERROR("No heap space available. Aborting publishing message %s\n", utf8);
+ return -1;
+ }
- to_utf8(utf8, json_len, _json.c_str());
+ to_utf8(utf8, get_utf8_output_buf_length(_json), _json.c_str());
DEBUG_INFO("Publishing JSON\n");
DEBUG_INFO("%s\n", _json.c_str());
@@ -121,7 +145,7 @@ int json_publish(const char *_session_id, std::string &_json)
return 0;
}
-extern "C" int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
+std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
{
std::string jsonStr;
jsonStr += "{\n ";
@@ -131,9 +155,9 @@ extern "C" int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
jsonStr += ",\n ";
append_json_element(jsonStr, "job", _job);
jsonStr += ",\n ";
- append_json_element(jsonStr, "msgid", _msgid);
+ append_json_element(jsonStr, "message_id", _msgid);
jsonStr += ",\n ";
- append_json_element(jsonStr, "msgtype", _msg_type);
+ append_json_element(jsonStr, "message_type", _msg_type);
jsonStr += ",\n ";
append_json_element(jsonStr, "severity", _msg_severity);
jsonStr += ",\n ";
@@ -148,8 +172,18 @@ extern "C" int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
append_json_element(jsonStr, "sending_module_name", _sending_module_name);
jsonStr += ",\n ";
append_json_element(jsonStr, "sending_procedure_name", _sending_procedure_name);
-
jsonStr += "\n}";
+ return jsonStr;
+}
+
+/**
+ * Publish json message to DTAQ
+ */
+extern "C" int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE)
+{
+ std::string jsonStr = construct_json_message(_session_id, _msgid, _msg_type, _msg_severity, _msg_timestamp, _job, _sending_usrprf,
+ _message, _sending_program_name, _sending_module_name, _sending_procedure_name);
+
return json_publish(_session_id, jsonStr);
}
diff --git a/ile/src/pub_json.h b/ile/src/pub_json.h
index bbb6f97..572ab4a 100644
--- a/ile/src/pub_json.h
+++ b/ile/src/pub_json.h
@@ -1,3 +1,5 @@
+#include
+
#ifndef _MANZAN_JSON_PUB_H_
#define _MANZAN_JSON_PUB_H_
extern "C" {
@@ -5,5 +7,11 @@ int json_publish_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE);
int json_publish_vlog(PUBLISH_VLOG_FUNCTION_SIGNATURE);
int json_publish_pal(PUBLISH_PAL_FUNCTION_SIGNATURE);
int json_publish_other(PUBLISH_OTHER_FUNCTION_SIGNATURE);
+int to_utf8(char *out, size_t out_len, const char *in);
+char* get_utf8_output_buf(std::string str);
+size_t get_utf8_output_buf_length(std::string str);
}
+
+std::string construct_json_message(PUBLISH_MESSAGE_FUNCTION_SIGNATURE);
+
#endif
\ No newline at end of file
diff --git a/test/e2e/msg/snd2q/Makefile b/test/e2e/msg/snd2q/Makefile
index 8999435..d1ef8ff 100644
--- a/test/e2e/msg/snd2q/Makefile
+++ b/test/e2e/msg/snd2q/Makefile
@@ -11,6 +11,7 @@ SEVERITY:=80
setup: /qsys.lib/${TESTLIB}.lib/msgs.msgq
echo "\nstrwch=WCHMSG((*ALL)) WCHMSGQ((${TESTLIB}/MSGS))" >> data.ini
+ rm -f ${OUTPUT_FILE}
cleanup:
system -kKv "ENDWCH SSNID(TESTING)"