Skip to content

Commit a22150f

Browse files
Merge pull request #228 from wttech/output-memory-efficient
Output - memory efficient with repo chunks
2 parents 0bbe208 + 9931359 commit a22150f

File tree

89 files changed

+1500
-876
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+1500
-876
lines changed

core/src/main/java/dev/vml/es/acm/core/assist/Assistancer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ private synchronized void maybeUpdateVariablesCache(ResourceResolver resolver) {
155155
ExecutionMode.PARSE,
156156
Code.consoleMinimal(),
157157
new InputValues(),
158-
resolver)) {
158+
resolver,
159+
new CodeOutputMemory())) {
159160
context.getCodeContext().prepareRun(context);
160161
variablesCache = context.getCodeContext().getBindingVariables();
161162
variablesCacheTimestamp = currentTime;

core/src/main/java/dev/vml/es/acm/core/code/CodeOutputString.java renamed to core/src/main/java/dev/vml/es/acm/core/code/CodeOutputMemory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import java.io.*;
44

5-
public class CodeOutputString implements CodeOutput {
5+
public class CodeOutputMemory implements CodeOutput {
66

77
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
88

Lines changed: 35 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
package dev.vml.es.acm.core.code;
22

3-
import dev.vml.es.acm.core.AcmConstants;
43
import dev.vml.es.acm.core.AcmException;
54
import dev.vml.es.acm.core.gui.SpaSettings;
6-
import dev.vml.es.acm.core.repo.Repo;
7-
import dev.vml.es.acm.core.repo.RepoResource;
8-
import dev.vml.es.acm.core.util.ResolverUtils;
5+
import dev.vml.es.acm.core.repo.RepoChunks;
96
import java.io.*;
107
import java.util.concurrent.Executors;
118
import java.util.concurrent.ScheduledExecutorService;
129
import java.util.concurrent.TimeUnit;
13-
import org.apache.commons.lang3.StringUtils;
14-
import org.apache.sling.api.resource.ResourceResolver;
1510
import org.apache.sling.api.resource.ResourceResolverFactory;
1611
import org.slf4j.Logger;
1712
import org.slf4j.LoggerFactory;
@@ -20,74 +15,29 @@ public class CodeOutputRepo implements CodeOutput {
2015

2116
private static final Logger LOG = LoggerFactory.getLogger(CodeOutputRepo.class);
2217

23-
private static final String MIME_TYPE = "text/plain";
24-
25-
private static final String OUTPUT_ROOT = "output";
26-
2718
private static final int SCHEDULER_TERMINATION_TIMEOUT_SECONDS = 5;
2819

29-
private final ResourceResolverFactory resolverFactory;
30-
3120
private SpaSettings spaSettings;
3221

3322
private final String executionId;
3423

35-
private final ByteArrayOutputStream buffer;
24+
private final RepoChunks repoChunks;
3625

37-
private ScheduledExecutorService scheduler;
26+
private ScheduledExecutorService asyncFlushScheduler;
3827

3928
public CodeOutputRepo(ResourceResolverFactory resolverFactory, SpaSettings spaSettings, String executionId) {
40-
this.resolverFactory = resolverFactory;
4129
this.spaSettings = spaSettings;
4230
this.executionId = executionId;
43-
this.buffer = new ByteArrayOutputStream();
44-
}
45-
46-
private void startAsyncSave() {
47-
if (scheduler == null) {
48-
scheduler = Executors.newSingleThreadScheduledExecutor();
49-
scheduler.scheduleWithFixedDelay(
50-
this::saveToRepo,
51-
0,
52-
Math.round(0.8 * spaSettings.getExecutionPollInterval()),
53-
TimeUnit.MILLISECONDS);
54-
}
55-
}
56-
57-
private RepoResource getFile(ResourceResolver resolver) {
58-
return Repo.quiet(resolver)
59-
.get(String.format(
60-
"%s/%s/%s_output.txt",
61-
AcmConstants.VAR_ROOT, OUTPUT_ROOT, StringUtils.replace(executionId, "/", "-")));
62-
}
63-
64-
private void saveToRepo() {
65-
byte[] data = buffer.toByteArray();
66-
if (data.length == 0) {
67-
return;
68-
}
69-
70-
try {
71-
ResolverUtils.useContentResolver(resolverFactory, null, resolver -> {
72-
RepoResource dataResource = getFile(resolver);
73-
dataResource.parent().ensureRegularFolder();
74-
dataResource.saveFile(MIME_TYPE, new ByteArrayInputStream(data));
75-
});
76-
} catch (Exception e) {
77-
LOG.error("Output repo cannot save data for execution ID '{}'", executionId, e);
78-
}
31+
this.repoChunks = new RepoChunks(
32+
resolverFactory,
33+
String.format("%s/output", ExecutionContext.varPath(executionId)),
34+
spaSettings.getExecutionCodeOutputChunkSize());
7935
}
8036

8137
@Override
8238
public InputStream read() {
8339
try {
84-
return ResolverUtils.queryContentResolver(resolverFactory, null, resolver -> {
85-
RepoResource resource = getFile(resolver);
86-
if (!resource.exists()) {
87-
return new ByteArrayInputStream(new byte[0]);
88-
}
89-
return resource.readFileAsStream();
90-
});
40+
return repoChunks.getInputStream();
9141
} catch (Exception e) {
9242
throw new AcmException(
9343
String.format("Output repo cannot open for reading for execution ID '%s'", executionId), e);
@@ -96,38 +46,45 @@ public InputStream read() {
9646

9747
@Override
9848
public OutputStream write() {
99-
startAsyncSave();
100-
return buffer;
49+
startAsyncFlush();
50+
return repoChunks.getOutputStream();
10151
}
10252

10353
@Override
10454
public void flush() {
105-
saveToRepo();
55+
try {
56+
repoChunks.flush();
57+
} catch (IOException e) {
58+
LOG.error("Output repo cannot flush for execution ID '{}'", executionId, e);
59+
}
10660
}
10761

10862
@Override
10963
public void close() {
110-
if (scheduler != null) {
111-
scheduler.shutdown();
64+
stopAsyncFlush();
65+
try {
66+
repoChunks.close();
67+
} catch (IOException e) {
68+
LOG.error("Output repo cannot close for execution ID '{}'", executionId, e);
69+
}
70+
}
71+
72+
private void startAsyncFlush() {
73+
if (asyncFlushScheduler == null) {
74+
asyncFlushScheduler = Executors.newSingleThreadScheduledExecutor();
75+
asyncFlushScheduler.scheduleWithFixedDelay(
76+
this::flush, 0, Math.round(0.8 * spaSettings.getExecutionPollInterval()), TimeUnit.MILLISECONDS);
77+
}
78+
}
79+
80+
private void stopAsyncFlush() {
81+
if (asyncFlushScheduler != null) {
82+
asyncFlushScheduler.shutdown();
11283
try {
113-
scheduler.awaitTermination(SCHEDULER_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
84+
asyncFlushScheduler.awaitTermination(SCHEDULER_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
11485
} catch (InterruptedException e) {
11586
Thread.currentThread().interrupt();
11687
}
11788
}
118-
deleteFromRepo();
119-
}
120-
121-
private void deleteFromRepo() {
122-
try {
123-
ResolverUtils.useContentResolver(resolverFactory, null, resolver -> {
124-
RepoResource fileResource = getFile(resolver);
125-
if (fileResource.exists()) {
126-
fileResource.delete();
127-
}
128-
});
129-
} catch (Exception e) {
130-
LOG.error("Output repo cannot clean up data for execution ID '{}'", executionId, e);
131-
}
13289
}
13390
}

core/src/main/java/dev/vml/es/acm/core/code/ContextualExecution.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public String getError() {
7878

7979
@Override
8080
public String getOutput() {
81-
context.getOutput().flush();
82-
try (InputStream stream = context.getOutput().read()) {
81+
context.getCodeOutput().flush();
82+
try (InputStream stream = context.getCodeOutput().read()) {
8383
return IOUtils.toString(stream, StandardCharsets.UTF_8);
8484
} catch (Exception e) {
8585
return null;
@@ -92,8 +92,8 @@ public String getInstance() {
9292
}
9393

9494
public InputStream readOutput() throws AcmException {
95-
context.getOutput().flush();
96-
return context.getOutput().read();
95+
context.getCodeOutput().flush();
96+
return context.getCodeOutput().read();
9797
}
9898

9999
@Override

core/src/main/java/dev/vml/es/acm/core/code/ExecutionContext.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
package dev.vml.es.acm.core.code;
22

3-
import dev.vml.es.acm.core.gui.SpaSettings;
3+
import dev.vml.es.acm.core.AcmConstants;
4+
import dev.vml.es.acm.core.repo.Repo;
5+
import dev.vml.es.acm.core.util.ResolverUtils;
46
import groovy.lang.Binding;
7+
import org.apache.commons.lang3.StringUtils;
58
import org.apache.sling.api.resource.ResourceResolverFactory;
69
import org.slf4j.Logger;
710

811
public class ExecutionContext implements AutoCloseable {
912

13+
public static final String VAR_ROOT = AcmConstants.VAR_ROOT + "/execution/context";
14+
15+
public static String varPath(String executionId) {
16+
return String.format("%s/%s", VAR_ROOT, StringUtils.replace(executionId, "/", "-"));
17+
}
18+
1019
private final String id;
1120

1221
private final String userId;
@@ -19,7 +28,7 @@ public class ExecutionContext implements AutoCloseable {
1928

2029
private final CodeContext codeContext;
2130

22-
private final CodeOutput output;
31+
private final CodeOutput codeOutput;
2332

2433
private final CodePrintStream printStream;
2534

@@ -46,31 +55,31 @@ public ExecutionContext(
4655
Executor executor,
4756
Executable executable,
4857
InputValues inputValues,
49-
CodeContext codeContext) {
58+
CodeContext codeContext,
59+
CodeOutput codeOutput) {
5060
this.id = id;
5161
this.userId = userId;
5262
this.mode = mode;
5363
this.executor = executor;
5464
this.executable = executable;
5565
this.inputValues = inputValues;
5666
this.codeContext = codeContext;
57-
this.output = determineOutput(mode, codeContext, id);
58-
this.printStream = new CodePrintStream(output.write(), String.format("%s|%s", executable.getId(), id));
67+
this.codeOutput = codeOutput;
68+
this.printStream = new CodePrintStream(codeOutput.write(), String.format("%s|%s", executable.getId(), id));
5969
this.schedules = new Schedules();
6070
this.conditions = new Conditions(this);
6171
this.inputs = new Inputs();
62-
this.outputs = new Outputs();
72+
this.outputs = new Outputs(this);
6373

6474
customizeBinding();
6575
}
6676

67-
private CodeOutput determineOutput(ExecutionMode mode, CodeContext codeContext, String id) {
68-
return mode == ExecutionMode.RUN
69-
? new CodeOutputRepo(
70-
codeContext.getOsgiContext().getService(ResourceResolverFactory.class),
71-
codeContext.getOsgiContext().getService(SpaSettings.class),
72-
id)
73-
: new CodeOutputString();
77+
private void cleanOutputs() {
78+
ResourceResolverFactory resolverFactory =
79+
codeContext.getOsgiContext().getService(ResourceResolverFactory.class);
80+
ResolverUtils.useContentResolver(resolverFactory, null, resolver -> {
81+
Repo.quiet(resolver).get(ExecutionContext.varPath(getId())).delete();
82+
});
7483
}
7584

7685
public String getId() {
@@ -93,8 +102,13 @@ public CodeContext getCodeContext() {
93102
return codeContext;
94103
}
95104

105+
public CodeOutput getCodeOutput() {
106+
return codeOutput;
107+
}
108+
109+
@Deprecated
96110
public CodeOutput getOutput() {
97-
return output;
111+
return getCodeOutput();
98112
}
99113

100114
public CodePrintStream getOut() {
@@ -172,7 +186,9 @@ private void customizeBinding() {
172186
@Override
173187
public void close() {
174188
printStream.close();
175-
output.close();
189+
codeOutput.close();
190+
outputs.close();
191+
cleanOutputs();
176192
}
177193

178194
public void variable(String name, Object value) {

core/src/main/java/dev/vml/es/acm/core/code/ExecutionHistory.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,29 @@ private Resource saveEntry(ContextualExecution execution, Resource root) {
6969
}
7070

7171
private void saveOutputs(ContextualExecution execution, Resource entry) {
72-
for (Output definition :
72+
for (Output output :
7373
execution.getContext().getOutputs().getDefinitions().values()) {
74-
RepoResource container = Repo.quiet(entry.getResourceResolver())
75-
.get(entry.getPath())
76-
.child(String.format("%s/%s", OUTPUT_FILES_CONTAINER_RN, definition.getName()))
77-
.ensure(JcrConstants.NT_UNSTRUCTURED);
78-
RepoResource file = container.child(OUTPUT_FILE_RN);
79-
file.saveFile(definition.getMimeType(), definition.getInputStream());
74+
if (OutputType.FILE.equals(output.getType())) {
75+
saveFileOutput((FileOutput) output, entry);
76+
}
77+
}
78+
}
79+
80+
private void saveFileOutput(FileOutput output, Resource entry) {
81+
RepoResource container = Repo.quiet(entry.getResourceResolver())
82+
.get(entry.getPath())
83+
.child(String.format("%s/%s", OUTPUT_FILES_CONTAINER_RN, output.getName()))
84+
.ensure(JcrConstants.NT_UNSTRUCTURED);
85+
RepoResource file = container.child(OUTPUT_FILE_RN);
86+
try {
87+
output.flush();
88+
} catch (IOException e) {
89+
throw new AcmException(
90+
String.format(
91+
"Output '%s' cannot be flushed before saving to the execution history!", output.getName()),
92+
e);
8093
}
94+
file.saveFile(output.getMimeType(), output.getInputStream());
8195
}
8296

8397
public InputStream readOutputByName(Execution execution, String name) {

core/src/main/java/dev/vml/es/acm/core/code/ExecutionId.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public static String generate() {
1818
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy/MM/dd/HH/mm");
1919
String formattedDate = now.format(formatter);
2020
String uuid = UUID.randomUUID().toString();
21-
return String.format("%s-%s_%d", formattedDate, uuid, now.getNano());
21+
int nanoFirst3Digits = now.getNano() / 1_000_000; // first 3 digits
22+
return String.format("%s-%s_%d", formattedDate, uuid, nanoFirst3Digits);
2223
}
2324
}

core/src/main/java/dev/vml/es/acm/core/code/ExecutionQueue.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ private CodeOutput determineCodeOutput(String executionId) {
196196
@Override
197197
public JobExecutionResult process(Job job, JobExecutionContext context) {
198198
ExecutionContextOptions contextOptions = ExecutionContextOptions.fromJob(job);
199-
QueuedExecution queuedExecution = new QueuedExecution(executor, job, new CodeOutputString());
199+
QueuedExecution queuedExecution = new QueuedExecution(executor, job, new CodeOutputMemory());
200200

201201
LOG.debug("Execution started '{}'", queuedExecution);
202202

@@ -260,13 +260,22 @@ private Execution executeAsync(ExecutionContextOptions contextOptions, QueuedExe
260260
contextOptions.getExecutionMode(),
261261
execution.getExecutable(),
262262
contextOptions.getInputs(),
263-
resolver)) {
263+
resolver,
264+
determineOutput(
265+
contextOptions.getExecutionMode(),
266+
execution.getJob().getId()))) {
264267
return executor.execute(context);
265268
} catch (LoginException e) {
266269
throw new AcmException(String.format("Cannot access repository for execution '%s'", execution.getId()), e);
267270
}
268271
}
269272

273+
private CodeOutput determineOutput(ExecutionMode mode, String executionId) {
274+
return mode == ExecutionMode.RUN
275+
? new CodeOutputRepo(resourceResolverFactory, spaSettings, executionId)
276+
: new CodeOutputMemory();
277+
}
278+
270279
public void reset() {
271280
if ((jobAsyncExecutor != null) && !jobAsyncExecutor.isShutdown()) {
272281
jobAsyncExecutor.shutdownNow();

0 commit comments

Comments
 (0)