- Quick Start
- Manual Setup
- Project Structure
- Architecture
- Database Schema
- API Documentation
- CLI Usage
- Additional Information
- Plugin Model
- Testing in Kubernetes with Kind
- Improvement
To quickly get started with the Task Service, use the following command:
make bootstrap
docker-compose -f docker-compose.demo.yaml up
This will start all the necessary services, including the server, database, and worker.
For a more detailed setup process, follow these steps:
Install Pixi and activate the shell:
make bootstrap
# Run Database
docker-compose up -d
Start the server:
make run-server
Access at https://127.0.0.1:8080
Build and test:
make build-cli
./bin/task-cli --help
Install dependencies and build:
npm install
npm run dev
Access at https://127.0.0.1:3000
Start worker instances:
./bin/task-cli serve --log-level debug
task/
├── cmd/
│ ├── cli/ # CLI for task management
│ └── server/ # Server entry point
├── pkg/
│ ├── config/ # Configuration management
│ ├── gen/ # GRPC generated code
│ ├── plugins/ # Plugin model
│ ├── worker/ # Worker code
│ └── x/ # Utility functions
├── idl/
│ └── proto/ # Protocol buffer definitions
├── clients/
│ └── dashboard/ # NextJS Dashboard
├── charts/
│ └── task/ # Helm charts for deployment
├── server/
│ ├── repository/ # Database ORM
│ └── root/ # Server Root
│ └── route/ # All Server Routes
└── docs/ # Documentation files
The Task Service follows a distributed architecture with separate components for the control plane and data plane. Here's a high-level overview of the system:
graph TD
%% Clients
A[Dashboard Client] -->|Sends Request| B(Server)
C[CLI Client] -->|Sends Request| B(Server)
%% Control Plane
subgraph Control Plane
B(Server) -->|Reads/Writes| D[(PostgreSQL Database)]
end
%% Data Plane
subgraph Data Plane
E[Agent] -->|Initiates Connection| B[Server]
B[Server] -->|Publish W| E[Agent]
E -->|Creates CRD| H[CRD]
F[Controller] -->|Watches CRD| H
F -->|Executes Task| J[Task Execution]
F -->|Sends Status Update| B
end
This architecture allows for:
- Separation of concerns between the control plane (server) and data plane (workers)
- Scalability of worker nodes to handle increased workloads
- Asynchronous task execution through message queuing
- Real-time status updates from workers to the server
The server interacts with the database for persistent storage of tasks and their history. Here's a summary of the database operations:
-
Read Operations
- Get Task by ID
- Purpose: Retrieve details of a specific task
- Frequency: On-demand, triggered by API requests
- List All Tasks
- Purpose: Retrieve a list of all tasks
- Frequency: On-demand, typically for dashboard or reporting
- List Task History
- Purpose: Retrieve the status change history of a specific task
- Frequency: On-demand, for detailed task analysis
- Get Task by ID
-
Write Operations
- Create New Task
- Purpose: Store a newly created task
- Frequency: Each time a new task is submitted
- Update Task Status
- Purpose: Modify the status of an existing task
- Frequency: As task states change (e.g., from queued to running to completed)
- Create Task History Entry
- Purpose: Log task status changes and creation events
- Frequency: On task creation and each status change
- Create New Task
The Task Service uses a PostgreSQL database to store task and task history information. Below is an Entity-Relationship Diagram (ERD) representing the database schema:
erDiagram
%% Task Model
TASK {
int id PK
string name
int type
int status
jsonb payload
int retries
int priority
timestamp created_at
}
%% TaskHistory Model
TASK_HISTORY {
int id PK
int task_id FK
int status
string details
timestamp created_at
}
%% Relationships
TASK ||--o{ TASK_HISTORY : has
%% Indexes (described as comments)
%% Indexes for TASK
%% - idx_type_status (type, status)
%% - idx_created_at (created_at)
%% - idx_status_created_at (status, created_at)
%% Indexes for TASK_HISTORY
%% - idx_task_id_created_at (task_id, created_at)
Note: Ideally, we should create separate tables for tasks 📝 and task executions ⚙️. When a task is created, it should be added to the task table. Upon triggering an execution, a corresponding entry should be created in the execution table, and the execution data should be published to the PostgreSQL queue for processing 📬. This way, the task status remains unchanged, and only the execution status is updated in the execution table ✅.
-
TASK
- Stores information about individual tasks
id
: Unique identifier for the task (Primary Key)name
: Name of the tasktype
: Type of the task (e.g., send_email, run_query)status
: Current status of the task (e.g., pending, running, completed)payload
: JSON object containing task-specific parametersretries
: Number of retry attempts for the taskpriority
: Priority level of the taskcreated_at
: Timestamp of task creation
-
TASK_HISTORY
- Tracks the history of status changes for tasks
id
: Unique identifier for the history entry (Primary Key)task_id
: Foreign Key referencing the TASK tablestatus
: Status of the task at the time of the history entrydetails
: Additional details about the status changecreated_at
: Timestamp of the history entry creation
- One TASK can have many TASK_HISTORY entries (one-to-many relationship)
To optimize query performance, the following indexes are implemented:
-
TASK table
idx_type_status
: Composite index ontype
andstatus
columnsidx_created_at
: Index oncreated_at
columnidx_status_created_at
: Composite index onstatus
andcreated_at
columns
-
TASK_HISTORY table
idx_task_id_created_at
: Composite index ontask_id
andcreated_at
columns
These indexes improve the efficiency of common queries such as filtering tasks by type and status, sorting by creation time, and retrieving task history.
The worker process follows a specific flow for task execution and error handling. Here's a detailed view of the worker's operation:
graph TD
A[Receive Message] --> B{Update Status: RUNNING}
B -->|Success| C[Run Task]
B -->|Failure| D[Log Error]
D --> K[Move to Next Message]
C --> E{Task Execution}
E -->|Success| F[Update Status: SUCCEEDED]
E -->|Failure| G[Retry Logic]
G --> H{Retry Attempt <= 3?}
H -->|Yes| I[Backoff]
I --> J[Update Status: RETRYING]
J --> C
H -->|No| K[Update Status: FAILED]
F --> L[Move to Next Message]
K --> L
The Task Service CLI provides several commands to manage tasks. Here's a detailed overview of each command and its available flags:
Create a new task with the specified name, type, and parameters.
task-cli task create [task name] --type [task type] --parameter [key=value]
Flags:
--type
,-t
: Type of the task (e.g., send_email, run_query)--parameter
,-p
: Additional parameters for the task as key=value pairs (can be used multiple times)
Example:
task-cli task create "Send Newsletter" --type send_email --parameter recipient=user@example.com --parameter subject="Weekly Update"
Retrieve and display the details of a specific task by its ID.
task-cli task get --id [task ID] [flags]
Flags:
--id
,-i
: ID of the task (required)--output
,-o
: Output format (table, json, yaml) (default: "table")
Example:
task-cli task get --id 123 --output json
Retrieve and display the history of a specific task by its ID.
task-cli history --id [task ID] [flags]
Flags:
--id
,-i
: ID of the task (required)--output
,-o
: Output format (table, json, yaml) (default: "table")
Example:
task-cli history --id 123 --output yaml
Retrieve and display a list of all tasks.
task-cli task list [flags]
Flags:
--output
,-o
: Output format (table, json, yaml) (default: "table")--pageNumber
,-n
: Page number for pagination (default: 1)--pageCount
,-c
: Number of items per page (default: 30)
Examples:
task-cli task list
task-cli task list --output json
task-cli task list --pageNumber 2 --pageCount 20
Retrieve the status counts of all tasks in the system.
task-cli task status
Aliases: s
, stat
Example:
task-cli task status
task-cli task s
This command will display the count of tasks for each status (e.g., PENDING, RUNNING, SUCCEEDED, FAILED).
Run end-to-end tests against the system to verify its functionality.
task-cli end2end [flags]
Flags:
--num-tasks
,-n
: Number of tasks to create for the test (default: 100, max: 100)
Example:
task-cli end2end
task-cli end2end -n 50
This command will:
- Create the specified number of tasks (default 100)
- Monitor the tasks' completion status for up to 3 minutes
- Display progress every 5 seconds
- Report the final result (success or partial completion)
The test creates a mix of "run_query" and "send_email" task types to simulate a realistic workload.
The following flag is available for all task commands:
--log-level
: Set the logging level (default: "error")--address
: Control Plane Address (default: "http://127.0.0.1:8080")
Example:
task-cli task list --log-level debug
All commands that display task information support three output formats:
table
: Displays the information in a formatted table (default)json
: Outputs the data in JSON formatyaml
: Outputs the data in YAML format
Use the --output
or -o
flag to specify the desired format.
- Control plane (server) manages task creation, scheduling, and status updates
- Data plane (workers) executes tasks (Currently part of same binary)
- RiverQueue used for communication between control and data planes using postgres as queue backend
- Explore the UI or CLI to create and manage tasks
The Task Service uses a plugin-based architecture to allow for extensibility and customization of task execution. This model enables users to create their own task types and implement custom logic for task execution.
-
Plugin Interface: All plugins must implement the
Plugin
interface defined in@pkg/plugins/plugins.go
. This interface requires aRun
method:type Plugin interface { Run(parameters map[string]string) error }
-
Plugin Registration: Plugins are registered in the
NewPlugin
function in@pkg/plugins/plugins.go
. This function acts as a factory, creating the appropriate plugin based on the task type:func NewPlugin(pluginType string) (Plugin, error) { switch pluginType { case email.PLUGIN_NAME: return &email.Email{}, nil case query.PLUGIN_NAME: return &query.Query{}, nil // Add more plugin types here default: return nil, fmt.Errorf("unknown plugin type: %s", pluginType) } }
-
Custom Plugin Implementation: Users can create their own plugins by implementing the
Plugin
interface. For example, theEmail
plugin in@pkg/email/email.go
:var PLUGIN_NAME = "send_email" type Email struct {} func (e *Email) Run(parameters map[string]string) error { // Implementation of email sending logic return nil }
-
Task Execution: When a task is executed, the system uses the
NewPlugin
function to create the appropriate plugin based on the task type. It then calls theRun
method of the plugin, passing any necessary parameters.
To create a new plugin:
- Create a new package in the
@pkg/plugins
directory for your plugin. - Implement the
Plugin
interface in your new package. - Add your plugin to the
NewPlugin
function in@pkg/plugins/plugins.go
.
This modular approach allows for easy extension of the Task Service with new task types and functionalities.
This section guides you through setting up and testing the Task Service in a local Kubernetes cluster using Kind (Kubernetes in Docker) and Helm charts.
- Create a Kind cluster:
kind create cluster --name task-service
- Set kubectl context to the new cluster:
kubectl cluster-info --context kind-task-service
- Add the necessary Helm repositories:
make helm
Install the Task Service Helm chart:
helm install task-service ./charts/task -n task
Check that all pods are running:
kubectl get pods
kubectl port-forward service/task 80 -n task
- Port-forward the Task Service:
kubectl port-forward -n task svc/task 8080:80
-
Access the service at
http://127.0.0.1:8080
-
Use CLI to verify the connection:
./bin/task-cli task l --address http://127.0.0.1:8080
To delete the Kind cluster and all resources:
kind delete cluster --name task-service
This setup allows you to test the entire Task Service stack, including the server, workers, and dependencies, in a local Kubernetes environment. It's an excellent way to validate the Helm charts and ensure everything works together as expected in a Kubernetes setting.