Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sockets #154

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
38b7401
Add socket client
jonnyz32 Sep 20, 2024
6957f01
added sockclient.cpp
jonnyz32 Sep 24, 2024
75fe623
Handle recieved json message
jonnyz32 Sep 25, 2024
443caac
Do case insensitive matching for formatting data
jonnyz32 Sep 26, 2024
ff36479
send message over socket
jonnyz32 Sep 26, 2024
f629fff
change session_id to sessionId
jonnyz32 Sep 27, 2024
8d0af5c
change db2_publish_message to use socket instead of sql
jonnyz32 Sep 27, 2024
6b6b1cc
Add helper function to get json utf8 output buf
jonnyz32 Sep 27, 2024
8d3f320
Add error handling to socketclient
jonnyz32 Sep 27, 2024
7ebeed6
revert sessionId in ile code to session_id
jonnyz32 Sep 27, 2024
ef7ce38
remove space
jonnyz32 Sep 27, 2024
f9ed636
remove msgEvent class
jonnyz32 Sep 27, 2024
995798d
move socket ip and port into var
jonnyz32 Sep 27, 2024
b111d99
Merge remote-tracking branch 'origin/main' into use-sockets
jonnyz32 Sep 27, 2024
80cb27f
change tgtccsid to job
jonnyz32 Sep 27, 2024
e2530e6
add get_utf8_output_buf_length
jonnyz32 Sep 27, 2024
21249be
add get_utf8_output_buf method
jonnyz32 Sep 27, 2024
4341920
Clear output file in snd2q test
jonnyz32 Sep 27, 2024
0830e02
remove comments, whitespace
jonnyz32 Sep 27, 2024
72fda27
remove whitespace
jonnyz32 Sep 27, 2024
c48339e
Remove unused method
jonnyz32 Oct 3, 2024
999d575
Comment out insert into manzanmsg code which we may use later
jonnyz32 Oct 3, 2024
9901f84
Fix message formatting; add test
jonnyz32 Oct 3, 2024
a896b6e
Change msgid to message_id
jonnyz32 Oct 3, 2024
4e582ab
change msgtype to message_type
jonnyz32 Oct 3, 2024
a0b50e3
Dont uppercase message format
jonnyz32 Oct 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions camel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
<artifactId>camel-stream</artifactId>
<version>3.14.7</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jdbc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public static String format(final String _in, final Map<String, Object> _mapping
return formatter.format(_mappings);
}

public String getM_fmtStr() {
return m_fmtStr;
}

private final String m_fmtStr;

public ManzanMessageFormatter(final String _fmtStr) {
Expand All @@ -33,8 +37,9 @@ public String format(final Map<String, Object> _mappings) {
ret = ret.replace("\\n", "\n").replace("\\t", "\t");

for (final Entry<String, Object> repl : _mappings.entrySet()) {
ret = ret.replace("$" + repl.getKey() + "$", "" + repl.getValue());
String jsonIndicator ="$json:" + repl.getKey() + "$";
final String key = repl.getKey();
ret = ret.replace("$" + key + "$", "" + repl.getValue());
String jsonIndicator ="$json:" + key + "$";
if(ret.contains(jsonIndicator)) {
ret = ret.replace(jsonIndicator, jsonEncode("" + repl.getValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.camel.model.dataformat.JsonLibrary;

import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanEventType;
Expand All @@ -15,29 +18,31 @@ public class WatchMsgEvent extends ManzanRoute {
private final String m_schema;
private final String m_sessionId;
private final ManzanMessageFormatter m_formatter;
private final String m_socketIp = "0.0.0.0";
private final String m_socketPort = "8080";
SanjulaGanepola marked this conversation as resolved.
Show resolved Hide resolved


public WatchMsgEvent(final String _name, final String _session_id, final String _format, final List<String> _destinations, final String _schema, final int _interval, final int _numToProcess) throws IOException {
public WatchMsgEvent(final String _name, final String _session_id, final String _format,
final List<String> _destinations, final String _schema, final int _interval, final int _numToProcess)
throws IOException {
super(_name);
m_interval = _interval;
m_numToProcess = _numToProcess;
m_schema = _schema;
m_formatter = StringUtils.isEmpty(_format) ? null: new ManzanMessageFormatter(_format);
m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format);
super.setRecipientList(_destinations);
m_sessionId = _session_id.trim().toUpperCase();
}

//@formatter:off
@Override
public void configure() {
from("timer://foo?synchronous=true&period=" + m_interval)
.routeId("manzan_msg:"+m_name)
.setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG))
.setBody(constant("SeLeCt * fRoM " + m_schema + ".mAnZaNmSg wHeRe SESSION_ID = '"+m_sessionId+"' limit " + m_numToProcess ))
// .to("stream:out")
.to("jdbc:jt400?outputType=StreamList")
.split(body()).streaming().parallelProcessing()
.setHeader("id", simple("${body[ORDINAL_POSITION]}"))
.setHeader("session_id", simple("${body[SESSION_ID]}"))
from(String.format("netty:tcp://%s:%s?sync=false", m_socketIp, m_socketPort))
.log("Received raw message: ${body}")
SanjulaGanepola marked this conversation as resolved.
Show resolved Hide resolved
.unmarshal().json(JsonLibrary.Jackson, Map.class)
.routeId("manzan_msg:"+m_name)
.setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG))
.setHeader("session_id", simple("${body[sessionId]}"))
.setHeader("data_map", simple("${body}"))
.marshal().json(true) //TODO: skip this if we are applying a format
.setBody(simple("${body}\n"))
Expand All @@ -46,9 +51,7 @@ public void configure() {
exchange.getIn().setBody(m_formatter.format(getDataMap(exchange)));
}
})
.recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end()
.setBody(simple("delete fRoM " + m_schema + ".mAnZaNmSg where ORDINAL_POSITION = ${header.id} WITH NC"))
.to("jdbc:jt400").to("stream:err");
.recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end();
}
//@formatter:on

Expand Down
6 changes: 3 additions & 3 deletions ile/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ src/mzversion.h:
/qsys.lib/${BUILDLIB}.lib:
system "RUNSQL SQL('create schema ${BUILDLIB}') COMMIT(*NONE) NAMING(*SQL) "

/qsys.lib/${BUILDLIB}.lib/handler.pgm: /qsys.lib/${BUILDLIB}.lib/handler.module /qsys.lib/${BUILDLIB}.lib/pub_json.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/debug.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/userconf.module
/qsys.lib/${BUILDLIB}.lib/handler.pgm: /qsys.lib/${BUILDLIB}.lib/handler.module /qsys.lib/${BUILDLIB}.lib/pub_json.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/debug.module /qsys.lib/${BUILDLIB}.lib/pub_db2.module /qsys.lib/${BUILDLIB}.lib/userconf.module /qsys.lib/${BUILDLIB}.lib/SockClient.module

/qsys.lib/${BUILDLIB}.lib/%.pgm:
system "CRTPGM PGM(${BUILDLIB}/$*) MODULE($(patsubst %.module,$(BUILDLIB)/%,$(notdir $^))) ACTGRP(*CALLER)"

/qsys.lib/${BUILDLIB}.lib/%.module: src/%.cpp src/mzversion.h
system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED)"
system "CRTCPPMOD MODULE(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$<') OPTION(*EVENTF) SYSIFCOPT(*IFS64IO) DBGVIEW(*SOURCE) TERASPACE(*YES *TSIFC) STGMDL(*SNGLVL) DTAMDL(*p128) DEFINE(DEBUG_ENABLED) OUTPUT(*PRINT) TGTCCSID(*JOB)"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding TGTCCSID(*JOB) seemed to resolve encoding issues


/qsys.lib/${BUILDLIB}.lib/%.module: src/%.sqlc
system "CRTSQLCI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)"
system "CRTSQLCPPI OBJ(${BUILDLIB}/$*) SRCSTMF('$(CURDIR)/$^') COMMIT(*NONE) DATFMT(*ISO) TIMFMT(*ISO) DBGVIEW(*SOURCE) CVTCCSID(*JOB) COMPILEOPT('INCDIR(''src'')') SQLPATH(${BUILDLIB}) DFTRDBCOL(${BUILDLIB}) OPTION(*SQL)"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The program uses cpp functions now, so we need to use CRTSQLCPPI instead


/qsys.lib/${BUILDLIB.lib}:
-system "RUNSQL SQL('create schema ${BUILDLIB}') NAMING(*SYS)"
Expand Down
69 changes: 69 additions & 0 deletions ile/src/SockClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include <iostream>
#include <cstring> // For memset and memcpy
#include <sys/socket.h> // For socket functions
#include <arpa/inet.h> // For inet_addr
#include <unistd.h> // For close
#include "SockClient.h"
#include "manzan.h"

// Constructor to initialize the socket descriptor to -1
SockClient::SockClient(){
sock_fd = -1;
}

// Method to open a socket and connect to the server
bool SockClient::openSocket(const std::string ip, int port) {
// Create socket
sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (sock_fd < 0) {
DEBUG_ERROR("Error creating socket\n");
return false;
}

// Define server address
struct sockaddr_in server_address;
server_address.sin_family = AF_INET;
server_address.sin_port = htons(port);
server_address.sin_addr.s_addr = inet_addr(const_cast<char*>(ip.c_str()));

// Connect to server
if (connect(sock_fd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) {
DEBUG_ERROR("Error connecting to server\n");
closeSocket();
return false;
}

DEBUG_INFO("Connected to server at %s:%d\n", ip.c_str(), port);
return true;
}

// Method to send a message (string) over the socket
bool SockClient::sendMessage(const std::string message) {
if (sock_fd < 0) {
DEBUG_ERROR("Socket is not open\n");
return false;
}

int bytes_sent = send(sock_fd, const_cast<char*>(message.c_str()), message.size(), 0);
if (bytes_sent < 0) {
DEBUG_ERROR("Error sending message\n");
return false;
}
DEBUG_INFO("Sent message: %s\n", message);

return true;
}

// Method to close the socket
void SockClient::closeSocket() {
if (sock_fd >= 0) {
close(sock_fd);
sock_fd = -1;
DEBUG_INFO("Socket closed\n");
}
}

// Destructor to ensure socket is closed
SockClient::~SockClient() {
closeSocket();
}
23 changes: 23 additions & 0 deletions ile/src/SockClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include <string>

// extern "C" {
class SockClient {

int sock_fd;
public:
// Constructor to initialize the socket descriptor to -1
SockClient();

// Method to open a socket and connect to the server
bool openSocket(const std::string ip, int port);

// Method to send a message (string) over the socket
bool sendMessage(const std::string message);

// Method to close the socket
void closeSocket();

// Destructor to ensure socket is closed
~SockClient();
};
// }
2 changes: 2 additions & 0 deletions ile/src/handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "event_data.h"
#include "userconf.h"
#include "mzversion.h"
#include "pub_json.h"
#include "SockClient.h"

static FILE *fd = NULL;

Expand Down
Loading
Loading