Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[simple-api] simplified method for JNI bridges #221

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 116 additions & 56 deletions src/main/java/SimpleAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -36,9 +39,10 @@

/**
* A simplified Java API consisting of a {@link #startJob()} method that starts a job based on a
* script name and a list of options, and {@link #getNewMessages()} and {@link #getLastJobStatus()}
* methods. It is used to build a simple Java CLI (see the {@link #main()} method). The simplified
* API also makes it easier to bridge with other programming languages using JNI.
* script name and a list of options and returns a {@link CommandLineJob}. This object provices
* convenience methods for monitoring the status and messages. This class is used to build a simple
* Java CLI (see the {@link #main()} method). The simplified API also makes it easier to bridge with
* other programming languages using JNI.
*/
@Component(
name = "SimpleAPI",
Expand Down Expand Up @@ -71,7 +75,8 @@ public void setJobFactory(JobFactory jobFactory) {
this.jobFactory = jobFactory;
}

private void _startJob(String scriptName, Map<String,? extends Iterable<String>> options) throws IllegalArgumentException, FileNotFoundException {
private CommandLineJob _startJob(String scriptName, Map<String,? extends Iterable<String>> options)
throws IllegalArgumentException, FileNotFoundException, URISyntaxException {
ScriptService<?> scriptService = scriptRegistry.getScript(scriptName);
if (scriptService == null)
throw new IllegalArgumentException(scriptName + " script not found");
Expand All @@ -82,24 +87,31 @@ private void _startJob(String scriptName, Map<String,? extends Iterable<String>>
for (String value : e.getValue())
parser.withArgument(e.getKey(), value);
CommandLineJob job = parser.createJob(jobFactory);
MessageAccessor accessor = job.getMonitor().getMessageAccessor();
accessor.listen(
num -> {
consumeMessage(accessor, num);
}
);
job.getMonitor().getStatusUpdates().listen(s -> updateJobStatus(s));
new Thread(job).start();
return job;
}

public static void startJob(String scriptName, Map<String,? extends Iterable<String>> options) throws IllegalArgumentException, FileNotFoundException {
getInstance()._startJob(scriptName, options);
/**
* Start a new job
*
* @param scriptName the name of the script
* @param options the command line arguments, providing the inputs and option values for the
* job, and file locations where results must be stored.
* @return The job, wrapped in a {@link CommandLineJob} object for easy monitoring.
*/
public static CommandLineJob startJob(String scriptName, Map<String,? extends Iterable<String>> options)
throws IllegalArgumentException, FileNotFoundException, URISyntaxException {
return getInstance()._startJob(scriptName, options);
}

/**
* Singleton thread safe instance of SimpleAPI.
*/
private static SimpleAPI INSTANCE;

/**
* Get the singleton {@link SimpleAPI} instance.
*/
private static SimpleAPI getInstance() {
if (INSTANCE == null) {
for (CreateOnStart o : ServiceLoader.load(CreateOnStart.class))
Expand All @@ -111,39 +123,6 @@ private static SimpleAPI getInstance() {
return INSTANCE;
}

private static Job.Status lastJobStatus = null;
private static synchronized void updateJobStatus(Job.Status status) {
lastJobStatus = status;
}
private static List<Message> messagesQueue = new ArrayList<>();
private static int lastMessage = -1;
private static synchronized void consumeMessage(MessageAccessor accessor, int seqNum) {
for (Message m :
accessor.createFilter()
.greaterThan(lastMessage)
.filterLevels(Collections.singleton(Level.INFO))
.getMessages()) {
if (m.getSequence() > lastMessage) {
messagesQueue.add(m);
}
}
lastMessage = seqNum;
}

/**
* Get the list of new top-level messages (messages that have not
* been returned yet by a previous call to {@link #getNewMessages()}).
*/
public static synchronized List<Message> getNewMessages() {
List<Message> result = List.copyOf(messagesQueue);
messagesQueue.clear();
return result;
}

public static synchronized Job.Status getLastJobStatus() {
return lastJobStatus;
}

/**
* Simple command line interface
*/
Expand Down Expand Up @@ -171,20 +150,21 @@ public static void main(String[] args) throws InterruptedException, IOException
}
list.add(args[i + 1]);
}
CommandLineJob job = null;
try {
SimpleAPI.startJob(script, options);
job = SimpleAPI.startJob(script, options);
} catch (IllegalArgumentException e) {
System.err.println(e.getMessage());
System.exit(1);
} catch (FileNotFoundException e) {
} catch (FileNotFoundException|URISyntaxException e) {
System.err.println("File does not exist: " + e.getMessage());
System.exit(1);
}
while (true) {
for (Message m : SimpleAPI.getNewMessages()) {
for (Message m : job.getNewMessages()) {
System.err.println(m.getText());
}
switch (SimpleAPI.getLastJobStatus()) {
switch (job.getStatus()) {
case SUCCESS:
case FAIL:
case ERROR:
Expand All @@ -197,6 +177,9 @@ public static void main(String[] args) throws InterruptedException, IOException
}
}

/**
* Builder class to create a {@link CommandLineJob} object by parsing command line arguments.
*/
private static class CommandLineJobParser {

private final Script script;
Expand All @@ -214,7 +197,10 @@ public CommandLineJobParser(Script script, File fileBase) {
/**
* Parse command line argument
*/
public CommandLineJobParser withArgument(String key, String value) throws IllegalArgumentException, FileNotFoundException {
public CommandLineJobParser withArgument(String key, String value)
throws IllegalArgumentException, FileNotFoundException, URISyntaxException {
if (value == null)
throw new IllegalArgumentException();
if (script.getInputPort(key) != null)
return withInput(key, value);
else if (script.getOption(key) != null)
Expand Down Expand Up @@ -256,13 +242,22 @@ private CommandLineJobParser withInput(String port, String source) throws Illega
* the value is not valid according to the option type.
* @throws FileNotFoundException if the option type is "anyFileURI" and the value can not be
* resolved to a document.
* @throws URISyntaxException if the option type is "anyFileURI" or "anyDirURI" and the
* value starts with "file:/" but is an invalid URI
*/
private CommandLineJobParser withOption(String name, String value) throws IllegalArgumentException, FileNotFoundException {
private CommandLineJobParser withOption(String name, String value)
throws IllegalArgumentException, FileNotFoundException, URISyntaxException {
ScriptOption o = script.getOption(name);
if (o != null) {
String type = o.getType().getId();
if ("anyFileURI".equals(type)) {
File file = new File(value);
File file; {
if (value.startsWith("file:/")) {
file = new File(new URI(value));
} else {
file = new File(value);
}
}
if (!file.isAbsolute()) {
if (fileBase == null)
throw new FileNotFoundException("File must be an absolute path, but got " + file);
Expand All @@ -274,7 +269,13 @@ private CommandLineJobParser withOption(String name, String value) throws Illega
throw new UncheckedIOException(e);
}
} else if ("anyDirURI".equals(type)) {
File dir = new File(value);
File dir; {
if (value.startsWith("file:/")) {
dir = new File(new URI(value));
} else {
dir = new File(value);
}
}
if (!dir.isAbsolute()) {
if (fileBase == null)
throw new FileNotFoundException("File must be an absolute path, but got " + dir);
Expand Down Expand Up @@ -334,7 +335,7 @@ else if (file.list().length > 0)
if (file.isDirectory()) {
if (file.list().length > 0)
throw new IllegalArgumentException("Directory is not empty: " + file);
resultLocations.put(port, URI.create(file.toURI() + "/"));
resultLocations.put(port, file.toURI());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

@NPavie NPavie Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If i recall, It could lead to invalid/malformed path on windows ending by "//" or "\/"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Shouldn't we instead change the

if (result.endsWith("/")) {

above to

if (result.endsWith("/") || result.endsWith("\\")) {

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I did not treat this in the results side (only the parameters initialization side was creating issues during developement of the connection between word addin and pipeline)

} else {
if (p.isSequence())
throw new IllegalArgumentException("Not a directory: " + file);
Expand Down Expand Up @@ -368,8 +369,18 @@ public static class CommandLineJob implements Runnable, AutoCloseable {
private CommandLineJob(Job job, Map<String,URI> resultLocations) {
this.job = job;
this.resultLocations = resultLocations;
// Simplify monitoring of messages
MessageAccessor accessor = job.getMonitor().getMessageAccessor();
accessor.listen(
num -> {
consumeMessage(accessor, num);
}
);
}

/**
* Run the job and store the results
*/
public void run() {
job.run();
try {
Expand All @@ -383,7 +394,7 @@ public void run() {
File f = new File(u);
if (u.toString().endsWith("/"))
for (JobResult r : job.getResults().getResults(port)) {
File dest = new File(f, r.strip().getIdx());
File dest = new File(f, URLDecoder.decode(r.strip().getIdx(), StandardCharsets.UTF_8));
if (dest.exists())
existingFiles.add(dest);
else
Expand All @@ -407,6 +418,9 @@ public void run() {
completed.set(true);
}

/**
* Get the current status
*/
public Job.Status getStatus() {
Job.Status s = job.getStatus();
switch (s) {
Expand All @@ -421,6 +435,52 @@ public Job.Status getStatus() {
}
}

private final List<Message> messagesQueue = new ArrayList<>();
private int lastMessage = -1;

/**
* Fill the message buffer queue for logging. The queue is returned and emptied on each
* {@link #getNewMessages()} call.
*
* @param accessor the job's {@link MessageAccessor}
* @param seqNum see {@link MessageAccessor#listen()}
*/
private synchronized void consumeMessage(MessageAccessor accessor, int seqNum) {
for (Message m :
accessor.createFilter()
.greaterThan(lastMessage)
.filterLevels(Collections.singleton(Level.INFO))
.getMessages()) {
if (m.getSequence() > lastMessage) {
messagesQueue.add(m);
}
}
lastMessage = seqNum;
}

/**
* Get the list of new top-level messages (messages that have not been returned yet by a
* previous call to {@link #getNewMessages()}).
*/
public synchronized List<Message> getNewMessages() {
List<Message> result = List.copyOf(messagesQueue);
messagesQueue.clear();
return result;
}

/**
* Get the list of all error messages reported during the job execution.
*/
public List<Message> getErrors() {
return job.getMonitor().getMessageAccessor().getErrors();
}

/**
* For advanced job monitoring, get the job's {@link JobMonitor}. From this object, you can
* access all messages reported for the job through {@link JobMonitor#getMessageAccessor()},
* or register your own status notifications callback through {@link
* JobMonitor#getStatusUpdates()}.
*/
public JobMonitor getMonitor() {
return job.getMonitor();
}
Expand Down
Loading