Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] #996 Add two-factor authentication #4373

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/environment
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ STATUSPAGE_URL=https://2p66nmmycsj3.statuspage.io

TOKEN_PASSWORD_SECRET="an insecure password reset secret key"
TOKEN_EMAIL_SECRET="an insecure email verification secret key"
TOKEN_TWO_FACTOR_SECRET="an insecure two-factor auth secret key"

DATADOG_HOST=notdatadog

Expand Down
1 change: 1 addition & 0 deletions requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ paginate>=0.5.2
paginate_sqlalchemy
passlib>=1.6.4
psycopg2
pyotp
pyramid>=1.9a2
pyramid_jinja2>=2.5
pyramid_mailer>=0.14.1
Expand Down
2 changes: 2 additions & 0 deletions requirements/main.txt
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ pycparser==2.18 \
Pygments==2.2.0 \
--hash=sha256:78f3f434bcc5d6ee09020f92ba487f95ba50f1e3ef83ae96b9d5ffa1bab25c5d \
--hash=sha256:dbae1046def0efb574852fab9e90209b23f556367b5a320c0bcb871c77c3e8cc
pyotp==2.2.6 \
--hash=sha256:8f0df1fcf9e86cec41f0a31c91212b1a04fca6dd353426917222b21864b9310b
pyparsing==2.2.0 \
--hash=sha256:fee43f17a9c4087e7ed1605bd6df994c6173c1e977d7ade7b651292fab2bd010 \
--hash=sha256:0832bcf47acd283788593e7a0f542407bd9550a55a8a8435214a1960e04bcb04 \
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/accounts/test_forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,29 @@ def test_passwords_match_success(self):
)

assert form.validate()


class TestTwoFactorForm:
def test_creation(self):
user_service = pretend.stub()
form = forms.TwoFactorForm(user_service=user_service)

assert form.user_service is user_service

def test_opt_secret_exists(self):
form = forms.TwoFactorForm(
data={
"otp_secret": "",
},
user_service=pretend.stub()
)
assert not form.validate()
assert form.otp_secret.errors.pop() == "This field is required."

form = forms.TwoFactorForm(
data={
"otp_secret": "otp_code",
},
user_service=pretend.stub()
)
assert form.validate()
187 changes: 185 additions & 2 deletions tests/unit/accounts/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def test_post_validate_redirects(self, monkeypatch, pyramid_request, with_user):
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: user_id),
update_user=pretend.call_recorder(lambda *a, **kw: None),
has_two_factor=lambda userid: False,
)
pyramid_request.find_service = pretend.call_recorder(
lambda iface, context: user_service
Expand Down Expand Up @@ -206,7 +207,6 @@ def test_post_validate_redirects(self, monkeypatch, pyramid_request, with_user):
assert pyramid_request.session.invalidate.calls == [pretend.call()]
assert pyramid_request.find_service.calls == [
pretend.call(IUserService, context=None),
pretend.call(IUserService, context=None),
]
assert pyramid_request.session.new_csrf_token.calls == [pretend.call()]

Expand All @@ -222,6 +222,7 @@ def test_post_validate_no_redirects(
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: 1),
update_user=lambda *a, **k: None,
has_two_factor=lambda userid: False,
)
pyramid_request.find_service = pretend.call_recorder(
lambda iface, context: user_service
Expand All @@ -247,6 +248,189 @@ def test_redirect_authenticated_user(self):
assert isinstance(result, HTTPSeeOther)
assert result.headers["Location"] == "/the-redirect"

@pytest.mark.parametrize("redirect_url", ["test_redirect_url", None])
def test_two_factor_auth(self, pyramid_request, redirect_url, token_service):
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: 1),
update_user=lambda *a, **k: None,
has_two_factor=lambda userid: True,
)

token_service = pretend.stub(
dumps=pretend.call_recorder(lambda token_data: "test-secure-token")
)
pyramid_request.find_service = lambda interface, **kwargs: {
IUserService: user_service,
ITokenService: token_service,
}[interface]

pyramid_request.method = "POST"
if redirect_url:
pyramid_request.POST["next"] = redirect_url

form_obj = pretend.stub(
validate=pretend.call_recorder(lambda: True),
username=pretend.stub(data="theuser"),
)
form_class = pretend.call_recorder(lambda d, user_service: form_obj)
pyramid_request.route_path = pretend.call_recorder(
lambda a: "/account/two-factor"
)
result = views.login(pyramid_request, _form_class=form_class)

token_expected_data = {"userid": 1}
if redirect_url:
token_expected_data['redirect_to'] = redirect_url
assert token_service.dumps.calls == [pretend.call(token_expected_data)]

assert isinstance(result, HTTPSeeOther)
assert result.headerlist == [
('Content-Type', 'text/html; charset=UTF-8'),
('Content-Length', '0'),
('Location', '/account/two-factor'),
('Set-Cookie', "{}={}; Path=/".
format(views.TWO_FACTOR_COOKIE_KEY, "test-secure-token"))]


class TestTwoFactor:
@pytest.mark.parametrize("redirect_url", [None, "/foo/bar/", "/wat/"])
def test_get_returns_form(self, pyramid_request, redirect_url):
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: 1),
update_user=lambda *a, **k: None,
send_otp_secret=pretend.call_recorder(lambda userid: None),
)

token_data = {"userid": 1}
if redirect_url:
token_data['redirect_to'] = redirect_url

token_service = pretend.stub(
loads=pretend.call_recorder(lambda token: token_data)
)
pyramid_request.find_service = lambda interface, **kwargs: {
ITokenService: token_service,
IUserService: user_service,
}[interface]

form_obj = pretend.stub()
form_class = pretend.call_recorder(lambda d, user_service: form_obj)

result = views.two_factor(pyramid_request, _form_class=form_class)

assert result == {
"form": form_obj,
}
assert user_service.send_otp_secret.calls == [
pretend.call(1)
]
assert form_class.calls == [
pretend.call(pyramid_request.POST, user_service=user_service)
]

@pytest.mark.parametrize("redirect_url", ["test_redirect_url", None])
def test_two_factor_auth(self, monkeypatch, pyramid_request, redirect_url,
token_service):
remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")])
monkeypatch.setattr(views, "remember", remember)
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: 1),
update_user=lambda *a, **k: None,
send_otp_secret=lambda userid: None,
check_otp_secret=lambda userid, otp_token: True,
)

new_session = {}

token_data = {"userid": str(1)}
if redirect_url:
token_data['redirect_to'] = redirect_url

token_service = pretend.stub(
loads=pretend.call_recorder(lambda token: token_data)
)
pyramid_request.find_service = lambda interface, **kwargs: {
IUserService: user_service,
ITokenService: token_service,
}[interface]

pyramid_request.method = "POST"
pyramid_request.session = pretend.stub(
items=lambda: [("a", "b"), ("foo", "bar")],
update=new_session.update,
invalidate=pretend.call_recorder(lambda: None),
new_csrf_token=pretend.call_recorder(lambda: None),
)

pyramid_request.set_property(
lambda r: str(uuid.uuid4()),
name="unauthenticated_userid",
)

form_obj = pretend.stub(
validate=pretend.call_recorder(lambda: True),
otp_secret=pretend.stub(data="test-otp-secret"),
)
form_class = pretend.call_recorder(lambda d, user_service: form_obj)
pyramid_request.route_path = pretend.call_recorder(
lambda a: "/account/two-factor"
)
pyramid_request.cookies[views.TWO_FACTOR_COOKIE_KEY] = "test-secure-token"
result = views.two_factor(pyramid_request, _form_class=form_class)

token_expected_data = {"userid": str(1)}
if redirect_url:
token_expected_data['redirect_to'] = redirect_url
assert token_service.loads.calls == [pretend.call("test-secure-token")]

assert isinstance(result, HTTPSeeOther)

assert remember.calls == [pretend.call(pyramid_request, str(1))]
assert pyramid_request.session.invalidate.calls == [pretend.call()]
assert pyramid_request.session.new_csrf_token.calls == [pretend.call()]

@pytest.mark.parametrize("redirect_url", ["test_redirect_url", None])
def test_two_factor_auth_invalid(self, pyramid_request, redirect_url,
token_service):
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: 1),
update_user=lambda *a, **k: None,
send_otp_secret=lambda userid: None,
check_otp_secret=lambda userid, otp_token: False,
)

token_data = {"userid": str(1)}
if redirect_url:
token_data['redirect_to'] = redirect_url

token_service = pretend.stub(loads=pretend.call_recorder(
lambda token: token_data)
)
pyramid_request.find_service = lambda interface, **kwargs: {
IUserService: user_service,
ITokenService: token_service,
}[interface]

pyramid_request.method = "POST"

form_obj = pretend.stub(
validate=pretend.call_recorder(lambda: True),
otp_secret=pretend.stub(data="test-otp-secret"),
)
form_class = pretend.call_recorder(lambda d, user_service: form_obj)
pyramid_request.route_path = pretend.call_recorder(
lambda a: "/account/two-factor"
)
pyramid_request.cookies[views.TWO_FACTOR_COOKIE_KEY] = "test-secure-token"
result = views.two_factor(pyramid_request, _form_class=form_class)

token_expected_data = {"userid": str(1)}
if redirect_url:
token_expected_data['redirect_to'] = redirect_url
assert token_service.loads.calls == [pretend.call("test-secure-token")]

assert isinstance(result, HTTPSeeOther)


class TestLogout:
@pytest.mark.parametrize("next_url", [None, "/foo/bar/", "/wat/"])
Expand Down Expand Up @@ -670,7 +854,6 @@ def test_reset_password(self, db_request, user_service, token_service):
assert db_request.find_service.calls == [
pretend.call(IUserService, context=None),
pretend.call(ITokenService, name="password"),
pretend.call(IUserService, context=None),
]

@pytest.mark.parametrize(
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def add_policy(name, filename):
domain=warehouse,
),
pretend.call("accounts.login", "/account/login/", domain=warehouse),
pretend.call("accounts.two-factor", "/account/two-factor/", domain=warehouse),
pretend.call("accounts.logout", "/account/logout/", domain=warehouse),
pretend.call("accounts.register", "/account/register/", domain=warehouse),
pretend.call(
Expand Down
52 changes: 52 additions & 0 deletions tests/unit/utils/test_otp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import base64

import pyotp

from warehouse.utils.otp import (
generate_totp_secret,
generate_totp_provisioning_uri,
verify_totp,
)


def test_generate_totp_secret():
secret = generate_totp_secret()

# ensure it decodes as base32 and is 160 bits long
assert len(base64.b32decode(secret)) == 20


def test_generate_totp_provisioning_uri():
secret = "F" * 32
username = "pony"
issuer_name = "pypi.org"
uri = generate_totp_provisioning_uri(secret, username, issuer_name=issuer_name)
expected_uri = "otpauth://totp/{0}:{1}?secret={2}&issuer={3}".format(
issuer_name, username, secret, issuer_name
)
assert uri == expected_uri


def test_verify_totp_success():
secret = generate_totp_secret()
value = pyotp.TOTP(secret).now()
assert verify_totp(secret, value)


def test_verify_totp_failure():
secret = generate_totp_secret()
value = pyotp.TOTP(secret).now()
value_plus_one = str((int(value) + 1) % (999999 + 1)).zfill(6)
assert not verify_totp(secret, value_plus_one, valid_window=0)
3 changes: 3 additions & 0 deletions warehouse/accounts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def includeme(config):
config.register_service_factory(
TokenServiceFactory(name="email"), ITokenService, name="email"
)
config.register_service_factory(
TokenServiceFactory(name="two_factor"), ITokenService, name="two_factor"
)

# Register our authentication and authorization policies
config.set_authentication_policy(
Expand Down
11 changes: 11 additions & 0 deletions warehouse/accounts/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ def validate_username(self, field):
raise wtforms.validators.ValidationError("No user found with that username")


class OtpSecretMixin:

otp_secret = wtforms.StringField(validators=[wtforms.validators.DataRequired()])


class NewUsernameMixin:

username = wtforms.StringField(
Expand Down Expand Up @@ -166,6 +171,12 @@ def __init__(self, *args, user_service, **kwargs):
self.user_service = user_service


class TwoFactorForm(OtpSecretMixin, forms.Form):
def __init__(self, *args, user_service, **kwargs):
super().__init__(*args, **kwargs)
self.user_service = user_service


class RequestPasswordResetForm(forms.Form):
username_or_email = wtforms.StringField(
validators=[wtforms.validators.DataRequired()]
Expand Down
Loading