-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resolve Lightning App with remote storage #17426
Changes from all commits
50b9172
ef77a84
1220b33
1ce64b7
525c1d1
4680b40
76050cb
a3cbdf3
ccf036d
ce0d73d
7867342
e9acf43
0d37de8
f13db97
2d18bdf
9883588
0efd806
f58fd07
d5fdc2f
44d8641
88875a1
a64743e
423f9e9
dbecd0d
dbb3bd1
1b04de3
f0832d9
b7b41df
355264d
cd004fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
venv/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is automatically generated when the app runs. Better to have it in the repo. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,9 @@ | |
|
||
|
||
def get_lightning_cloud_url() -> str: | ||
# detect local development | ||
if os.getenv("VSCODE_PROXY_URI", "").startswith("http://localhost:9800"): | ||
return "http://localhost:9800" | ||
tchaton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# DO NOT CHANGE! | ||
return os.getenv("LIGHTNING_CLOUD_URL", "https://lightning.ai") | ||
|
||
|
@@ -115,17 +118,5 @@ def enable_interruptible_works() -> bool: | |
return bool(int(os.getenv("LIGHTNING_INTERRUPTIBLE_WORKS", "0"))) | ||
|
||
|
||
# Get Cluster Driver | ||
_CLUSTER_DRIVERS = [None, "k8s", "direct"] | ||
|
||
|
||
def get_cluster_driver() -> Optional[str]: | ||
value = os.getenv("LIGHTNING_CLUSTER_DRIVER", None) | ||
if value is None: | ||
if enable_interruptible_works(): | ||
value = "direct" | ||
else: | ||
value = None | ||
if value not in _CLUSTER_DRIVERS: | ||
raise ValueError(f"Found {value} cluster driver. The value needs to be in {_CLUSTER_DRIVERS}.") | ||
return value | ||
return "direct" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why unset There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't used and we can drop it entirely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For another PR once the fire is being us. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -388,19 +388,30 @@ def get(self, timeout: Optional[float] = None) -> Any: | |
if timeout is None: | ||
while True: | ||
try: | ||
return self._get() | ||
try: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's use |
||
return self._get() | ||
except requests.exceptions.HTTPError: | ||
pass | ||
except queue.Empty: | ||
time.sleep(HTTP_QUEUE_REFRESH_INTERVAL) | ||
|
||
# make one request and return the result | ||
if timeout == 0: | ||
return self._get() | ||
try: | ||
return self._get() | ||
except requests.exceptions.HTTPError: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you pls add a note about why this error happens and it is fine to ignore it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can happen if the queue isn't responsive. This would lead for the app to die entirely. This adds some sort of fault tolerancy during CP re-deployment. |
||
return None | ||
|
||
# timeout is some value - loop until the timeout is reached | ||
start_time = time.time() | ||
while (time.time() - start_time) < timeout: | ||
try: | ||
return self._get() | ||
try: | ||
return self._get() | ||
except requests.exceptions.HTTPError: | ||
if timeout > self.default_timeout: | ||
return None | ||
raise queue.Empty | ||
except queue.Empty: | ||
# Note: In theory, there isn't a need for a sleep as the queue shouldn't | ||
# block the flow if the queue is empty. | ||
|
@@ -441,8 +452,11 @@ def length(self) -> int: | |
if not self.app_id: | ||
raise ValueError(f"App ID couldn't be extracted from the queue name: {self.name}") | ||
|
||
val = self.client.get(f"/v1/{self.app_id}/{self._name_suffix}/length") | ||
return int(val.text) | ||
try: | ||
val = self.client.get(f"/v1/{self.app_id}/{self._name_suffix}/length") | ||
return int(val.text) | ||
except requests.exceptions.HTTPError: | ||
return 0 | ||
|
||
@staticmethod | ||
def _split_app_id_and_queue_name(queue_name: str) -> Tuple[str, str]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,7 @@ def check_content(button_name, text_content): | |
has_logs = False | ||
while not has_logs: | ||
for log in fetch_logs(["flow"]): | ||
print(log) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch ! |
||
if "'a': 'a', 'b': 'b'" in log: | ||
has_logs = True | ||
sleep(1) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,9 @@ | ||
import os | ||
from unittest import mock | ||
|
||
import pytest | ||
|
||
from lightning.app.core.constants import get_cluster_driver, get_lightning_cloud_url | ||
from lightning.app.core.constants import get_lightning_cloud_url | ||
|
||
|
||
@mock.patch.dict(os.environ, {"LIGHTNING_CLOUD_URL": "https://beta.lightning.ai"}) | ||
def test_defaults(): | ||
assert get_lightning_cloud_url() == "https://beta.lightning.ai" | ||
|
||
|
||
def test_cluster_drive(monkeypatch): | ||
tchaton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assert get_cluster_driver() is None | ||
|
||
monkeypatch.setenv("LIGHTNING_INTERRUPTIBLE_WORKS", "1") | ||
assert get_cluster_driver() == "direct" | ||
|
||
monkeypatch.setenv("LIGHTNING_CLUSTER_DRIVER", "k8s") | ||
assert get_cluster_driver() == "k8s" | ||
|
||
with pytest.raises(ValueError, match="The value needs to be in"): | ||
monkeypatch.setenv("LIGHTNING_CLUSTER_DRIVER", "something_else") | ||
assert get_cluster_driver() == "k8s" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ethanwharris why do we drop redis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this won't be supported going forward.