diff --git a/securedrop/alembic/versions/c5a02eb52f2d_dropped_session_nonce_from_journalist_.py b/securedrop/alembic/versions/c5a02eb52f2d_dropped_session_nonce_from_journalist_.py index dc6c7051fe9..dc3c7a9065c 100644 --- a/securedrop/alembic/versions/c5a02eb52f2d_dropped_session_nonce_from_journalist_.py +++ b/securedrop/alembic/versions/c5a02eb52f2d_dropped_session_nonce_from_journalist_.py @@ -5,88 +5,95 @@ Create Date: 2022-04-16 21:25:22.398189 """ -from alembic import op import sqlalchemy as sa - +from alembic import op # revision identifiers, used by Alembic. -revision = 'c5a02eb52f2d' -down_revision = 'b7f98cfd6a70' +revision = "c5a02eb52f2d" +down_revision = "b7f98cfd6a70" branch_labels = None depends_on = None def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.drop_table('revoked_tokens') - with op.batch_alter_table('journalists', schema=None) as batch_op: - batch_op.drop_column('session_nonce') + op.drop_table("revoked_tokens") + with op.batch_alter_table("journalists", schema=None) as batch_op: + batch_op.drop_column("session_nonce") # ### end Alembic commands ### def downgrade() -> None: - '''This would have been the easy way, however previous does not have - default value and thus up/down assertion fails''' - #op.add_column('journalists', sa.Column('session_nonce', sa.Integer(), nullable=False, server_default='0')) + """This would have been the easy way, however previous does not have + default value and thus up/down assertion fails""" + # op.add_column('journalists', sa.Column('session_nonce', sa.Integer(), nullable=False, server_default='0')) conn = op.get_bind() conn.execute("PRAGMA legacy_alter_table=ON") # Save existing journalist table. - op.rename_table('journalists', 'journalists_tmp') + op.rename_table("journalists", "journalists_tmp") # Add nonce column. - op.add_column('journalists_tmp', sa.Column('session_nonce', sa.Integer())) + op.add_column("journalists_tmp", sa.Column("session_nonce", sa.Integer())) # Populate nonce column. - journalists = conn.execute( - sa.text("SELECT * FROM journalists_tmp")).fetchall() + journalists = conn.execute(sa.text("SELECT * FROM journalists_tmp")).fetchall() for journalist in journalists: conn.execute( - sa.text("""UPDATE journalists_tmp SET session_nonce=0 WHERE - id=:id""").bindparams(id=journalist.id) - ) + sa.text( + """UPDATE journalists_tmp SET session_nonce=0 WHERE + id=:id""" + ).bindparams(id=journalist.id) + ) # Now create new table with null constraint applied. - op.create_table('journalists', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('uuid', sa.String(length=36), nullable=False), - sa.Column('username', sa.String(length=255), nullable=False), - sa.Column('first_name', sa.String(length=255), nullable=True), - sa.Column('last_name', sa.String(length=255), nullable=True), - sa.Column('pw_salt', sa.Binary(), nullable=True), - sa.Column('pw_hash', sa.Binary(), nullable=True), - sa.Column('passphrase_hash', sa.String(length=256), nullable=True), - sa.Column('is_admin', sa.Boolean(), nullable=True), - sa.Column('session_nonce', sa.Integer(), nullable=False), - sa.Column('otp_secret', sa.String(length=32), nullable=True), - sa.Column('is_totp', sa.Boolean(), nullable=True), - sa.Column('hotp_counter', sa.Integer(), nullable=True), - sa.Column('last_token', sa.String(length=6), nullable=True), - sa.Column('created_on', sa.DateTime(), nullable=True), - sa.Column('last_access', sa.DateTime(), nullable=True), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('username'), - sa.UniqueConstraint('uuid') + op.create_table( + "journalists", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("uuid", sa.String(length=36), nullable=False), + sa.Column("username", sa.String(length=255), nullable=False), + sa.Column("first_name", sa.String(length=255), nullable=True), + sa.Column("last_name", sa.String(length=255), nullable=True), + sa.Column("pw_salt", sa.Binary(), nullable=True), + sa.Column("pw_hash", sa.Binary(), nullable=True), + sa.Column("passphrase_hash", sa.String(length=256), nullable=True), + sa.Column("is_admin", sa.Boolean(), nullable=True), + sa.Column("session_nonce", sa.Integer(), nullable=False), + sa.Column("otp_secret", sa.String(length=32), nullable=True), + sa.Column("is_totp", sa.Boolean(), nullable=True), + sa.Column("hotp_counter", sa.Integer(), nullable=True), + sa.Column("last_token", sa.String(length=6), nullable=True), + sa.Column("created_on", sa.DateTime(), nullable=True), + sa.Column("last_access", sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("username"), + sa.UniqueConstraint("uuid"), ) - conn.execute(''' + conn.execute( + """ INSERT INTO journalists SELECT id, uuid, username, first_name, last_name, pw_salt, pw_hash, passphrase_hash, is_admin, session_nonce, otp_secret, is_totp, hotp_counter, last_token, created_on, last_access FROM journalists_tmp - ''') + """ + ) # Now delete the old table. - op.drop_table('journalists_tmp') - - op.create_table('revoked_tokens', - sa.Column('id', sa.INTEGER(), nullable=False), - sa.Column('journalist_id', sa.INTEGER(), nullable=False), - sa.Column('token', sa.TEXT(), nullable=False), - sa.ForeignKeyConstraint(['journalist_id'], ['journalists.id'], ), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('token') + op.drop_table("journalists_tmp") + + op.create_table( + "revoked_tokens", + sa.Column("id", sa.INTEGER(), nullable=False), + sa.Column("journalist_id", sa.INTEGER(), nullable=False), + sa.Column("token", sa.TEXT(), nullable=False), + sa.ForeignKeyConstraint( + ["journalist_id"], + ["journalists.id"], + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("token"), ) diff --git a/securedrop/journalist_app/__init__.py b/securedrop/journalist_app/__init__.py index fba53d8e7ef..445597ca28a 100644 --- a/securedrop/journalist_app/__init__.py +++ b/securedrop/journalist_app/__init__.py @@ -1,14 +1,7 @@ # -*- coding: utf-8 -*- import typing -from datetime import datetime, timedelta, timezone -from pathlib import Path from datetime import datetime -from flask import (Flask, redirect, url_for, g, request, - render_template, json, abort) -from flask_assets import Environment -from flask_babel import gettext -from flask_wtf.csrf import CSRFProtect, CSRFError from os import path from pathlib import Path @@ -16,20 +9,14 @@ import template_filters import version from db import db -from flask import Flask, flash, g, json, redirect, render_template, request, session, url_for +from flask import Flask, abort, g, json, redirect, render_template, request, url_for from flask_babel import gettext from flask_wtf.csrf import CSRFError, CSRFProtect from journalist_app import account, admin, api, col, main -from journalist_app.utils import ( - JournalistInterfaceSessionInterface, - cleanup_expired_revoked_tokens, - get_source, - logged_in, -) -from journalist_app import account, admin, api, main, col from journalist_app.sessions import Session, session from journalist_app.utils import get_source from models import InstanceConfig +from werkzeug.exceptions import default_exceptions # https://www.python.org/dev/peps/pep-0484/#runtime-or-type-checking if typing.TYPE_CHECKING: @@ -43,8 +30,8 @@ from werkzeug import Response # noqa: F401 from werkzeug.exceptions import HTTPException # noqa: F401 -_insecure_views = ['main.login', 'static'] -_insecure_api_views = ['api.get_token', 'api.get_endpoints'] +_insecure_views = ["main.login", "static"] +_insecure_api_views = ["api.get_token", "api.get_endpoints"] # Timezone-naive datetime format expected by SecureDrop Client API_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" @@ -96,9 +83,9 @@ def default(self, obj: "Any") -> "Any": @app.errorhandler(CSRFError) # type: ignore def handle_csrf_error(e: CSRFError) -> "Response": app.logger.error("The CSRF token is invalid.") - msg = gettext('You have been logged out due to inactivity.') - session.destroy(('error', msg), session.get('locale')) - return redirect(url_for('main.login')) + msg = gettext("You have been logged out due to inactivity.") + session.destroy(("error", msg), session.get("locale")) + return redirect(url_for("main.login")) def _handle_http_exception( error: "HTTPException", @@ -149,12 +136,12 @@ def setup_g() -> "Optional[Response]": except FileNotFoundError: app.logger.error("Site logo not found.") - if request.path.split('/')[1] == 'api': + if request.path.split("/")[1] == "api": if request.endpoint not in _insecure_api_views and not session.logged_in(): abort(403) else: if request.endpoint not in _insecure_views and not session.logged_in(): - return redirect(url_for('main.login')) + return redirect(url_for("main.login")) if request.method == "POST": filesystem_id = request.form.get("filesystem_id") diff --git a/securedrop/journalist_app/account.py b/securedrop/journalist_app/account.py index 9a3328c52f2..d9a28b2e901 100644 --- a/securedrop/journalist_app/account.py +++ b/securedrop/journalist_app/account.py @@ -2,14 +2,16 @@ from typing import Union import werkzeug -from flask import (Blueprint, render_template, request, g, redirect, url_for, - flash) -from flask_babel import gettext - from db import db -from journalist_app.sessions import session -from journalist_app.utils import (set_diceware_password, set_name, validate_user, - validate_hotp_secret) +from flask import Blueprint, flash, g, redirect, render_template, request, url_for +from flask_babel import gettext +from journalist_app.sessions import logout_user, session +from journalist_app.utils import ( + set_diceware_password, + set_name, + validate_hotp_secret, + validate_user, +) from passphrases import PassphraseGenerator from sdconfig import SDConfig @@ -26,40 +28,42 @@ def edit() -> str: @view.route("/change-name", methods=("POST",)) def change_name() -> werkzeug.Response: - first_name = request.form.get('first_name') - last_name = request.form.get('last_name') + first_name = request.form.get("first_name") + last_name = request.form.get("last_name") set_name(session.get_user(), first_name, last_name) - return redirect(url_for('account.edit')) + return redirect(url_for("account.edit")) @view.route("/new-password", methods=("POST",)) def new_password() -> werkzeug.Response: user = session.get_user() - current_password = request.form.get('current_password') - token = request.form.get('token') - error_message = gettext('Incorrect password or two-factor code.') + current_password = request.form.get("current_password") + token = request.form.get("token") + error_message = gettext("Incorrect password or two-factor code.") # If the user is validated, change their password - if validate_user(user.username, current_password, token, - error_message): - password = request.form.get('password') + if validate_user(user.username, current_password, token, error_message): + password = request.form.get("password") if set_diceware_password(user, password): - return redirect(url_for('main.login')) - return redirect(url_for('account.edit')) + logout_user(user.id) + return redirect(url_for("main.login")) + return redirect(url_for("account.edit")) @view.route("/2fa", methods=("GET", "POST")) def new_two_factor() -> Union[str, werkzeug.Response]: - if request.method == 'POST': - token = request.form['token'] + if request.method == "POST": + token = request.form["token"] if session.get_user().verify_token(token): - flash(gettext("Your two-factor credentials have been reset successfully."), - "notification") - return redirect(url_for('account.edit')) + flash( + gettext("Your two-factor credentials have been reset successfully."), + "notification", + ) + return redirect(url_for("account.edit")) else: flash( gettext("There was a problem verifying the two-factor code. Please try again."), "error", ) - return render_template('account_new_two_factor.html', user=session.get_user()) + return render_template("account_new_two_factor.html", user=session.get_user()) @view.route("/reset-2fa-totp", methods=["POST"]) def reset_two_factor_totp() -> werkzeug.Response: @@ -73,7 +77,7 @@ def reset_two_factor_hotp() -> Union[str, werkzeug.Response]: otp_secret = request.form.get("otp_secret", None) if otp_secret: if not validate_hotp_secret(session.get_user(), otp_secret): - return render_template('account_edit_hotp_secret.html') + return render_template("account_edit_hotp_secret.html") session.get_user().set_hotp_secret(otp_secret) db.session.commit() return redirect(url_for("account.new_two_factor")) diff --git a/securedrop/journalist_app/admin.py b/securedrop/journalist_app/admin.py index fbe09545920..b0fe2d9717a 100644 --- a/securedrop/journalist_app/admin.py +++ b/securedrop/journalist_app/admin.py @@ -5,12 +5,6 @@ from typing import Optional, Union import werkzeug -from flask import (Blueprint, render_template, request, url_for, redirect, g, - current_app, flash, abort) -from flask_babel import gettext -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm.exc import NoResultFound - from db import db from flask import ( Blueprint, @@ -25,11 +19,17 @@ ) from flask_babel import gettext from journalist_app.decorators import admin_required +from journalist_app.forms import LogoForm, NewUserForm, OrgNameForm, SubmissionPreferencesForm from journalist_app.sessions import logout_user, session -from journalist_app.utils import (commit_account_changes, set_diceware_password, - validate_hotp_secret) -from journalist_app.forms import LogoForm, NewUserForm, SubmissionPreferencesForm, OrgNameForm -from sdconfig import SDConfig +from journalist_app.utils import commit_account_changes, set_diceware_password, validate_hotp_secret +from models import ( + FirstOrLastNameError, + InstanceConfig, + InvalidUsernameException, + Journalist, + PasswordError, + Submission, +) from passphrases import PassphraseGenerator from sdconfig import SDConfig from sqlalchemy.exc import IntegrityError @@ -245,28 +245,26 @@ def new_user_two_factor() -> Union[str, werkzeug.Response]: @view.route("/reset-2fa-totp", methods=["POST"]) @admin_required def reset_two_factor_totp() -> werkzeug.Response: - # nosemgrep: python.flask.security.open-redirect.open-redirect uid = request.form["uid"] user = Journalist.query.get(uid) user.is_totp = True user.regenerate_totp_shared_secret() db.session.commit() - return redirect(url_for("admin.new_user_two_factor", uid=uid)) + return redirect(url_for("admin.new_user_two_factor", uid=user.id)) @view.route("/reset-2fa-hotp", methods=["POST"]) @admin_required def reset_two_factor_hotp() -> Union[str, werkzeug.Response]: - # nosemgrep: python.flask.security.open-redirect.open-redirect uid = request.form["uid"] + user = Journalist.query.get(uid) otp_secret = request.form.get("otp_secret", None) if otp_secret: - user = Journalist.query.get(uid) if not validate_hotp_secret(user, otp_secret): - return render_template("admin_edit_hotp_secret.html", uid=uid) + return render_template("admin_edit_hotp_secret.html", uid=user.id) db.session.commit() - return redirect(url_for("admin.new_user_two_factor", uid=uid)) + return redirect(url_for("admin.new_user_two_factor", uid=user.id)) else: - return render_template("admin_edit_hotp_secret.html", uid=uid) + return render_template("admin_edit_hotp_secret.html", uid=user.id) @view.route("/edit/", methods=("GET", "POST")) @admin_required @@ -280,7 +278,10 @@ def edit_user(user_id: int) -> Union[str, werkzeug.Response]: try: Journalist.check_username_acceptable(new_username) except InvalidUsernameException as e: - flash(gettext("Invalid username: {message}").format(message=e), "error") + flash( + gettext("Invalid username: {message}").format(message=e), + "error", + ) return redirect(url_for("admin.edit_user", user_id=user_id)) if new_username == user.username: @@ -330,24 +331,31 @@ def delete_user(user_id: int) -> werkzeug.Response: # Do not flash because the interface already has safe guards. # It can only happen by manually crafting a POST request current_app.logger.error( - "Admin {} tried to delete itself".format(session.get_user().username)) + "Admin {} tried to delete itself".format(session.get_user().username) + ) abort(403) elif not user: current_app.logger.error( "Admin {} tried to delete nonexistent user with pk={}".format( - session.get_user().username, user_id)) + session.get_user().username, user_id + ) + ) abort(404) elif user.is_deleted_user(): # Do not flash because the interface does not expose this. # It can only happen by manually crafting a POST request current_app.logger.error( - "Admin {} tried to delete \"deleted\" user".format(session.get_user().username)) + 'Admin {} tried to delete "deleted" user'.format(session.get_user().username) + ) abort(403) else: user.delete() - logout_user(user.id, is_current=False) + logout_user(user.id) db.session.commit() - flash(gettext("Deleted user '{user}'.").format(user=user.username), "notification") + flash( + gettext("Deleted user '{user}'.").format(user=user.username), + "notification", + ) return redirect(url_for("admin.index")) @@ -358,10 +366,9 @@ def new_password(user_id: int) -> werkzeug.Response: user = Journalist.query.get(user_id) except NoResultFound: abort(404) - - password = request.form.get('password') - if set_diceware_password(user, password) is not False: - logout_user(user.id, is_current=False) + password = request.form.get("password") + if set_diceware_password(user, password, admin=True) is not False: + logout_user(user.id) db.session.commit() return redirect(url_for("admin.edit_user", user_id=user_id)) @@ -369,7 +376,10 @@ def new_password(user_id: int) -> werkzeug.Response: @admin_required def ossec_test() -> werkzeug.Response: current_app.logger.error("This is a test OSSEC alert") - flash(gettext("Test alert sent. Please check your email."), "testalert-notification") + flash( + gettext("Test alert sent. Please check your email."), + "testalert-notification", + ) return redirect(url_for("admin.manage_config") + "#config-testalert") return view diff --git a/securedrop/journalist_app/api.py b/securedrop/journalist_app/api.py index 24f22090cd4..0f46e9f09c0 100644 --- a/securedrop/journalist_app/api.py +++ b/securedrop/journalist_app/api.py @@ -1,27 +1,28 @@ import collections.abc import json from datetime import datetime, timezone -from typing import Tuple, Set, Union - -import flask -import werkzeug -from flask import abort, Blueprint, jsonify, request - -from sqlalchemy import Column -from sqlalchemy.exc import IntegrityError from os import path -from typing import Any, Callable, Set, Tuple, Union +from typing import Set, Tuple, Union from uuid import UUID import flask import werkzeug from db import db +from flask import Blueprint, abort, jsonify, request from journalist_app import utils from journalist_app.sessions import session -from models import (Journalist, Reply, SeenReply, Source, Submission, - LoginThrottledException, InvalidUsernameException, - BadTokenException, InvalidOTPSecretException, - WrongPasswordException) +from models import ( + BadTokenException, + InvalidOTPSecretException, + InvalidUsernameException, + Journalist, + LoginThrottledException, + Reply, + SeenReply, + Source, + Submission, + WrongPasswordException, +) from sdconfig import SDConfig from sqlalchemy import Column from sqlalchemy.exc import IntegrityError @@ -93,32 +94,39 @@ def get_token() -> Tuple[flask.Response, int]: try: journalist = Journalist.login(username, passphrase, one_time_code) - response = jsonify({ - 'token': session.get_token(), - 'expiration': session.get_lifetime(), - 'journalist_uuid': journalist.uuid, - 'journalist_first_name': journalist.first_name, - 'journalist_last_name': journalist.last_name, - }) + response = jsonify( + { + "token": session.get_token(), + "expiration": session.get_lifetime(), + "journalist_uuid": journalist.uuid, + "journalist_first_name": journalist.first_name, + "journalist_last_name": journalist.last_name, + } + ) # Update access metadata journalist.last_access = datetime.now(timezone.utc) db.session.add(journalist) db.session.commit() - session['uid'] = journalist.id + session["uid"] = journalist.id return response, 200 - except (LoginThrottledException, InvalidUsernameException, - BadTokenException, InvalidOTPSecretException, WrongPasswordException): - return abort(403, 'Token authentication failed.') - - @api.route('/sources', methods=['GET']) + except ( + LoginThrottledException, + InvalidUsernameException, + BadTokenException, + InvalidOTPSecretException, + WrongPasswordException, + ): + return abort(403, "Token authentication failed.") + + @api.route("/sources", methods=["GET"]) def get_all_sources() -> Tuple[flask.Response, int]: sources = Source.query.filter_by(pending=False, deleted_at=None).all() return jsonify({"sources": [source.to_json() for source in sources]}), 200 - @api.route('/sources/', methods=['GET', 'DELETE']) + @api.route("/sources/", methods=["GET", "DELETE"]) def single_source(source_uuid: str) -> Tuple[flask.Response, int]: if request.method == "GET": source = get_or_404(Source, source_uuid, column=Source.uuid) @@ -130,25 +138,28 @@ def single_source(source_uuid: str) -> Tuple[flask.Response, int]: else: abort(405) - @api.route('/sources//add_star', methods=['POST']) + @api.route("/sources//add_star", methods=["POST"]) def add_star(source_uuid: str) -> Tuple[flask.Response, int]: source = get_or_404(Source, source_uuid, column=Source.uuid) utils.make_star_true(source.filesystem_id) db.session.commit() return jsonify({"message": "Star added"}), 201 - @api.route('/sources//remove_star', methods=['DELETE']) + @api.route("/sources//remove_star", methods=["DELETE"]) def remove_star(source_uuid: str) -> Tuple[flask.Response, int]: source = get_or_404(Source, source_uuid, column=Source.uuid) utils.make_star_false(source.filesystem_id) db.session.commit() return jsonify({"message": "Star removed"}), 200 - @api.route('/sources//flag', methods=['POST']) + @api.route("/sources//flag", methods=["POST"]) def flag(source_uuid: str) -> Tuple[flask.Response, int]: - return jsonify({"message": "Sources no longer need to be flagged for reply"}), 200 + return ( + jsonify({"message": "Sources no longer need to be flagged for reply"}), + 200, + ) - @api.route('/sources//conversation', methods=['DELETE']) + @api.route("/sources//conversation", methods=["DELETE"]) def source_conversation(source_uuid: str) -> Tuple[flask.Response, int]: if request.method == "DELETE": source = get_or_404(Source, source_uuid, column=Source.uuid) @@ -157,7 +168,7 @@ def source_conversation(source_uuid: str) -> Tuple[flask.Response, int]: else: abort(405) - @api.route('/sources//submissions', methods=['GET']) + @api.route("/sources//submissions", methods=["GET"]) def all_source_submissions(source_uuid: str) -> Tuple[flask.Response, int]: source = get_or_404(Source, source_uuid, column=Source.uuid) return ( @@ -171,16 +182,17 @@ def download_submission(source_uuid: str, submission_uuid: str) -> flask.Respons submission = get_or_404(Submission, submission_uuid, column=Submission.uuid) return utils.serve_file_with_etag(submission) - @api.route('/sources//replies//download', - methods=['GET']) + @api.route("/sources//replies//download", methods=["GET"]) def download_reply(source_uuid: str, reply_uuid: str) -> flask.Response: get_or_404(Source, source_uuid, column=Source.uuid) reply = get_or_404(Reply, reply_uuid, column=Reply.uuid) return utils.serve_file_with_etag(reply) - @api.route('/sources//submissions/', - methods=['GET', 'DELETE']) + @api.route( + "/sources//submissions/", + methods=["GET", "DELETE"], + ) def single_submission(source_uuid: str, submission_uuid: str) -> Tuple[flask.Response, int]: if request.method == "GET": get_or_404(Source, source_uuid, column=Source.uuid) @@ -194,11 +206,14 @@ def single_submission(source_uuid: str, submission_uuid: str) -> Tuple[flask.Res else: abort(405) - @api.route('/sources//replies', methods=['GET', 'POST']) + @api.route("/sources//replies", methods=["GET", "POST"]) def all_source_replies(source_uuid: str) -> Tuple[flask.Response, int]: if request.method == "GET": source = get_or_404(Source, source_uuid, column=Source.uuid) - return jsonify({"replies": [reply.to_json() for reply in source.replies]}), 200 + return ( + jsonify({"replies": [reply.to_json() for reply in source.replies]}), + 200, + ) elif request.method == "POST": source = get_or_404(Source, source_uuid, column=Source.uuid) if request.json is None: @@ -262,8 +277,7 @@ def all_source_replies(source_uuid: str) -> Tuple[flask.Response, int]: else: abort(405) - @api.route('/sources//replies/', - methods=['GET', 'DELETE']) + @api.route("/sources//replies/", methods=["GET", "DELETE"]) def single_reply(source_uuid: str, reply_uuid: str) -> Tuple[flask.Response, int]: get_or_404(Source, source_uuid, column=Source.uuid) reply = get_or_404(Reply, reply_uuid, column=Reply.uuid) @@ -275,16 +289,27 @@ def single_reply(source_uuid: str, reply_uuid: str) -> Tuple[flask.Response, int else: abort(405) - @api.route('/submissions', methods=['GET']) + @api.route("/submissions", methods=["GET"]) def get_all_submissions() -> Tuple[flask.Response, int]: submissions = Submission.query.all() - return jsonify({'submissions': [submission.to_json() for - submission in submissions if submission.source]}), 200 + return ( + jsonify( + { + "submissions": [ + submission.to_json() for submission in submissions if submission.source + ] + } + ), + 200, + ) - @api.route('/replies', methods=['GET']) + @api.route("/replies", methods=["GET"]) def get_all_replies() -> Tuple[flask.Response, int]: replies = Reply.query.all() - return jsonify({"replies": [reply.to_json() for reply in replies if reply.source]}), 200 + return ( + jsonify({"replies": [reply.to_json() for reply in replies if reply.source]}), + 200, + ) @api.route("/seen", methods=["POST"]) def seen() -> Tuple[flask.Response, int]: @@ -327,19 +352,19 @@ def seen() -> Tuple[flask.Response, int]: abort(405) - @api.route('/user', methods=['GET']) + @api.route("/user", methods=["GET"]) def get_current_user() -> Tuple[flask.Response, int]: return jsonify(session.get_user().to_json()), 200 - @api.route('/users', methods=['GET']) + @api.route("/users", methods=["GET"]) def get_all_users() -> Tuple[flask.Response, int]: users = Journalist.query.all() return jsonify({"users": [user.to_json(all_info=False) for user in users]}), 200 - @api.route('/logout', methods=['POST']) + @api.route("/logout", methods=["POST"]) def logout() -> Tuple[flask.Response, int]: session.destroy() - return jsonify({'message': 'Your token has been revoked.'}), 200 + return jsonify({"message": "Your token has been revoked."}), 200 def _handle_api_http_exception( error: werkzeug.exceptions.HTTPException, diff --git a/securedrop/journalist_app/col.py b/securedrop/journalist_app/col.py index 31d064a6e11..cda17ba5f7d 100644 --- a/securedrop/journalist_app/col.py +++ b/securedrop/journalist_app/col.py @@ -21,10 +21,20 @@ from flask_babel import gettext from journalist_app.forms import ReplyForm from journalist_app.sessions import session -from journalist_app.utils import (make_star_true, make_star_false, get_source, - delete_collection, col_download_unread, - col_download_all, col_star, col_un_star, - col_delete, col_delete_data, mark_seen) +from journalist_app.utils import ( + col_delete, + col_delete_data, + col_download_all, + col_download_unread, + col_star, + col_un_star, + delete_collection, + get_source, + make_star_false, + make_star_true, + mark_seen, +) +from models import Reply, Submission from sdconfig import SDConfig from sqlalchemy.orm.exc import NoResultFound from store import Storage @@ -155,7 +165,8 @@ def download_single_file(filesystem_id: str, fn: str) -> werkzeug.Response: current_app.logger.error("Could not mark {} as seen: {}".format(fn, e)) return send_file( - Storage.get_default().path(filesystem_id, fn), mimetype="application/pgp-encrypted" + Storage.get_default().path(filesystem_id, fn), + mimetype="application/pgp-encrypted", ) return view diff --git a/securedrop/journalist_app/decorators.py b/securedrop/journalist_app/decorators.py index a3ae61cc197..82fec096ce7 100644 --- a/securedrop/journalist_app/decorators.py +++ b/securedrop/journalist_app/decorators.py @@ -1,13 +1,11 @@ # -*- coding: utf-8 -*- -from typing import Any +from functools import wraps +from typing import Any, Callable -from flask import redirect, url_for, flash +from flask import flash, redirect, url_for from flask_babel import gettext -from functools import wraps from journalist_app.sessions import session -from typing import Callable - def admin_required(func: Callable) -> Callable: @wraps(func) diff --git a/securedrop/journalist_app/main.py b/securedrop/journalist_app/main.py index c756ae629d0..4d3dcbd750d 100644 --- a/securedrop/journalist_app/main.py +++ b/securedrop/journalist_app/main.py @@ -3,13 +3,6 @@ from pathlib import Path from typing import Union -import werkzeug -from flask import (Blueprint, request, current_app, url_for, redirect, - render_template, g, flash, abort, Markup, escape) -from flask_babel import gettext -from sqlalchemy.orm import joinedload -from sqlalchemy.sql import func - import store import werkzeug from db import db @@ -25,14 +18,13 @@ redirect, render_template, request, - session, url_for, ) from flask_babel import gettext from journalist_app.forms import ReplyForm from journalist_app.sessions import session -from journalist_app.utils import (validate_user, bulk_delete, download, - get_source) +from journalist_app.utils import bulk_delete, download, get_source, validate_user +from models import Reply, SeenReply, Source, SourceStar, Submission from sdconfig import SDConfig from sqlalchemy.orm import joinedload from sqlalchemy.sql import func @@ -46,7 +38,9 @@ def make_blueprint(config: SDConfig) -> Blueprint: def login() -> Union[str, werkzeug.Response]: if request.method == "POST": user = validate_user( - request.form["username"], request.form["password"], request.form["token"] + request.form["username"], + request.form["password"], + request.form["token"], ) if user: current_app.logger.info( @@ -60,16 +54,16 @@ def login() -> Union[str, werkzeug.Response]: db.session.add(user) db.session.commit() - session['uid'] = user.id + session["uid"] = user.id session.regenerate() - return redirect(url_for('main.index')) + return redirect(url_for("main.index")) return render_template("login.html") @view.route("/logout") def logout() -> werkzeug.Response: session.destroy() - return redirect(url_for('main.index')) + return redirect(url_for("main.index")) @view.route("/") def index() -> str: @@ -165,14 +159,18 @@ def reply() -> werkzeug.Response: db.session.commit() store.async_add_checksum_for_file(reply, Storage.get_default()) except Exception as exc: - flash(gettext("An unexpected error occurred! Please " "inform your admin."), "error") + flash( + gettext("An unexpected error occurred! Please " "inform your admin."), + "error", + ) # We take a cautious approach to logging here because we're dealing # with responses to sources. It's possible the exception message # could contain information we don't want to write to disk. current_app.logger.error( - "Reply from '{}' (ID {}) failed: {}!".format(session.get_user().username, - session.get_uid(), - exc.__class__)) + "Reply from '{}' (ID {}) failed: {}!".format( + session.get_user().username, session.get_uid(), exc.__class__ + ) + ) else: flash( @@ -227,7 +225,9 @@ def bulk() -> Union[str, werkzeug.Response]: if action == "download": source = get_source(g.filesystem_id) return download( - source.journalist_filename, selected_docs, on_error_redirect=error_redirect + source.journalist_filename, + selected_docs, + on_error_redirect=error_redirect, ) elif action == "delete": return bulk_delete(g.filesystem_id, selected_docs) diff --git a/securedrop/journalist_app/sessions.py b/securedrop/journalist_app/sessions.py index c08b915b65b..4da6b7a6d5f 100644 --- a/securedrop/journalist_app/sessions.py +++ b/securedrop/journalist_app/sessions.py @@ -1,18 +1,18 @@ import typing from datetime import datetime, timedelta, timezone +from json.decoder import JSONDecodeError from secrets import token_urlsafe -from flask_babel import gettext +from typing import Any, Dict, Optional, Tuple + from flask import Flask, Request, Response from flask import current_app as app -from typing import Optional, Any, Dict, Tuple from flask.sessions import SessionInterface as FlaskSessionInterface from flask.sessions import SessionMixin, session_json_serializer -from json.decoder import JSONDecodeError +from flask_babel import gettext +from itsdangerous import BadSignature, URLSafeTimedSerializer +from models import Journalist from redis import Redis from werkzeug.datastructures import CallbackDict -from itsdangerous import URLSafeTimedSerializer, BadSignature - -from models import Journalist class ServerSideSession(CallbackDict, SessionMixin): @@ -21,8 +21,9 @@ class ServerSideSession(CallbackDict, SessionMixin): def __init__(self, sid: str, token: str, lifetime: int = 0, initial: Any = None) -> None: def on_update(self: ServerSideSession) -> None: self.modified = True - if initial and 'uid' in initial: - self.set_uid(initial['uid']) + + if initial and "uid" in initial: + self.set_uid(initial["uid"]) self.set_user() else: self.uid: Optional[int] = None @@ -69,9 +70,7 @@ def logged_in(self) -> bool: return False def destroy( - self, - flash: Optional[Tuple[str, str]] = None, - locale: Optional[str] = None + self, flash: Optional[Tuple[str, str]] = None, locale: Optional[str] = None ) -> None: # The parameters are needed to pass the information to the new session self.locale = locale @@ -85,15 +84,13 @@ def regenerate(self) -> None: class SessionInterface(FlaskSessionInterface): - def _generate_sid(self) -> str: return token_urlsafe(32) def _get_signer(self, app: Flask) -> URLSafeTimedSerializer: if not app.secret_key: - raise RuntimeError('No secret key set') - return URLSafeTimedSerializer(app.secret_key, - salt=self.salt) + raise RuntimeError("No secret key set") + return URLSafeTimedSerializer(app.secret_key, salt=self.salt) """Uses the Redis key-value store as a session backend. @@ -103,18 +100,23 @@ def _get_signer(self, app: Flask) -> URLSafeTimedSerializer: :param header_name: if use_header, set the header name to parse """ - def __init__(self, lifetime: int, renew_count: int, redis: Optional[Redis], - key_prefix: str, salt: str, header_name: str) -> None: + def __init__( + self, + lifetime: int, + renew_count: int, + redis: Redis, + key_prefix: str, + salt: str, + header_name: str, + ) -> None: self.serializer = session_json_serializer - if redis is None: - redis = Redis() self.redis = redis self.lifetime = lifetime self.renew_count = renew_count self.key_prefix = key_prefix - self.api_key_prefix = 'api_' + key_prefix + self.api_key_prefix = "api_" + key_prefix self.salt = salt - self.api_salt = 'api_' + salt + self.api_salt = "api_" + salt self.header_name = header_name self.new = False self.has_same_site_capability = hasattr(self, "get_cookie_samesite") @@ -122,19 +124,16 @@ def __init__(self, lifetime: int, renew_count: int, redis: Optional[Redis], def _new_session(self, is_api: bool = False, initial: Any = None) -> ServerSideSession: sid = self._generate_sid() token: str = self._get_signer(app).dumps(sid) # type: ignore - session = ServerSideSession(sid=sid, - token=token, - lifetime=self.lifetime, - initial=initial) + session = ServerSideSession(sid=sid, token=token, lifetime=self.lifetime, initial=initial) session.new = True session.is_api = is_api return session def open_session(self, app: Flask, request: Request) -> Optional[ServerSideSession]: - '''This function is called by the flask session interface at the + """This function is called by the flask session interface at the beginning of each request. - ''' - is_api = (request.path.split('/')[1] == 'api') + """ + is_api = request.path.split("/")[1] == "api" if is_api: self.key_prefix = self.api_key_prefix @@ -142,7 +141,7 @@ def open_session(self, app: Flask, request: Request) -> Optional[ServerSideSessi auth_header = request.headers.get(self.header_name) if auth_header: split = auth_header.split(" ") - if len(split) != 2 or split[0] != 'Token': + if len(split) != 2 or split[0] != "Token": return self._new_session(is_api) sid: Optional[str] = split[1] else: @@ -160,7 +159,7 @@ def open_session(self, app: Flask, request: Request) -> Optional[ServerSideSessi val = self.redis.get(self.key_prefix + sid) if val is not None: try: - data = self.serializer.loads(val.decode()) + data = self.serializer.loads(val.decode("utf-8")) token: str = self._get_signer(app).dumps(sid) # type: ignore return ServerSideSession(sid=sid, token=token, initial=data) except (JSONDecodeError, NotImplementedError): @@ -171,14 +170,11 @@ def open_session(self, app: Flask, request: Request) -> Optional[ServerSideSessi return self._new_session(is_api, initial={"_flashes": [("error", msg)]}) def save_session( # type: ignore[override] # noqa - self, - app: Flask, - session: ServerSideSession, - response: Response + self, app: Flask, session: ServerSideSession, response: Response ) -> None: - '''This is called at the end of each request, just + """This is called at the end of each request, just before sending the response. - ''' + """ domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) if session.to_destroy: @@ -192,11 +188,11 @@ def save_session( # type: ignore[override] # noqa session = self._new_session(False, initial=initial) expires = self.redis.ttl(name=self.key_prefix + session.sid) if session.new: - session['renew_count'] = self.renew_count + session["renew_count"] = self.renew_count expires = self.lifetime else: - if expires < (30 * 60) and session['renew_count'] > 0: - session['renew_count'] -= 1 + if expires < (30 * 60) and session["renew_count"] > 0: + session["renew_count"] -= 1 expires += self.lifetime session.modified = True conditional_cookie_kwargs = {} @@ -210,28 +206,31 @@ def save_session( # type: ignore[override] # noqa session.sid = self._generate_sid() session.token = self._get_signer(app).dumps(session.sid) # type: ignore if session.new or session.to_regenerate: - self.redis.setex(name=self.key_prefix + session.sid, value=val, - time=expires) + self.redis.setex(name=self.key_prefix + session.sid, value=val, time=expires) elif session.modified: # To prevent race conditions where session is delete by an admin in the middle of a req # accept to save the session object if and only if alrady exists using the xx flag - self.redis.set(name=self.key_prefix + session.sid, value=val, - ex=expires, xx=True) + self.redis.set(name=self.key_prefix + session.sid, value=val, ex=expires, xx=True) if not session.is_api and (session.new or session.to_regenerate): - response.headers.add('Vary', 'Cookie') - response.set_cookie(app.session_cookie_name, session.token, - httponly=httponly, domain=domain, path=path, - secure=secure, **conditional_cookie_kwargs) # type: ignore - - -class Session(object): - - def __init__(self, app: Optional[Flask] = None) -> None: + response.headers.add("Vary", "Cookie") + response.set_cookie( + app.session_cookie_name, + session.token, + httponly=httponly, + domain=domain, + path=path, + secure=secure, + **conditional_cookie_kwargs # type: ignore + ) + + +class Session: + def __init__(self, app: Flask) -> None: self.app = app if app is not None: self.init_app(app) - def init_app(self, app: Flask) -> 'None': + def init_app(self, app: Flask) -> "None": """This is used to set up session for your app object. :param app: the Flask app object with proper configuration. """ @@ -239,40 +238,46 @@ def init_app(self, app: Flask) -> 'None': def _get_interface(self, app: Flask) -> SessionInterface: config = app.config.copy() - config.setdefault('SESSION_REDIS', None) - config.setdefault('SESSION_LIFETIME', 2 * 60 * 60) - config.setdefault('SESSION_RENEW_COUNT', 5) - config.setdefault('SESSION_SIGNER_SALT', 'session') - config.setdefault('SESSION_KEY_PREFIX', 'session:') - config.setdefault('SESSION_HEADER_NAME', 'authorization') + config.setdefault("SESSION_REDIS", Redis()) + config.setdefault("SESSION_LIFETIME", 2 * 60 * 60) + config.setdefault("SESSION_RENEW_COUNT", 5) + config.setdefault("SESSION_SIGNER_SALT", "session") + config.setdefault("SESSION_KEY_PREFIX", "session:") + config.setdefault("SESSION_HEADER_NAME", "authorization") session_interface = SessionInterface( - config['SESSION_LIFETIME'], config['SESSION_RENEW_COUNT'], - config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'], - config['SESSION_SIGNER_SALT'], config['SESSION_HEADER_NAME']) + config["SESSION_LIFETIME"], + config["SESSION_RENEW_COUNT"], + config["SESSION_REDIS"], + config["SESSION_KEY_PREFIX"], + config["SESSION_SIGNER_SALT"], + config["SESSION_HEADER_NAME"], + ) return session_interface -def logout_user(uid: int, is_current: bool = True) -> None: +def logout_user(uid: int) -> None: redis = Redis() - for key in (redis.keys(app.config['SESSION_KEY_PREFIX'] + "*") + - redis.keys("api_" + app.config['SESSION_KEY_PREFIX'] + "*")): + for key in redis.keys(app.config["SESSION_KEY_PREFIX"] + "*") + redis.keys( + "api_" + app.config["SESSION_KEY_PREFIX"] + "*" + ): found = redis.get(key) if found: - sess = session_json_serializer.loads(found.decode()) - if 'uid' in sess and sess['uid'] == uid: + sess = session_json_serializer.loads(found.decode("utf-8")) + if "uid" in sess and sess["uid"] == uid: redis.delete(key) - if is_current: - session.pop('uid') def logout_all() -> None: redis = Redis() - for key in (redis.keys(app.config['SESSION_KEY_PREFIX'] + "*") + - redis.keys("api_" + app.config['SESSION_KEY_PREFIX'] + "*")): + for key in redis.keys(app.config["SESSION_KEY_PREFIX"] + "*") + redis.keys( + "api_" + app.config["SESSION_KEY_PREFIX"] + "*" + ): redis.delete(key) + # Re-export flask.session, but with the correct type information for mypy. from flask import session # noqa + session = typing.cast(ServerSideSession, session) diff --git a/securedrop/journalist_app/utils.py b/securedrop/journalist_app/utils.py index 2a36ce7a475..17f5664670b 100644 --- a/securedrop/journalist_app/utils.py +++ b/securedrop/journalist_app/utils.py @@ -1,32 +1,16 @@ # -*- coding: utf-8 -*- import binascii import os -from typing import Optional, List, Union +from datetime import datetime, timezone +from typing import List, Optional, Union import flask import werkzeug -from flask import (flash, current_app, abort, send_file, redirect, url_for, - Markup, escape) -from flask_babel import gettext, ngettext -from journalist_app.sessions import session -from sqlalchemy.exc import IntegrityError - from db import db from encryption import EncryptionManager -from flask import ( - Markup, - abort, - current_app, - escape, - flash, - g, - redirect, - request, - send_file, - sessions, - url_for, -) +from flask import Markup, abort, current_app, escape, flash, redirect, send_file, url_for from flask_babel import gettext, ngettext +from journalist_app.sessions import session from models import ( HOTP_SECRET_LENGTH, BadTokenException, @@ -57,7 +41,10 @@ def commit_account_changes(user: Journalist) -> None: db.session.add(user) db.session.commit() except Exception as e: - flash(gettext("An unexpected error occurred! Please " "inform your admin."), "error") + flash( + gettext("An unexpected error occurred! Please " "inform your admin."), + "error", + ) current_app.logger.error("Account changes for '{}' failed: {}".format(user, e)) db.session.rollback() else: @@ -170,7 +157,10 @@ def validate_hotp_secret(user: Journalist, otp_secret: str) -> bool: ) return False else: - flash(gettext("An unexpected error occurred! " "Please inform your admin."), "error") + flash( + gettext("An unexpected error occurred! " "Please inform your admin."), + "error", + ) current_app.logger.error( "set_hotp_secret '{}' (id {}) failed: {}".format(otp_secret, user.id, e) ) @@ -444,12 +434,17 @@ def set_name(user: Journalist, first_name: Optional[str], last_name: Optional[st flash(gettext("Name not updated: {message}").format(message=e), "error") -def set_diceware_password(user: Journalist, password: Optional[str]) -> bool: +def set_diceware_password( + user: Journalist, password: Optional[str], admin: Optional[bool] = False +) -> bool: try: # nosemgrep: python.django.security.audit.unvalidated-password.unvalidated-password user.set_password(password) except PasswordError: - flash(gettext("The password you submitted is invalid. Password not changed."), "error") + flash( + gettext("The password you submitted is invalid. Password not changed."), + "error", + ) return False try: @@ -467,9 +462,26 @@ def set_diceware_password(user: Journalist, password: Optional[str]) -> bool: return False # using Markup so the HTML isn't escaped - session.destroy( - ( - 'success', + if not admin: + session.destroy( + ( + "success", + Markup( + "

{message} {password}

".format( + message=Markup.escape( + gettext( + "Password updated. Don't forget to save it in your KeePassX database. " # noqa: E501 + "New password:" + ) + ), + password=Markup.escape("" if password is None else password), + ) + ), + ), + session.get("locale"), + ) + else: + flash( Markup( "

{message} {password}

".format( message=Markup.escape( @@ -478,12 +490,11 @@ def set_diceware_password(user: Journalist, password: Optional[str]) -> bool: "New password:" ) ), - password=Markup.escape("" if password is None else password) + password=Markup.escape("" if password is None else password), ) - ) - ), - session.get('locale') - ) + ), + "success", + ) return True diff --git a/securedrop/models.py b/securedrop/models.py index 716163bbd72..b606f198fdb 100644 --- a/securedrop/models.py +++ b/securedrop/models.py @@ -15,6 +15,8 @@ import qrcode.image.svg from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.kdf import scrypt +from db import db +from encryption import EncryptionManager, GpgKeyNotFoundError from flask import url_for from flask_babel import gettext, ngettext from markupsafe import Markup @@ -150,7 +152,11 @@ def to_json(self) -> "Dict[str, object]": "is_starred": starred, "last_updated": last_updated, "interaction_count": self.interaction_count, - "key": {"type": "PGP", "public": self.public_key, "fingerprint": self.fingerprint}, + "key": { + "type": "PGP", + "public": self.public_key, + "fingerprint": self.fingerprint, + }, "number_of_documents": docs_msg_count["documents"], "number_of_messages": docs_msg_count["messages"], "submissions_url": url_for("api.all_source_submissions", source_uuid=self.uuid), @@ -215,7 +221,9 @@ def to_json(self) -> "Dict[str, Any]": if self.source else None, "submission_url": url_for( - "api.single_submission", source_uuid=self.source.uuid, submission_uuid=self.uuid + "api.single_submission", + source_uuid=self.source.uuid, + submission_uuid=self.uuid, ) if self.source else None, @@ -226,7 +234,9 @@ def to_json(self) -> "Dict[str, Any]": "is_read": self.seen, "uuid": self.uuid, "download_url": url_for( - "api.download_submission", source_uuid=self.source.uuid, submission_uuid=self.uuid + "api.download_submission", + source_uuid=self.source.uuid, + submission_uuid=self.uuid, ) if self.source else None, @@ -411,9 +421,7 @@ class Journalist(db.Model): passphrase_hash = Column(String(256)) login_attempts = relationship( - "JournalistLoginAttempt", - backref="journalist", - cascade="all, delete" + "JournalistLoginAttempt", backref="journalist", cascade="all, delete" ) MIN_USERNAME_LEN = 3 @@ -709,8 +717,8 @@ def login( def to_json(self, all_info: bool = True) -> Dict[str, Any]: """Returns a JSON representation of the journalist user. If all_info is - False, potentially sensitive or extraneous fields are excluded. Note - that both representations do NOT include credentials.""" + False, potentially sensitive or extraneous fields are excluded. Note + that both representations do NOT include credentials.""" json_user = { "username": self.username, "uuid": self.uuid, @@ -807,7 +815,8 @@ class SeenFile(db.Model): file_id = Column(Integer, ForeignKey("submissions.id"), nullable=False) journalist_id = Column(Integer, ForeignKey("journalists.id"), nullable=False) file = relationship( - "Submission", backref=backref("seen_files", lazy="dynamic", cascade="all,delete") + "Submission", + backref=backref("seen_files", lazy="dynamic", cascade="all,delete"), ) journalist = relationship("Journalist", backref=backref("seen_files")) @@ -819,7 +828,8 @@ class SeenMessage(db.Model): message_id = Column(Integer, ForeignKey("submissions.id"), nullable=False) journalist_id = Column(Integer, ForeignKey("journalists.id"), nullable=False) message = relationship( - "Submission", backref=backref("seen_messages", lazy="dynamic", cascade="all,delete") + "Submission", + backref=backref("seen_messages", lazy="dynamic", cascade="all,delete"), ) journalist = relationship("Journalist", backref=backref("seen_messages")) @@ -860,7 +870,10 @@ class InstanceConfig(db.Model): __tablename__ = "instance_config" version = Column(Integer, primary_key=True) valid_until = Column( - DateTime, default=datetime.datetime.fromtimestamp(0), nullable=False, unique=True + DateTime, + default=datetime.datetime.fromtimestamp(0), + nullable=False, + unique=True, ) allow_document_uploads = Column(Boolean, default=True) organization_name = Column(String(255), nullable=True, default="SecureDrop") diff --git a/securedrop/tests/functional/sd_config_v2.py b/securedrop/tests/functional/sd_config_v2.py index cb84d45d95a..49c0e303cea 100644 --- a/securedrop/tests/functional/sd_config_v2.py +++ b/securedrop/tests/functional/sd_config_v2.py @@ -19,6 +19,12 @@ class FlaskAppConfig: # This is recommended for performance, and also resolves #369 USE_X_SENDFILE: bool = True + # additional config for JI Redis sessions + SESSION_SIGNER_SALT: str = "js_session" + SESSION_KEY_PREFIX: str = "js_session:" + SESSION_LIFETIME: int = 2 * 60 * 60 + SESSION_RENEW_COUNT: int = 5 + DEFAULT_SECUREDROP_ROOT = Path(__file__).absolute().parent.parent.parent diff --git a/securedrop/tests/migrations/migration_c5a02eb52f2d.py b/securedrop/tests/migrations/migration_c5a02eb52f2d.py index 2c52790d5e4..22066f20d22 100644 --- a/securedrop/tests/migrations/migration_c5a02eb52f2d.py +++ b/securedrop/tests/migrations/migration_c5a02eb52f2d.py @@ -2,22 +2,20 @@ import random import uuid +from db import db +from journalist_app import create_app from sqlalchemy import text from sqlalchemy.exc import OperationalError -from db import db -from journalist_app import create_app from .helpers import random_chars class Helper: - def __init__(self): self.journalist_id = None class UpgradeTester(Helper): - def __init__(self, config): Helper.__init__(self) self.config = config @@ -25,22 +23,22 @@ def __init__(self, config): def create_journalist(self): params = { - 'uuid': str(uuid.uuid4()), - 'username': random_chars(50), - 'nonce': random.randint(20, 100) + "uuid": str(uuid.uuid4()), + "username": random_chars(50), + "nonce": random.randint(20, 100), } - sql = '''INSERT INTO journalists (uuid, username, session_nonce) - VALUES (:uuid, :username, :nonce)''' + sql = """INSERT INTO journalists (uuid, username, session_nonce) + VALUES (:uuid, :username, :nonce)""" return db.engine.execute(text(sql), **params).lastrowid def add_revoked_token(self): params = { - 'journalist_id': self.journalist_id, - 'token': 'abc123', + "journalist_id": self.journalist_id, + "token": "abc123", } - sql = '''INSERT INTO revoked_tokens (journalist_id, token) + sql = """INSERT INTO revoked_tokens (journalist_id, token) VALUES (:journalist_id, :token) - ''' + """ db.engine.execute(text(sql), **params) def load_data(self): @@ -51,12 +49,12 @@ def load_data(self): def check_upgrade(self): with self.app.app_context(): sql = "SELECT session_nonce FROM journalists WHERE id = :id" - params = {'id': self.journalist_id} + params = {"id": self.journalist_id} try: db.engine.execute(text(sql), **params).fetchall() except OperationalError: pass - sql = 'SELECT * FROM revoked_tokens WHERE id = :id' + sql = "SELECT * FROM revoked_tokens WHERE id = :id" try: db.engine.execute(text(sql), **params).fetchall() except OperationalError: @@ -64,19 +62,15 @@ def check_upgrade(self): class DowngradeTester(Helper): - def __init__(self, config): Helper.__init__(self) self.config = config self.app = create_app(config) def create_journalist(self): - params = { - 'uuid': str(uuid.uuid4()), - 'username': random_chars(50) - } - sql = '''INSERT INTO journalists (uuid, username) - VALUES (:uuid, :username)''' + params = {"uuid": str(uuid.uuid4()), "username": random_chars(50)} + sql = """INSERT INTO journalists (uuid, username) + VALUES (:uuid, :username)""" return db.engine.execute(text(sql), **params).lastrowid def load_data(self): @@ -88,9 +82,9 @@ def check_downgrade(self): sql = "SELECT session_nonce FROM journalists WHERE id = :id" params = {"id": self.journalist_id} res = db.engine.execute(text(sql), **params).fetchone() - assert(isinstance(res['session_nonce'], int)) + assert isinstance(res["session_nonce"], int) sql = """INSERT INTO revoked_tokens (journalist_id, token) VALUES (:journalist_id, :token)""" params = {"journalist_id": self.journalist_id, "token": "abc789"} res = db.engine.execute(text(sql), **params).lastrowid - assert(isinstance(res, int)) + assert isinstance(res, int) diff --git a/securedrop/tests/test_2fa.py b/securedrop/tests/test_2fa.py index 142b273a561..11fab0ecf24 100644 --- a/securedrop/tests/test_2fa.py +++ b/securedrop/tests/test_2fa.py @@ -62,7 +62,9 @@ def test_totp_reuse_protections(journalist_app, test_journo, hardening): resp = app.post( url_for("main.login"), data=dict( - username=test_journo["username"], password=test_journo["password"], token=token + username=test_journo["username"], + password=test_journo["password"], + token=token, ), ) @@ -136,19 +138,8 @@ def test_bad_token_fails_to_verify_on_admin_new_user_two_factor_page( assert resp.status_code == 200 ins.assert_message_flashed( - "There was a problem verifying the two-factor code. Please try again.", "error" - ) - - with journalist_app.test_client() as app: - login_user(app, test_admin) - # Submit the same invalid token again - with InstrumentedApp(journalist_app) as ins: - resp = app.post( - url_for("admin.new_user_two_factor", uid=test_admin["id"]), - data=dict(token=invalid_token), - ) - ins.assert_message_flashed( - "There was a problem verifying the two-factor code. Please try again.", "error" + "There was a problem verifying the two-factor code. Please try again.", + "error", ) @@ -169,15 +160,6 @@ def test_bad_token_fails_to_verify_on_new_user_two_factor_page( assert resp.status_code == 200 ins.assert_message_flashed( - "There was a problem verifying the two-factor code. Please try again.", "error" - ) - - with journalist_app.test_client() as app: - login_user(app, test_journo) - - # Submit the same invalid token again - with InstrumentedApp(journalist_app) as ins: - resp = app.post(url_for("account.new_two_factor"), data=dict(token=invalid_token)) - ins.assert_message_flashed( - "There was a problem verifying the two-factor code. Please try again.", "error" + "There was a problem verifying the two-factor code. Please try again.", + "error", ) diff --git a/securedrop/tests/test_integration.py b/securedrop/tests/test_integration.py index ec64a1dd45c..30b5d9c309d 100644 --- a/securedrop/tests/test_integration.py +++ b/securedrop/tests/test_integration.py @@ -8,19 +8,15 @@ from base64 import b32encode from binascii import unhexlify from io import BytesIO -import mock - -from bs4 import BeautifulSoup -from flask import escape -from pyotp import HOTP, TOTP import journalist_app as journalist_app_module import mock from bs4 import BeautifulSoup from db import db from encryption import EncryptionManager -from store import Storage +from flask import escape from journalist_app.sessions import session +from pyotp import HOTP, TOTP from source_app.session_manager import SessionManager from store import Storage @@ -311,7 +307,9 @@ def _helper_test_reply( resp = app.post( "/bulk", data=dict( - filesystem_id=filesystem_id, action="download", doc_names_selected=checkbox_values + filesystem_id=filesystem_id, + action="download", + doc_names_selected=checkbox_values, ), follow_redirects=True, ) @@ -360,7 +358,11 @@ def _helper_filenames_delete(journalist_app, soup, i): # delete resp = journalist_app.post( "/bulk", - data=dict(filesystem_id=filesystem_id, action="delete", doc_names_selected=checkbox_values), + data=dict( + filesystem_id=filesystem_id, + action="delete", + doc_names_selected=checkbox_values, + ), follow_redirects=True, ) assert resp.status_code == 200 @@ -405,7 +407,12 @@ def test_reply_normal(journalist_app, source_app, test_journo, config): encryption_mgr = EncryptionManager.get_default() with mock.patch.object(encryption_mgr._gpg, "_encoding", "ansi_x3.4_1968"): _helper_test_reply( - journalist_app, source_app, config, test_journo, "This is a test reply.", True + journalist_app, + source_app, + config, + test_journo, + "This is a test reply.", + True, ) @@ -421,7 +428,12 @@ def test_unicode_reply_with_ansi_env(journalist_app, source_app, test_journo, co encryption_mgr = EncryptionManager.get_default() with mock.patch.object(encryption_mgr._gpg, "_encoding", "ansi_x3.4_1968"): _helper_test_reply( - journalist_app, source_app, config, test_journo, "ᚠᛇᚻ᛫ᛒᛦᚦ᛫ᚠᚱᚩᚠᚢᚱ᛫ᚠᛁᚱᚪ᛫ᚷᛖᚻᚹᛦᛚᚳᚢᛗ", True + journalist_app, + source_app, + config, + test_journo, + "ᚠᛇᚻ᛫ᛒᛦᚦ᛫ᚠᚱᚩᚠᚢᚱ᛫ᚠᛁᚱᚪ᛫ᚷᛖᚻᚹᛦᛚᚳᚢᛗ", + True, ) @@ -712,7 +724,9 @@ def test_prevent_document_uploads(source_app, journalist_app, test_admin): prevent_document_uploads=True, min_message_length=0 ) resp = app.post( - "/admin/update-submission-preferences", data=form.data, follow_redirects=True + "/admin/update-submission-preferences", + data=form.data, + follow_redirects=True, ) assert resp.status_code == 200 @@ -742,7 +756,9 @@ def test_no_prevent_document_uploads(source_app, journalist_app, test_admin): prevent_document_uploads=False, min_message_length=0 ) resp = app.post( - "/admin/update-submission-preferences", data=form.data, follow_redirects=True + "/admin/update-submission-preferences", + data=form.data, + follow_redirects=True, ) assert resp.status_code == 200 diff --git a/securedrop/tests/test_journalist.py b/securedrop/tests/test_journalist.py index c810ff9cd7a..b3b97bb3862 100644 --- a/securedrop/tests/test_journalist.py +++ b/securedrop/tests/test_journalist.py @@ -3,11 +3,9 @@ import base64 import binascii import io -import time - -from mock import call import os import random +import time import zipfile from base64 import b64decode from io import BytesIO @@ -21,15 +19,6 @@ from flaky import flaky from flask import current_app, escape, g, url_for from flask_babel import gettext, ngettext -from mock import patch -from pyotp import TOTP -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm.exc import StaleDataError -from sqlalchemy.sql.expression import func -from html import escape as htmlescape - -import journalist_app as journalist_app_module -from encryption import EncryptionManager, GpgKeyNotFoundError from journalist_app.sessions import session from journalist_app.utils import mark_seen from mock import call, patch @@ -73,11 +62,15 @@ def _login_user(app, username, password, otp_secret, success=True): resp = app.post( url_for("main.login"), - data={"username": username, "password": password, "token": TOTP(otp_secret).now()}, + data={ + "username": username, + "password": password, + "token": TOTP(otp_secret).now(), + }, follow_redirects=True, ) assert resp.status_code == 200 - assert ((session.get_user() is not None) == success) + assert (session.get_user() is not None) == success @pytest.mark.parametrize("otp_secret", ["", "GA", "GARBAGE", "JHCOGO7VCER3EJ4"]) @@ -117,13 +110,19 @@ def test_reply_error_logging(journalist_app, test_journo, test_source): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with patch.object(journalist_app.logger, "error") as mocked_error_logger: with patch.object(db.session, "commit", side_effect=exception_class(exception_msg)): resp = app.post( url_for("main.reply"), - data={"filesystem_id": test_source["filesystem_id"], "message": "_"}, + data={ + "filesystem_id": test_source["filesystem_id"], + "message": "_", + }, follow_redirects=True, ) assert resp.status_code == 200 @@ -144,14 +143,20 @@ def test_reply_error_flashed_message(config, journalist_app, test_journo, test_s with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(app) as ins: with patch.object(db.session, "commit", side_effect=exception_class()): resp = app.post( url_for("main.reply", l=locale), - data={"filesystem_id": test_source["filesystem_id"], "message": "_"}, + data={ + "filesystem_id": test_source["filesystem_id"], + "message": "_", + }, follow_redirects=True, ) @@ -166,7 +171,10 @@ def test_reply_error_flashed_message(config, journalist_app, test_journo, test_s def test_empty_replies_are_rejected(config, journalist_app, test_journo, test_source, locale): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) resp = app.post( url_for("main.reply", l=locale), @@ -185,7 +193,10 @@ def test_empty_replies_are_rejected(config, journalist_app, test_journo, test_so def test_nonempty_replies_are_accepted(config, journalist_app, test_journo, test_source, locale): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) resp = app.post( url_for("main.reply", l=locale), @@ -245,7 +256,9 @@ def test_login_throttle(config, journalist_app, test_journo, locale): resp = app.post( url_for("main.login"), data=dict( - username=test_journo["username"], password="invalid", token="invalid" + username=test_journo["username"], + password="invalid", + token="invalid", ), ) assert resp.status_code == 200 @@ -255,7 +268,9 @@ def test_login_throttle(config, journalist_app, test_journo, locale): resp = app.post( url_for("main.login", l=locale), data=dict( - username=test_journo["username"], password="invalid", token="invalid" + username=test_journo["username"], + password="invalid", + token="invalid", ), ) assert page_language(resp.data) == language_tag(locale) @@ -296,7 +311,9 @@ def test_login_throttle_is_not_global(config, journalist_app, test_journo, test_ resp = app.post( url_for("main.login", l=locale), data=dict( - username=test_journo["username"], password="invalid", token="invalid" + username=test_journo["username"], + password="invalid", + token="invalid", ), ) assert page_language(resp.data) == language_tag(locale) @@ -307,7 +324,9 @@ def test_login_throttle_is_not_global(config, journalist_app, test_journo, test_ resp = app.post( url_for("main.login", l=locale), data=dict( - username=test_journo["username"], password="invalid", token="invalid" + username=test_journo["username"], + password="invalid", + token="invalid", ), ) assert page_language(resp.data) == language_tag(locale) @@ -488,7 +507,10 @@ def test_admin_logout_redirects_to_index(journalist_app, test_admin): with journalist_app.test_client() as app: with InstrumentedApp(journalist_app) as ins: _login_user( - app, test_admin["username"], test_admin["password"], test_admin["otp_secret"] + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], ) resp = app.get(url_for("main.logout")) ins.assert_redirects(resp, url_for("main.index")) @@ -498,7 +520,10 @@ def test_user_logout_redirects_to_index(journalist_app, test_journo): with journalist_app.test_client() as app: with InstrumentedApp(journalist_app) as ins: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) resp = app.get(url_for("main.logout")) ins.assert_redirects(resp, url_for("main.index")) @@ -508,7 +533,12 @@ def test_user_logout_redirects_to_index(journalist_app, test_journo): @pytest.mark.parametrize("locale", get_test_locales()) def test_admin_index(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.get(url_for("admin.index", l=locale)) assert page_language(resp.data) == language_tag(locale) msgids = ["Admin Interface"] @@ -524,7 +554,12 @@ def test_admin_delete_user(config, journalist_app, test_admin, test_journo, loca assert Journalist.query.get(test_journo["id"]) is not None with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) with InstrumentedApp(journalist_app) as ins: resp = app.post( url_for("admin.delete_user", user_id=test_journo["id"], l=locale), @@ -535,7 +570,8 @@ def test_admin_delete_user(config, journalist_app, test_admin, test_journo, loca msgids = ["Deleted user '{user}'."] with xfail_untranslated_messages(config, locale, msgids): ins.assert_message_flashed( - gettext(msgids[0]).format(user=test_journo["username"]), "notification" + gettext(msgids[0]).format(user=test_journo["username"]), + "notification", ) # Verify journalist is no longer in the database @@ -551,9 +587,15 @@ def test_admin_cannot_delete_self(config, journalist_app, test_admin, test_journ assert Journalist.query.get(test_journo["id"]) is not None with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( - url_for("admin.delete_user", user_id=test_admin["id"], l=locale), follow_redirects=True + url_for("admin.delete_user", user_id=test_admin["id"], l=locale), + follow_redirects=True, ) # Assert correct interface behavior @@ -593,7 +635,12 @@ def test_admin_edits_user_password_success_response( config, journalist_app, test_admin, test_journo, locale ): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.new_password", user_id=test_journo["id"], l=locale), @@ -619,13 +666,19 @@ def test_admin_edits_user_password_session_invalidate( # Start the journalist session. with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) # Change the journalist password via an admin session. with journalist_app.test_client() as admin_app: _login_user( - admin_app, test_admin["username"], test_admin["password"], test_admin["otp_secret"] + admin_app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], ) resp = admin_app.post( @@ -649,7 +702,12 @@ def test_admin_deletes_invalid_user_404(journalist_app, test_admin): invalid_id = db.session.query(func.max(Journalist.id)).scalar() + 1 with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post(url_for("admin.delete_user", user_id=invalid_id)) assert resp.status_code == 404 @@ -661,7 +719,12 @@ def test_admin_deletes_deleted_user_403(journalist_app, test_admin): deleted_id = deleted.id with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post(url_for("admin.delete_user", user_id=deleted_id)) assert resp.status_code == 403 @@ -672,7 +735,12 @@ def test_admin_edits_user_password_error_response( config, journalist_app, test_admin, test_journo, locale ): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) with patch("sqlalchemy.orm.scoping.scoped_session.commit", side_effect=Exception()): with InstrumentedApp(journalist_app) as ins: @@ -703,13 +771,18 @@ def test_user_edits_password_success_response(config, journalist_app, test_journ with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) token = TOTP(test_journo["otp_secret"]).now() resp = app.post( url_for("account.new_password", l=locale), data=dict( - current_password=test_journo["password"], token=token, password=VALID_PASSWORD_2 + current_password=test_journo["password"], + token=token, + password=VALID_PASSWORD_2, ), follow_redirects=True, ) @@ -734,7 +807,10 @@ def test_user_edits_password_expires_session(journalist_app, test_journo): models.LOGIN_HARDENING = False with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) assert "uid" in session @@ -752,7 +828,7 @@ def test_user_edits_password_expires_session(journalist_app, test_journo): ins.assert_redirects(resp, url_for("main.login")) # verify the session was expired after the password was changed - assert (session.uid is None and session.user is None) + assert session.uid is None and session.user is None finally: models.LOGIN_HARDENING = original_hardening @@ -769,7 +845,10 @@ def test_user_edits_password_error_response(config, journalist_app, test_journo, with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) # patch token verification because there are multiple commits @@ -806,7 +885,10 @@ def test_user_edits_password_error_response(config, journalist_app, test_journo, def test_admin_add_user_when_username_already_taken(config, journalist_app, test_admin, locale): with journalist_app.test_client() as client: _login_user( - client, test_admin["username"], test_admin["password"], test_admin["otp_secret"] + client, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: resp = client.post( @@ -876,7 +958,12 @@ def test_admin_edits_user_password_too_long_warning(journalist_app, test_admin, ) with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) with InstrumentedApp(journalist_app) as ins: app.post( url_for("admin.new_password", user_id=test_journo["id"]), @@ -891,7 +978,8 @@ def test_admin_edits_user_password_too_long_warning(journalist_app, test_admin, ) ins.assert_message_flashed( - "The password you submitted is invalid. " "Password not changed.", "error" + "The password you submitted is invalid. " "Password not changed.", + "error", ) @@ -902,7 +990,10 @@ def test_user_edits_password_too_long_warning(journalist_app, test_journo): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: @@ -921,7 +1012,8 @@ def test_user_edits_password_too_long_warning(journalist_app, test_journo): ) ins.assert_message_flashed( - "The password you submitted is invalid. " "Password not changed.", "error" + "The password you submitted is invalid. " "Password not changed.", + "error", ) @@ -932,7 +1024,12 @@ def test_admin_add_user_password_too_long_warning(config, journalist_app, test_a Journalist.MAX_PASSWORD_LEN - len(VALID_PASSWORD) + 1 ) with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) with InstrumentedApp(journalist_app) as ins: resp = app.post( @@ -961,7 +1058,12 @@ def test_admin_add_user_password_too_long_warning(config, journalist_app, test_a def test_admin_add_user_first_name_too_long_warning(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: overly_long_name = "a" * (Journalist.MAX_NAME_LEN + 1) - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user", l=locale), @@ -989,7 +1091,12 @@ def test_admin_add_user_first_name_too_long_warning(config, journalist_app, test def test_admin_add_user_last_name_too_long_warning(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: overly_long_name = "a" * (Journalist.MAX_NAME_LEN + 1) - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user", l=locale), @@ -1021,7 +1128,12 @@ def test_admin_edits_user_invalid_username_deleted( username to deleted""" new_username = "deleted" with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) with InstrumentedApp(journalist_app) as ins: resp = app.post( @@ -1044,7 +1156,12 @@ def test_admin_edits_user_invalid_username_deleted( def test_admin_resets_user_hotp_format_non_hexa(journalist_app, test_admin, test_journo): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) journo = test_journo["journalist"] # guard to ensure check below tests the correct condition @@ -1080,7 +1197,12 @@ def test_admin_resets_user_hotp_format_too_short( ): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) journo = test_journo["journalist"] # guard to ensure check below tests the correct condition @@ -1112,7 +1234,12 @@ def test_admin_resets_user_hotp_format_too_short( def test_admin_resets_user_hotp(journalist_app, test_admin, test_journo): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) journo = test_journo["journalist"] old_secret = journo.otp_secret @@ -1143,9 +1270,17 @@ def test_admin_resets_user_hotp_error(mocker, journalist_app, test_admin, test_j old_secret = test_journo["otp_secret"] with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) - mocker.patch("models.Journalist.set_hotp_secret", side_effect=binascii.Error(error_message)) + mocker.patch( + "models.Journalist.set_hotp_secret", + side_effect=binascii.Error(error_message), + ) with InstrumentedApp(journalist_app) as ins: app.post( @@ -1178,12 +1313,16 @@ def test_user_resets_hotp(journalist_app, test_journo): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: resp = app.post( - url_for("account.reset_two_factor_hotp"), data=dict(otp_secret=new_secret) + url_for("account.reset_two_factor_hotp"), + data=dict(otp_secret=new_secret), ) # should redirect to verification page ins.assert_redirects(resp, url_for("account.new_two_factor")) @@ -1201,12 +1340,16 @@ def test_user_resets_user_hotp_format_non_hexa(journalist_app, test_journo): non_hexa_secret = "0123456789ABCDZZ0123456789ABCDEF01234567" with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: app.post( - url_for("account.reset_two_factor_hotp"), data=dict(otp_secret=non_hexa_secret) + url_for("account.reset_two_factor_hotp"), + data=dict(otp_secret=non_hexa_secret), ) ins.assert_message_flashed( "Invalid HOTP secret format: " "please only submit letters A-F and numbers 0-9.", @@ -1228,13 +1371,22 @@ def test_user_resets_user_hotp_error(mocker, journalist_app, test_journo): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) - mocker.patch("models.Journalist.set_hotp_secret", side_effect=binascii.Error(error_message)) + mocker.patch( + "models.Journalist.set_hotp_secret", + side_effect=binascii.Error(error_message), + ) with InstrumentedApp(journalist_app) as ins: - app.post(url_for("account.reset_two_factor_hotp"), data=dict(otp_secret=bad_secret)) + app.post( + url_for("account.reset_two_factor_hotp"), + data=dict(otp_secret=bad_secret), + ) ins.assert_message_flashed( "An unexpected error occurred! Please inform your " "admin.", "error" ) @@ -1255,7 +1407,12 @@ def test_admin_resets_user_totp(journalist_app, test_admin, test_journo): old_secret = test_journo["otp_secret"] with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) with InstrumentedApp(journalist_app) as ins: resp = app.post( @@ -1275,7 +1432,10 @@ def test_user_resets_totp(journalist_app, test_journo): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: @@ -1294,7 +1454,12 @@ def test_user_resets_totp(journalist_app, test_journo): @pytest.mark.parametrize("locale", get_test_locales()) def test_admin_resets_hotp_with_missing_otp_secret_key(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.reset_two_factor_hotp", l=locale), data=dict(uid=test_admin["id"]), @@ -1308,7 +1473,12 @@ def test_admin_resets_hotp_with_missing_otp_secret_key(config, journalist_app, t def test_admin_new_user_2fa_redirect(journalist_app, test_admin, test_journo): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) with InstrumentedApp(journalist_app) as ins: resp = app.post( url_for("admin.new_user_two_factor", uid=test_journo["id"]), @@ -1323,7 +1493,12 @@ def test_http_get_on_admin_new_user_two_factor_page( config, journalist_app, test_admin, test_journo, locale ): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.get( url_for("admin.new_user_two_factor", uid=test_journo["id"], l=locale), ) @@ -1338,7 +1513,12 @@ def test_http_get_on_admin_new_user_two_factor_page( @pytest.mark.parametrize("locale", get_test_locales()) def test_http_get_on_admin_add_user_page(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.get(url_for("admin.add_user", l=locale)) assert page_language(resp.data) == language_tag(locale) msgids = ["ADD USER"] @@ -1350,7 +1530,12 @@ def test_admin_add_user(journalist_app, test_admin): username = "dellsberg" with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) with InstrumentedApp(journalist_app) as ins: resp = app.post( @@ -1375,7 +1560,12 @@ def test_admin_add_user_with_invalid_username(config, journalist_app, test_admin username = "deleted" with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user", l=locale), @@ -1450,7 +1640,12 @@ def test_deleted_user_cannot_login_exception(journalist_app, locale): @pytest.mark.parametrize("locale", get_test_locales()) def test_admin_add_user_without_username(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user", l=locale), @@ -1476,7 +1671,12 @@ def test_admin_add_user_too_short_username(config, journalist_app, test_admin, l username = "a" * (Journalist.MIN_USERNAME_LEN - 1) with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user", l=locale), @@ -1512,7 +1712,12 @@ def test_admin_add_user_too_short_username(config, journalist_app, test_admin, l ) def test_admin_add_user_yubikey_odd_length(journalist_app, test_admin, locale, secret): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user", l=locale), @@ -1540,11 +1745,17 @@ def test_admin_add_user_yubikey_odd_length(journalist_app, test_admin, locale, s @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize( - "locale, secret", ((locale, " " * i) for locale in get_test_locales() for i in range(3)) + "locale, secret", + ((locale, " " * i) for locale in get_test_locales() for i in range(3)), ) def test_admin_add_user_yubikey_blank_secret(journalist_app, test_admin, locale, secret): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user", l=locale), @@ -1573,7 +1784,12 @@ def test_admin_add_user_yubikey_valid_length(journalist_app, test_admin, locale) otp = "1234567890123456789012345678901234567890" with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user", l=locale), @@ -1602,7 +1818,12 @@ def test_admin_add_user_yubikey_correct_length_with_whitespace(journalist_app, t otp = "12 34 56 78 90 12 34 56 78 90 12 34 56 78 90 12 34 56 78 90" with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user", l=locale), @@ -1629,7 +1850,12 @@ def test_admin_sets_user_to_admin(journalist_app, test_admin): new_user = "admin-set-user-to-admin-test" with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user"), @@ -1662,7 +1888,12 @@ def test_admin_renames_user(journalist_app, test_admin): new_user = "admin-renames-user-test" with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user"), @@ -1694,7 +1925,12 @@ def test_admin_adds_first_name_last_name_to_user(journalist_app, test_admin): new_user = "admin-first-name-last-name-user-test" with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) resp = app.post( url_for("admin.add_user"), @@ -1728,7 +1964,10 @@ def test_admin_adds_invalid_first_last_name_to_user(config, journalist_app, test new_user = "admin-invalid-first-name-last-name-user-test" _login_user( - client, test_admin["username"], test_admin["password"], test_admin["otp_secret"] + client, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], ) resp = client.post( @@ -1750,7 +1989,11 @@ def test_admin_adds_invalid_first_last_name_to_user(config, journalist_app, test with InstrumentedApp(journalist_app) as ins: resp = client.post( url_for("admin.edit_user", user_id=journo.id, l=locale), - data=dict(username=new_user, first_name=overly_long_name, last_name="test name"), + data=dict( + username=new_user, + first_name=overly_long_name, + last_name="test name", + ), follow_redirects=True, ) assert resp.status_code == 200 @@ -1773,7 +2016,12 @@ def test_admin_add_user_integrity_error(config, journalist_app, test_admin, mock ) with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) with InstrumentedApp(journalist_app) as ins: resp = app.post( @@ -1805,12 +2053,19 @@ def test_admin_add_user_integrity_error(config, journalist_app, test_admin, mock @pytest.mark.parametrize("locale", get_test_locales()) def test_prevent_document_uploads(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) form = journalist_app_module.forms.SubmissionPreferencesForm( prevent_document_uploads=True, min_message_length=0 ) app.post( - url_for("admin.update_submission_preferences"), data=form.data, follow_redirects=True + url_for("admin.update_submission_preferences"), + data=form.data, + follow_redirects=True, ) assert InstanceConfig.get_current().allow_document_uploads is False with InstrumentedApp(journalist_app) as ins: @@ -1829,10 +2084,17 @@ def test_prevent_document_uploads(config, journalist_app, test_admin, locale): @pytest.mark.parametrize("locale", get_test_locales()) def test_no_prevent_document_uploads(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) form = journalist_app_module.forms.SubmissionPreferencesForm(min_message_length=0) app.post( - url_for("admin.update_submission_preferences"), data=form.data, follow_redirects=True + url_for("admin.update_submission_preferences"), + data=form.data, + follow_redirects=True, ) assert InstanceConfig.get_current().allow_document_uploads is True with InstrumentedApp(journalist_app) as ins: @@ -1850,7 +2112,12 @@ def test_no_prevent_document_uploads(config, journalist_app, test_admin, locale) def test_prevent_document_uploads_invalid(journalist_app, test_admin): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) form_true = journalist_app_module.forms.SubmissionPreferencesForm( prevent_document_uploads=True, min_message_length=0 ) @@ -1876,7 +2143,12 @@ def test_prevent_document_uploads_invalid(journalist_app, test_admin): def test_message_filtering(config, journalist_app, test_admin): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) # Assert status quo assert InstanceConfig.get_current().initial_message_min_len == 0 @@ -1885,7 +2157,9 @@ def test_message_filtering(config, journalist_app, test_admin): prevent_short_messages=False, min_message_length=10 ) app.post( - url_for("admin.update_submission_preferences"), data=form.data, follow_redirects=True + url_for("admin.update_submission_preferences"), + data=form.data, + follow_redirects=True, ) # Still 0 assert InstanceConfig.get_current().initial_message_min_len == 0 @@ -1895,7 +2169,9 @@ def test_message_filtering(config, journalist_app, test_admin): prevent_short_messages=True, min_message_length=0 ) resp = app.post( - url_for("admin.update_submission_preferences"), data=form.data, follow_redirects=True + url_for("admin.update_submission_preferences"), + data=form.data, + follow_redirects=True, ) # Still 0 assert InstanceConfig.get_current().initial_message_min_len == 0 @@ -1907,7 +2183,9 @@ def test_message_filtering(config, journalist_app, test_admin): prevent_short_messages=True, min_message_length=10 ) app.post( - url_for("admin.update_submission_preferences"), data=form.data, follow_redirects=True + url_for("admin.update_submission_preferences"), + data=form.data, + follow_redirects=True, ) assert InstanceConfig.get_current().initial_message_min_len == 10 @@ -1923,7 +2201,9 @@ def test_message_filtering(config, journalist_app, test_admin): assert InstanceConfig.get_current().reject_message_with_codename is False form = journalist_app_module.forms.SubmissionPreferencesForm(reject_codename_messages=True) app.post( - url_for("admin.update_submission_preferences"), data=form.data, follow_redirects=True + url_for("admin.update_submission_preferences"), + data=form.data, + follow_redirects=True, ) assert InstanceConfig.get_current().reject_message_with_codename is True @@ -1936,7 +2216,10 @@ class dummy_current: with journalist_app.test_client() as app: iMock.return_value = dummy_current() _login_user( - app, test_admin["username"], test_admin["password"], test_admin["otp_secret"] + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], ) assert g.organization_name == "SecureDrop" @@ -1946,12 +2229,19 @@ class dummy_current: def test_orgname_valid_succeeds(config, journalist_app, test_admin, locale): test_name = "Walden Inquirer" with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) form = journalist_app_module.forms.OrgNameForm(organization_name=test_name) assert InstanceConfig.get_current().organization_name == "SecureDrop" with InstrumentedApp(journalist_app) as ins: resp = app.post( - url_for("admin.update_org_name", l=locale), data=form.data, follow_redirects=True + url_for("admin.update_org_name", l=locale), + data=form.data, + follow_redirects=True, ) assert page_language(resp.data) == language_tag(locale) msgids = ["Preferences saved."] @@ -1964,12 +2254,19 @@ def test_orgname_valid_succeeds(config, journalist_app, test_admin, locale): @pytest.mark.parametrize("locale", get_test_locales()) def test_orgname_null_fails(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) form = journalist_app_module.forms.OrgNameForm(organization_name=None) assert InstanceConfig.get_current().organization_name == "SecureDrop" with InstrumentedApp(journalist_app) as ins: resp = app.post( - url_for("admin.update_org_name", l=locale), data=form.data, follow_redirects=True + url_for("admin.update_org_name", l=locale), + data=form.data, + follow_redirects=True, ) assert page_language(resp.data) == language_tag(locale) msgids = ["This field is required."] @@ -1983,11 +2280,18 @@ def test_orgname_null_fails(config, journalist_app, test_admin, locale): def test_orgname_oversized_fails(config, journalist_app, test_admin, locale): test_name = "1234567812345678123456781234567812345678123456781234567812345678a" with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) form = journalist_app_module.forms.OrgNameForm(organization_name=test_name) assert InstanceConfig.get_current().organization_name == "SecureDrop" resp = app.post( - url_for("admin.update_org_name", l=locale), data=form.data, follow_redirects=True + url_for("admin.update_org_name", l=locale), + data=form.data, + follow_redirects=True, ) assert page_language(resp.data) == language_tag(locale) msgids = ["Cannot be longer than {num} character."] @@ -2029,13 +2333,20 @@ def test_logo_upload_with_valid_image_succeeds(config, journalist_app, test_admi with journalist_app.test_client() as app: _login_user( - app, test_admin["username"], test_admin["password"], test_admin["otp_secret"] + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], ) # Create 1px * 1px 'white' PNG file from its base64 string form = journalist_app_module.forms.LogoForm(logo=(BytesIO(logo_bytes), "test.png")) + # Create 1px * 1px 'white' PNG file from its base64 string + form = journalist_app_module.forms.LogoForm(logo=(BytesIO(logo_bytes), "test.png")) with InstrumentedApp(journalist_app) as ins: resp = app.post( - url_for("admin.manage_config", l=locale), data=form.data, follow_redirects=True + url_for("admin.manage_config", l=locale), + data=form.data, + follow_redirects=True, ) assert page_language(resp.data) == language_tag(locale) msgids = ["Image updated."] @@ -2058,12 +2369,19 @@ def test_logo_upload_with_valid_image_succeeds(config, journalist_app, test_admi @pytest.mark.parametrize("locale", get_test_locales()) def test_logo_upload_with_invalid_filetype_fails(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) form = journalist_app_module.forms.LogoForm(logo=(BytesIO(b"filedata"), "bad.exe")) with InstrumentedApp(journalist_app) as ins: resp = app.post( - url_for("admin.manage_config", l=locale), data=form.data, follow_redirects=True + url_for("admin.manage_config", l=locale), + data=form.data, + follow_redirects=True, ) assert page_language(resp.data) == language_tag(locale) @@ -2083,7 +2401,10 @@ def test_logo_upload_save_fails(config, journalist_app, test_admin, locale): try: with journalist_app.test_client() as app: _login_user( - app, test_admin["username"], test_admin["password"], test_admin["otp_secret"] + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], ) # Create 1px * 1px 'white' PNG file from its base64 string form = journalist_app_module.forms.LogoForm( @@ -2119,7 +2440,12 @@ def test_logo_upload_save_fails(config, journalist_app, test_admin, locale): def test_creation_of_ossec_test_log_event(journalist_app, test_admin, mocker): mocked_error_logger = mocker.patch("journalist.app.logger.error") with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) app.post(url_for("admin.ossec_test")) mocked_error_logger.assert_called_once_with("This is a test OSSEC alert") @@ -2129,13 +2455,20 @@ def test_creation_of_ossec_test_log_event(journalist_app, test_admin, mocker): @pytest.mark.parametrize("locale", get_test_locales()) def test_logo_upload_with_empty_input_field_fails(config, journalist_app, test_admin, locale): with journalist_app.test_client() as app: - _login_user(app, test_admin["username"], test_admin["password"], test_admin["otp_secret"]) + _login_user( + app, + test_admin["username"], + test_admin["password"], + test_admin["otp_secret"], + ) form = journalist_app_module.forms.LogoForm(logo=(BytesIO(b""), "")) with InstrumentedApp(journalist_app) as ins: resp = app.post( - url_for("admin.manage_config", l=locale), data=form.data, follow_redirects=True + url_for("admin.manage_config", l=locale), + data=form.data, + follow_redirects=True, ) assert page_language(resp.data) == language_tag(locale) @@ -2153,7 +2486,10 @@ def test_admin_page_restriction_http_gets(journalist_app, test_journo): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) for admin_url in admin_urls: resp = app.get(admin_url) @@ -2173,7 +2509,10 @@ def test_admin_page_restriction_http_posts(journalist_app, test_journo): ] with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) for admin_url in admin_urls: resp = app.post(admin_url) @@ -2218,7 +2557,10 @@ def test_user_authorization_for_posts(journalist_app): def test_incorrect_current_password_change(config, journalist_app, test_journo, locale): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: resp = app.post( @@ -2315,7 +2657,10 @@ def test_too_long_user_password_change(config, journalist_app, test_journo, loca with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: @@ -2340,7 +2685,10 @@ def test_too_long_user_password_change(config, journalist_app, test_journo, loca def test_valid_user_password_change(config, journalist_app, test_journo, locale): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) resp = app.post( @@ -2366,7 +2714,10 @@ def test_valid_user_password_change(config, journalist_app, test_journo, locale) def test_valid_user_first_last_name_change(config, journalist_app, test_journo, locale): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: @@ -2390,7 +2741,10 @@ def test_valid_user_invalid_first_last_name_change(journalist_app, test_journo, with journalist_app.test_client() as app: overly_long_name = "a" * (Journalist.MAX_NAME_LEN + 1) _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: @@ -2412,7 +2766,10 @@ def test_regenerate_totp(journalist_app, test_journo): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: @@ -2433,12 +2790,16 @@ def test_edit_hotp(journalist_app, test_journo): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: resp = app.post( - url_for("account.reset_two_factor_hotp"), data=dict(otp_secret=valid_secret) + url_for("account.reset_two_factor_hotp"), + data=dict(otp_secret=valid_secret), ) new_secret = Journalist.query.get(test_journo["id"]).otp_secret @@ -2682,7 +3043,9 @@ def test_login_with_invalid_password_doesnt_call_argon2(mocker, test_journo): def test_valid_login_calls_argon2(mocker, test_journo): mock_argon2 = mocker.patch("models.argon2.verify") Journalist.login( - test_journo["username"], test_journo["password"], TOTP(test_journo["otp_secret"]).now() + test_journo["username"], + test_journo["password"], + TOTP(test_journo["otp_secret"]).now(), ) assert mock_argon2.called @@ -2707,7 +3070,10 @@ def test_render_locales(config, journalist_app, test_journo, test_source): with app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) resp = app.get(url + "?l=fr_FR") @@ -2934,7 +3300,12 @@ def selected_missing_files(journalist_app, test_source, app_storage): def test_download_selected_submissions_missing_files( - journalist_app, test_journo, test_source, mocker, selected_missing_files, app_storage + journalist_app, + test_journo, + test_source, + mocker, + selected_missing_files, + app_storage, ): """Tests download of selected submissions with missing files in storage.""" mocked_error_logger = mocker.patch("journalist.app.logger.error") @@ -2967,7 +3338,12 @@ def test_download_selected_submissions_missing_files( def test_download_single_submission_missing_file( - journalist_app, test_journo, test_source, mocker, selected_missing_files, app_storage + journalist_app, + test_journo, + test_source, + mocker, + selected_missing_files, + app_storage, ): """Tests download of single submissions with missing files in storage.""" mocked_error_logger = mocker.patch("journalist.app.logger.error") @@ -3150,7 +3526,10 @@ def test_download_all_selected_sources(journalist_app, test_journo, app_storage) def test_single_source_is_successfully_starred(journalist_app, test_journo, test_source): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) with InstrumentedApp(journalist_app) as ins: resp = app.post(url_for("col.add_star", filesystem_id=test_source["filesystem_id"])) @@ -3164,7 +3543,10 @@ def test_single_source_is_successfully_starred(journalist_app, test_journo, test def test_single_source_is_successfully_unstarred(journalist_app, test_journo, test_source): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) # First star the source app.post(url_for("col.add_star", filesystem_id=test_source["filesystem_id"])) @@ -3197,18 +3579,18 @@ def test_journalist_session_expiration(config, journalist_app, test_journo, loca # Wait 2s for the redis key to expire time.sleep(2) - resp = app.get(url_for('account.edit'), follow_redirects=True) + resp = app.get(url_for("account.edit"), follow_redirects=True) # because the session is being cleared when it expires, the # response should always be in English. - assert page_language(resp.data) == 'en-US' - assert 'Login to access the journalist interface' in resp.data.decode('utf-8') + assert page_language(resp.data) == "en-US" + assert "Login to access the journalist interface" in resp.data.decode("utf-8") # check that the session was cleared (apart from 'expires' # which is always present and 'csrf_token' which leaks no info) - session.pop('expires', None) - session.pop('csrf_token', None) - session.pop('locale', None) - session.pop('renew_count', None) + session.pop("expires", None) + session.pop("csrf_token", None) + session.pop("locale", None) + session.pop("renew_count", None) assert not session, session @@ -3241,10 +3623,16 @@ def test_col_process_aborts_with_bad_action(journalist_app, test_journo): """If the action is not a valid choice, a 500 should occur""" with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) - form_data = {"cols_selected": "does not matter", "action": "this action does not exist"} + form_data = { + "cols_selected": "does not matter", + "action": "this action does not exist", + } resp = app.post(url_for("col.process"), data=form_data) assert resp.status_code == 500 @@ -3263,7 +3651,10 @@ def test_col_process_successfully_deletes_multiple_sources( with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) form_data = { @@ -3291,7 +3682,10 @@ def test_col_process_successfully_stars_sources( with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) form_data = {"cols_selected": [test_source["filesystem_id"]], "action": "star"} @@ -3310,7 +3704,10 @@ def test_col_process_successfully_unstars_sources( with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) # First star the source @@ -3318,7 +3715,10 @@ def test_col_process_successfully_unstars_sources( app.post(url_for("col.process"), data=form_data, follow_redirects=True) # Now unstar the source - form_data = {"cols_selected": [test_source["filesystem_id"]], "action": "un-star"} + form_data = { + "cols_selected": [test_source["filesystem_id"]], + "action": "un-star", + } resp = app.post(url_for("col.process"), data=form_data, follow_redirects=True) assert resp.status_code == 200 @@ -3337,7 +3737,10 @@ def test_source_with_null_last_updated(journalist_app, test_journo, test_files): with journalist_app.test_client() as app: _login_user( - app, test_journo["username"], test_journo["password"], test_journo["otp_secret"] + app, + test_journo["username"], + test_journo["password"], + test_journo["otp_secret"], ) resp = app.get(url_for("main.index")) assert resp.status_code == 200 diff --git a/securedrop/tests/test_journalist_api.py b/securedrop/tests/test_journalist_api.py index 4f1c237d677..818094df319 100644 --- a/securedrop/tests/test_journalist_api.py +++ b/securedrop/tests/test_journalist_api.py @@ -2,19 +2,13 @@ import json import random from datetime import datetime -from flask import url_for -from pyotp import TOTP from uuid import UUID, uuid4 from db import db from encryption import EncryptionManager -from models import ( - Journalist, - Reply, - Source, - SourceStar, - Submission, -) +from flask import url_for +from models import Journalist, Reply, Source, SourceStar, Submission +from pyotp import TOTP from .utils.api_helper import get_api_headers @@ -48,25 +42,32 @@ def test_unauthenticated_user_gets_all_endpoints(journalist_app): def test_valid_user_can_get_an_api_token(journalist_app, test_journo): with journalist_app.test_client() as app: - valid_token = TOTP(test_journo['otp_secret']).now() - response = app.post(url_for('api.get_token'), - data=json.dumps( - {'username': test_journo['username'], - 'passphrase': test_journo['password'], - 'one_time_code': valid_token}), - headers=get_api_headers()) - - assert response.json['journalist_uuid'] == test_journo['uuid'] + valid_token = TOTP(test_journo["otp_secret"]).now() + response = app.post( + url_for("api.get_token"), + data=json.dumps( + { + "username": test_journo["username"], + "passphrase": test_journo["password"], + "one_time_code": valid_token, + } + ), + headers=get_api_headers(), + ) + + assert response.json["journalist_uuid"] == test_journo["uuid"] assert response.status_code == 200 assert response.json["journalist_first_name"] == test_journo["first_name"] assert response.json["journalist_last_name"] == test_journo["last_name"] assert_valid_timestamp(response.json["expiration"]) - response = app.get(url_for('api.get_current_user'), - headers=get_api_headers(response.json['token'])) + response = app.get( + url_for("api.get_current_user"), + headers=get_api_headers(response.json["token"]), + ) assert response.status_code == 200 - assert response.json['uuid'] == test_journo['uuid'] + assert response.json["uuid"] == test_journo["uuid"] def test_user_cannot_get_an_api_token_with_wrong_password(journalist_app, test_journo): @@ -139,7 +140,10 @@ def test_user_cannot_get_an_api_token_with_no_otp_field(journalist_app, test_jou response = app.post( url_for("api.get_token"), data=json.dumps( - {"username": test_journo["username"], "passphrase": test_journo["password"]} + { + "username": test_journo["username"], + "passphrase": test_journo["password"], + } ), headers=get_api_headers(), ) @@ -152,7 +156,8 @@ def test_user_cannot_get_an_api_token_with_no_otp_field(journalist_app, test_jou def test_authorized_user_gets_all_sources(journalist_app, test_submissions, journalist_api_token): with journalist_app.test_client() as app: response = app.get( - url_for("api.get_all_sources"), headers=get_api_headers(journalist_api_token) + url_for("api.get_all_sources"), + headers=get_api_headers(journalist_api_token), ) assert response.status_code == 200 @@ -185,7 +190,11 @@ def test_user_without_token_cannot_get_protected_endpoints(journalist_app, test_ ), url_for("api.get_all_submissions"), url_for("api.get_all_replies"), - url_for("api.single_reply", source_uuid=uuid, reply_uuid=test_files["replies"][0].uuid), + url_for( + "api.single_reply", + source_uuid=uuid, + reply_uuid=test_files["replies"][0].uuid, + ), url_for("api.all_source_replies", source_uuid=uuid), url_for("api.get_current_user"), url_for("api.get_all_users"), @@ -219,9 +228,9 @@ def test_user_without_token_cannot_del_protected_endpoints(journalist_app, test_ assert response.status_code == 403 -def test_attacker_cannot_use_token_after_admin_deletes(journalist_app, - test_source, - journalist_api_token): +def test_attacker_cannot_use_token_after_admin_deletes( + journalist_app, test_source, journalist_api_token +): with journalist_app.test_client() as app: uuid = test_source["source"].uuid @@ -229,10 +238,12 @@ def test_attacker_cannot_use_token_after_admin_deletes(journalist_app, # In a scenario where an attacker compromises a journalist workstation # the admin should be able to delete the user and their token should # no longer be valid. - attacker = app.get(url_for('api.get_current_user'), - headers=get_api_headers(journalist_api_token)).json + attacker = app.get( + url_for("api.get_current_user"), + headers=get_api_headers(journalist_api_token), + ).json - attacker = Journalist.query.filter_by(uuid=attacker['uuid']).first() + attacker = Journalist.query.filter_by(uuid=attacker["uuid"]).first() db.session.delete(attacker) db.session.commit() @@ -258,7 +269,9 @@ def test_user_without_token_cannot_post_protected_endpoints(journalist_app, test with journalist_app.test_client() as app: for protected_route in protected_routes: response = app.post( - protected_route, headers=get_api_headers(""), data=json.dumps({"some": "stuff"}) + protected_route, + headers=get_api_headers(""), + data=json.dumps({"some": "stuff"}), ) assert response.status_code == 403 @@ -322,7 +335,8 @@ def test_authorized_user_can_star_a_source(journalist_app, test_source, journali uuid = test_source["source"].uuid source_id = test_source["source"].id response = app.post( - url_for("api.add_star", source_uuid=uuid), headers=get_api_headers(journalist_api_token) + url_for("api.add_star", source_uuid=uuid), + headers=get_api_headers(journalist_api_token), ) assert response.status_code == 201 @@ -343,7 +357,8 @@ def test_authorized_user_can_unstar_a_source(journalist_app, test_source, journa uuid = test_source["source"].uuid source_id = test_source["source"].id response = app.post( - url_for("api.add_star", source_uuid=uuid), headers=get_api_headers(journalist_api_token) + url_for("api.add_star", source_uuid=uuid), + headers=get_api_headers(journalist_api_token), ) assert response.status_code == 201 @@ -368,7 +383,8 @@ def test_disallowed_methods_produces_405(journalist_app, test_source, journalist with journalist_app.test_client() as app: uuid = test_source["source"].uuid response = app.delete( - url_for("api.add_star", source_uuid=uuid), headers=get_api_headers(journalist_api_token) + url_for("api.add_star", source_uuid=uuid), + headers=get_api_headers(journalist_api_token), ) assert response.status_code == 405 @@ -380,7 +396,8 @@ def test_authorized_user_can_get_all_submissions( ): with journalist_app.test_client() as app: response = app.get( - url_for("api.get_all_submissions"), headers=get_api_headers(journalist_api_token) + url_for("api.get_all_submissions"), + headers=get_api_headers(journalist_api_token), ) assert response.status_code == 200 @@ -400,7 +417,8 @@ def test_authorized_user_get_all_submissions_with_disconnected_submissions( "DELETE FROM sources WHERE id = :id", {"id": test_submissions["source"].id} ) response = app.get( - url_for("api.get_all_submissions"), headers=get_api_headers(journalist_api_token) + url_for("api.get_all_submissions"), + headers=get_api_headers(journalist_api_token), ) assert response.status_code == 200 @@ -434,7 +452,11 @@ def test_authorized_user_can_get_single_submission( submission_uuid = test_submissions["source"].submissions[0].uuid uuid = test_submissions["source"].uuid response = app.get( - url_for("api.single_submission", source_uuid=uuid, submission_uuid=submission_uuid), + url_for( + "api.single_submission", + source_uuid=uuid, + submission_uuid=submission_uuid, + ), headers=get_api_headers(journalist_api_token), ) @@ -452,7 +474,8 @@ def test_authorized_user_can_get_all_replies_with_disconnected_replies( with journalist_app.test_client() as app: db.session.execute("DELETE FROM sources WHERE id = :id", {"id": test_files["source"].id}) response = app.get( - url_for("api.get_all_replies"), headers=get_api_headers(journalist_api_token) + url_for("api.get_all_replies"), + headers=get_api_headers(journalist_api_token), ) assert response.status_code == 200 @@ -461,7 +484,8 @@ def test_authorized_user_can_get_all_replies_with_disconnected_replies( def test_authorized_user_can_get_all_replies(journalist_app, test_files, journalist_api_token): with journalist_app.test_client() as app: response = app.get( - url_for("api.get_all_replies"), headers=get_api_headers(journalist_api_token) + url_for("api.get_all_replies"), + headers=get_api_headers(journalist_api_token), ) assert response.status_code == 200 @@ -541,7 +565,11 @@ def test_authorized_user_can_delete_single_submission( submission_uuid = test_submissions["source"].submissions[0].uuid uuid = test_submissions["source"].uuid response = app.delete( - url_for("api.single_submission", source_uuid=uuid, submission_uuid=submission_uuid), + url_for( + "api.single_submission", + source_uuid=uuid, + submission_uuid=submission_uuid, + ), headers=get_api_headers(journalist_api_token), ) @@ -670,7 +698,8 @@ def test_authorized_user_can_get_current_user_endpoint( ): with journalist_app.test_client() as app: response = app.get( - url_for("api.get_current_user"), headers=get_api_headers(journalist_api_token) + url_for("api.get_current_user"), + headers=get_api_headers(journalist_api_token), ) assert response.status_code == 200 @@ -1053,7 +1082,11 @@ def test_reply_download_generates_checksum( with journalist_app.test_client() as app: response = app.get( - url_for("api.download_reply", source_uuid=test_source["uuid"], reply_uuid=reply.uuid), + url_for( + "api.download_reply", + source_uuid=test_source["uuid"], + reply_uuid=reply.uuid, + ), headers=get_api_headers(journalist_api_token), ) assert response.status_code == 200 @@ -1066,7 +1099,11 @@ def test_reply_download_generates_checksum( mock_add_checksum = mocker.patch("journalist_app.utils.add_checksum_for_file") with journalist_app.test_client() as app: response = app.get( - url_for("api.download_reply", source_uuid=test_source["uuid"], reply_uuid=reply.uuid), + url_for( + "api.download_reply", + source_uuid=test_source["uuid"], + reply_uuid=reply.uuid, + ), headers=get_api_headers(journalist_api_token), ) assert response.status_code == 200 diff --git a/securedrop/tests/test_journalist_session.py b/securedrop/tests/test_journalist_session.py index 65cd2597064..a915aec056a 100644 --- a/securedrop/tests/test_journalist_session.py +++ b/securedrop/tests/test_journalist_session.py @@ -1,305 +1,472 @@ # -*- coding: utf-8 -*- -from urllib.parse import urlparse import json -from datetime import datetime, timedelta, timezone import re +from datetime import datetime, timedelta, timezone +from urllib.parse import urlparse -from flask import url_for, Response +from flask import Response, url_for from flask.sessions import session_json_serializer -from redis import Redis -from pyotp import TOTP from itsdangerous import URLSafeTimedSerializer +from pyotp import TOTP +from redis import Redis from .utils.api_helper import get_api_headers redis = Redis() -NEW_PASSWORD = 'another correct horse battery staple generic passphrase' +NEW_PASSWORD = "another correct horse battery staple generic passphrase" +# Helper function to check if session cookie are properly signed +# Returns just the session id without signature def _check_sig(session_cookie, journalist_app, api=False): if api: - salt = 'api_' + journalist_app.config['SESSION_SIGNER_SALT'] + salt = "api_" + journalist_app.config["SESSION_SIGNER_SALT"] else: - salt = journalist_app.config['SESSION_SIGNER_SALT'] + salt = journalist_app.config["SESSION_SIGNER_SALT"] signer = URLSafeTimedSerializer(journalist_app.secret_key, salt) return signer.loads(session_cookie) +# Helper function to get a session payload from redis +# Returns the unserialized session payload def _get_session(sid, journalist_app, api=False): if api: - key = 'api_' + journalist_app.config['SESSION_KEY_PREFIX'] + sid + key = "api_" + journalist_app.config["SESSION_KEY_PREFIX"] + sid else: - key = journalist_app.config['SESSION_KEY_PREFIX'] + sid + key = journalist_app.config["SESSION_KEY_PREFIX"] + sid return session_json_serializer.loads(redis.get(key)) +# Helper function to login and return the response +# Returns the raw response object def _login_user(app, journo): - resp = app.post(url_for('main.login'), - data={'username': journo['username'], - 'password': journo['password'], - 'token': TOTP(journo['otp_secret']).now()}, - follow_redirects=False) + resp = app.post( + url_for("main.login"), + data={ + "username": journo["username"], + "password": journo["password"], + "token": TOTP(journo["otp_secret"]).now(), + }, + follow_redirects=False, + ) assert resp.status_code == 302 - assert urlparse(resp.headers['Location']).path == url_for('main.index') + assert urlparse(resp.headers["Location"]).path == url_for("main.index") return resp +# Helper function to extract a the session cookie from the cookiejar when testing as client +# Returns the session cookie value def _session_from_cookiejar(cookie_jar, journalist_app): return next( - (cookie for cookie in cookie_jar - if cookie.name == journalist_app.config['SESSION_COOKIE_NAME']), - None) + ( + cookie + for cookie in cookie_jar + if cookie.name == journalist_app.config["SESSION_COOKIE_NAME"] + ), + None, + ) +# Test a standard login sequence def test_session_login(journalist_app, test_journo): + # Given a test client and a valid journalist user with journalist_app.test_client() as app: + # When sending a correct login request resp = _login_user(app, test_journo) - assert 'Set-Cookie' in list(resp.headers.keys()) - assert 'Cookie' in resp.headers['Vary'] + # Then a set-cookie header and a cookie: vary header are returned + assert "Set-Cookie" in list(resp.headers.keys()) + assert "Cookie" in resp.headers["Vary"] + # When checking the local session cookie jar session_cookie = _session_from_cookiejar(app.cookie_jar, journalist_app) + # Then there is a session cookie in it assert session_cookie is not None + # Then such cookie is properly signed sid = _check_sig(session_cookie.value, journalist_app) + # Then such session cookie has a corresponding payload in redis redis_session = _get_session(sid, journalist_app) - ttl = redis.ttl(journalist_app.config['SESSION_KEY_PREFIX'] + sid) + ttl = redis.ttl(journalist_app.config["SESSION_KEY_PREFIX"] + sid) - assert ((journalist_app.config['SESSION_LIFETIME'] - 10) < ttl - <= journalist_app.config['SESSION_LIFETIME']) + # Then the TTL of such key in redis conforms to the lifetime configuration + assert ( + (journalist_app.config["SESSION_LIFETIME"] - 10) + < ttl + <= journalist_app.config["SESSION_LIFETIME"] + ) - assert redis_session['uid'] == test_journo['id'] + # Then the user id of the user who logged in is the same as the user id in session + assert redis_session["uid"] == test_journo["id"] - resp = app.get(url_for('main.index')) + # Finally load the main page + resp = app.get(url_for("main.index")) + # And expect a successfull status code assert resp.status_code == 200 +# Test a standard session renewal sequence def test_session_renew(journalist_app, test_journo): + # Given a test client and a valid journalist user with journalist_app.test_client() as app: + # When sending a correct login request resp = _login_user(app, test_journo) + # Then check session existance, signature, and redis payload session_cookie = _session_from_cookiejar(app.cookie_jar, journalist_app) assert session_cookie is not None sid = _check_sig(session_cookie.value, journalist_app) redis_session = _get_session(sid, journalist_app) - assert redis_session['renew_count'] == journalist_app.config['SESSION_RENEW_COUNT'] + # The `renew_count` must exists in the session payload and must be equal to the app config + assert redis_session["renew_count"] == journalist_app.config["SESSION_RENEW_COUNT"] + # When forcing the session TTL in redis to be below the threshold # Threshold for auto renew is less than 60*30 - redis.setex(name=journalist_app.config['SESSION_KEY_PREFIX'] + sid, - value=session_json_serializer.dumps(redis_session), - time=15*60) - resp = app.get(url_for('main.index')) + redis.setex( + name=journalist_app.config["SESSION_KEY_PREFIX"] + sid, + value=session_json_serializer.dumps(redis_session), + time=15 * 60, + ) + # When doing a generic request to trigger the auto-renew + resp = app.get(url_for("main.index")) + # Then the session must still be valid assert resp.status_code == 200 + # Then the corresponding renew_count in redis must have been decreased redis_session = _get_session(sid, journalist_app) - assert redis_session['renew_count'] == (journalist_app.config['SESSION_RENEW_COUNT'] - 1) + assert redis_session["renew_count"] == (journalist_app.config["SESSION_RENEW_COUNT"] - 1) - ttl = redis.ttl(journalist_app.config['SESSION_KEY_PREFIX'] + sid) - assert ttl > journalist_app.config['SESSION_LIFETIME'] + # Then the ttl must have been updated and the new lifetime must be > of app confing lifetime + # (Bigger because there is also a variable amount of time in the threshold that is kept) + ttl = redis.ttl(journalist_app.config["SESSION_KEY_PREFIX"] + sid) + assert ttl > journalist_app.config["SESSION_LIFETIME"] +# Test a standard login then logout sequence def test_session_logout(journalist_app, test_journo): + # Given a test client and a valid journalist user with journalist_app.test_client() as app: + # When sending a correct login request resp = _login_user(app, test_journo) + # Then check session as in the previous tests session_cookie = _session_from_cookiejar(app.cookie_jar, journalist_app) assert session_cookie is not None sid = _check_sig(session_cookie.value, journalist_app) - assert (redis.get(journalist_app.config['SESSION_KEY_PREFIX'] + sid)) is not None + assert (redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is not None - resp = app.get(url_for('main.logout'), follow_redirects=False) + # When sending a logout request from a logged in journalist + resp = app.get(url_for("main.logout"), follow_redirects=False) + # Then it redirects to login assert resp.status_code == 302 - assert (redis.get(journalist_app.config['SESSION_KEY_PREFIX'] + sid)) is None + # Then the session no longer exists in redis + assert (redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is None - resp = app.get(url_for('main.index'), follow_redirects=False) + # Then a request to the index redirects back to login + resp = app.get(url_for("main.index"), follow_redirects=False) assert resp.status_code == 302 +# Test the user forced logout when an admin changes the user password def test_session_admin_change_password_logout(journalist_app, test_journo, test_admin): + # Given a test client and a valid journalist user with journalist_app.test_client() as app: + # When sending a correct login request resp = _login_user(app, test_journo) session_cookie = _session_from_cookiejar(app.cookie_jar, journalist_app) assert session_cookie is not None - # Save the cookie we were given for later - cookie_val = re.search(r'js=(.*?);', resp.headers['set-cookie']).group(1) + # Then save the cookie for later + cookie_val = re.search(r"js=(.*?);", resp.headers["set-cookie"]).group(1) + # Then also save the session id for later sid = _check_sig(session_cookie.value, journalist_app) - assert (redis.get(journalist_app.config['SESSION_KEY_PREFIX'] + sid)) is not None + assert (redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is not None + # Given a another test client and a valid admin user with journalist_app.test_client() as admin_app: + # When sending a valid login request as the admin user _login_user(admin_app, test_admin) - resp = admin_app.post(url_for('admin.new_password', user_id=test_journo['id']), - data=dict(password=NEW_PASSWORD), - follow_redirects=False) + # When changing password of the journalist (non-admin) user + resp = admin_app.post( + url_for("admin.new_password", user_id=test_journo["id"]), + data=dict(password=NEW_PASSWORD), + follow_redirects=False, + ) + # Then the password change has been successful assert resp.status_code == 302 - # Verify session deleted from redis - assert (redis.get(journalist_app.config['SESSION_KEY_PREFIX'] + sid)) is None + # Then the journalis (non-admin) user session does no longer exist in redis + assert (redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is None with journalist_app.test_client() as app: # Add our original cookie back in to the session, and try to re-use it - app.set_cookie('localhost.localdomain', 'js', cookie_val, - domain='.localhost.localdomain', httponly=True, path='/') - resp = app.get(url_for('main.index'), follow_redirects=False) + app.set_cookie( + "localhost.localdomain", + "js", + cookie_val, + domain=".localhost.localdomain", + httponly=True, + path="/", + ) + resp = app.get(url_for("main.index"), follow_redirects=False) + # Then trying to reuse the same journalist user cookie fails and redirects assert resp.status_code == 302 +# Test the forced logout when the user changes its password def test_session_change_password_logout(journalist_app, test_journo): + # Given a test client and a valid journalist user with journalist_app.test_client() as app: + # When sending a correct login request resp = _login_user(app, test_journo) + # Then check session as the previous tests session_cookie = _session_from_cookiejar(app.cookie_jar, journalist_app) assert session_cookie is not None sid = _check_sig(session_cookie.value, journalist_app) - assert (redis.get(journalist_app.config['SESSION_KEY_PREFIX'] + sid)) is not None - - resp = app.post(url_for('account.new_password'), - data=dict(current_password=test_journo['password'], - token=TOTP(test_journo['otp_secret']).now(), - password=NEW_PASSWORD) - ) + assert (redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is not None + + # When sending a self change password request + resp = app.post( + url_for("account.new_password"), + data=dict( + current_password=test_journo["password"], + token=TOTP(test_journo["otp_secret"]).now(), + password=NEW_PASSWORD, + ), + ) + # Then the session is no longer valid assert resp.status_code == 302 - assert (redis.get(journalist_app.config['SESSION_KEY_PREFIX'] + sid)) is None + # Then the session no longer exists in redis + assert (redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is None - resp = app.get(url_for('main.index'), follow_redirects=False) + # Then a request for the index redirects back to login + resp = app.get(url_for("main.index"), follow_redirects=False) assert resp.status_code == 302 +# Test that the session id is regenerated after a valid login def test_session_login_regenerate_sid(journalist_app, test_journo): + # Given a test client and a valid journalist user with journalist_app.test_client() as app: - resp = app.get(url_for('main.login')) + # When sending an anonymous get request + resp = app.get(url_for("main.login")) + # Then check the response code is correct assert resp.status_code == 200 + # Given a valid unauthenticated session id from the previous request session_cookie_pre_login = _session_from_cookiejar(app.cookie_jar, journalist_app) assert session_cookie_pre_login is not None + # When sending a valid login request using the same client (same cookiejar) resp = _login_user(app, test_journo) session_cookie_post_login = _session_from_cookiejar(app.cookie_jar, journalist_app) + # Then the two session ids are different as the session id gets regenerated post login assert session_cookie_post_login != session_cookie_pre_login +# Test a standard `get_token` API login def test_session_api_login(journalist_app, test_journo): + # Given a test client and a valid journalist user with journalist_app.test_client() as app: - resp = app.post(url_for('api.get_token'), - data=json.dumps( - {'username': test_journo['username'], - 'passphrase': test_journo['password'], - 'one_time_code': TOTP(test_journo['otp_secret']).now()}), - headers=get_api_headers()) - - assert resp.json['journalist_uuid'] == test_journo['uuid'] + # When sending a `get_token` request to the API with valid creds + resp = app.post( + url_for("api.get_token"), + data=json.dumps( + { + "username": test_journo["username"], + "passphrase": test_journo["password"], + "one_time_code": TOTP(test_journo["otp_secret"]).now(), + } + ), + headers=get_api_headers(), + ) + + # Then the API token is issued and returned with the correct journalist id + assert resp.json["journalist_uuid"] == test_journo["uuid"] assert resp.status_code == 200 - sid = _check_sig(resp.json['token'], journalist_app, api=True) + # Then such token is properly signed + sid = _check_sig(resp.json["token"], journalist_app, api=True) redis_session = _get_session(sid, journalist_app, api=True) - assert redis_session['uid'] == test_journo['id'] - - ttl = redis.ttl('api_' + journalist_app.config['SESSION_KEY_PREFIX'] + sid) - assert ((journalist_app.config['SESSION_LIFETIME'] - 10) < ttl - <= journalist_app.config['SESSION_LIFETIME']) - - assert (datetime.now(timezone.utc) < - datetime.strptime(resp.json['expiration'], "%Y-%m-%dT%H:%M:%S.%f%z") - < (datetime.now(timezone.utc) + - timedelta(seconds=journalist_app.config['SESSION_LIFETIME']))) - - response = app.get(url_for('api.get_current_user'), - headers=get_api_headers(resp.json['token'])) + # Then the session id in redis match that of the credentials + assert redis_session["uid"] == test_journo["id"] + + # Then the ttl of the session in redis is lower than the lifetime configured in the app + ttl = redis.ttl("api_" + journalist_app.config["SESSION_KEY_PREFIX"] + sid) + assert ( + (journalist_app.config["SESSION_LIFETIME"] - 10) + < ttl + <= journalist_app.config["SESSION_LIFETIME"] + ) + + # Then the expiration date returned in `get_token` response also conforms to the same rules + assert ( + datetime.now(timezone.utc) + < datetime.strptime(resp.json["expiration"], "%Y-%m-%dT%H:%M:%S.%f%z") + < ( + datetime.now(timezone.utc) + + timedelta(seconds=journalist_app.config["SESSION_LIFETIME"]) + ) + ) + + # When querying the endpoint that return the corrent user with the token + response = app.get( + url_for("api.get_current_user"), headers=get_api_headers(resp.json["token"]) + ) + # Then the reuqest is successful and the correct journalist id is returned assert response.status_code == 200 - assert response.json['uuid'] == test_journo['uuid'] + assert response.json["uuid"] == test_journo["uuid"] +# test a standard login then logout from API def test_session_api_logout(journalist_app, test_journo): + # Given a test client and a valid journalist user with journalist_app.test_client() as app: - resp = app.post(url_for('api.get_token'), - data=json.dumps( - {'username': test_journo['username'], - 'passphrase': test_journo['password'], - 'one_time_code': TOTP(test_journo['otp_secret']).now()}), - headers=get_api_headers()) - - assert resp.json['journalist_uuid'] == test_journo['uuid'] + # When sending a valid login request and asking an API token + resp = app.post( + url_for("api.get_token"), + data=json.dumps( + { + "username": test_journo["username"], + "passphrase": test_journo["password"], + "one_time_code": TOTP(test_journo["otp_secret"]).now(), + } + ), + headers=get_api_headers(), + ) + + # Then the token is issued successfully with the correct attributed + assert resp.json["journalist_uuid"] == test_journo["uuid"] assert resp.status_code == 200 - token = resp.json['token'] + token = resp.json["token"] sid = _check_sig(token, journalist_app, api=True) - resp = app.get(url_for('api.get_current_user'), - headers=get_api_headers(token)) + # When querying the endpoint for the current user information + resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(token)) + # Then the request is successful and the returned id matches the creds journalist id assert resp.status_code == 200 - assert resp.json['uuid'] == test_journo['uuid'] + assert resp.json["uuid"] == test_journo["uuid"] - resp = app.post(url_for('api.logout'), - headers=get_api_headers(token)) + # When sending a logout request using the API token + resp = app.post(url_for("api.logout"), headers=get_api_headers(token)) + # Then it is successful assert resp.status_code == 200 - assert (redis.get('api_' + journalist_app.config['SESSION_KEY_PREFIX'] + sid)) is None + # Then the token and the corresponding payload no longer exist in redis + assert (redis.get("api_" + journalist_app.config["SESSION_KEY_PREFIX"] + sid)) is None - resp = app.get(url_for('api.get_current_user'), - headers=get_api_headers(token)) + # When sending an authenticated request with the deleted token + resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(token)) + # Then the request is unsuccessful assert resp.status_code == 403 +# Test a few cases of valid session token with bad signatures def test_session_bad_signature(journalist_app, test_journo): + # Given a test client and a valid journalist user with journalist_app.test_client() as app: - resp = app.post(url_for('api.get_token'), - data=json.dumps( - {'username': test_journo['username'], - 'passphrase': test_journo['password'], - 'one_time_code': TOTP(test_journo['otp_secret']).now()}), - headers=get_api_headers()) - - assert resp.json['journalist_uuid'] == test_journo['uuid'] + # When sending a valid login request and asking an API token + resp = app.post( + url_for("api.get_token"), + data=json.dumps( + { + "username": test_journo["username"], + "passphrase": test_journo["password"], + "one_time_code": TOTP(test_journo["otp_secret"]).now(), + } + ), + headers=get_api_headers(), + ) + + # Then the request is successful and the uid matched the creds one + assert resp.json["journalist_uuid"] == test_journo["uuid"] assert resp.status_code == 200 - token = resp.json['token'] + # Given the valid token in the response + token = resp.json["token"] + # When checking the signature and sreipping it sid = _check_sig(token, journalist_app, api=True) - resp = app.get(url_for('api.get_current_user'), - headers=get_api_headers(sid)) + # When requesting an authenticated endpoint with a valid unsigned token + resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(sid)) + # Then the request is refused assert resp.status_code == 403 - resp = app.get(url_for('api.get_current_user'), - headers=get_api_headers(sid + '.')) + # When requesting an authenticated endpoint with a valid unsigned token with a trailing dot + resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(sid + ".")) + # Then the request is refused assert resp.status_code == 403 - signer = URLSafeTimedSerializer(journalist_app.secret_key, 'wrong_salt') + # Given the correct app secret key and a wrong salt + signer = URLSafeTimedSerializer(journalist_app.secret_key, "wrong_salt") + # Given a valid token signed with the correct secret key and the wrong salt token_wrong_salt = signer.dumps(sid) - resp = app.get(url_for('api.get_current_user'), - headers=get_api_headers(token_wrong_salt)) + # When requesting an authenticated endpoint with a valid token signed with the wrong salt + resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(token_wrong_salt)) + # Then the request is refused assert resp.status_code == 403 - signer = URLSafeTimedSerializer(journalist_app.secret_key, - journalist_app.config['SESSION_SIGNER_SALT']) + # Given the correct app secret and the Journalist Interface salt + signer = URLSafeTimedSerializer( + journalist_app.secret_key, journalist_app.config["SESSION_SIGNER_SALT"] + ) + # Given a valid token signed with the corrects secret key and tje Journalist Interface salt token_not_api_salt = signer.dumps(sid) - resp = app.get(url_for('api.get_current_user'), - headers=get_api_headers(token_not_api_salt)) + # When requesting an authenticated endpoint with such token + resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(token_not_api_salt)) + # Then the request is refused since the JI salt is not valid for the API assert resp.status_code == 403 - resp = app.get(url_for('api.get_current_user'), - headers=get_api_headers(token)) + # When sending again an authenticated request with the original, valid, signed token + resp = app.get(url_for("api.get_current_user"), headers=get_api_headers(token)) + # Then the request is successful assert resp.status_code == 200 - assert resp.json['uuid'] == test_journo['uuid'] + assert resp.json["uuid"] == test_journo["uuid"] +# Context: there are many cases in which as users may logout or be forcibly logged out. +# For the latter case, a user session is destroyed by an admin when a password change is +# enforced or when the user is deleted. When that happens, the session gets deleted from redis. +# What could happen is that if a user session is deleted between open_session() and save_session(), +# then the session might be restored and the admin deletion might fail. +# To avoid this, save_session() uses a setxx call, which writes in redis only if the key exists. +# This does not apply when the session is new or is being regenerated. +# Test that a deleted session cannot be rewritten by a race condition def test_session_race_condition(mocker, journalist_app, test_journo): + # Given a test client and a valid journalist user with journalist_app.test_request_context() as app: + # When manually creating a session in the context session = journalist_app.session_interface.open_session(journalist_app, app.request) assert session.sid is not None - session['uid'] = test_journo['id'] + # When manually setting the journalist uid in session + session["uid"] = test_journo["id"] + + # When manually builfing a Flask repsonse object app.response = Response() + # When manually calling save_session() to write the session in redis journalist_app.session_interface.save_session(journalist_app, session, app.response) - assert redis.get(journalist_app.config['SESSION_KEY_PREFIX'] + session.sid) is not None - app.request.cookies = {journalist_app.config['SESSION_COOKIE_NAME']: session.token} + # Then the session gets written in redis + assert redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + session.sid) is not None + + # When manually adding the created session token in the request cookies + app.request.cookies = {journalist_app.config["SESSION_COOKIE_NAME"]: session.token} + # When getting the session object by supplying a request context to open_session() session2 = journalist_app.session_interface.open_session(journalist_app, app.request) + # Then the properties of the two sessions are the same + # (They are indeed the same session) assert session2.sid == session.sid - assert session2['uid'] == test_journo['id'] - # Force entering the redis set xx case + assert session2["uid"] == test_journo["id"] + # When setting the modified properties to issue a write in redis + # (To force entering the redis set xx case) session.modified = True session.new = False session.to_regenerate = False - redis.delete(journalist_app.config['SESSION_KEY_PREFIX'] + session.sid) + # When deleting the original session token and object from redis + redis.delete(journalist_app.config["SESSION_KEY_PREFIX"] + session.sid) + # Then the session_save() fails since the original object no longer exists journalist_app.session_interface.save_session(journalist_app, session, app.response) - assert redis.get(journalist_app.config['SESSION_KEY_PREFIX'] + session.sid) is None + assert redis.get(journalist_app.config["SESSION_KEY_PREFIX"] + session.sid) is None diff --git a/securedrop/tests/utils/__init__.py b/securedrop/tests/utils/__init__.py index 464b5681241..758f06a2850 100644 --- a/securedrop/tests/utils/__init__.py +++ b/securedrop/tests/utils/__init__.py @@ -29,4 +29,4 @@ def login_user(app, test_user): follow_redirects=True, ) assert resp.status_code == 200 - assert test_user['username'] in resp.data.decode('utf-8') + assert test_user["username"] in resp.data.decode("utf-8")