Skip to content

Commit

Permalink
fix: find_dependent by name or path
Browse files Browse the repository at this point in the history
Signed-off-by: Frost Ming <me@frostming.com>
  • Loading branch information
frostming committed Oct 16, 2024
1 parent cee4059 commit 7cb9ebc
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/_bentoml_impl/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def import_service(
if not depend_path:
svc = root_service
else:
svc = root_service.find_dependent(depend_path)
svc = root_service.find_dependent_by_path(depend_path)
if bento is not None:
svc.on_load_bento(bento)
return svc
Expand Down
4 changes: 2 additions & 2 deletions src/_bentoml_impl/server/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ def serve_http(
allocator = ResourceAllocator()
if dependency_map is None:
dependency_map = {}
if service_name:
svc = svc.find_dependent(service_name)
if service_name and service_name != svc.name:
svc = svc.find_dependent_by_name(service_name)
num_workers, worker_envs = allocator.get_worker_env(svc)
server_on_deployment(svc)
uds_path = tempfile.mkdtemp(prefix="bentoml-uds-")
Expand Down
2 changes: 1 addition & 1 deletion src/_bentoml_impl/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def main(
service = import_service(bento_identifier)

if service_name and service_name != service.name:
service = service.find_dependent(service_name)
service = service.find_dependent_by_name(service_name)
server_context.service_type = "service"
else:
server_context.service_type = "entry_service"
Expand Down
19 changes: 13 additions & 6 deletions src/_bentoml_sdk/service/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,28 @@ def __repr__(self) -> str:
return f"<{self.__class__.__name__} name={self.name!r}>"

@lru_cache
def find_dependent(self, name_or_path: str) -> Service[t.Any]:
"""Find a service by name or path"""
attr_name, _, path = name_or_path.partition(".")
def find_dependent_by_path(self, path: str) -> Service[t.Any]:
"""Find a service by path"""
attr_name, _, path = path.partition(".")
if attr_name not in self.dependencies:
if attr_name in self.all_services():
return self.all_services()[attr_name]
else:
raise ValueError(f"Service {attr_name} not found")
raise BentoMLException(f"Service {attr_name} not found")
dependent = self.dependencies[attr_name]
if dependent.on is None:
raise ValueError(f"Service {attr_name} not found")
raise BentoMLException(f"Service {attr_name} not found")
if path:
return dependent.on.find_dependent(path)
return dependent.on.find_dependent_by_path(path)
return dependent

def find_dependent_by_name(self, name: str) -> Service[t.Any]:
"""Find a service by name"""
try:
return self.all_services()[name]
except KeyError:
raise BentoMLException(f"Service {name} not found") from None

@property
def url(self) -> str | None:
"""Get the URL of the service, or None if the service is not served"""
Expand Down
10 changes: 10 additions & 0 deletions src/bentoml/_internal/cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,16 @@ def update_deployment(
self._check_resp(resp)
return schema_from_object(resp.json(), DeploymentFullSchemaV2)

def void_update_deployment(
self, name: str, cluster: str | None
) -> DeploymentFullSchemaV2:
url = f"/api/v2/deployments/{name}/void_update"
resp = self.session.put(url, params={"cluster": cluster})
if self._is_not_found(resp):
raise NotFound(f"Deployment {name} is not found: {resp.text}")
self._check_resp(resp)
return schema_from_object(resp.json(), DeploymentFullSchemaV2)

def get_deployment(
self,
name: str,
Expand Down
57 changes: 35 additions & 22 deletions src/bentoml/_internal/cloud/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,43 +840,46 @@ def is_bento_changed(bento_info: Bento) -> bool:
needs_update = False
spinner.log(f"💻 View Dashboard: {self.admin_console}")
endpoint_url: str | None = None
stop_event = Event()
try:
spinner.start()
upload_id = spinner.transmission_progress.add_task(
"Dummy upload task", visible=False
)
while True:
stop_event.clear()
if needs_update or bento_changed:
if bento_changed:
console.print("✨ [green bold]Bento change detected[/]")
spinner.update("🔄 Pushing Bento to BentoCloud")
bento_api._do_push_bento(bento_info, upload_id, bare=True) # type: ignore
spinner.update("🔄 Updating deployment with new configuration")
update_config = DeploymentConfigParameters(
bento=str(bento_info.tag),
name=self.name,
cluster=self.cluster,
cli=False,
dev=True,
)
update_config.verify()
self = deployment_api.update(update_config)
target = self._refetch_target(False)
requirements_hash, setup_md5 = self._init_deployment_files(
bento_dir, spinner=spinner
)
spinner.update("🔄 Updating deployment with new configuration")
update_config = DeploymentConfigParameters(
bento=str(bento_info.tag)
if bento_changed
else self.get_bento(False),
name=self.name,
cluster=self.cluster,
cli=False,
dev=True,
)
update_config.verify()
self = deployment_api.update(update_config)
target = self._refetch_target(False)
else:
spinner.update("🔄 Resetting deployment")
self = deployment_api.void_update(self.name, self.cluster)
needs_update = bento_changed = False
elif not requirements_hash or not setup_md5:
requirements_hash, setup_md5 = self._init_deployment_files(
bento_dir, spinner=spinner
)
requirements_hash, setup_md5 = self._init_deployment_files(
bento_dir, spinner=spinner
)
if endpoint_url is None:
endpoint_url = self.get_endpoint_urls(False)[0]
spinner.log(f"🌐 Endpoint: {endpoint_url}")
with self._tail_logs(spinner.console):
with self._tail_logs(spinner.console, stop_event):
spinner.update("👀 Watching for changes")
for changes in watchfiles.watch(
*watch_dirs, watch_filter=watch_filter
*watch_dirs, watch_filter=watch_filter, stop_event=stop_event
):
if not is_editable or any(
fs.path.isparent(bento_dir, p) for _, p in changes
Expand Down Expand Up @@ -968,12 +971,13 @@ def is_bento_changed(bento_info: Bento) -> bool:
spinner.stop()

@contextlib.contextmanager
def _tail_logs(self, console: Console) -> t.Generator[None, None, None]:
def _tail_logs(
self, console: Console, stop_event: Event
) -> t.Generator[None, None, None]:
import itertools
from collections import defaultdict

pods = self._client.v2.list_deployment_pods(self.name, self.cluster)
stop_event = Event()
workers: list[Thread] = []

colors = itertools.cycle(["cyan", "yellow", "blue", "magenta", "green"])
Expand Down Expand Up @@ -1002,6 +1006,11 @@ def pod_log_worker(pod: KubePodSchema, stop_event: Event) -> None:
current = line
break
console.print(f"[{color}]\\[{pod.runner_name}][/] {line}")
if (
"BentoMLDevSupervisor has been shut down" in line
and not stop_event.is_set()
):
stop_event.set()
if current:
console.print(f"[{color}]\\[{pod.runner_name}][/] {current}")

Expand Down Expand Up @@ -1214,6 +1223,10 @@ def update(
logger.debug("Deployment Schema: %s", config_struct)
return self._generate_deployment_info_(res, res.urls)

def void_update(self, name: str, cluster: str | None) -> Deployment:
res = self._client.v2.void_update_deployment(name, cluster)
return self._generate_deployment_info_(res, res.urls)

def apply(
self,
deployment_config_params: DeploymentConfigParameters,
Expand Down

0 comments on commit 7cb9ebc

Please sign in to comment.