Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/urls auth #13421

Merged
merged 4 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conan/internal/conan_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self, cache_folder):

self.hook_manager = HookManager(self.cache.hooks_path)
# Wraps an http_requester to inject proxies, certs, etc
self.requester = ConanRequester(self.cache.new_config)
self.requester = ConanRequester(self.cache.new_config, cache_folder)
# To handle remote connections
rest_client_factory = RestApiClientFactory(self.requester, self.cache.new_config)
# Wraps RestApiClient to add authentication support (same interface)
Expand Down
35 changes: 34 additions & 1 deletion conans/client/rest/conan_requester.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,57 @@
import fnmatch
import json
import logging
import os
import platform

import requests
import urllib3
from jinja2 import Template
from requests.adapters import HTTPAdapter

from conans import __version__ as client_version

# Capture SSL warnings as pointed out here:
# https://urllib3.readthedocs.org/en/latest/security.html#insecureplatformwarning
# TODO: Fix this security warning
from conans.util.files import load

logging.captureWarnings(True)


DEFAULT_TIMEOUT = (30, 60) # connect, read timeouts
INFINITE_TIMEOUT = -1


class URLCredentials:
def __init__(self, cache_folder):
self._urls = {}
if not cache_folder:
return
# TODO: Make it a jinja template so env-vars can be used too
creds_path = os.path.join(cache_folder, "creds.json")
if not os.path.exists(creds_path):
return
template = Template(load(creds_path))
content = template.render({"platform": platform, "os": os})
content = json.loads(content)
self._urls = content

def add_auth(self, url, kwargs):
for u, creds in self._urls.items():
if url.startswith(u):
token = creds.get("token")
if token:
kwargs["headers"]["Authorization"] = f"Bearer {token}"
auth = creds.get("auth")
if auth:
kwargs["auth"] = (auth["user"], auth["password"])
break


class ConanRequester(object):

def __init__(self, config):
def __init__(self, config, cache_folder=None):
# TODO: Make all this lazy, to avoid fully configuring Requester, for every api call
# even if it doesn't use it
# FIXME: Trick for testing when requests is mocked
Expand All @@ -31,6 +61,7 @@ def __init__(self, config):
self._http_requester.mount("http://", adapter)
self._http_requester.mount("https://", adapter)

self._url_creds = URLCredentials(cache_folder)
self._timeout = config.get("core.net.http:timeout", default=DEFAULT_TIMEOUT)
self._no_proxy_match = config.get("core.net.http:no_proxy_match")
self._proxies = config.get("core.net.http:proxies")
Expand Down Expand Up @@ -82,6 +113,8 @@ def _add_kwargs(self, url, kwargs):
if not kwargs.get("headers"):
kwargs["headers"] = {}

self._url_creds.add_auth(url, kwargs)

# Only set User-Agent if none was provided
if not kwargs["headers"].get("User-Agent"):
platform_info = "; ".join([
Expand Down
78 changes: 78 additions & 0 deletions conans/test/integration/test_source_download_password.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import base64
import json
import os
import textwrap

from bottle import static_file, request, HTTPError

from conans.test.utils.test_files import temp_folder
from conans.test.utils.tools import TestClient, StoppableThreadBottle
from conans.util.files import save


def test_source_download_password():
http_server = StoppableThreadBottle()
http_server_base_folder = temp_folder()
save(os.path.join(http_server_base_folder, "myfile.txt"), "hello world!")

def valid_auth():
auth = request.headers.get("Authorization")
if auth == "Bearer mytoken":
return
if auth and "Basic" in auth and \
base64.b64decode(auth[6:], validate=False) == b"myuser:mypassword":
return
return HTTPError(401, "Authentication required")

@http_server.server.get("/<file>")
def get_file(file):
ret = valid_auth()
return ret or static_file(file, http_server_base_folder)

@http_server.server.put("/<file>")
def put_file(file):
ret = valid_auth()
if ret:
return ret
dest = os.path.join(http_server_base_folder, file)
with open(dest, 'wb') as f:
f.write(request.body.read())

http_server.run_server()

c = TestClient()
conanfile = textwrap.dedent(f"""
from conan import ConanFile
from conan.tools.files import download, load
class Pkg(ConanFile):
def source(self):
download(self, "http://localhost:{http_server.port}/myfile.txt", "myfile.txt")
self.output.info(f"Content: {{load(self, 'myfile.txt')}}")
""")
c.save({"conanfile.py": conanfile})
content = {f"http://localhost:{http_server.port}": {"token": "mytoken"}}
save(os.path.join(c.cache_folder, "creds.json"), json.dumps(content))
c.run("source .")
assert "Content: hello world!" in c.out
content = {f"http://localhost:{http_server.port}": {
"auth": {"user": "myuser", "password": "mypassword"}}
}
save(os.path.join(c.cache_folder, "creds.json"), json.dumps(content))
c.run("source .")
assert "Content: hello world!" in c.out

content = {f"http://localhost:{http_server.port}": {"token": "{{mytk}}"}}
content = "{% set mytk = 'mytoken' %}\n" + json.dumps(content)
save(os.path.join(c.cache_folder, "creds.json"), content)
c.run("source .")
assert "Content: hello world!" in c.out

# Errors
for invalid in [{"token": "mytoken2"},
{},
{"auth": {}},
{"auth": {"user": "other", "password": "pass"}}]:
content = {f"http://localhost:{http_server.port}": invalid}
save(os.path.join(c.cache_folder, "creds.json"), json.dumps(content))
c.run("source .", assert_error=True)
assert "Authentication" in c.out