Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework automatic elevation logic #156

Merged
merged 7 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 107 additions & 53 deletions menuinst/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import shlex
import subprocess
import sys
import traceback
import xml.etree.ElementTree as XMLTree
from contextlib import suppress
from functools import wraps
from functools import lru_cache, wraps
from logging import getLogger
from pathlib import Path
from typing import Callable, Iterable, Literal, Mapping, Optional, Sequence, Union
Expand Down Expand Up @@ -246,6 +245,52 @@ def deep_update(mapping: Mapping, *updating_mappings: Iterable[Mapping]) -> Mapp
return updated_mapping


def needs_admin(target_prefix: os.PathLike, base_prefix: os.PathLike) -> bool:
"""
Checks if the current installation needs admin permissions.
"""
if user_is_admin():
return False

if Path(target_prefix, ".nonadmin").exists():
# This file is planted by the constructor installer
# and signals we don't need admin permissions
return False

try:
Path(target_prefix, ".nonadmin").touch()
return False
marcoesters marked this conversation as resolved.
Show resolved Hide resolved
except Exception as exc:
logger.debug("Attempt to write %s/.nonadmin failed.", target_prefix, exc_info=exc)

if base_prefix == target_prefix:
# We are already in the base env, no need to check further
return True

# I can't think of cases where users can't write to target_prefix but can to base
# so maybe we can skip everything underneath?

if Path(base_prefix, ".nonadmin").exists():
return False

if os.name == "nt":
# Absence of $base_prefix/.nonadmin in Windows means we need admin permissions
return True

if os.name == "posix":
# Absence of $base_prefix/.nonadmin in Linux, macOS and other posix systems
# has no meaning for historic reasons, so let's try to see if we can
# write to the installation root
try:
Path(base_prefix, ".nonadmin").touch()
except Exception as exc:
logger.debug("Attempt to write %s/.nonadmin failed.", target_prefix, exc_info=exc)
return True
else:
return False


@lru_cache(maxsize=1)
def user_is_admin() -> bool:
if os.name == "nt":
from .platforms.win_utils.win_elevate import isUserAdmin
Expand Down Expand Up @@ -316,65 +361,70 @@ def elevate_as_needed(func: Callable) -> Callable:
@wraps(func)
def wrapper_elevate(
*args,
target_prefix: os.PathLike = None,
base_prefix: os.PathLike = None,
**kwargs,
):
kwargs.pop("_mode", None)
target_prefix = target_prefix or DEFAULT_BASE_PREFIX
base_prefix = base_prefix or DEFAULT_BASE_PREFIX
if not (Path(base_prefix) / ".nonadmin").exists():
if user_is_admin():
return func(
base_prefix=base_prefix,
_mode="system",
*args,
**kwargs,
)
if os.environ.get("_MENUINST_RECURSING") != "1":
# call the wrapped func with elevated prompt...
# from the command line; not pretty!
try:
if func.__module__ == "__main__":
import_func = (
f"import runpy;"
f"{func.__name__} = runpy.run_path('{__file__}')"
f"['{func.__name__}'];"
)
else:
import_func = f"from {func.__module__} import {func.__name__};"
env_vars = ";".join(
[
f"os.environ.setdefault('{k}', '{v}')"
for (k, v) in os.environ.items()
if k.startswith(("CONDA_", "CONSTRUCTOR_", "MENUINST_"))
]
)
cmd = [
*python_executable(),
"-c",
f"import os;"
f"os.environ.setdefault('_MENUINST_RECURSING', '1');"
f"{env_vars};"
f"{import_func}"
f"{func.__name__}("
f"*{args!r},"
f"base_prefix={base_prefix!r},"
f"_mode='system',"
f"**{kwargs!r}"
")",
]
logger.debug("Elevating command: %s", cmd)
return_code = run_as_admin(cmd)
except Exception:
logger.warn(
"Error occurred! Falling back to user mode. Exception:\n%s",
traceback.format_exc(),
if (
needs_admin(target_prefix, base_prefix)
and os.environ.get("_MENUINST_RECURSING") != "1"
):
# call the wrapped func with elevated prompt...
# from the command line; not pretty!
try:
if func.__module__ == "__main__":
import_func = (
f"import runpy;"
f"{func.__name__} = runpy.run_path('{__file__}')"
f"['{func.__name__}'];"
)
else:
os.environ.pop("_MENUINST_RECURSING", None)
if return_code == 0: # success, no need to fallback
return
import_func = f"from {func.__module__} import {func.__name__};"
env_vars = ";".join(
[
f"os.environ.setdefault('{k}', '{v}')"
for (k, v) in os.environ.items()
if k.startswith(("CONDA_", "CONSTRUCTOR_", "MENUINST_"))
]
)
cmd = [
*python_executable(),
"-c",
f"import os;"
f"os.environ.setdefault('_MENUINST_RECURSING', '1');"
f"{env_vars};"
f"{import_func}"
f"{func.__name__}("
f"*{args!r},"
f"target_prefix={target_prefix!r},"
f"base_prefix={base_prefix!r},"
f"_mode='system',"
f"**{kwargs!r}"
")",
]
logger.debug("Elevating command: %s", cmd)
return_code = run_as_admin(cmd)
except Exception as exc:
logger.warn("Elevation failed! Falling back to user mode.", exc_info=exc)
else:
os.environ.pop("_MENUINST_RECURSING", None)
if return_code == 0: # success, we are done
return
elif user_is_admin():
# We are already running as admin, no need to elevate
return func(
target_prefix=target_prefix,
base_prefix=base_prefix,
_mode="system",
*args,
**kwargs,
)
# We have not returned yet? Well, let's try as a normal user
return func(
target_prefix=target_prefix,
base_prefix=base_prefix,
_mode="user",
*args,
Expand All @@ -384,7 +434,11 @@ def wrapper_elevate(
return wrapper_elevate


def _test_elevation(base_prefix: Optional[os.PathLike] = None, _mode: _UserOrSystem = "user"):
def _test_elevation(
target_prefix: Optional[os.PathLike] = None,
base_prefix: Optional[os.PathLike] = None,
_mode: _UserOrSystem = "user",
):
if os.name == "nt":
if base_prefix:
output = os.path.join(base_prefix, "_test_output.txt")
Expand Down
19 changes: 19 additions & 0 deletions news/156-elevation
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
### Enhancements

* Implement auto-elevation logic in Unix systems so it doesn't depend on pre-existing `.nonadmin` files. (#150 via #156)

### Bug fixes

* <news item>

### Deprecations

* <news item>

### Docs

* <news item>

### Other

* <news item>
15 changes: 10 additions & 5 deletions tests/test_elevation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ def test_elevation(tmp_path, capfd):
# Windows runners on GHA always run as admin
assert not is_admin

_test_elevation(str(tmp_path))
_test_elevation(target_prefix=str(tmp_path), base_prefix=str(tmp_path))
output = (tmp_path / "_test_output.txt").read_text().strip()
if on_ci:
assert output.endswith("env_var: TEST _mode: user")
else:
assert output.endswith("user_is_admin(): False env_var: TEST _mode: user")

elevate_as_needed(_test_elevation)(base_prefix=str(tmp_path))
elevate_as_needed(_test_elevation)(target_prefix=str(tmp_path), base_prefix=str(tmp_path))
output = (tmp_path / "_test_output.txt").read_text().strip()
if on_ci:
assert output.endswith("env_var: TEST _mode: system")
Expand All @@ -31,11 +31,16 @@ def test_elevation(tmp_path, capfd):
_test_elevation(str(tmp_path))
assert capfd.readouterr().out.strip() == "user_is_admin(): False env_var: TEST _mode: user"

elevate_as_needed(_test_elevation)(base_prefix=str(tmp_path))
# make tmp_path not writable by the current user to force elevation
tmp_path.chmod(0o500)
elevate_as_needed(_test_elevation)(target_prefix=str(tmp_path), base_prefix=str(tmp_path))
assert (
capfd.readouterr().out.strip() == "user_is_admin(): True env_var: TEST _mode: system"
)
assert not (tmp_path / ".nonadmin").exists()

(tmp_path / ".nonadmin").touch()
elevate_as_needed(_test_elevation)(base_prefix=str(tmp_path))
# restore permissions
tmp_path.chmod(0o700)
elevate_as_needed(_test_elevation)(target_prefix=str(tmp_path), base_prefix=str(tmp_path))
assert capfd.readouterr().out.strip() == "user_is_admin(): False env_var: TEST _mode: user"
assert (tmp_path / ".nonadmin").exists()
Loading