Skip to content

Commit

Permalink
[Expressions] Fix the execution pipeline not to stop on a flaky subex…
Browse files Browse the repository at this point in the history
…pression (#143852)

* Fix the execution pipeline not to stop on a flaky subexpression
* Fix the execution pipeline not to stop on an invalid or incorrect value
  • Loading branch information
dokmic committed Oct 24, 2022
1 parent 9f312e8 commit ee6aeba
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 38 deletions.
106 changes: 106 additions & 0 deletions src/plugins/expressions/common/execution/execution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { parseExpression, ExpressionAstExpression } from '../ast';
import { createUnitTestExecutor } from '../test_helpers';
import { ExpressionFunctionDefinition } from '..';
import { ExecutionContract } from './execution_contract';
import { ExpressionValueBoxed } from '../expression_types';

beforeAll(() => {
if (typeof performance === 'undefined') {
Expand Down Expand Up @@ -744,6 +745,79 @@ describe('Execution', () => {
});
});
});

test('continues execution when error state is gone', async () => {
testScheduler.run(({ cold, expectObservable, flush }) => {
const a = 1;
const b = 2;
const c = 3;
const d = 4;
const observable$ = cold('abcd|', { a, b, c, d });
const flakyFn = jest
.fn()
.mockImplementationOnce((value) => value)
.mockImplementationOnce(() => {
throw new Error('Some error.');
})
.mockReturnValueOnce({ type: 'something' })
.mockImplementationOnce((value) => value);
const spyFn = jest.fn((input, { arg }) => arg);

const executor = createUnitTestExecutor();
executor.registerFunction({
name: 'observable',
args: {},
help: '',
fn: () => observable$,
});
executor.registerFunction({
name: 'flaky',
args: {},
help: '',
fn: (value) => flakyFn(value),
});
executor.registerFunction({
name: 'spy',
args: {
arg: {
help: '',
types: ['number'],
},
},
help: '',
fn: (input, args) => spyFn(input, args),
});

const result = executor.run('spy arg={observable | flaky}', null, {});

expectObservable(result).toBe('abcd|', {
a: { partial: true, result: a },
b: {
partial: true,
result: {
type: 'error',
error: expect.objectContaining({ message: '[spy] > [flaky] > Some error.' }),
},
},
c: {
partial: true,
result: {
type: 'error',
error: expect.objectContaining({
message: `[spy] > Can not cast 'something' to any of 'number'`,
}),
},
},
d: { partial: false, result: d },
});

flush();

expect(spyFn).toHaveBeenCalledTimes(2);
expect(spyFn).toHaveBeenNthCalledWith(1, null, { arg: a });
expect(spyFn).toHaveBeenNthCalledWith(2, null, { arg: d });
});
});
});

describe('when arguments are missing', () => {
Expand Down Expand Up @@ -847,6 +921,38 @@ describe('Execution', () => {
});
});

describe('when arguments are incorrect', () => {
it('when required argument is missing and has not alias, returns error', async () => {
const incorrectArg: ExpressionFunctionDefinition<
'incorrectArg',
unknown,
{ arg: ExpressionValueBoxed<'something'> },
unknown
> = {
name: 'incorrectArg',
args: {
arg: {
help: '',
required: true,
types: ['something'],
},
},
help: '',
fn: jest.fn(),
};
const executor = createUnitTestExecutor();
executor.registerFunction(incorrectArg);
const { result } = await lastValueFrom(executor.run('incorrectArg arg="string"', null, {}));

expect(result).toMatchObject({
type: 'error',
error: {
message: `[incorrectArg] > Can not cast 'string' to any of 'something'`,
},
});
});
});

describe('debug mode', () => {
test('can execute expression in debug mode', async () => {
const execution = createExecution('add val=1 | add val=2 | add val=3', {}, true);
Expand Down
103 changes: 65 additions & 38 deletions src/plugins/expressions/common/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,20 +352,30 @@ export class Execution<
// actually have `then` or `subscribe` methods which would be treated as a `Promise`
// or an `Observable` accordingly.
return this.resolveArgs(fn, currentInput, fnArgs).pipe(
tap((args) => this.execution.params.debug && Object.assign(head.debug, { args })),
switchMap((args) => this.invokeFunction(fn, currentInput, args)),
switchMap((output) => (getType(output) === 'error' ? throwError(output) : of(output))),
tap((output) => this.execution.params.debug && Object.assign(head.debug, { output })),
switchMap((output) => this.invokeChain<ChainOutput>(tail, output)),
catchError((rawError) => {
const error = createError(rawError);
error.error.message = `[${fnName}] > ${error.error.message}`;

if (this.execution.params.debug) {
Object.assign(head.debug, { error, rawError, success: false });
}

return of(error);
switchMap((resolvedArgs) => {
const args$ = isExpressionValueError(resolvedArgs)
? throwError(resolvedArgs.error)
: of(resolvedArgs);

return args$.pipe(
tap((args) => this.execution.params.debug && Object.assign(head.debug, { args })),
switchMap((args) => this.invokeFunction(fn, currentInput, args)),
switchMap((output) =>
getType(output) === 'error' ? throwError(output) : of(output)
),
tap((output) => this.execution.params.debug && Object.assign(head.debug, { output })),
switchMap((output) => this.invokeChain<ChainOutput>(tail, output)),
catchError((rawError) => {
const error = createError(rawError);
error.error.message = `[${fnName}] > ${error.error.message}`;

if (this.execution.params.debug) {
Object.assign(head.debug, { error, rawError, success: false });
}

return of(error);
})
);
}),
finalize(() => {
if (this.execution.params.debug) {
Expand Down Expand Up @@ -449,7 +459,10 @@ export class Execution<
}
}

throw new Error(`Can not cast '${fromTypeName}' to any of '${toTypeNames.join(', ')}'`);
throw createError({
name: 'invalid value',
message: `Can not cast '${fromTypeName}' to any of '${toTypeNames.join(', ')}'`,
});
}

validate<Type = unknown>(value: Type, argDef: ExpressionFunctionParameter<Type>): void {
Expand All @@ -459,7 +472,10 @@ export class Execution<
}': '${argDef.options.join("', '")}'`;

if (argDef.strict) {
throw new Error(message);
throw createError({
message,
name: 'invalid argument',
});
}

this.logger?.warn(message);
Expand All @@ -471,7 +487,7 @@ export class Execution<
fnDef: Fn,
input: unknown,
argAsts: Record<string, ExpressionAstArgument[]>
): Observable<Record<string, unknown>> {
): Observable<Record<string, unknown> | ExpressionValueError> {
return defer(() => {
const { args: argDefs } = fnDef;

Expand All @@ -481,7 +497,10 @@ export class Execution<
(acc, argAst, argName) => {
const argDef = getByAlias(argDefs, argName);
if (!argDef) {
throw new Error(`Unknown argument '${argName}' passed to function '${fnDef.name}'`);
throw createError({
name: 'unknown argument',
message: `Unknown argument '${argName}' passed to function '${fnDef.name}'`,
});
}
if (argDef.deprecated && !acc[argDef.name]) {
this.logger?.warn(`Argument '${argName}' is deprecated in function '${fnDef.name}'`);
Expand All @@ -502,7 +521,10 @@ export class Execution<
continue;
}

throw new Error(`${fnDef.name} requires the "${name}" argument`);
throw createError({
name: 'missing argument',
message: `${fnDef.name} requires the "${name}" argument`,
});
}

// Create the functions to resolve the argument ASTs into values
Expand All @@ -513,14 +535,17 @@ export class Execution<
(subInput = input) =>
this.interpret(item, subInput).pipe(
pluck('result'),
map((output) => {
switchMap((output) => {
if (isExpressionValueError(output)) {
throw output.error;
return of(output);
}

return this.cast(output, argDefs[argName].types);
}),
tap((value) => this.validate(value, argDefs[argName]))
return of(output).pipe(
map((value) => this.cast(value, argDefs[argName].types)),
tap((value) => this.validate(value, argDefs[argName])),
catchError((error) => of(error))
);
})
)
)
);
Expand All @@ -531,7 +556,7 @@ export class Execution<
return from([{}]);
}

const resolvedArgValuesObservable = combineLatest(
return combineLatest(
argNames.map((argName) => {
const interpretFns = resolveArgFns[argName];

Expand All @@ -542,23 +567,25 @@ export class Execution<
}

return argDefs[argName].resolve
? combineLatest(interpretFns.map((fn) => fn()))
? combineLatest(interpretFns.map((fn) => fn())).pipe(
map((values) => values.find(isExpressionValueError) ?? values)
)
: of(interpretFns);
})
);

return resolvedArgValuesObservable.pipe(
map((resolvedArgValues) =>
mapValues(
// Return an object here because the arguments themselves might actually have a 'then'
// function which would be treated as a promise
zipObject(argNames, resolvedArgValues),
// Just return the last unless the argument definition allows multiple
(argValues, argName) => (argDefs[argName].multi ? argValues : last(argValues))
)
).pipe(
map(
(values) =>
values.find(isExpressionValueError) ??
mapValues(
// Return an object here because the arguments themselves might actually have a 'then'
// function which would be treated as a promise
zipObject(argNames, values as unknown[][]),
// Just return the last unless the argument definition allows multiple
(argValues, argName) => (argDefs[argName].multi ? argValues : last(argValues))
)
)
);
});
}).pipe(catchError((error) => of(error)));
}

interpret<T>(ast: ExpressionAstNode, input: T): Observable<ExecutionResult<unknown>> {
Expand Down

0 comments on commit ee6aeba

Please sign in to comment.