Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support optional destinations #182

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.WatchStarter;
import com.github.theprez.manzan.routes.ManzanRoute;
import com.github.theprez.manzan.routes.event.FileEvent;
Expand Down Expand Up @@ -61,7 +60,7 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
}
if ("false".equalsIgnoreCase(getIni().get(section, "enabled"))) {
continue;
} else if ( type.equals("watch")){
} else if (type.equals("watch")) {
// We will handle the watch events separately as the logic is a bit more complicated
watchEvents.add(section);
continue;
Expand Down Expand Up @@ -100,50 +99,63 @@ public synchronized Map<String, ManzanRoute> getRoutes() throws IOException, AS4
for (int i = 0; i < watchEvents.size(); i++) {
final String section = watchEvents.get(i);
final String name = section;
int userNumToProcess = getOptionalInt(name, "numToProcess");
final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS;
final String format = getOptionalString(name, "format");
String strwch = getRequiredString(name, "strwch");
String id = getRequiredString(name, "id");

ManzanEventType eventType;
if(strwch.contains("WCHMSGQ")) {
eventType = ManzanEventType.WATCH_MSG;
} else if(strwch.contains("WCHLICLOG")) {
eventType = ManzanEventType.WATCH_VLOG;
} else if(strwch.contains("WCHPAL")) {
eventType = ManzanEventType.WATCH_PAL;
} else {
throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified");
}
// Required fields
String id = getRequiredString(name, "id");
String strwch = getRequiredString(name, "strwch");

int userInterval = getOptionalInt(name, "interval");
final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL;
final List<String> destinations = new LinkedList<String>();
for (String d : getRequiredString(name, "destinations").split("\\s*,\\s*")) {
d = d.trim();
if (!m_destinations.contains(d)) {
throw new RuntimeException(
"No destination configured named '" + d + "' for data source '" + name + "'");
String userDestinations = getOptionalString(name, "destinations");
if (userDestinations != null) {
// Optional fields
int userNumToProcess = getOptionalInt(name, "numToProcess");
final int numToProcess = userNumToProcess != -1 ? userNumToProcess : DEFAULT_NUM_TO_PROCESS;
int userInterval = getOptionalInt(name, "interval");
final int interval = userInterval != -1 ? userInterval : DEFAULT_INTERVAL;
final String format = getOptionalString(name, "format");

// Determine the event type
ManzanEventType eventType;
if (strwch.contains("WCHMSGQ")) {
eventType = ManzanEventType.WATCH_MSG;
} else if (strwch.contains("WCHLICLOG")) {
eventType = ManzanEventType.WATCH_VLOG;
} else if (strwch.contains("WCHPAL")) {
eventType = ManzanEventType.WATCH_PAL;
} else {
throw new RuntimeException("Watch for message, LIC log entry, or PAL entry not specified");
}
if (StringUtils.isNonEmpty(d)) {
destinations.add(d);

// Process the destinations
final List<String> destinations = new LinkedList<String>();
for (String d : userDestinations.split("\\s*,\\s*")) {
d = d.trim();
if (!m_destinations.contains(d)) {
throw new RuntimeException(
"No destination configured named '" + d + "' for data source '" + name + "'");
}
if (StringUtils.isNonEmpty(d)) {
destinations.add(d);
}
}
}

// Build the maps
String destString = createRecipientList(destinations);
formatMap.put(id.toUpperCase(), format);
destMap.put(id.toUpperCase(), destString);
// Build the maps
String destString = createRecipientList(destinations);
formatMap.put(id.toUpperCase(), format);
destMap.put(id.toUpperCase(), destString);

String sqlRouteName = name + "sql";
ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType,
interval, numToProcess));
}

String sqlRouteName = name + "sql";
ret.put(sqlRouteName, new WatchMsgEventSql(sqlRouteName, id, format, destinations, schema, eventType, interval, numToProcess));
// Create the watcher
WatchStarter ws = new WatchStarter(id, strwch);
ws.strwch();
}

if (watchEvents.size() > 0){
// After iterating over the loop, the formatMap and destMap are complete. Now create the route.
if (watchEvents.size() > 0) {
// After iterating over the loop, the formatMap and destMap are complete. Now
// create the route.
final String routeName = "socketWatcher";
ret.put(routeName, new WatchMsgEventSockets(routeName, formatMap, destMap));
}
Expand Down
Loading