Skip to content

Commit

Permalink
feat: Add support to automatically cleanup on final states, as well a…
Browse files Browse the repository at this point in the history
…s to cleanup all other instances when starting a machine
  • Loading branch information
nklomp committed Mar 9, 2024
1 parent f6baae0 commit 484fc21
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ export default (testContext: { getAgent: () => ConfiguredAgent; setup: () => Pro

it('should automatically store xstate state changes', async (): Promise<void> => {
instance.start()
const init = await machineStatePersistRegistration({ context, interpreter: instance, machineName: instance.machine.id })
const init = await machineStatePersistRegistration({
context,
interpreter: instance,
machineName: instance.machine.id,
cleanupOnFinalState: false,
})
console.log(JSON.stringify(init, null, 2))
if (!init) {
return Promise.reject(new Error('No init'))
Expand Down
39 changes: 28 additions & 11 deletions packages/xstate-persistence/src/agent/MachineStatePersistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,31 +79,43 @@ export class MachineStatePersistence implements IAgentPlugin {
await context.agent.machineStateInit({ ...event.data })
break
case MachineStatePersistEventType.EVERY:
if (event.data.state.done) {
// TODO: Cleanup on done
}
// We are keeping track of the update counter in the events, ensuring we do not process out of order
await context.agent.machineStatePersist({ ...event.data, updatedCount: event.data._eventCounter ?? event.data.updatedCount })
await context.agent.machineStatePersist({
...event.data,
cleanupOnFinalState: event.data.cleanupOnFinalState ?? event.data._cleanupOnFinalState,
updatedCount: event.data._eventCounter ?? event.data.updatedCount,
})
break
default:
return Promise.reject(Error('Event type not supported'))
return Promise.reject(Error(`Event type ${event.type} not supported`))
}
}

private async machineStateInit(args: InitMachineStateArgs): Promise<MachineStateInit> {
const { tenantId, machineName, expiresAt, customInstanceId, existingInstanceId } = args
private async machineStateInit(args: InitMachineStateArgs, context: RequiredContext): Promise<MachineStateInit> {
const { tenantId, machineName, expiresAt, customInstanceId, existingInstanceId, cleanupAllOtherInstances } = args
debug(
`machineStateInit for machine name ${machineName}, tenant ${tenantId}, custom instance ${customInstanceId}, existing id ${existingInstanceId}`
)
let machineInit: MachineStateInit | undefined = undefined
if (customInstanceId && existingInstanceId) {
return Promise.reject(new Error(`Cannot have both a custom and existing instance id at the same time`))
} else if (existingInstanceId) {
}
if (cleanupAllOtherInstances !== false) {
await context.agent.machineStatesDeleteExpired({ machineName, tenantId, deleteDoneStates: true })
await context.agent.machineStatesDeleteExpired({ machineName, tenantId, deleteDoneStates: false })
const activeMachineStates = (await context.agent.machineStatesFindActive({ machineName, tenantId })).filter(
(state) => !existingInstanceId || state.instanceId !== existingInstanceId
)
await Promise.all(activeMachineStates.map((state) => context.agent.machineStateDelete({ instanceId: state.instanceId, tenantId })))
}
let machineInit: MachineStateInit | undefined = undefined

if (existingInstanceId) {
// A existing instanceId is provided. First lookup whether this id is persisted, if not an error is thrown
debug(`machineStateInit is using a previously persisted instance id (${existingInstanceId})`)
const state = await this.store.getMachineState({ tenantId, instanceId: existingInstanceId })
machineInit = storeInfoToMachineInit({ ...state, stateType: 'existing' })
} else if (customInstanceId) {
}
if (customInstanceId) {
// A custom instanceId is provided.
debug(`machineStateInit is using a custom instance id (${customInstanceId})`)
}
Expand All @@ -120,8 +132,9 @@ export class MachineStatePersistence implements IAgentPlugin {
debug(`machineStateInit result: ${JSON.stringify(machineInit)}`)
return machineInit
}
private async machineStatePersist(args: MachineStatePersistArgs): Promise<MachineStateInfo> {
private async machineStatePersist(args: MachineStatePersistArgs, context: RequiredContext): Promise<MachineStateInfo> {
const { instanceId, tenantId, machineName, updatedCount } = args
const cleanupOnFinalState = args.cleanupOnFinalState !== false
debug(`machineStatePersist for machine name ${machineName}, updateCount: ${updatedCount}, instance ${instanceId} and tenant ${tenantId}...`)
const queriedStates = await this.store.findMachineStates({ filter: [{ instanceId, tenantId }] })
const existingState = queriedStates.length === 1 ? queriedStates[0] : undefined
Expand All @@ -131,6 +144,10 @@ export class MachineStatePersistence implements IAgentPlugin {
debug(
`machineStatePersist success for machine name ${machineName}, instance ${instanceId}, update count ${machineStateInfo.updatedCount}, tenant ${tenantId}, last event: ${machineStateInfo.latestEventType}, last state: ${machineStateInfo.latestStateName}`
)
if (cleanupOnFinalState && machineStateInfo.state.done) {
debug(`reached final state for machine ${machineName} instance ${instanceId} and auto cleanup was enabled. Deleting machine state`)
await context.agent.machineStateDelete(machineStateInfo)
}
return machineStateInfo
}

Expand Down
27 changes: 23 additions & 4 deletions packages/xstate-persistence/src/functions/machineRegistration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ export const machineStatePersistOnTransition = async <
interpreter: Interpreter<TContext, TStateSchema, TEvent, TTypestate, TResolvedTypesMeta>
context: IAgentContext<any> // We use any as this method could be called from an agent with access to, but not exposing this plugin
init: MachineStateInit
cleanupOnFinalState?: boolean
}): Promise<void> => {
const { context, init, interpreter } = opts
const { cleanupOnFinalState, context, init, interpreter } = opts
if (!(context.agent.availableMethods().includes('machineStatePersist') && 'machineStatePersist' in context.agent)) {
console.log(`IMachineStatePersistence was not exposed in the current agent. Disabling machine state persistence events`)
return
Expand All @@ -78,6 +79,7 @@ export const machineStatePersistOnTransition = async <
state,
_eventCounter: _eventCounter++,
_eventDate: new Date(),
_cleanupOnFinalState: cleanupOnFinalState !== false,
},
},
context
Expand Down Expand Up @@ -105,6 +107,8 @@ export const machineStatePersistRegistration = async <
args: Omit<InitMachineStateArgs, 'machineName'> &
Partial<Pick<InitMachineStateArgs, 'machineName'>> &
MachineStatePersistenceOpts & {
cleanupOnFinalState?: boolean
cleanupAllOtherInstances?: boolean
interpreter: Interpreter<TContext, TStateSchema, TEvent, TTypestate, TResolvedTypesMeta>
context: IAgentContext<any> // We use any as this method could be called from an agent with access to, but not exposing this plugin
}
Expand Down Expand Up @@ -208,11 +212,12 @@ export const interpreterStartOrResumeFromInit = async <
TResolvedTypesMeta = TypegenDisabled
>(args: {
init: MachineStateInit & { stateType?: MachineStateInitType }
cleanupAllOtherInstances?: boolean
noRegistration?: boolean
interpreter: Interpreter<TContext, TStateSchema, TEvent, TTypestate, TResolvedTypesMeta>
context: IAgentContext<IMachineStatePersistence>
}): Promise<StartedInterpreterInfo<TContext, TStateSchema, TEvent, TTypestate, TResolvedTypesMeta>> => {
const { init, noRegistration, interpreter, context } = args
const { init, noRegistration, interpreter, cleanupAllOtherInstances, context } = args
const { stateType, instanceId, machineName, tenantId, expiresAt } = init
if (init.machineName !== interpreter.id) {
throw new Error(`Machine state init machine name ${init.machineName} does not match name from state machine interpreter ${interpreter.id}`)
Expand All @@ -225,6 +230,7 @@ export const interpreterStartOrResumeFromInit = async <
tenantId,
...(stateType === 'existing' && { existingInstanceId: instanceId }),
...(stateType === 'new' && { customInstanceId: instanceId }),
cleanupAllOtherInstances,
context,
interpreter,
})
Expand Down Expand Up @@ -278,16 +284,27 @@ export const interpreterStartOrResume = async <
tenantId?: string
singletonCheck: boolean
noRegistration?: boolean
cleanupAllOtherInstances?: boolean
interpreter: Interpreter<TContext, TStateSchema, TEvent, TTypestate, TResolvedTypesMeta>
context: IAgentContext<IMachineStatePersistence>
}): Promise<StartedInterpreterInfo<TContext, TStateSchema, TEvent, TTypestate, TResolvedTypesMeta>> => {
const { stateType, singletonCheck, instanceId, tenantId, noRegistration, context, interpreter } = args
const { stateType, singletonCheck, instanceId, tenantId, noRegistration, context, interpreter, cleanupAllOtherInstances } = args
const machineName = args.machineName ?? interpreter.id
const activeStates = await context.agent.machineStatesFindActive({
let activeStates = await context.agent.machineStatesFindActive({
machineName,
tenantId,
instanceId,
})
if (activeStates.length > 0 && cleanupAllOtherInstances) {
// We cleanup here to not influence the logic below. Normally the agent machineStateInit method does the cleanup
await Promise.all(activeStates.map((state) => context.agent.machineStateDelete({ tenantId: args.tenantId, instanceId: state.instanceId })))
// We search again, given the delete is using the passed in tenantId, instead of relying on the persisted tenantId. Should not matter, but just making sure
activeStates = await context.agent.machineStatesFindActive({
machineName,
tenantId,
instanceId,
})
}
if (singletonCheck && activeStates.length > 0) {
if (stateType === 'new' || activeStates.every((state) => state.instanceId !== instanceId)) {
return Promise.reject(new Error(`Found ${activeStates.length} active '${machineName}' instances, but only one is allows at the same time`))
Expand All @@ -305,6 +322,7 @@ export const interpreterStartOrResume = async <
customInstanceId: instanceId,
machineName: machineName ?? interpreter.id,
tenantId,
cleanupAllOtherInstances,
})
return await interpreterStartOrResumeFromInit({ init, noRegistration, interpreter, context })
}
Expand All @@ -317,6 +335,7 @@ export const interpreterStartOrResume = async <
customInstanceId: instanceId,
machineName: machineName ?? interpreter.id,
tenantId,
cleanupAllOtherInstances,
})
return await interpreterStartOrResumeFromInit({ init, noRegistration, interpreter, context })
}
Expand Down
6 changes: 3 additions & 3 deletions packages/xstate-persistence/src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export type DeleteStateResult = number
*/
export type MachineStatePersistEvent = {
type: MachineStatePersistEventType
data: MachineStatePersistArgs & { _eventCounter: number; _eventDate: Date }
data: MachineStatePersistArgs & { _eventCounter: number; _eventDate: Date; _cleanupOnFinalState: boolean }
}

/**
Expand Down Expand Up @@ -120,14 +120,14 @@ export type MachineStateInit = Pick<MachineStateInfo, 'instanceId' | 'machineNam
*/
export type InitMachineStateArgs = Omit<Partial<MachineStateInit>, 'instanceId'> &
Pick<MachineStateInfo, 'machineName'> &
Pick<MachineStatePersistenceOpts, 'customInstanceId' | 'existingInstanceId'>
Pick<MachineStatePersistenceOpts, 'customInstanceId' | 'existingInstanceId'> & { cleanupAllOtherInstances?: boolean }

/**
* Represents the arguments required to persist the machine state.
*/
export type MachineStatePersistArgs = Omit<MachineStateInit, 'createdAt'> &
Pick<MachineStateInfo, 'state' | 'instanceId'> &
Partial<Pick<MachineStateInfo, 'updatedCount'>>
Partial<Pick<MachineStateInfo, 'updatedCount'>> & { cleanupOnFinalState?: boolean }

/**
* Represents the arguments required to get machine state.
Expand Down

0 comments on commit 484fc21

Please sign in to comment.