Skip to content

Commit

Permalink
fix: log errors in entity stream worker, fix includeOld types, do not…
Browse files Browse the repository at this point in the history
… remove key from oldValue (#440)
  • Loading branch information
sam-goodwin authored Sep 7, 2023
1 parent 351ddec commit e5e94de
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 29 deletions.
21 changes: 21 additions & 0 deletions apps/tests/aws-runtime/test/test-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1286,3 +1286,24 @@ export const searchBlog = command(
};
}
);

// check types of entity
function streamShouldHaveEmptyIfNoInclude() {
counter.stream("", {}, (item) => {
if (item.operation === "modify") {
// @ts-expect-error - no oldValue without includeOld: true
item.oldValue!.namespace;
}
});
counter.stream(
"",
{
includeOld: true,
},
(item) => {
if (item.operation === "modify") {
item.oldValue.namespace;
}
}
);
}
1 change: 1 addition & 0 deletions packages/@eventual/aws-cdk/src/entity-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ export class EntityStream extends Construct implements EventualResource {
: Duration.seconds(0),
reportBatchItemFailures: true,
startingPosition: StartingPosition.TRIM_HORIZON,

...(filters.length > 0 ? { filters } : {}),
}),
],
Expand Down
25 changes: 13 additions & 12 deletions packages/@eventual/aws-runtime/src/handlers/entity-stream-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@ export default (async (event) => {
const normalizedKey = normalizeCompositeKey(entity, bestValue);
const keyMap = convertNormalizedEntityKeyToMap(normalizedKey);

if (newValue) {
delete newValue[EntityEntityRecord.VERSION_FIELD];
delete newValue[normalizedKey.partition.keyAttribute];
if (normalizedKey.sort) {
delete newValue[normalizedKey.sort.keyAttribute];
}
}
if (oldValue) {
delete oldValue[EntityEntityRecord.VERSION_FIELD];
delete oldValue[normalizedKey.partition.keyAttribute];
if (normalizedKey.sort) {
delete oldValue[normalizedKey.sort.keyAttribute];
removeSyntheticKey(newValue);
removeSyntheticKey(oldValue);

function removeSyntheticKey(value: Record<string, any> | undefined) {
if (value) {
delete value[EntityEntityRecord.VERSION_FIELD];
if (normalizedKey.partition.attributes.length > 0) {
delete value[normalizedKey.partition.keyAttribute];
}
if (normalizedKey.sort && normalizedKey.sort.attributes.length > 0) {
delete value[normalizedKey.sort.keyAttribute];
}
}
}

Expand All @@ -104,6 +104,7 @@ export default (async (event) => {
newValue: newValue as any,
newVersion,
operation,
// @ts-ignore
oldValue,
oldVersion,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ export function createEntityStreamWorker(
if (result !== false) {
continue;
}
} catch {}
} catch (err) {
console.error(err);
}
// if the handler fails or returns false, return the rest of the items
return itemGroup.slice(Number(i)).map((i) => i.id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class LocalEntityStore extends EntityStore {
item: {
key: convertNormalizedEntityKeyToMap(key),
operation: "remove" as const,
oldValue: item.value,
oldValue: item.value as any,
oldVersion: item.version,
} as EntityStreamItem,
});
Expand Down
7 changes: 4 additions & 3 deletions packages/@eventual/core/src/entity/entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,12 @@ export interface Entity<
): EntityIndexMapper<Name, Attr, Partition, IndexPartition, IndexSort>;
stream<
Name extends string = string,
Operations extends EntityStreamOperation[] = EntityStreamOperation[]
Operations extends EntityStreamOperation[] = EntityStreamOperation[],
IncludeOld extends boolean = false
>(
name: Name,
options: EntityStreamOptions<Attr, Partition, Sort, Operations>,
handler: EntityStreamHandler<Attr, Partition, Sort, Operations>
options: EntityStreamOptions<Attr, Partition, Sort, Operations, IncludeOld>,
handler: EntityStreamHandler<Attr, Partition, Sort, Operations, IncludeOld>
): EntityStream<Name, Attr, Partition, Sort>;
stream<Name extends string = string>(
name: string,
Expand Down
24 changes: 14 additions & 10 deletions packages/@eventual/core/src/entity/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ export interface EntityStreamHandler<
Sort extends EntityCompositeKeyPart<Attr> | undefined =
| EntityCompositeKeyPart<Attr>
| undefined,
Operations extends EntityStreamOperation[] = EntityStreamOperation[]
Operations extends EntityStreamOperation[] = EntityStreamOperation[],
IncludeOld extends boolean = false
> {
/**
* Provides the keys, new value
*/
(
item: EntityStreamItem<Attr, Partition, Sort, Operations>,
item: EntityStreamItem<Attr, Partition, Sort, Operations, IncludeOld>,
context: EntityStreamContext
): Promise<void | false> | void | false;
}
Expand Down Expand Up @@ -76,11 +77,12 @@ export type EntityStreamItem<
Sort extends EntityCompositeKeyPart<Attr> | undefined =
| EntityCompositeKeyPart<Attr>
| undefined,
Operations extends EntityStreamOperation[] = EntityStreamOperation[]
Operations extends EntityStreamOperation[] = EntityStreamOperation[],
IncludeOld extends boolean = false
> = (
| EntityStreamInsertItem<Attr, Partition, Sort>
| EntityStreamModifyItem<Attr, Partition, Sort>
| EntityStreamRemoveItem<Attr, Partition, Sort>
| EntityStreamModifyItem<Attr, Partition, Sort, IncludeOld>
| EntityStreamRemoveItem<Attr, Partition, Sort, IncludeOld>
) & { id: string; operation: Operations[number] };

export interface EntityStreamInsertItem<
Expand All @@ -100,24 +102,26 @@ export interface EntityStreamModifyItem<
Partition extends EntityCompositeKeyPart<Attr> = EntityCompositeKeyPart<Attr>,
Sort extends EntityCompositeKeyPart<Attr> | undefined =
| EntityCompositeKeyPart<Attr>
| undefined
| undefined,
IncludeOld extends boolean = false
> extends EntityStreamItemBase<Attr, Partition, Sort> {
operation: "modify";
newValue: Attr;
newVersion: number;
oldValue?: Attr;
oldVersion?: number;
oldValue: IncludeOld extends true ? Attr : undefined;
oldVersion: number;
}

export interface EntityStreamRemoveItem<
Attr extends Attributes = Attributes,
Partition extends EntityCompositeKeyPart<Attr> = EntityCompositeKeyPart<Attr>,
Sort extends EntityCompositeKeyPart<Attr> | undefined =
| EntityCompositeKeyPart<Attr>
| undefined
| undefined,
IncludeOld extends boolean = false
> extends EntityStreamItemBase<Attr, Partition, Sort> {
operation: "remove";
oldValue?: Attr;
oldValue?: IncludeOld extends true ? Attr : undefined;
oldVersion?: number;
}

Expand Down
5 changes: 3 additions & 2 deletions packages/@eventual/core/src/internal/service-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ export interface EntityStreamOptions<
Sort extends EntityCompositeKeyPart<Attr> | undefined =
| EntityCompositeKeyPart<Attr>
| undefined,
Operations extends EntityStreamOperation[] = EntityStreamOperation[]
Operations extends EntityStreamOperation[] = EntityStreamOperation[],
IncludeOld extends boolean = false
> extends FunctionRuntimeProps {
/**
* A list of operations to be send to the stream.
Expand All @@ -238,7 +239,7 @@ export interface EntityStreamOptions<
/**
* When true, the old value will be sent with the new value.
*/
includeOld?: boolean;
includeOld?: IncludeOld;
/**
* One or more key queries that will be included in the stream.
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/@eventual/core/src/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ export function task<Name extends string, Input = any, Output = any>(
return sendTaskHeartbeat(request.taskToken);
};
func.sourceLocation = sourceLocation;
func.options = opts;
func.kind = "Task";

// @ts-ignore
func.handler = handler;
Expand Down

0 comments on commit e5e94de

Please sign in to comment.