Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rnd): Split Execution Manager #8008

Merged
merged 23 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,3 @@ rnd/autogpt_builder/.env.example
rnd/autogpt_builder/.env.local
rnd/autogpt_server/.env
rnd/autogpt_server/.venv/

1 change: 1 addition & 0 deletions rnd/autogpt_server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ REDIS_PASSWORD=password

AUTH_ENABLED=false
APP_ENV="local"
PYRO_HOST=localhost
SENTRY_DSN=

## ===== OPTIONAL API KEYS ===== ##
Expand Down
9 changes: 7 additions & 2 deletions rnd/autogpt_server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ FROM python:3.11-slim-buster as server_base
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1


WORKDIR /app

RUN apt-get update \
Expand All @@ -17,7 +16,6 @@ RUN apt-get update \
&& make prefix=/usr all \
&& make prefix=/usr install


ENV POETRY_VERSION=1.8.3 \
POETRY_HOME="/opt/poetry" \
POETRY_NO_INTERACTION=1 \
Expand All @@ -44,3 +42,10 @@ ENV PORT=8000
ENV DATABASE_URL=""

CMD ["poetry", "run", "rest"]

FROM server_base as executor

ENV PORT=8002
ENV DATABASE_URL=""

CMD ["poetry", "run", "executor"]
7 changes: 7 additions & 0 deletions rnd/autogpt_server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ A communication layer (`service.py`) is created to decouple the communication li

Currently, the IPC is done using Pyro5 and abstracted in a way that allows a function decorated with `@expose` to be called from a different process.


By default the daemons run on the following ports:

Execution Manager Daemon: 8002
Execution Scheduler Daemon: 8003
Rest Server Daemon: 8004

Comment on lines +187 to +192
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this to docs/server too

## Adding a New Agent Block

To add a new agent block, you need to create a new class that inherits from `Block` and provides the following information:
Expand Down
2 changes: 0 additions & 2 deletions rnd/autogpt_server/autogpt_server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ def main(**kwargs):

from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer, WebsocketServer
from autogpt_server.util.service import PyroNameServer

run_processes(
PyroNameServer(),
aarushik93 marked this conversation as resolved.
Show resolved Hide resolved
ExecutionManager(),
ExecutionScheduler(),
WebsocketServer(),
Expand Down
15 changes: 15 additions & 0 deletions rnd/autogpt_server/autogpt_server/exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from autogpt_server.app import run_processes
from autogpt_server.executor import ExecutionManager


def main():
"""
Run all the processes required for the AutoGPT-server REST API.
"""
run_processes(
ExecutionManager(),
)


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion rnd/autogpt_server/autogpt_server/executor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def validate_exec(
def get_agent_server_client() -> "AgentServer":
from autogpt_server.server.rest_api import AgentServer

return get_service_client(AgentServer)
return get_service_client(AgentServer, Config().agent_server_port)


class Executor:
Expand Down Expand Up @@ -648,6 +648,7 @@ def callback(_):

class ExecutionManager(AppService):
def __init__(self):
super().__init__(port=Config().execution_manager_port)
self.use_db = True
self.pool_size = Config().num_graph_workers
self.queue = ExecutionQueue[GraphExecution]()
Expand Down
5 changes: 4 additions & 1 deletion rnd/autogpt_server/autogpt_server/executor/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from autogpt_server.data.block import BlockInput
from autogpt_server.executor.manager import ExecutionManager
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Config

logger = logging.getLogger(__name__)

Expand All @@ -19,13 +20,15 @@ def log(msg, **kwargs):

class ExecutionScheduler(AppService):
def __init__(self, refresh_interval=10):
super().__init__(port=Config().execution_scheduler_port)
self.use_db = True
self.last_check = datetime.min
self.refresh_interval = refresh_interval
self.use_redis = False

@property
def execution_manager_client(self) -> ExecutionManager:
return get_service_client(ExecutionManager)
return get_service_client(ExecutionManager, Config().execution_manager_port)

def run_service(self):
scheduler = BackgroundScheduler()
Expand Down
5 changes: 1 addition & 4 deletions rnd/autogpt_server/autogpt_server/rest.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
from autogpt_server.app import run_processes
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.executor import ExecutionScheduler
from autogpt_server.server import AgentServer
from autogpt_server.util.service import PyroNameServer


def main():
"""
Run all the processes required for the AutoGPT-server REST API.
"""
run_processes(
PyroNameServer(),
ExecutionManager(),
ExecutionScheduler(),
AgentServer(),
)
Expand Down
7 changes: 4 additions & 3 deletions rnd/autogpt_server/autogpt_server/server/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from autogpt_server.server.model import CreateGraph, SetGraphActiveVersion
from autogpt_server.util.lock import KeyedMutex
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Settings
from autogpt_server.util.settings import Config, Settings

from .utils import get_user_id

Expand All @@ -34,6 +34,7 @@ class AgentServer(AppService):
_test_dependency_overrides = {}

def __init__(self, event_queue: AsyncEventQueue | None = None):
super().__init__(port=Config().agent_server_port)
self.event_queue = event_queue or AsyncRedisEventQueue()

@asynccontextmanager
Expand Down Expand Up @@ -239,11 +240,11 @@ async def wrapper(*args, **kwargs):

@property
def execution_manager_client(self) -> ExecutionManager:
return get_service_client(ExecutionManager)
return get_service_client(ExecutionManager, Config().execution_manager_port)

@property
def execution_scheduler_client(self) -> ExecutionScheduler:
return get_service_client(ExecutionScheduler)
return get_service_client(ExecutionScheduler, Config().execution_scheduler_port)

@classmethod
def handle_internal_http_error(cls, request: Request, exc: Exception):
Expand Down
40 changes: 17 additions & 23 deletions rnd/autogpt_server/autogpt_server/util/service.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
import logging
import os
import threading
import time
from abc import abstractmethod
from typing import Any, Callable, Coroutine, Type, TypeVar, cast

import Pyro5.api
from Pyro5 import api as pyro
from Pyro5 import nameserver

from autogpt_server.data import db
from autogpt_server.data.queue import AsyncEventQueue, AsyncRedisEventQueue
Expand Down Expand Up @@ -42,25 +43,16 @@ def wrapper(*args, **kwargs):
return pyro.expose(wrapper) # type: ignore


class PyroNameServer(AppProcess):
def run(self):
nameserver.start_ns_loop(host=pyro_host, port=9090)

@conn_retry
def _wait_for_ns(self):
pyro.locate_ns(host="localhost", port=9090)

def health_check(self):
self._wait_for_ns()
logger.info(f"{__class__.__name__} is ready")


class AppService(AppProcess):
shared_event_loop: asyncio.AbstractEventLoop
event_queue: AsyncEventQueue = AsyncRedisEventQueue()
use_db: bool = False
use_redis: bool = False

def __init__(self, port):
self.port = port
self.uri = None

@classmethod
@property
def service_name(cls) -> str:
Expand Down Expand Up @@ -108,11 +100,10 @@ def cleanup(self):

@conn_retry
def __start_pyro(self):
daemon = pyro.Daemon(host=pyro_host)
ns = pyro.locate_ns(host=pyro_host, port=9090)
uri = daemon.register(self)
ns.register(self.service_name, uri)
logger.info(f"[{self.service_name}] Connected to Pyro; URI = {uri}")
host = Config().pyro_host
daemon = Pyro5.api.Daemon(host=host, port=self.port)
self.uri = daemon.register(self, objectId=self.service_name)
aarushik93 marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"[{self.service_name}] Connected to Pyro; URI = {self.uri}")
daemon.requestLoop()

def __start_async_loop(self):
Expand All @@ -122,16 +113,19 @@ def __start_async_loop(self):
AS = TypeVar("AS", bound=AppService)


def get_service_client(service_type: Type[AS]) -> AS:
def get_service_client(service_type: Type[AS], port: int) -> AS:
service_name = service_type.service_name

class DynamicClient:
@conn_retry
def __init__(self):
ns = pyro.locate_ns()
uri = ns.lookup(service_name)
self.proxy = pyro.Proxy(uri)
host = os.environ.get(f"{service_name.upper()}_HOST", "localhost")
uri = f"PYRO:{service_type.service_name}@{host}:{port}"
logger.debug(f"Connecting to service [{service_name}]. URI = {uri}")
self.proxy = Pyro5.api.Proxy(uri)
# Attempt to bind to ensure the connection is established
self.proxy._pyroBind()
logger.debug(f"Successfully connected to service [{service_name}]")

def __getattr__(self, name: str) -> Callable[..., Any]:
return getattr(self.proxy, name)
Expand Down
15 changes: 15 additions & 0 deletions rnd/autogpt_server/autogpt_server/util/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
extra="allow",
)

execution_manager_port: int = Field(
default=8002,
description="The port for execution manager daemon to run on",
)

execution_scheduler_port: int = Field(
default=8003,
description="The port for execution scheduler daemon to run on",
)

agent_server_port: int = Field(
default=8004,
description="The port for agent server daemon to run on",
)

@classmethod
def settings_customise_sources(
cls,
Expand Down
4 changes: 0 additions & 4 deletions rnd/autogpt_server/autogpt_server/util/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer
from autogpt_server.server.rest_api import get_user_id
from autogpt_server.util.service import PyroNameServer

log = print

Expand Down Expand Up @@ -48,7 +47,6 @@ async def get(self):

class SpinTestServer:
def __init__(self):
self.name_server = PyroNameServer()
self.exec_manager = ExecutionManager()
self.in_memory_queue = InMemoryAsyncEventQueue()
self.agent_server = AgentServer(event_queue=self.in_memory_queue)
Expand All @@ -59,7 +57,6 @@ def test_get_user_id():
return "3e53486c-cf57-477e-ba2a-cb02dc828e1a"

async def __aenter__(self):
self.name_server.__enter__()
self.setup_dependency_overrides()
self.agent_server.__enter__()
self.exec_manager.__enter__()
Expand All @@ -76,7 +73,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
self.scheduler.__exit__(exc_type, exc_val, exc_tb)
self.exec_manager.__exit__(exc_type, exc_val, exc_tb)
self.agent_server.__exit__(exc_type, exc_val, exc_tb)
self.name_server.__exit__(exc_type, exc_val, exc_tb)

def setup_dependency_overrides(self):
# Override get_user_id for testing
Expand Down
1 change: 1 addition & 0 deletions rnd/autogpt_server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ build-backend = "poetry.core.masonry.api"
app = "autogpt_server.app:main"
rest = "autogpt_server.rest:main"
ws = "autogpt_server.ws:main"
executor = "autogpt_server.exec:main"
cli = "autogpt_server.cli:main"
format = "linter:format"
lint = "linter:lint"
Expand Down
5 changes: 4 additions & 1 deletion rnd/autogpt_server/test/executor/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from autogpt_server.executor import ExecutionScheduler
from autogpt_server.usecases.sample import create_test_graph, create_test_user
from autogpt_server.util.service import get_service_client
from autogpt_server.util.settings import Config
from autogpt_server.util.test import SpinTestServer


Expand All @@ -13,7 +14,9 @@ async def test_agent_schedule(server: SpinTestServer):
test_user = await create_test_user()
test_graph = await graph.create_graph(create_test_graph(), user_id=test_user.id)

scheduler = get_service_client(ExecutionScheduler)
scheduler = get_service_client(
ExecutionScheduler, Config().execution_scheduler_port
)

schedules = scheduler.get_execution_schedules(test_graph.id, test_user.id)
assert len(schedules) == 0
Expand Down
3 changes: 2 additions & 1 deletion rnd/autogpt_server/test/util/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

class TestService(AppService):
def __init__(self):
super().__init__(port=8005)
self.use_redis = False

def run_service(self):
Expand All @@ -29,7 +30,7 @@ async def add_async(a: int, b: int) -> int:
@pytest.mark.asyncio(scope="session")
async def test_service_creation(server):
with TestService():
client = get_service_client(TestService)
client = get_service_client(TestService, 8005)
aarushik93 marked this conversation as resolved.
Show resolved Hide resolved
assert client.add(5, 3) == 8
assert client.subtract(10, 4) == 6
assert client.fun_with_async(5, 3) == 8
Loading
Loading