Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflow): add pause and resume functionality to team workflow #180

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

anthonydevs17
Copy link
Collaborator

No description provided.

- Added a 'Stop Workflow' button in AgentsBoardDebugger component.
- Introduced a stop method in the Team class to halt the workflow and update the state.
- Updated ReactChampionAgent to handle the STOPPED status during workflow execution.
- Enhanced workflowController to clear the task queue when the workflow is stopped.
… functionalities

- Added handlePauseResume and handleStop methods in AgentWithToolPreviewer for managing team workflow states.
- Introduced new buttons for pause/resume and stop actions in the UI, with appropriate disabling logic based on workflow status.
- Updated Team class to handle stopping workflows more robustly, including abort handling in ReactChampionAgent.
- Enhanced error handling for task abortion and logging for better debugging and user feedback.
- Introduced AbortError class to manage operation interruptions effectively.
- Adjusted the table formatting in README.md for the tools package to enhance clarity and consistency.
- Ensured proper alignment of tool descriptions and documentation links for improved user experience.
@@ -157,6 +157,47 @@ class Team {
this.store.getState().addTasks(tasks);
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must follow the Flux architecture as much as possible because:

  1. KaibanJS is built on the idea that Stores are the brain, while Agents and Tasks are the actors
  2. When state management is scattered across components, debugging becomes a nightmare
  3. Having a single source of truth (Stores) makes the system predictable and reliable
  4. As we add more features, centralized state management will keep the codebase maintainable

Current Issues:

  1. Direct State Management
  • The Team class is directly managing workflow state through store.setState
  • State transition logic is mixed with the Team class responsibilities
  • Violates Flux pattern where stores should manage state
  1. Validation Logic Location
  • State transition validations are in the Team class
  • These validations are part of the workflow state machine and should be in the store
  1. Error Handling
  • Error messages are hardcoded in the Team class
  • Error handling strategy should be consistent and managed by stores

Suggested Implementation:

class Team {
  /**
   * Pauses the team's workflow.
   * @returns {void}
   */
  pause() {
    this.store.getState().pauseWorkflow();
  }

  /**
   * Resumes the team's workflow.
   * @returns {void}
   */
  resume() {
    this.store.getState().resumeWorkflow();
  }

  /**
   * Stops the team's workflow.
   * @returns {void}
   */
  stop() {
    this.store.getState().stopWorkflow();
  }
}

And in the workflow store:

// In workflowStore.js
const workflowStore = {
  teamWorkflowStatus: WORKFLOW_STATUS_enum.INITIAL,

  // State transition validations
  allowedTransitions: {
    [WORKFLOW_STATUS_enum.RUNNING]: [WORKFLOW_STATUS_enum.PAUSED, WORKFLOW_STATUS_enum.STOPPING],
    [WORKFLOW_STATUS_enum.PAUSED]: [WORKFLOW_STATUS_enum.RUNNING, WORKFLOW_STATUS_enum.STOPPING],
    [WORKFLOW_STATUS_enum.STOPPING]: [WORKFLOW_STATUS_enum.STOPPED],
  },

  // Actions
  pauseWorkflow: () => {
    const currentStatus = get().teamWorkflowStatus;
    get().validateTransition(currentStatus, WORKFLOW_STATUS_enum.PAUSED);
    set({ teamWorkflowStatus: WORKFLOW_STATUS_enum.PAUSED });
  },

  resumeWorkflow: () => {
    const currentStatus = get().teamWorkflowStatus;
    get().validateTransition(currentStatus, WORKFLOW_STATUS_enum.RUNNING);
    set({ teamWorkflowStatus: WORKFLOW_STATUS_enum.RUNNING });
  },

  stopWorkflow: () => {
    const currentStatus = get().teamWorkflowStatus;
    get().validateTransition(currentStatus, WORKFLOW_STATUS_enum.STOPPING);
    set({ teamWorkflowStatus: WORKFLOW_STATUS_enum.STOPPING });
  },

  // Validation helper
  validateTransition: (from, to) => {
    const allowedNext = get().allowedTransitions[from] || [];
    if (!allowedNext.includes(to)) {
      throw new WorkflowError(
        'INVALID_TRANSITION',
        `Cannot transition from ${from} to ${to}`
      );
    }
  }
}

Benefits of this Approach:

  1. Clear Separation of Concerns

    • Team class only dispatches actions
    • Store manages all state transitions and validations
    • Workflow logic is centralized
  2. Better State Management

    • All state transitions are tracked in one place
    • Explicit state machine through allowedTransitions
    • Consistent error handling
  3. More Maintainable

    • Easier to add new workflow states
    • Clearer validation rules
    • Better testability
  4. Follows Flux Pattern

    • Unidirectional data flow
    • State changes only happen in stores
    • Components dispatch actions

Would you like me to elaborate on any part of this suggestion?

@@ -161,6 +164,24 @@ class ReactChampionAgent extends BaseAgent {
iterations < maxAgentIterations &&
!loopCriticalError
) {
while (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FEEDBACK: Agent Loop Control

What the code is trying to do:

  1. Pause the agent's thinking process when workflow status is PAUSED
  2. Stop the agent's thinking process when workflow status is STOPPED
  3. Return the current results when stopped
  4. Wait and check periodically when paused

Issues with current approach:

  1. Using a polling loop (while + setTimeout) is inefficient
  2. Agent is directly managing workflow control
  3. Multiple redundant store checks in the loop
  4. Blocking execution with a busy-wait pattern

The intention is correct (controlling agent execution based on workflow state), but the implementation should leverage Flux patterns and store subscriptions instead of direct polling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another Note... I think this could be a direct approach....

The main caveat here is to store the intermmidiate Agentic Loop States, and be able to recover it once needed. In a pause or resume.

class ReactChampionAgent {
constructor(config) {
// ... other constructor logic ...

// Subscribe to workflow status changes
this.store.subscribe(
  (state) => state.teamWorkflowStatus,
  async (status) => {
    switch (status) {
      case WORKFLOW_STATUS_enum.PAUSED:
        await this.pauseLoop();
        break;
      case WORKFLOW_STATUS_enum.RUNNING:
        await this.resumeLoop();
        break;
      case WORKFLOW_STATUS_enum.STOPPED:
        // Handle stop if needed
        break;
    }
  }
);

}

#loopState = {
currentIteration: 0,
lastThinkingResult: null,
isPaused: false
};

async pauseLoop() {
this.#loopState.isPaused = true;
}

async resumeLoop() {
this.#loopState.isPaused = false;
await this.executeAgentLoop(this.currentTask, this.#loopState);
}
}.

@@ -485,14 +513,36 @@ class ReactChampionAgent extends BaseAgent {
}

async executeThinking(agent, task, ExecutableAgent, feedbackMessage) {
const abortController =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LEt's try to do this control handler in an outer level... to try to follow the Flux principles...

The main thing is having a list of promises that we can kill anytime we needed in a directly way. Instead to wire each part to an external workflow controller.

Example

1. Promise Tracking

class ReactChampionAgent {
  #thinkingPromises = new Set();
  #toolPromises = new Set();

  async executeAgentLoop(task) {
    while (iterations < maxAgentIterations && !loopCriticalError) {
      try {
        // 1. Thinking Phase
        const thinkingPromise = new Promise((resolve, reject) => {
          this.#executableAgent.invoke(
            { feedbackMessage },
            {
              configurable: { sessionId: 'foo-bar-baz' },
              callbacks: [
                {
                  handleChatModelStart: async (llm, messages) => {
                    await agent.handleThinkingStart({ agent, task, messages });
                  },
                  handleLLMEnd: async (output) => {
                    const result = await agent.handleThinkingEnd({ agent, task, output });
                    resolve(result);
                  }
                }
              ]
            }
          ).catch(error => reject(error));
        });

        // Track thinking promise
        this.#thinkingPromises.add({ promise: thinkingPromise, reject });
        const thinkingResult = await thinkingPromise;
        this.#thinkingPromises.delete({ promise: thinkingPromise, reject });

        // 2. Tool Phase (if needed)
        if (thinkingResult.action) {
          const toolPromise = new Promise((resolve, reject) => {
            const tool = this.getToolByName(thinkingResult.action);
            tool.call(thinkingResult.actionInput)
              .then(result => resolve(result))
              .catch(error => reject(error));
          });

          // Track tool promise
          this.#toolPromises.add({ promise: toolPromise, reject });
          const toolResult = await toolPromise;
          this.#toolPromises.delete({ promise: toolPromise, reject });
        }

      } catch (error) {
        if (error instanceof AbortError) {
          // Clean exit if we're stopping/pausing
          break;
        }
        // Handle other errors
      }
    }
  }

  // Abort handler remains the same
  abortAllPromises() {
    for (const { reject } of this.#thinkingPromises) {
      reject(new AbortError('Workflow stopped or paused'));
    }
    this.#thinkingPromises.clear();

    for (const { reject } of this.#toolPromises) {
      reject(new AbortError('Workflow stopped or paused'));
    }
    this.#toolPromises.clear();
  }

  constructor(config) {
    super(config);
    this.store.subscribe(
      (state) => state.teamWorkflowStatus,
      (status) => {
        if (status === WORKFLOW_STATUS_enum.STOPPED || 
            status === WORKFLOW_STATUS_enum.PAUSED) {
          this.abortAllPromises();
        }
      }
    );
  }
}

Implementation Flow

  1. Track all promises (thinking and tools)
  2. Subscribe to workflow status changes
  3. When paused/stopped, abort all tracked promises
  4. Promises auto-cleanup when completed
  5. AbortError triggers clean exit from agentic loop

…esume, and stop methods

- Refactored Team class methods (pause, resume, stop) to utilize new workflow management functions directly from the store, improving code clarity and reducing redundancy.
- Updated ReactChampionAgent to track the last feedback message and handle task execution more effectively, including abort handling.
- Introduced new error classes (StopAbortError, PauseAbortError) for better error management during workflow interruptions.
- Enhanced task logging for aborted tasks, capturing relevant statistics and error details for improved debugging.
- Integrated workflow action enums to standardize workflow control actions across the application.
…edbackMessage fields

- Added `currentIterations` and `lastFeedbackMessage` fields to various agent instances across multiple test snapshots, enhancing the state tracking of agents during workflows.
- Ensured consistency in the agent configurations for improved testing accuracy and reliability.
- Updated snapshots for `customLLMs`, `llmProxy`, `outputSchemaTeam`, `productSpecTeam`, `resumeCreationTeam`, `sportNewsTeam`, and `tripPlanningTeam` to reflect these changes.
- Cleaned up the README.md by removing an unnecessary newline before the Compatibility section, improving the overall formatting and readability of the document.
});
const tasks = get().tasks;

tasks.forEach((task) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Blocked tasks are being reset here to the Doing state?

…LMEnd method

- Eliminated the unnecessary check for PAUSED workflow status in the handleLLMEnd method of ReactChampionAgent, streamlining the task handling process.
- This change enhances the agent's responsiveness during task execution by allowing it to proceed without being blocked by the workflow status.
…status management

- Introduced a new `handleAgentTaskPaused` method to manage agent status when tasks are paused, improving workflow control during interruptions.
- Updated `handleTaskAborted` and `handleTaskPaused` methods to log task status changes and relevant error details, enhancing debugging capabilities.
- Refactored task status management to include a new PAUSED state for both agents and tasks, allowing for better tracking of workflow interruptions.
- Enhanced error handling in the workflow loop to ensure proper task management during pauses, improving overall system reliability.
- Updated taskSubscriber.js to include the PAUSED state in the subscribeTaskStatusUpdates function, enhancing task status management.
- This change aligns with recent enhancements to workflow control, allowing for better tracking of tasks during interruptions.
- Integrated ChatMessageHistory into agent instances for improved interaction tracking.
- Updated agent status management to reset interactions history and feedback upon workflow reset.
- Added functionality to start the task queue if paused, ensuring smoother workflow execution.
- Removed redundant clearAgentLoopState method from workflow loop store, streamlining state management.
- Improved overall task handling and agent responsiveness during workflow interruptions.
…arious workflows

- Enhanced snapshot files for `llmProxy`, `outputSchemaTeam`, `productSpecTeam`, `resumeCreationTeam`, `sportNewsTeam`, and `tripPlanningTeam` to include comprehensive `lastFeedbackMessage` fields.
- Updated messages to reflect specific tasks and expected outputs, improving clarity and context for each workflow scenario.
- Ensured consistency in agent configurations across multiple test snapshots, enhancing the accuracy of state tracking during workflows.
…management

- Introduced workOnTaskResume method in Agent and BaseAgent classes to handle task resumption.
- Implemented workOnTaskResume in ReactChampionAgent to manage task execution with last feedback context.
- Updated teamStore to support task resumption in the workflow controller, enhancing task handling during workflow interruptions.
- Improved overall agent state management and task queue handling for better responsiveness and control.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants