Skip to content

Commit

Permalink
fix: trigger reset when system package changes; find dependent (#5027)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostming authored Oct 16, 2024
1 parent e2c573c commit d8b6ef8
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/_bentoml_impl/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def import_service(
if not depend_path:
svc = root_service
else:
svc = root_service.find_dependent(depend_path)
svc = root_service.find_dependent_by_path(depend_path)
if bento is not None:
svc.on_load_bento(bento)
return svc
Expand Down
4 changes: 2 additions & 2 deletions src/_bentoml_impl/server/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ def serve_http(
allocator = ResourceAllocator()
if dependency_map is None:
dependency_map = {}
if service_name:
svc = svc.find_dependent(service_name)
if service_name and service_name != svc.name:
svc = svc.find_dependent_by_name(service_name)
num_workers, worker_envs = allocator.get_worker_env(svc)
server_on_deployment(svc)
uds_path = tempfile.mkdtemp(prefix="bentoml-uds-")
Expand Down
2 changes: 1 addition & 1 deletion src/_bentoml_impl/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def main(
service = import_service(bento_identifier)

if service_name and service_name != service.name:
service = service.find_dependent(service_name)
service = service.find_dependent_by_name(service_name)
server_context.service_type = "service"
else:
server_context.service_type = "entry_service"
Expand Down
19 changes: 13 additions & 6 deletions src/_bentoml_sdk/service/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,28 @@ def __repr__(self) -> str:
return f"<{self.__class__.__name__} name={self.name!r}>"

@lru_cache
def find_dependent(self, name_or_path: str) -> Service[t.Any]:
"""Find a service by name or path"""
attr_name, _, path = name_or_path.partition(".")
def find_dependent_by_path(self, path: str) -> Service[t.Any]:
"""Find a service by path"""
attr_name, _, path = path.partition(".")
if attr_name not in self.dependencies:
if attr_name in self.all_services():
return self.all_services()[attr_name]
else:
raise ValueError(f"Service {attr_name} not found")
raise BentoMLException(f"Service {attr_name} not found")
dependent = self.dependencies[attr_name]
if dependent.on is None:
raise ValueError(f"Service {attr_name} not found")
raise BentoMLException(f"Service {attr_name} not found")
if path:
return dependent.on.find_dependent(path)
return dependent.on.find_dependent_by_path(path)
return dependent

def find_dependent_by_name(self, name: str) -> Service[t.Any]:
"""Find a service by name"""
try:
return self.all_services()[name]
except KeyError:
raise BentoMLException(f"Service {name} not found") from None

@property
def url(self) -> str | None:
"""Get the URL of the service, or None if the service is not served"""
Expand Down
10 changes: 10 additions & 0 deletions src/bentoml/_internal/cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,16 @@ def update_deployment(
self._check_resp(resp)
return schema_from_object(resp.json(), DeploymentFullSchemaV2)

def void_update_deployment(
self, name: str, cluster: str | None
) -> DeploymentFullSchemaV2:
url = f"/api/v2/deployments/{name}/void_update"
resp = self.session.put(url, params={"cluster": cluster})
if self._is_not_found(resp):
raise NotFound(f"Deployment {name} is not found: {resp.text}")
self._check_resp(resp)
return schema_from_object(resp.json(), DeploymentFullSchemaV2)

def get_deployment(
self,
name: str,
Expand Down
93 changes: 59 additions & 34 deletions src/bentoml/_internal/cloud/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ def list_files(self) -> DeploymentFileListSchema:

def _init_deployment_files(
self, bento_dir: str, spinner: Spinner, timeout: int = 600
) -> str:
) -> tuple[str, str]:
from ..bento.build_config import BentoPathSpec

check_interval = 5
Expand Down Expand Up @@ -770,9 +770,11 @@ def _init_deployment_files(
if requirements_md5 != pod_files.get(REQUIREMENTS_TXT, ""):
upload_files.append((REQUIREMENTS_TXT, requirements_content))
setup_script = _build_setup_script(bento_dir, build_config)
upload_files.append(("setup.sh", setup_script))
setup_md5 = hashlib.md5(setup_script).hexdigest()
if setup_md5 != pod_files.get("setup.sh", ""):
upload_files.append(("setup.sh", setup_script))
self.upload_files(upload_files, console=console)
return requirements_md5
return requirements_md5, setup_md5

def watch(self, bento_dir: str) -> None:
import watchfiles
Expand All @@ -788,7 +790,7 @@ def watch(self, bento_dir: str) -> None:
build_config.include, build_config.exclude, bento_dir
)
requirements_hash: str | None = None

setup_md5: str | None = None
default_filter = watchfiles.filters.DefaultFilter()
is_editable = is_editable_bentoml()
bentoml_project = str(Path(source_locations("bentoml")).parent.parent)
Expand All @@ -806,6 +808,7 @@ def watch_filter(change: watchfiles.Change, path: str) -> bool:
return rel_path in (
"bentofile.yaml",
REQUIREMENTS_TXT,
"setup.sh",
) or bento_spec.includes(rel_path)

console = Console(highlight=False)
Expand Down Expand Up @@ -833,45 +836,50 @@ def is_bento_changed(bento_info: Bento) -> bool:
return True

spinner = Spinner(console=console)
needs_update = is_bento_changed(bento_info)
bento_changed = is_bento_changed(bento_info)
needs_update = False
spinner.log(f"💻 View Dashboard: {self.admin_console}")
endpoint_url: str | None = None
stop_event = Event()
try:
spinner.start()
upload_id = spinner.transmission_progress.add_task(
"Dummy upload task", visible=False
)
while True:
if needs_update:
console.print("✨ [green bold]Bento change detected[/]")
spinner.update("🔄 Pushing Bento to BentoCloud")
bento_api._do_push_bento(bento_info, upload_id, bare=True) # type: ignore
spinner.update("🔄 Updating deployment with new configuration")
update_config = DeploymentConfigParameters(
bento=str(bento_info.tag),
name=self.name,
cluster=self.cluster,
cli=False,
dev=True,
)
update_config.verify()
self = deployment_api.update(update_config)
target = self._refetch_target(False)
requirements_hash = self._init_deployment_files(
bento_dir, spinner=spinner
)
needs_update = False
elif not requirements_hash:
requirements_hash = self._init_deployment_files(
bento_dir, spinner=spinner
)
stop_event.clear()
if needs_update or bento_changed:
if bento_changed:
console.print("✨ [green bold]Bento change detected[/]")
spinner.update("🔄 Pushing Bento to BentoCloud")
bento_api._do_push_bento(bento_info, upload_id, bare=True) # type: ignore
spinner.update("🔄 Updating deployment with new configuration")
update_config = DeploymentConfigParameters(
bento=str(bento_info.tag)
if bento_changed
else self.get_bento(False),
name=self.name,
cluster=self.cluster,
cli=False,
dev=True,
)
update_config.verify()
self = deployment_api.update(update_config)
target = self._refetch_target(False)
else:
spinner.update("🔄 Resetting deployment")
self = deployment_api.void_update(self.name, self.cluster)
needs_update = bento_changed = False
requirements_hash, setup_md5 = self._init_deployment_files(
bento_dir, spinner=spinner
)
if endpoint_url is None:
endpoint_url = self.get_endpoint_urls(True)[0]
spinner.log(f"🌐 Endpoint: {endpoint_url}")
with self._tail_logs(spinner.console):
with self._tail_logs(spinner.console, stop_event):
spinner.update("👀 Watching for changes")
for changes in watchfiles.watch(
*watch_dirs, watch_filter=watch_filter
*watch_dirs, watch_filter=watch_filter, stop_event=stop_event
):
if not is_editable or any(
fs.path.isparent(bento_dir, p) for _, p in changes
Expand All @@ -892,7 +900,7 @@ def is_bento_changed(bento_info: Bento) -> bool:
assert isinstance(bento_info, Bento)
if is_bento_changed(bento_info):
# stop log tail and reset the deployment
needs_update = True
bento_changed = True
break

build_config = get_bento_build_config(bento_dir)
Expand All @@ -917,7 +925,14 @@ def is_bento_changed(bento_info: Bento) -> bool:
upload_files.append((rel_path, open(path, "rb").read()))
else:
delete_files.append(rel_path)

setup_script = _build_setup_script(bento_dir, build_config)
if (
new_hash := hashlib.md5(setup_script).hexdigest()
!= setup_md5
):
setup_md5 = new_hash
needs_update = True
break
requirements_content = _build_requirements_txt(
bento_dir, build_config
)
Expand Down Expand Up @@ -956,12 +971,13 @@ def is_bento_changed(bento_info: Bento) -> bool:
spinner.stop()

@contextlib.contextmanager
def _tail_logs(self, console: Console) -> t.Generator[None, None, None]:
def _tail_logs(
self, console: Console, stop_event: Event
) -> t.Generator[None, None, None]:
import itertools
from collections import defaultdict

pods = self._client.v2.list_deployment_pods(self.name, self.cluster)
stop_event = Event()
workers: list[Thread] = []

colors = itertools.cycle(["cyan", "yellow", "blue", "magenta", "green"])
Expand Down Expand Up @@ -990,6 +1006,11 @@ def pod_log_worker(pod: KubePodSchema, stop_event: Event) -> None:
current = line
break
console.print(f"[{color}]\\[{pod.runner_name}][/] {line}")
if (
"BentoMLDevSupervisor has been shut down" in line
and not stop_event.is_set()
):
stop_event.set()
if current:
console.print(f"[{color}]\\[{pod.runner_name}][/] {current}")

Expand Down Expand Up @@ -1202,6 +1223,10 @@ def update(
logger.debug("Deployment Schema: %s", config_struct)
return self._generate_deployment_info_(res, res.urls)

def void_update(self, name: str, cluster: str | None) -> Deployment:
res = self._client.v2.void_update_deployment(name, cluster)
return self._generate_deployment_info_(res, res.urls)

def apply(
self,
deployment_config_params: DeploymentConfigParameters,
Expand Down

0 comments on commit d8b6ef8

Please sign in to comment.