Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: trigger reset when system package changes; find dependent #5027

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/_bentoml_impl/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def import_service(
if not depend_path:
svc = root_service
else:
svc = root_service.find_dependent(depend_path)
svc = root_service.find_dependent_by_path(depend_path)
if bento is not None:
svc.on_load_bento(bento)
return svc
Expand Down
4 changes: 2 additions & 2 deletions src/_bentoml_impl/server/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ def serve_http(
allocator = ResourceAllocator()
if dependency_map is None:
dependency_map = {}
if service_name:
svc = svc.find_dependent(service_name)
if service_name and service_name != svc.name:
svc = svc.find_dependent_by_name(service_name)
num_workers, worker_envs = allocator.get_worker_env(svc)
server_on_deployment(svc)
uds_path = tempfile.mkdtemp(prefix="bentoml-uds-")
Expand Down
2 changes: 1 addition & 1 deletion src/_bentoml_impl/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def main(
service = import_service(bento_identifier)

if service_name and service_name != service.name:
service = service.find_dependent(service_name)
service = service.find_dependent_by_name(service_name)
server_context.service_type = "service"
else:
server_context.service_type = "entry_service"
Expand Down
19 changes: 13 additions & 6 deletions src/_bentoml_sdk/service/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,28 @@ def __repr__(self) -> str:
return f"<{self.__class__.__name__} name={self.name!r}>"

@lru_cache
def find_dependent(self, name_or_path: str) -> Service[t.Any]:
"""Find a service by name or path"""
attr_name, _, path = name_or_path.partition(".")
def find_dependent_by_path(self, path: str) -> Service[t.Any]:
"""Find a service by path"""
attr_name, _, path = path.partition(".")
if attr_name not in self.dependencies:
if attr_name in self.all_services():
return self.all_services()[attr_name]
else:
raise ValueError(f"Service {attr_name} not found")
raise BentoMLException(f"Service {attr_name} not found")
dependent = self.dependencies[attr_name]
if dependent.on is None:
raise ValueError(f"Service {attr_name} not found")
raise BentoMLException(f"Service {attr_name} not found")
if path:
return dependent.on.find_dependent(path)
return dependent.on.find_dependent_by_path(path)
return dependent

def find_dependent_by_name(self, name: str) -> Service[t.Any]:
"""Find a service by name"""
try:
return self.all_services()[name]
except KeyError:
raise BentoMLException(f"Service {name} not found") from None

@property
def url(self) -> str | None:
"""Get the URL of the service, or None if the service is not served"""
Expand Down
10 changes: 10 additions & 0 deletions src/bentoml/_internal/cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,16 @@ def update_deployment(
self._check_resp(resp)
return schema_from_object(resp.json(), DeploymentFullSchemaV2)

def void_update_deployment(
self, name: str, cluster: str | None
) -> DeploymentFullSchemaV2:
url = f"/api/v2/deployments/{name}/void_update"
resp = self.session.put(url, params={"cluster": cluster})
if self._is_not_found(resp):
raise NotFound(f"Deployment {name} is not found: {resp.text}")
self._check_resp(resp)
return schema_from_object(resp.json(), DeploymentFullSchemaV2)

def get_deployment(
self,
name: str,
Expand Down
93 changes: 59 additions & 34 deletions src/bentoml/_internal/cloud/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ def list_files(self) -> DeploymentFileListSchema:

def _init_deployment_files(
self, bento_dir: str, spinner: Spinner, timeout: int = 600
) -> str:
) -> tuple[str, str]:
from ..bento.build_config import BentoPathSpec

check_interval = 5
Expand Down Expand Up @@ -770,9 +770,11 @@ def _init_deployment_files(
if requirements_md5 != pod_files.get(REQUIREMENTS_TXT, ""):
upload_files.append((REQUIREMENTS_TXT, requirements_content))
setup_script = _build_setup_script(bento_dir, build_config)
upload_files.append(("setup.sh", setup_script))
setup_md5 = hashlib.md5(setup_script).hexdigest()
if setup_md5 != pod_files.get("setup.sh", ""):
upload_files.append(("setup.sh", setup_script))
self.upload_files(upload_files, console=console)
return requirements_md5
return requirements_md5, setup_md5

def watch(self, bento_dir: str) -> None:
import watchfiles
Expand All @@ -788,7 +790,7 @@ def watch(self, bento_dir: str) -> None:
build_config.include, build_config.exclude, bento_dir
)
requirements_hash: str | None = None

setup_md5: str | None = None
default_filter = watchfiles.filters.DefaultFilter()
is_editable = is_editable_bentoml()
bentoml_project = str(Path(source_locations("bentoml")).parent.parent)
Expand All @@ -806,6 +808,7 @@ def watch_filter(change: watchfiles.Change, path: str) -> bool:
return rel_path in (
"bentofile.yaml",
REQUIREMENTS_TXT,
"setup.sh",
) or bento_spec.includes(rel_path)

console = Console(highlight=False)
Expand Down Expand Up @@ -833,45 +836,50 @@ def is_bento_changed(bento_info: Bento) -> bool:
return True

spinner = Spinner(console=console)
needs_update = is_bento_changed(bento_info)
bento_changed = is_bento_changed(bento_info)
needs_update = False
spinner.log(f"💻 View Dashboard: {self.admin_console}")
endpoint_url: str | None = None
stop_event = Event()
try:
spinner.start()
upload_id = spinner.transmission_progress.add_task(
"Dummy upload task", visible=False
)
while True:
if needs_update:
console.print("✨ [green bold]Bento change detected[/]")
spinner.update("🔄 Pushing Bento to BentoCloud")
bento_api._do_push_bento(bento_info, upload_id, bare=True) # type: ignore
spinner.update("🔄 Updating deployment with new configuration")
update_config = DeploymentConfigParameters(
bento=str(bento_info.tag),
name=self.name,
cluster=self.cluster,
cli=False,
dev=True,
)
update_config.verify()
self = deployment_api.update(update_config)
target = self._refetch_target(False)
requirements_hash = self._init_deployment_files(
bento_dir, spinner=spinner
)
needs_update = False
elif not requirements_hash:
requirements_hash = self._init_deployment_files(
bento_dir, spinner=spinner
)
stop_event.clear()
if needs_update or bento_changed:
if bento_changed:
console.print("✨ [green bold]Bento change detected[/]")
spinner.update("🔄 Pushing Bento to BentoCloud")
bento_api._do_push_bento(bento_info, upload_id, bare=True) # type: ignore
spinner.update("🔄 Updating deployment with new configuration")
update_config = DeploymentConfigParameters(
bento=str(bento_info.tag)
if bento_changed
else self.get_bento(False),
name=self.name,
cluster=self.cluster,
cli=False,
dev=True,
)
update_config.verify()
self = deployment_api.update(update_config)
target = self._refetch_target(False)
else:
spinner.update("🔄 Resetting deployment")
self = deployment_api.void_update(self.name, self.cluster)
needs_update = bento_changed = False
requirements_hash, setup_md5 = self._init_deployment_files(
bento_dir, spinner=spinner
)
if endpoint_url is None:
endpoint_url = self.get_endpoint_urls(False)[0]
spinner.log(f"🌐 Endpoint: {endpoint_url}")
with self._tail_logs(spinner.console):
with self._tail_logs(spinner.console, stop_event):
spinner.update("👀 Watching for changes")
for changes in watchfiles.watch(
*watch_dirs, watch_filter=watch_filter
*watch_dirs, watch_filter=watch_filter, stop_event=stop_event
):
if not is_editable or any(
fs.path.isparent(bento_dir, p) for _, p in changes
Expand All @@ -892,7 +900,7 @@ def is_bento_changed(bento_info: Bento) -> bool:
assert isinstance(bento_info, Bento)
if is_bento_changed(bento_info):
# stop log tail and reset the deployment
needs_update = True
bento_changed = True
break

build_config = get_bento_build_config(bento_dir)
Expand All @@ -917,7 +925,14 @@ def is_bento_changed(bento_info: Bento) -> bool:
upload_files.append((rel_path, open(path, "rb").read()))
else:
delete_files.append(rel_path)

setup_script = _build_setup_script(bento_dir, build_config)
if (
new_hash := hashlib.md5(setup_script).hexdigest()
!= setup_md5
):
setup_md5 = new_hash
needs_update = True
break
requirements_content = _build_requirements_txt(
bento_dir, build_config
)
Expand Down Expand Up @@ -956,12 +971,13 @@ def is_bento_changed(bento_info: Bento) -> bool:
spinner.stop()

@contextlib.contextmanager
def _tail_logs(self, console: Console) -> t.Generator[None, None, None]:
def _tail_logs(
self, console: Console, stop_event: Event
) -> t.Generator[None, None, None]:
import itertools
from collections import defaultdict

pods = self._client.v2.list_deployment_pods(self.name, self.cluster)
stop_event = Event()
workers: list[Thread] = []

colors = itertools.cycle(["cyan", "yellow", "blue", "magenta", "green"])
Expand Down Expand Up @@ -990,6 +1006,11 @@ def pod_log_worker(pod: KubePodSchema, stop_event: Event) -> None:
current = line
break
console.print(f"[{color}]\\[{pod.runner_name}][/] {line}")
if (
"BentoMLDevSupervisor has been shut down" in line
and not stop_event.is_set()
):
stop_event.set()
if current:
console.print(f"[{color}]\\[{pod.runner_name}][/] {current}")

Expand Down Expand Up @@ -1202,6 +1223,10 @@ def update(
logger.debug("Deployment Schema: %s", config_struct)
return self._generate_deployment_info_(res, res.urls)

def void_update(self, name: str, cluster: str | None) -> Deployment:
res = self._client.v2.void_update_deployment(name, cluster)
return self._generate_deployment_info_(res, res.urls)

def apply(
self,
deployment_config_params: DeploymentConfigParameters,
Expand Down
Loading