Skip to content

Commit

Permalink
update to output api (#981)
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu authored Oct 26, 2024
1 parent 0bad781 commit f1873a1
Show file tree
Hide file tree
Showing 13 changed files with 50 additions and 27 deletions.
11 changes: 0 additions & 11 deletions dockerfiles/Dockerfile.ui

This file was deleted.

5 changes: 5 additions & 0 deletions python-sdk/indexify/error.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
class ApiException(Exception):
def __init__(self, message: str) -> None:
super().__init__(message)


class GraphStillProcessing(Exception):
def __init__(self) -> None:
super().__init__("graph is still processing")
6 changes: 5 additions & 1 deletion python-sdk/indexify/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pydantic import BaseModel, Json
from rich import print

from indexify.error import ApiException
from indexify.error import ApiException, GraphStillProcessing
from indexify.functions_sdk.data_objects import IndexifyData
from indexify.functions_sdk.graph import ComputeGraphMetadata, Graph
from indexify.functions_sdk.indexify_functions import IndexifyFunction
Expand All @@ -36,7 +36,9 @@ class GraphOutputMetadata(BaseModel):


class GraphOutputs(BaseModel):
status: str
outputs: List[GraphOutputMetadata]
cursor: Optional[str] = None


class IndexifyClient:
Expand Down Expand Up @@ -329,6 +331,8 @@ def graph_outputs(
)
response.raise_for_status()
graph_outputs = GraphOutputs(**response.json())
if graph_outputs.status == "pending":
raise GraphStillProcessing()
outputs = []
for output in graph_outputs.outputs:
if output.compute_fn == fn_name:
Expand Down
2 changes: 2 additions & 0 deletions server/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target
indexify_storage
1 change: 0 additions & 1 deletion server/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ build-container: ## Build container
docker image prune -f

build-container-dev: ## Build container for local development
docker build -f dockerfiles/Dockerfile.builder --tag ${DOCKER_USERNAME}/builder .
docker build -f dockerfiles/Dockerfile.local --tag ${DOCKER_USERNAME}/${APPLICATION_NAME} .

push-container: ## Push container to docker hub
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ WORKDIR /indexify

COPY --from=rust-builder /app/target/release/indexify-server ./

COPY sample_config.yaml ./config/indexify.yaml

ENV PATH="/indexify:${PATH}"

ENTRYPOINT [ "/indexify/indexify-server" ]
1 change: 1 addition & 0 deletions server/src/http_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ impl From<data_model::NodeOutput> for FnOutput {

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct FnOutputs {
pub status: String,
pub outputs: Vec<FnOutput>,
pub cursor: Option<Vec<u8>>,
}
Expand Down
19 changes: 18 additions & 1 deletion server/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,19 @@ async fn list_outputs(
Query(params): Query<ListParams>,
State(state): State<RouteState>,
) -> Result<Json<FnOutputs>, IndexifyAPIError> {
let invocation_ctx = state
.indexify_state
.reader()
.invocation_ctx(&namespace, &compute_graph, &invocation_id)
.map_err(IndexifyAPIError::internal_error)?
.ok_or(IndexifyAPIError::not_found("Compute Graph not found"))?;
if !invocation_ctx.completed {
return Ok(Json(FnOutputs {
status: "pending".to_string(),
outputs: vec![],
cursor: None,
}));
}
let (outputs, cursor) = state
.indexify_state
.reader()
Expand All @@ -744,7 +757,11 @@ async fn list_outputs(
)
.map_err(IndexifyAPIError::internal_error)?;
let outputs = outputs.into_iter().map(Into::into).collect();
Ok(Json(FnOutputs { outputs, cursor }))
Ok(Json(FnOutputs {
status: "finalized".to_string(),
outputs,
cursor,
}))
}

/// Delete a specific invocation
Expand Down
2 changes: 1 addition & 1 deletion server/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ mod tests {
)
.unwrap();

assert!(invocation_ctx.completed);
assert!(invocation_ctx.unwrap().completed);

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/system_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ mod tests {
state
.reader()
.invocation_ctx(&graph.namespace, &graph.name, &invocation_payload.id)?;
assert_eq!(graph_ctx.outstanding_tasks, 0);
assert_eq!(graph_ctx.unwrap().outstanding_tasks, 0);

let request = RequestPayload::RerunComputeGraph(RerunComputeGraphRequest {
namespace: graph.namespace.clone(),
Expand Down
10 changes: 6 additions & 4 deletions server/state_store/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,16 +587,18 @@ impl StateReader {
namespace: &str,
compute_graph: &str,
invocation_id: &str,
) -> Result<GraphInvocationCtx> {
) -> Result<Option<GraphInvocationCtx>> {
let key = GraphInvocationCtx::key_from(namespace, compute_graph, invocation_id);
let value = self.db.get_cf(
&IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&self.db),
&key,
)?;
match value {
Some(value) => Ok(JsonEncoder::decode(&value)?),
None => Err(anyhow!("invocation ctx not found")),
if let None = value {
return Ok(None);
}
let invocation_ctx: GraphInvocationCtx = JsonEncoder::decode(&value.unwrap())
.map_err(|e| anyhow!("unable to decode invocation ctx: {}", e))?;
Ok(Some(invocation_ctx))
}

pub fn task_analytics(
Expand Down
16 changes: 11 additions & 5 deletions server/task_scheduler/src/task_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,17 @@ pub async fn handle_task_finished(
task: Task,
compute_graph: ComputeGraph,
) -> Result<TaskCreationResult> {
let invocation_ctx = indexify_state.reader().invocation_ctx(
&task.namespace,
&task.compute_graph_name,
&task.invocation_id,
)?;
let invocation_ctx = indexify_state
.reader()
.invocation_ctx(
&task.namespace,
&task.compute_graph_name,
&task.invocation_id,
)?
.ok_or(anyhow!(
"invocation context not found for invocation_id {}",
task.invocation_id
))?;

if task.outcome == TaskOutcome::Failure {
let mut invocation_finished = false;
Expand Down

0 comments on commit f1873a1

Please sign in to comment.