Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ private synchronized void maybeUpdateVariablesCache(ResourceResolver resolver) {
ExecutionMode.PARSE,
Code.consoleMinimal(),
new InputValues(),
resolver)) {
resolver,
new CodeOutputMemory())) {
context.getCodeContext().prepareRun(context);
variablesCache = context.getCodeContext().getBindingVariables();
variablesCacheTimestamp = currentTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.io.*;

public class CodeOutputString implements CodeOutput {
public class CodeOutputMemory implements CodeOutput {

private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

Expand Down
113 changes: 35 additions & 78 deletions core/src/main/java/dev/vml/es/acm/core/code/CodeOutputRepo.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package dev.vml.es.acm.core.code;

import dev.vml.es.acm.core.AcmConstants;
import dev.vml.es.acm.core.AcmException;
import dev.vml.es.acm.core.gui.SpaSettings;
import dev.vml.es.acm.core.repo.Repo;
import dev.vml.es.acm.core.repo.RepoResource;
import dev.vml.es.acm.core.util.ResolverUtils;
import dev.vml.es.acm.core.repo.RepoChunks;
import java.io.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,74 +15,29 @@ public class CodeOutputRepo implements CodeOutput {

private static final Logger LOG = LoggerFactory.getLogger(CodeOutputRepo.class);

private static final String MIME_TYPE = "text/plain";

private static final String OUTPUT_ROOT = "output";

private static final int SCHEDULER_TERMINATION_TIMEOUT_SECONDS = 5;

private final ResourceResolverFactory resolverFactory;

private SpaSettings spaSettings;

private final String executionId;

private final ByteArrayOutputStream buffer;
private final RepoChunks repoChunks;

private ScheduledExecutorService scheduler;
private ScheduledExecutorService asyncFlushScheduler;

public CodeOutputRepo(ResourceResolverFactory resolverFactory, SpaSettings spaSettings, String executionId) {
this.resolverFactory = resolverFactory;
this.spaSettings = spaSettings;
this.executionId = executionId;
this.buffer = new ByteArrayOutputStream();
}

private void startAsyncSave() {
if (scheduler == null) {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleWithFixedDelay(
this::saveToRepo,
0,
Math.round(0.8 * spaSettings.getExecutionPollInterval()),
TimeUnit.MILLISECONDS);
}
}

private RepoResource getFile(ResourceResolver resolver) {
return Repo.quiet(resolver)
.get(String.format(
"%s/%s/%s_output.txt",
AcmConstants.VAR_ROOT, OUTPUT_ROOT, StringUtils.replace(executionId, "/", "-")));
}

private void saveToRepo() {
byte[] data = buffer.toByteArray();
if (data.length == 0) {
return;
}

try {
ResolverUtils.useContentResolver(resolverFactory, null, resolver -> {
RepoResource dataResource = getFile(resolver);
dataResource.parent().ensureRegularFolder();
dataResource.saveFile(MIME_TYPE, new ByteArrayInputStream(data));
});
} catch (Exception e) {
LOG.error("Output repo cannot save data for execution ID '{}'", executionId, e);
}
this.repoChunks = new RepoChunks(
resolverFactory,
String.format("%s/output", ExecutionContext.varPath(executionId)),
spaSettings.getExecutionCodeOutputChunkSize());
}

@Override
public InputStream read() {
try {
return ResolverUtils.queryContentResolver(resolverFactory, null, resolver -> {
RepoResource resource = getFile(resolver);
if (!resource.exists()) {
return new ByteArrayInputStream(new byte[0]);
}
return resource.readFileAsStream();
});
return repoChunks.getInputStream();
} catch (Exception e) {
throw new AcmException(
String.format("Output repo cannot open for reading for execution ID '%s'", executionId), e);
Expand All @@ -96,38 +46,45 @@ public InputStream read() {

@Override
public OutputStream write() {
startAsyncSave();
return buffer;
startAsyncFlush();
return repoChunks.getOutputStream();
}

@Override
public void flush() {
saveToRepo();
try {
repoChunks.flush();
} catch (IOException e) {
LOG.error("Output repo cannot flush for execution ID '{}'", executionId, e);
}
}

@Override
public void close() {
if (scheduler != null) {
scheduler.shutdown();
stopAsyncFlush();
try {
repoChunks.close();
} catch (IOException e) {
LOG.error("Output repo cannot close for execution ID '{}'", executionId, e);
}
}

private void startAsyncFlush() {
if (asyncFlushScheduler == null) {
asyncFlushScheduler = Executors.newSingleThreadScheduledExecutor();
asyncFlushScheduler.scheduleWithFixedDelay(
this::flush, 0, Math.round(0.8 * spaSettings.getExecutionPollInterval()), TimeUnit.MILLISECONDS);
}
}

private void stopAsyncFlush() {
if (asyncFlushScheduler != null) {
asyncFlushScheduler.shutdown();
try {
scheduler.awaitTermination(SCHEDULER_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
asyncFlushScheduler.awaitTermination(SCHEDULER_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
deleteFromRepo();
}

private void deleteFromRepo() {
try {
ResolverUtils.useContentResolver(resolverFactory, null, resolver -> {
RepoResource fileResource = getFile(resolver);
if (fileResource.exists()) {
fileResource.delete();
}
});
} catch (Exception e) {
LOG.error("Output repo cannot clean up data for execution ID '{}'", executionId, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public String getError() {

@Override
public String getOutput() {
context.getOutput().flush();
try (InputStream stream = context.getOutput().read()) {
context.getCodeOutput().flush();
try (InputStream stream = context.getCodeOutput().read()) {
return IOUtils.toString(stream, StandardCharsets.UTF_8);
} catch (Exception e) {
return null;
Expand All @@ -92,8 +92,8 @@ public String getInstance() {
}

public InputStream readOutput() throws AcmException {
context.getOutput().flush();
return context.getOutput().read();
context.getCodeOutput().flush();
return context.getCodeOutput().read();
}

@Override
Expand Down
46 changes: 31 additions & 15 deletions core/src/main/java/dev/vml/es/acm/core/code/ExecutionContext.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package dev.vml.es.acm.core.code;

import dev.vml.es.acm.core.gui.SpaSettings;
import dev.vml.es.acm.core.AcmConstants;
import dev.vml.es.acm.core.repo.Repo;
import dev.vml.es.acm.core.util.ResolverUtils;
import groovy.lang.Binding;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.slf4j.Logger;

public class ExecutionContext implements AutoCloseable {

public static final String VAR_ROOT = AcmConstants.VAR_ROOT + "/execution/context";

public static String varPath(String executionId) {
return String.format("%s/%s", VAR_ROOT, StringUtils.replace(executionId, "/", "-"));
}

private final String id;

private final String userId;
Expand All @@ -19,7 +28,7 @@ public class ExecutionContext implements AutoCloseable {

private final CodeContext codeContext;

private final CodeOutput output;
private final CodeOutput codeOutput;

private final CodePrintStream printStream;

Expand All @@ -46,31 +55,31 @@ public ExecutionContext(
Executor executor,
Executable executable,
InputValues inputValues,
CodeContext codeContext) {
CodeContext codeContext,
CodeOutput codeOutput) {
this.id = id;
this.userId = userId;
this.mode = mode;
this.executor = executor;
this.executable = executable;
this.inputValues = inputValues;
this.codeContext = codeContext;
this.output = determineOutput(mode, codeContext, id);
this.printStream = new CodePrintStream(output.write(), String.format("%s|%s", executable.getId(), id));
this.codeOutput = codeOutput;
this.printStream = new CodePrintStream(codeOutput.write(), String.format("%s|%s", executable.getId(), id));
this.schedules = new Schedules();
this.conditions = new Conditions(this);
this.inputs = new Inputs();
this.outputs = new Outputs();
this.outputs = new Outputs(this);

customizeBinding();
}

private CodeOutput determineOutput(ExecutionMode mode, CodeContext codeContext, String id) {
return mode == ExecutionMode.RUN
? new CodeOutputRepo(
codeContext.getOsgiContext().getService(ResourceResolverFactory.class),
codeContext.getOsgiContext().getService(SpaSettings.class),
id)
: new CodeOutputString();
private void cleanOutputs() {
ResourceResolverFactory resolverFactory =
codeContext.getOsgiContext().getService(ResourceResolverFactory.class);
ResolverUtils.useContentResolver(resolverFactory, null, resolver -> {
Repo.quiet(resolver).get(ExecutionContext.varPath(getId())).delete();
});
}

public String getId() {
Expand All @@ -93,8 +102,13 @@ public CodeContext getCodeContext() {
return codeContext;
}

public CodeOutput getCodeOutput() {
return codeOutput;
}

@Deprecated
public CodeOutput getOutput() {
return output;
return getCodeOutput();
}

public CodePrintStream getOut() {
Expand Down Expand Up @@ -172,7 +186,9 @@ private void customizeBinding() {
@Override
public void close() {
printStream.close();
output.close();
codeOutput.close();
outputs.close();
cleanOutputs();
}

public void variable(String name, Object value) {
Expand Down
28 changes: 21 additions & 7 deletions core/src/main/java/dev/vml/es/acm/core/code/ExecutionHistory.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,29 @@ private Resource saveEntry(ContextualExecution execution, Resource root) {
}

private void saveOutputs(ContextualExecution execution, Resource entry) {
for (Output definition :
for (Output output :
execution.getContext().getOutputs().getDefinitions().values()) {
RepoResource container = Repo.quiet(entry.getResourceResolver())
.get(entry.getPath())
.child(String.format("%s/%s", OUTPUT_FILES_CONTAINER_RN, definition.getName()))
.ensure(JcrConstants.NT_UNSTRUCTURED);
RepoResource file = container.child(OUTPUT_FILE_RN);
file.saveFile(definition.getMimeType(), definition.getInputStream());
if (OutputType.FILE.equals(output.getType())) {
saveFileOutput((FileOutput) output, entry);
}
}
}

private void saveFileOutput(FileOutput output, Resource entry) {
RepoResource container = Repo.quiet(entry.getResourceResolver())
.get(entry.getPath())
.child(String.format("%s/%s", OUTPUT_FILES_CONTAINER_RN, output.getName()))
.ensure(JcrConstants.NT_UNSTRUCTURED);
RepoResource file = container.child(OUTPUT_FILE_RN);
try {
output.flush();
} catch (IOException e) {
throw new AcmException(
String.format(
"Output '%s' cannot be flushed before saving to the execution history!", output.getName()),
e);
}
file.saveFile(output.getMimeType(), output.getInputStream());
}

public InputStream readOutputByName(Execution execution, String name) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/dev/vml/es/acm/core/code/ExecutionId.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public static String generate() {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd/HH/mm");
String formattedDate = now.format(formatter);
String uuid = UUID.randomUUID().toString();
return String.format("%s-%s_%d", formattedDate, uuid, now.getNano());
int nanoFirst3Digits = now.getNano() / 1_000_000; // first 3 digits
return String.format("%s-%s_%d", formattedDate, uuid, nanoFirst3Digits);
}
}
13 changes: 11 additions & 2 deletions core/src/main/java/dev/vml/es/acm/core/code/ExecutionQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private CodeOutput determineCodeOutput(String executionId) {
@Override
public JobExecutionResult process(Job job, JobExecutionContext context) {
ExecutionContextOptions contextOptions = ExecutionContextOptions.fromJob(job);
QueuedExecution queuedExecution = new QueuedExecution(executor, job, new CodeOutputString());
QueuedExecution queuedExecution = new QueuedExecution(executor, job, new CodeOutputMemory());

LOG.debug("Execution started '{}'", queuedExecution);

Expand Down Expand Up @@ -260,13 +260,22 @@ private Execution executeAsync(ExecutionContextOptions contextOptions, QueuedExe
contextOptions.getExecutionMode(),
execution.getExecutable(),
contextOptions.getInputs(),
resolver)) {
resolver,
determineOutput(
contextOptions.getExecutionMode(),
execution.getJob().getId()))) {
return executor.execute(context);
} catch (LoginException e) {
throw new AcmException(String.format("Cannot access repository for execution '%s'", execution.getId()), e);
}
}

private CodeOutput determineOutput(ExecutionMode mode, String executionId) {
return mode == ExecutionMode.RUN
? new CodeOutputRepo(resourceResolverFactory, spaSettings, executionId)
: new CodeOutputMemory();
}

public void reset() {
if ((jobAsyncExecutor != null) && !jobAsyncExecutor.isShutdown()) {
jobAsyncExecutor.shutdownNow();
Expand Down
Loading