Skip to content

Commit 4bb2de2

Browse files
committedMar 6, 2025
feat(core): allow reading file from any namespaces
Fixes kestra-io/kestra-ee#1934
1 parent 27c8762 commit 4bb2de2

File tree

5 files changed

+125
-75
lines changed

5 files changed

+125
-75
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package io.kestra.core.runners.pebble.functions;
2+
3+
import io.kestra.core.services.FlowService;
4+
import io.kestra.core.utils.Slugify;
5+
import io.pebbletemplates.pebble.error.PebbleException;
6+
import io.pebbletemplates.pebble.extension.Function;
7+
import io.pebbletemplates.pebble.template.EvaluationContext;
8+
import io.pebbletemplates.pebble.template.PebbleTemplate;
9+
import jakarta.inject.Inject;
10+
11+
import java.net.URI;
12+
import java.util.Map;
13+
import java.util.regex.Pattern;
14+
15+
abstract class AbstractFileFunction implements Function {
16+
static final String KESTRA_SCHEME = "kestra:///";
17+
static final String TRIGGER = "trigger";
18+
static final String NAMESPACE = "namespace";
19+
static final String TENANT_ID = "tenantId";
20+
static final String ID = "id";
21+
22+
private static final Pattern EXECUTION_FILE = Pattern.compile(".*/.*/executions/.*/tasks/.*/.*");
23+
24+
@Inject
25+
private FlowService flowService;
26+
27+
URI getUriFromThePath(Object path, int lineNumber, PebbleTemplate self) {
28+
if (path instanceof URI u) {
29+
return u;
30+
} else if (path instanceof String str && str.startsWith(KESTRA_SCHEME)) {
31+
return URI.create(str);
32+
} else {
33+
throw new PebbleException(null, "Unable to create the URI from the path " + path, lineNumber, self.getName());
34+
}
35+
}
36+
37+
boolean isFileUriValid(String namespace, String flowId, String executionId, URI path) {
38+
// Internal storage URI should be: kestra:///$namespace/$flowId/executions/$executionId/tasks/$taskName/$taskRunId/$random.ion or kestra:///$namespace/$flowId/executions/$executionId/trigger/$triggerName/$random.ion
39+
// We check that the file is for the given flow execution
40+
if (namespace == null || flowId == null || executionId == null) {
41+
return false;
42+
}
43+
44+
String authorizedBasePath = KESTRA_SCHEME + namespace.replace(".", "/") + "/" + Slugify.of(flowId) + "/executions/" + executionId + "/";
45+
return path.toString().startsWith(authorizedBasePath);
46+
}
47+
48+
@SuppressWarnings("unchecked")
49+
String checkAllowedFileAndReturnNamespace(EvaluationContext context, URI path) {
50+
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
51+
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");
52+
53+
// check if the file is from the current execution, the parent execution or an allowed namespaces
54+
boolean isFileFromCurrentExecution = isFileUriValid(flow.get(NAMESPACE), flow.get(ID), execution.get(ID), path);
55+
if (isFileFromCurrentExecution) {
56+
return flow.get(NAMESPACE);
57+
} else {
58+
if (isFileFromParentExecution(context, path)) {
59+
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);
60+
return trigger.get(NAMESPACE);
61+
}
62+
else {
63+
return checkIfFileFromAllowedNamespaceAndReturnIt(context, path, flow.get(TENANT_ID), flow.get(NAMESPACE));
64+
}
65+
}
66+
}
67+
68+
@SuppressWarnings("unchecked")
69+
private boolean isFileFromParentExecution(EvaluationContext context, URI path) {
70+
if (context.getVariable(TRIGGER) != null) {
71+
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
72+
Map<String, String> trigger = (Map<String, String>) context.getVariable(TRIGGER);
73+
74+
if (!isFileUriValid(trigger.get(NAMESPACE), trigger.get("flowId"), trigger.get("executionId"), path)) {
75+
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the parent execution");
76+
}
77+
return true;
78+
}
79+
return false;
80+
}
81+
82+
private String checkIfFileFromAllowedNamespaceAndReturnIt(EvaluationContext context, URI path, String tenantId, String fromNamespace) {
83+
// Extract namespace from the path, it should be of the form: kestra:///({tenantId}/){namespace}/{flowId}/executions/{executionId}/tasks/{taskId}/{taskRunId}/{fileName}'
84+
// To extract the namespace, we must do it step by step as tenantId, namespace and taskId can contain the words 'executions' and 'tasks'
85+
String namespace = path.toString().substring(KESTRA_SCHEME.length());
86+
if (!EXECUTION_FILE.matcher(namespace).matches()) {
87+
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it is not an execution file");
88+
}
89+
90+
// 1. remove the tenantId if existing
91+
if (tenantId != null) {
92+
namespace = namespace.substring(tenantId.length() + 1);
93+
}
94+
// 2. remove everything after tasks
95+
namespace = namespace.substring(0, namespace.lastIndexOf("/tasks/"));
96+
// 3. remove everything after executions
97+
namespace = namespace.substring(0, namespace.lastIndexOf("/executions/"));
98+
// 4. remove the flowId
99+
namespace = namespace.substring(0, namespace.lastIndexOf('/'));
100+
// 5. replace '/' with '.'
101+
namespace = namespace.replace("/", ".");
102+
103+
flowService.checkAllowedNamespace(tenantId, namespace, tenantId, fromNamespace);
104+
105+
return namespace;
106+
}
107+
}

‎core/src/main/java/io/kestra/core/runners/pebble/functions/FileSizeFunction.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,11 @@ private URI getUriFromThePath(Object path, int lineNumber, PebbleTemplate self)
6161

6262
@SuppressWarnings("unchecked")
6363
private long getFileSizeFromInternalStorageUri(EvaluationContext context, URI path) throws IOException {
64-
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
65-
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");
66-
67-
boolean isFileFromCurrentExecution = isFileUriValid(flow.get(NAMESPACE), flow.get(ID), execution.get(ID), path);
64+
// check if the file is from the current execution, the parent execution, or an allowed namespace
65+
String namespace = checkAllowedFileAndReturnNamespace(context, path);
6866

69-
if (!isFileFromCurrentExecution) {
70-
checkIfFileFromParentExecution(context, path);
71-
}
72-
73-
FileAttributes fileAttributes = storageInterface.getAttributes(flow.get("tenantId"), flow.get("namespace"), path);
67+
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
68+
FileAttributes fileAttributes = storageInterface.getAttributes(flow.get(TENANT_ID), namespace, path);
7469
return fileAttributes.getSize();
7570
}
7671

‎core/src/main/java/io/kestra/core/runners/pebble/functions/ReadFileFunction.java

+8-24
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import io.kestra.core.storages.StorageContext;
44
import io.kestra.core.storages.StorageInterface;
5-
import io.kestra.core.utils.Slugify;
6-
import io.micronaut.context.annotation.Value;
75
import io.pebbletemplates.pebble.error.PebbleException;
86
import io.pebbletemplates.pebble.extension.Function;
97
import io.pebbletemplates.pebble.template.EvaluationContext;
@@ -26,8 +24,8 @@ public class ReadFileFunction implements Function {
2624
@Inject
2725
private StorageInterface storageInterface;
2826

29-
@Value("${kestra.server-type:}") // default to empty as tests didn't set this property
30-
private String serverType;
27+
// @Value("${kestra.server-type:}") // default to empty as tests didn't set this property
28+
// private String serverType;
3129

3230
@Override
3331
public List<String> getArgumentNames() {
@@ -70,33 +68,19 @@ public Object execute(Map<String, Object> args, PebbleTemplate self, EvaluationC
7068
@SuppressWarnings("unchecked")
7169
private String readFromNamespaceFile(EvaluationContext context, String path) throws IOException {
7270
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
73-
URI namespaceFile = URI.create(StorageContext.namespaceFilePrefix(flow.get("namespace")) + "/" + path);
74-
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), flow.get("namespace"), namespaceFile)) {
71+
URI namespaceFile = URI.create(StorageContext.namespaceFilePrefix(flow.get(NAMESPACE)) + "/" + path);
72+
try (InputStream inputStream = storageInterface.get(flow.get(TENANT_ID), flow.get(NAMESPACE), namespaceFile)) {
7573
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
7674
}
7775
}
7876

7977
@SuppressWarnings("unchecked")
8078
private String readFromInternalStorageUri(EvaluationContext context, URI path) throws IOException {
81-
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
82-
Map<String, String> execution = (Map<String, String>) context.getVariable("execution");
83-
84-
// check if the file is from the current execution
85-
if (!validateFileUri(flow.get("namespace"), flow.get("id"), execution.get("id"), path)) {
86-
// if not, it can be from the parent execution, so we check if there is a trigger of type execution
87-
if (context.getVariable("trigger") != null) {
88-
// if there is a trigger of type execution, we also allow accessing a file from the parent execution
89-
Map<String, String> trigger = (Map<String, String>) context.getVariable("trigger");
90-
if (!validateFileUri(trigger.get("namespace"), trigger.get("flowId"), trigger.get("executionId"), path)) {
91-
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the current execution");
92-
}
93-
}
94-
else {
95-
throw new IllegalArgumentException("Unable to read the file '" + path + "' as it didn't belong to the current execution");
96-
}
97-
}
79+
// check if the file is from the current execution, the parent execution, or an allowed namespace
80+
String namespace = checkAllowedFileAndReturnNamespace(context, path);
9881

99-
try (InputStream inputStream = storageInterface.get(flow.get("tenantId"), flow.get("namespace"), path)) {
82+
Map<String, String> flow = (Map<String, String>) context.getVariable("flow");
83+
try (InputStream inputStream = storageInterface.get(flow.get(TENANT_ID), namespace, path)) {
10084
return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
10185
}
10286
}

‎core/src/test/java/io/kestra/core/runners/pebble/functions/FileSizeFunctionTest.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ void returnsCorrectSize_givenStringUri_andParentExecution() throws IOException,
7373
}
7474

7575
@Test
76-
void shouldThrowIllegalArgumentException_givenMissingTrigger_andParentExecution() throws IOException {
76+
void shouldReadFromAnotherExecution() throws IOException, IllegalVariableEvaluationException {
7777
String executionId = IdUtils.create();
7878
URI internalStorageURI = getInternalStorageURI(executionId);
7979
URI internalStorageFile = getInternalStorageFile(internalStorageURI);
@@ -85,12 +85,8 @@ void shouldThrowIllegalArgumentException_givenMissingTrigger_andParentExecution(
8585
"execution", Map.of("id", IdUtils.create())
8686
);
8787

88-
Exception ex = assertThrows(
89-
IllegalArgumentException.class,
90-
() -> variableRenderer.render("{{ fileSize('" + internalStorageFile + "') }}", variables)
91-
);
92-
93-
assertTrue(ex.getMessage().startsWith("Unable to read the file"), "Exception message doesn't match expected one");
88+
String size = variableRenderer.render("{{ fileSize('" + internalStorageFile + "') }}", variables);
89+
assertThat(size, is(FILE_SIZE));
9490
}
9591

9692
@Test

‎core/src/test/java/io/kestra/core/runners/pebble/functions/ReadFileFunctionTest.java

+3-35
Original file line numberDiff line numberDiff line change
@@ -125,54 +125,22 @@ void readInternalStorageURI() throws IOException, IllegalVariableEvaluationExcep
125125
}
126126

127127
@Test
128-
void readUnauthorizedInternalStorageFile() throws IOException {
128+
void readInternalStorageFileFromAnotherExecution() throws IOException, IllegalVariableEvaluationException {
129129
String namespace = "my.namespace";
130130
String flowId = "flow";
131131
String executionId = IdUtils.create();
132132
URI internalStorageURI = URI.create("/" + namespace.replace(".", "/") + "/" + flowId + "/executions/" + executionId + "/tasks/task/" + IdUtils.create() + "/123456.ion");
133133
URI internalStorageFile = storageInterface.put(null, namespace, internalStorageURI, new ByteArrayInputStream("Hello from a task output".getBytes()));
134134

135-
// test for an un-authorized execution with no trigger
136135
Map<String, Object> variables = Map.of(
137136
"flow", Map.of(
138137
"id", "notme",
139138
"namespace", "notme"),
140139
"execution", Map.of("id", "notme")
141140
);
142141

143-
var exception = assertThrows(IllegalArgumentException.class, () -> variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables));
144-
assertThat(exception.getMessage(), is("Unable to read the file '" + internalStorageFile + "' as it didn't belong to the current execution"));
145-
146-
// test for an un-authorized execution with a trigger of type execution
147-
Map<String, Object> executionTriggerVariables = Map.of(
148-
"flow", Map.of(
149-
"id", "notme",
150-
"namespace", "notme"),
151-
"execution", Map.of("id", "notme"),
152-
"trigger", Map.of(
153-
"flowId", "notme",
154-
"namespace", "notme",
155-
"executionId", "notme"
156-
)
157-
);
158-
159-
exception = assertThrows(IllegalArgumentException.class, () -> variableRenderer.render("{{ read('" + internalStorageFile + "') }}", executionTriggerVariables));
160-
assertThat(exception.getMessage(), is("Unable to read the file '" + internalStorageFile + "' as it didn't belong to the current execution"));
161-
162-
// test for an un-authorized execution with a trigger of another type
163-
Map<String, Object> triggerVariables = Map.of(
164-
"flow", Map.of(
165-
"id", "notme",
166-
"namespace", "notme"),
167-
"execution", Map.of("id", "notme"),
168-
"trigger", Map.of(
169-
"date", "somedate",
170-
"row", "somerow"
171-
)
172-
);
173-
174-
exception = assertThrows(IllegalArgumentException.class, () -> variableRenderer.render("{{ read('" + internalStorageFile + "') }}", triggerVariables));
175-
assertThat(exception.getMessage(), is("Unable to read the file '" + internalStorageFile + "' as it didn't belong to the current execution"));
142+
String render = variableRenderer.render("{{ read('" + internalStorageFile + "') }}", variables);
143+
assertThat(render, is("Hello from a task output"));
176144
}
177145

178146
@Test

0 commit comments

Comments
 (0)
Please sign in to comment.