Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Lightning App with remote storage #17426

Merged
merged 30 commits into from
Apr 23, 2023
Merged
24 changes: 4 additions & 20 deletions .azure/app-cloud-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,57 +67,42 @@ jobs:
'App: v0_app':
name: "v0_app"
dir: "public"
queue_type: "redis"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ethanwharris why do we drop redis?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this won't be supported going forward.

'App: boring_app':
name: "boring_app"
dir: "public"
queue_type: "redis"
'App: boring_app / HTTP':
name: "boring_app"
dir: "public"
queue_type: "http"
'App: template_streamlit_ui':
name: "template_streamlit_ui"
dir: "public"
queue_type: "redis"
# TODO: RESOLVE ME ASAP
# 'App: template_streamlit_ui':
# name: "template_streamlit_ui"
# dir: "public"
'App: template_react_ui':
name: "template_react_ui"
dir: "public"
queue_type: "redis"
# 'App: template_jupyterlab': # TODO: clarify where these files lives
# name: "template_jupyterlab"
'App: installation_commands_app':
name: "installation_commands_app"
dir: "public"
queue_type: "redis"
'App: drive':
name: "drive"
dir: "public"
queue_type: "redis"
'App: payload':
name: "payload"
dir: "public"
queue_type: "redis"
'App: commands_and_api':
name: "commands_and_api"
dir: "public"
queue_type: "redis"
#'App: quick_start': # todo: consider adding back when fixed
# name: "quick_start"
# dir: "public"
# queue_type: "redis"
'App: idle_timeout':
name: "idle_timeout"
dir: "local"
queue_type: "redis"
'App: collect_failures':
name: "collect_failures"
dir: "local"
queue_type: "redis"
'App: custom_work_dependencies':
name: "custom_work_dependencies"
dir: "local"
queue_type: "redis"
timeoutInMinutes: "15"
cancelTimeoutInMinutes: "1"
# values: https://docs.microsoft.com/en-us/azure/devops/pipelines/process/phases?view=azure-devops&tabs=yaml#workspace
Expand All @@ -135,7 +120,6 @@ jobs:
HAR_LOCATION: './artifacts/hars'
SLOW_MO: '50'
LIGHTNING_DEBUG: '1'
LIGHTNING_CLOUD_QUEUE_TYPE: $(queue_type)
steps:

- script: echo '##vso[task.setvariable variable=local_id]$(System.PullRequest.PullRequestNumber)'
Expand Down
1 change: 1 addition & 0 deletions examples/app/commands_and_api/.lightningignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
venv/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is automatically generated when the app runs. Better to have it in the repo.

17 changes: 4 additions & 13 deletions src/lightning/app/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@


def get_lightning_cloud_url() -> str:
# detect local development
if os.getenv("VSCODE_PROXY_URI", "").startswith("http://localhost:9800"):
return "http://localhost:9800"
tchaton marked this conversation as resolved.
Show resolved Hide resolved
# DO NOT CHANGE!
return os.getenv("LIGHTNING_CLOUD_URL", "https://lightning.ai")

Expand Down Expand Up @@ -115,17 +118,5 @@ def enable_interruptible_works() -> bool:
return bool(int(os.getenv("LIGHTNING_INTERRUPTIBLE_WORKS", "0")))


# Get Cluster Driver
_CLUSTER_DRIVERS = [None, "k8s", "direct"]


def get_cluster_driver() -> Optional[str]:
value = os.getenv("LIGHTNING_CLUSTER_DRIVER", None)
if value is None:
if enable_interruptible_works():
value = "direct"
else:
value = None
if value not in _CLUSTER_DRIVERS:
raise ValueError(f"Found {value} cluster driver. The value needs to be in {_CLUSTER_DRIVERS}.")
return value
return "direct"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why unset LIGHTNING_CLUSTER_DRIVER is not enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used and we can drop it entirely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For another PR once the fire is being us.

24 changes: 19 additions & 5 deletions src/lightning/app/core/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,19 +388,30 @@ def get(self, timeout: Optional[float] = None) -> Any:
if timeout is None:
while True:
try:
return self._get()
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use contextmanager.suppress(...)

return self._get()
except requests.exceptions.HTTPError:
pass
except queue.Empty:
time.sleep(HTTP_QUEUE_REFRESH_INTERVAL)

# make one request and return the result
if timeout == 0:
return self._get()
try:
return self._get()
except requests.exceptions.HTTPError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you pls add a note about why this error happens and it is fine to ignore it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can happen if the queue isn't responsive. This would lead for the app to die entirely. This adds some sort of fault tolerancy during CP re-deployment.

return None

# timeout is some value - loop until the timeout is reached
start_time = time.time()
while (time.time() - start_time) < timeout:
try:
return self._get()
try:
return self._get()
except requests.exceptions.HTTPError:
if timeout > self.default_timeout:
return None
raise queue.Empty
except queue.Empty:
# Note: In theory, there isn't a need for a sleep as the queue shouldn't
# block the flow if the queue is empty.
Expand Down Expand Up @@ -441,8 +452,11 @@ def length(self) -> int:
if not self.app_id:
raise ValueError(f"App ID couldn't be extracted from the queue name: {self.name}")

val = self.client.get(f"/v1/{self.app_id}/{self._name_suffix}/length")
return int(val.text)
try:
val = self.client.get(f"/v1/{self.app_id}/{self._name_suffix}/length")
return int(val.text)
except requests.exceptions.HTTPError:
return 0

@staticmethod
def _split_app_id_and_queue_name(queue_name: str) -> Tuple[str, str]:
Expand Down
7 changes: 0 additions & 7 deletions src/lightning/app/runners/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
ENABLE_PULLING_STATE_ENDPOINT,
ENABLE_PUSHING_STATE_ENDPOINT,
get_cloud_queue_type,
get_cluster_driver,
get_lightning_cloud_url,
LIGHTNING_CLOUD_PRINT_SPECS,
SYS_CUSTOMIZATIONS_SYNC_ROOT,
Expand Down Expand Up @@ -874,12 +873,6 @@ def _get_env_vars(
if not ENABLE_PUSHING_STATE_ENDPOINT:
v1_env_vars.append(V1EnvVar(name="ENABLE_PUSHING_STATE_ENDPOINT", value="0"))

if get_cloud_queue_type():
v1_env_vars.append(V1EnvVar(name="LIGHTNING_CLOUD_QUEUE_TYPE", value=get_cloud_queue_type()))

if get_cluster_driver():
v1_env_vars.append(V1EnvVar(name="LIGHTNING_CLUSTER_DRIVER", value=get_cluster_driver()))

if enable_interruptible_works():
v1_env_vars.append(
V1EnvVar(
Expand Down
16 changes: 6 additions & 10 deletions src/lightning/app/storage/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,16 +427,12 @@ def _filesystem() -> AbstractFileSystem:
endpoint_url = os.getenv("LIGHTNING_BUCKET_ENDPOINT_URL", "")
bucket_name = os.getenv("LIGHTNING_BUCKET_NAME", "")
if endpoint_url != "" and bucket_name != "":
key = os.getenv("LIGHTNING_AWS_ACCESS_KEY_ID", "")
secret = os.getenv("LIGHTNING_AWS_SECRET_ACCESS_KEY", "")
# TODO: Remove when updated on the platform side.
if key == "" or secret == "":
key = os.getenv("AWS_ACCESS_KEY_ID", "")
secret = os.getenv("AWS_SECRET_ACCESS_KEY", "")
if key == "" or secret == "":
raise RuntimeError("missing S3 bucket credentials")

fs = S3FileSystem(key=key, secret=secret, use_ssl=False, client_kwargs={"endpoint_url": endpoint_url})
# FIXME: Temporary fix until we remove the injection from the platform
if "AWS_ACCESS_KEY_ID" in os.environ:
del os.environ["AWS_ACCESS_KEY_ID"]
del os.environ["AWS_SECRET_ACCESS_KEY"]

fs = S3FileSystem()
ethanwharris marked this conversation as resolved.
Show resolved Hide resolved

app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", "")
if app_id == "":
Expand Down
23 changes: 22 additions & 1 deletion src/lightning/app/testing/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ def run_app_in_cloud(
[constants.LIGHTNING_CLOUD_PROJECT_ID],
)

admin_page.reload()

view_page = context.new_page()
i = 1
while True:
Expand All @@ -385,10 +387,10 @@ def run_app_in_cloud(

# wait until the app is running and openapi.json is ready
if app.status.phase == V1LightningappInstanceState.RUNNING:
view_page.goto(f"{app.status.url}/view")
status_code = requests.get(f"{app.status.url}/openapi.json").status_code
if status_code == 200:
print("App is running, continuing with testing...")
view_page.goto(f"{app.status.url}/view")
break
msg = f"Received status code {status_code} at {app.status.url!r}"
elif app.status.phase not in (V1LightningappInstanceState.PENDING, V1LightningappInstanceState.NOT_STARTED):
Expand Down Expand Up @@ -478,6 +480,19 @@ def _delete_lightning_app(client, project_id, app_id, app_name):
print(f"Failed to delete {app_name}. Exception {ex}")


def _delete_cloud_space(client, project_id, cloud_space_id, app_name):
"""Used to delete the parent cloudspace."""
print(f"Deleting {app_name} id: {cloud_space_id}")
try:
res = client.cloud_space_service_delete_cloud_space(
project_id=project_id,
id=cloud_space_id,
)
assert res == {}
except ApiException as ex:
print(f"Failed to delete {app_name}. Exception {ex}")


def delete_cloud_lightning_apps():
"""Cleanup cloud apps that start with the name test-{PR_NUMBER}-{TEST_APP_NAME}.

Expand All @@ -502,10 +517,16 @@ def delete_cloud_lightning_apps():
if pr_number and app_name and not lit_app.name.startswith(f"test-{pr_number}-{app_name}-"):
continue
_delete_lightning_app(client, project_id=project_id, app_id=lit_app.id, app_name=lit_app.name)
_delete_cloud_space(
client, project_id=project_id, cloud_space_id=lit_app.spec.cloud_space_id, app_name=lit_app.name
)

print("deleting apps that were created more than 1 hour ago.")

for lit_app in list_apps.lightningapps:

if lit_app.created_at < datetime.datetime.now(lit_app.created_at.tzinfo) - datetime.timedelta(hours=1):
_delete_lightning_app(client, project_id=project_id, app_id=lit_app.id, app_name=lit_app.name)
_delete_cloud_space(
client, project_id=project_id, cloud_space_id=lit_app.spec.cloud_space_id, app_name=lit_app.name
)
17 changes: 10 additions & 7 deletions src/lightning/app/utilities/app_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@

@dataclass
class _LogEventLabels:
app: str
container: str
filename: str
job: str
namespace: str
node_name: str
pod: str
app: Optional[str] = None
container: Optional[str] = None
filename: Optional[str] = None
job: Optional[str] = None
namespace: Optional[str] = None
node_name: Optional[str] = None
pod: Optional[str] = None
clusterID: Optional[str] = None
component: Optional[str] = None
projectID: Optional[str] = None
stream: Optional[str] = None


Expand Down
7 changes: 6 additions & 1 deletion src/lightning/app/utilities/packaging/cloud_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ def __post_init__(self) -> None:
if "gpu" not in self.name:
raise ValueError("CloudCompute `interruptible=True` is supported only with GPU.")

# FIXME: Clean the mess on the platform side
if self.name == "default" or self.name == "cpu":
self.name = "cpu-small"
self._internal_id = "default"

# TODO: Remove from the platform first.
self.preemptible = self.interruptible

Expand Down Expand Up @@ -147,7 +152,7 @@ def id(self) -> Optional[str]:
return self._internal_id

def is_default(self) -> bool:
return self.name == "default"
return self.name in ("default", "cpu-small")

def _generate_id(self):
return "default" if self.name == "default" else uuid4().hex[:7]
Expand Down
1 change: 1 addition & 0 deletions tests/integrations_app/public/test_v0_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def check_content(button_name, text_content):
has_logs = False
while not has_logs:
for log in fetch_logs(["flow"]):
print(log)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch !

if "'a': 'a', 'b': 'b'" in log:
has_logs = True
sleep(1)
Expand Down
18 changes: 1 addition & 17 deletions tests/tests_app/core/test_constants.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,9 @@
import os
from unittest import mock

import pytest

from lightning.app.core.constants import get_cluster_driver, get_lightning_cloud_url
from lightning.app.core.constants import get_lightning_cloud_url


@mock.patch.dict(os.environ, {"LIGHTNING_CLOUD_URL": "https://beta.lightning.ai"})
def test_defaults():
assert get_lightning_cloud_url() == "https://beta.lightning.ai"


def test_cluster_drive(monkeypatch):
tchaton marked this conversation as resolved.
Show resolved Hide resolved
assert get_cluster_driver() is None

monkeypatch.setenv("LIGHTNING_INTERRUPTIBLE_WORKS", "1")
assert get_cluster_driver() == "direct"

monkeypatch.setenv("LIGHTNING_CLUSTER_DRIVER", "k8s")
assert get_cluster_driver() == "k8s"

with pytest.raises(ValueError, match="The value needs to be in"):
monkeypatch.setenv("LIGHTNING_CLUSTER_DRIVER", "something_else")
assert get_cluster_driver() == "k8s"
5 changes: 2 additions & 3 deletions tests/tests_app/core/test_lightning_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,8 +987,8 @@ def run(self):
def test_state_size_constant_growth():
app = LightningApp(SizeFlow())
MultiProcessRuntime(app, start_server=False).dispatch()
assert app.root._state_sizes[0] <= 7952
assert app.root._state_sizes[20] <= 26500
assert app.root._state_sizes[0] <= 7965
assert app.root._state_sizes[20] <= 26550


class FlowUpdated(LightningFlow):
Expand Down Expand Up @@ -1108,7 +1108,6 @@ def __init__(self, flow):


def test_cloud_compute_binding():

cloud_compute.ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER = True

assert cloud_compute._CLOUD_COMPUTE_STORE == {}
Expand Down
8 changes: 4 additions & 4 deletions tests/tests_app/core/test_lightning_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def run(self):
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "default",
"name": "cpu-small",
"disk_size": 0,
"idle_timeout": None,
"mounts": None,
Expand All @@ -358,7 +358,7 @@ def run(self):
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "default",
"name": "cpu-small",
"disk_size": 0,
"idle_timeout": None,
"mounts": None,
Expand Down Expand Up @@ -399,7 +399,7 @@ def run(self):
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "default",
"name": "cpu-small",
"disk_size": 0,
"idle_timeout": None,
"mounts": None,
Expand All @@ -424,7 +424,7 @@ def run(self):
"_display_name": "",
"_cloud_compute": {
"type": "__cloud_compute__",
"name": "default",
"name": "cpu-small",
"disk_size": 0,
"idle_timeout": None,
"mounts": None,
Expand Down
Loading