Skip to content

Commit

Permalink
Invocation state proxy for Function Executors and new API for functions
Browse files Browse the repository at this point in the history
We don't want to have Indexify Server API credentials in Function Executors
because it's a security risk if Function Executors run not trusted code.
To achieve that Executor provides a gRPC API to Function Executor to
get/set graph invocation state. Once a request is recieved Executor
finds invocation ID of the requesting Function Executor and does
Indexify Server API call to get/set and invocation state key.

There are a few customer function facing API changes here:

Instead of calling `get_ctx().set_state_key()` and
`get_ctx().get_state_key()` they call
`get_ctx().invocation_state.get()` and
`get_ctx().invocation_state.set()`.

This intoduces a separate API for graph invocation state which is more
clear than adding unbounded number of methods into object returned
by `get_ctx()`.

Another change is that `set(key, value)` now accepts any value
that is serializable using CloudPickle. This is provides a consisten
"no surprises" UX because we're using CloudPickle for function inputs
and outputs. Supporting this required small changes on Indexify Server
side as previous JSON was explicitly used at HTTP protocol and storage
layers. Now it supports arbitrary binary and textual formats for
invocation state values.

Finally `get(key)` now returns Optional[Any] so None is returned
if the key wasn't set yet. This allows the customer code to decide what
to do in this case.

Testing:

make fmt
make test
  • Loading branch information
eabatalov committed Dec 20, 2024
1 parent ab93241 commit 928d9fe
Show file tree
Hide file tree
Showing 30 changed files with 1,356 additions and 236 deletions.
15 changes: 4 additions & 11 deletions python-sdk/indexify/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@
console = Console(theme=custom_theme)

app = typer.Typer(pretty_exceptions_enable=False, no_args_is_help=True)
config_path_option: Optional[str] = typer.Option(
None, help="Path to the TLS configuration file"
)


@app.command(
Expand Down Expand Up @@ -209,7 +206,9 @@ def executor(
dev: Annotated[
bool, typer.Option("--dev", "-d", help="Run the executor in development mode")
] = False,
config_path: Optional[str] = config_path_option,
config_path: Optional[str] = typer.Option(
None, help="Path to the TLS configuration file"
),
executor_cache: Optional[str] = typer.Option(
"~/.indexify/executor_cache", help="Path to the executor cache directory"
),
Expand Down Expand Up @@ -264,27 +263,21 @@ def function_executor(
function_executor_server_address: str = typer.Option(
help="Function Executor server address"
),
indexify_server_address: str = typer.Option(help="Indexify server address"),
dev: Annotated[
bool, typer.Option("--dev", "-d", help="Run the executor in development mode")
] = False,
config_path: Optional[str] = config_path_option,
):
if not dev:
configure_production_logging()

logger.info(
"starting function executor server",
function_executor_server_address=function_executor_server_address,
indexify_server_address=indexify_server_address,
config_path=config_path,
)

FunctionExecutorServer(
server_address=function_executor_server_address,
service=FunctionExecutorService(
indexify_server_address=indexify_server_address, config_path=config_path
),
service=FunctionExecutorService(),
).run()


Expand Down
26 changes: 15 additions & 11 deletions python-sdk/indexify/executor/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def _fetch_url(
self, url: str, resource_description: str, logger: Any
) -> SerializedObject:
logger.info(f"fetching {resource_description}", url=url)
response = await self._client.get(url)
response: httpx.Response = await self._client.get(url)
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
Expand All @@ -153,13 +153,17 @@ async def _fetch_url(
)
raise

# We're hardcoding the content type currently used by Python SDK. It might change in the future.
# There's no other way for now to determine if the response is a bytes or string.
if response.headers["content-type"] == "application/octet-stream":
return SerializedObject(
bytes=response.content, content_type=response.headers["content-type"]
)
else:
return SerializedObject(
string=response.text, content_type=response.headers["content-type"]
)
return serialized_object_from_http_response(response)


def serialized_object_from_http_response(response: httpx.Response) -> SerializedObject:
# We're hardcoding the content type currently used by Python SDK. It might change in the future.
# There's no other way for now to determine if the response is a bytes or string.
if response.headers["content-type"] == "application/octet-stream":
return SerializedObject(
bytes=response.content, content_type=response.headers["content-type"]
)
else:
return SerializedObject(
string=response.text, content_type=response.headers["content-type"]
)
12 changes: 6 additions & 6 deletions python-sdk/indexify/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ def __init__(
self._logger.info("running the extractor with TLS enabled")
protocol = "https"

self._server_addr = server_addr
self._base_url = f"{protocol}://{self._server_addr}"
self._code_path = code_path
self._function_worker = FunctionWorker(
function_executor_factory=ProcessFunctionExecutorFactory(
indexify_server_address=server_addr,
development_mode=development_mode,
config_path=config_path,
)
),
base_url=self._base_url,
config_path=config_path,
)
self._server_addr = server_addr
self._base_url = f"{protocol}://{self._server_addr}"
self._code_path = code_path
self._downloader = Downloader(
code_path=code_path, base_url=self._base_url, config_path=config_path
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import grpc

from indexify.common_util import get_httpx_client
from indexify.function_executor.proto.function_executor_pb2 import (
InitializeRequest,
InitializeResponse,
Expand All @@ -13,6 +14,7 @@

from .function_executor import FunctionExecutor
from .function_executor_factory import FunctionExecutorFactory
from .invocation_state_client import InvocationStateClient


class FunctionExecutorMap:
Expand All @@ -21,10 +23,17 @@ class FunctionExecutorMap:
The map is safe to use by multiple couroutines running in event loop on the same thread
but it's not thread safe (can't be used from different threads concurrently)."""

def __init__(self, factory: FunctionExecutorFactory):
def __init__(
self,
factory: FunctionExecutorFactory,
base_url: str,
config_path: Optional[str],
):
self._factory = factory
self._base_url = base_url
self._config_path = config_path
# Map of initialized Function executors ready to run tasks.
# function ID -> FunctionExecutor
# Function ID -> FunctionExecutor.
self._executors: Dict[str, FunctionExecutor] = {}
# We have to do all operations under this lock because we need to ensure
# that we don't create more Function Executors than required. This is important
Expand Down Expand Up @@ -52,6 +61,7 @@ async def get_or_create(
return self._executors[id]

executor: Optional[FunctionExecutor] = None
invocation_state_client: Optional[InvocationStateClient] = None
try:
executor = await self._factory.create(logger, state=initial_state)
channel: grpc.aio.Channel = await executor.channel()
Expand All @@ -61,7 +71,22 @@ async def get_or_create(
)
if not initialize_response.success:
raise Exception("initialize RPC failed at function executor")
invocation_state_client = InvocationStateClient(
stub=stub,
base_url=self._base_url,
http_client=get_httpx_client(
config_path=self._config_path, make_async=True
),
graph=initialize_request.graph_name,
namespace=initialize_request.namespace,
logger=logger,
)
await invocation_state_client.start()
# This is dirty but requires refactoring to implement properly.
initial_state.invocation_state_client = invocation_state_client
except Exception:
if invocation_state_client is not None:
await invocation_state_client.destroy()
if executor is not None:
await self._factory.destroy(executor=executor, logger=logger)
# Function Executor creation or initialization failed.
Expand All @@ -82,10 +107,14 @@ async def delete(
# Function Executor was already deleted or replaced and the caller is not aware of this.
return
del self._executors[id]
if function_executor.state().invocation_state_client is not None:
await function_executor.state().invocation_state_client.destroy()
await self._factory.destroy(executor=function_executor, logger=logger)

async def clear(self, logger):
async with self._executors_lock:
while self._executors:
id, function_executor = self._executors.popitem()
if function_executor.state().invocation_state_client is not None:
await function_executor.state().invocation_state_client.destroy()
await self._factory.destroy(function_executor, logger)
Loading

0 comments on commit 928d9fe

Please sign in to comment.