|
2 | 2 |
|
3 | 3 | from base64 import b32encode
|
4 | 4 | from functools import lru_cache
|
5 |
| -from typing import Optional |
| 5 | +from pathlib import Path |
| 6 | +from random import SystemRandom |
| 7 | +from typing import Optional, List |
6 | 8 | from typing import TYPE_CHECKING
|
7 | 9 |
|
8 | 10 | from cryptography.hazmat.backends import default_backend
|
|
13 | 15 | import models
|
14 | 16 |
|
15 | 17 | if TYPE_CHECKING:
|
16 |
| - from crypto_util import CryptoUtil |
17 | 18 | from passphrases import DicewarePassphrase
|
18 | 19 | from store import Storage
|
19 | 20 |
|
@@ -67,23 +68,41 @@ class SourceDesignationCollisionError(Exception):
|
67 | 68 | def create_source_user(
|
68 | 69 | db_session: Session,
|
69 | 70 | source_passphrase: "DicewarePassphrase",
|
70 |
| - source_app_crypto_util: "CryptoUtil", |
71 | 71 | source_app_storage: "Storage",
|
72 | 72 | ) -> SourceUser:
|
73 | 73 | # Derive the source's info from their passphrase
|
74 | 74 | scrypt_manager = _SourceScryptManager.get_default()
|
75 | 75 | filesystem_id = scrypt_manager.derive_source_filesystem_id(source_passphrase)
|
76 | 76 | gpg_secret = scrypt_manager.derive_source_gpg_secret(source_passphrase)
|
77 | 77 |
|
78 |
| - # Create a designation for the source |
79 |
| - try: |
80 |
| - # TODO: Move display_id() out of CryptoUtil |
81 |
| - journalist_designation = source_app_crypto_util.display_id() |
82 |
| - except ValueError: |
| 78 | + # Create a unique journalist designation for the source |
| 79 | + # TODO: Add unique=True to models.Source.journalist_designation to enforce uniqueness |
| 80 | + # as the logic below has a race condition (time we check VS time when we add to the DB) |
| 81 | + designation_generation_attempts = 0 |
| 82 | + valid_designation = None |
| 83 | + designation_generator = _DesignationGenerator.get_default() |
| 84 | + while designation_generation_attempts < 50: |
| 85 | + # Generate a designation |
| 86 | + designation_generation_attempts += 1 |
| 87 | + new_designation = designation_generator.generate_journalist_designation() |
| 88 | + |
| 89 | + # Check to see if it's already used by an existing source |
| 90 | + existing_source_with_same_designation = db_session.query( |
| 91 | + models.Source |
| 92 | + ).filter_by(journalist_designation=new_designation).one_or_none() |
| 93 | + if not existing_source_with_same_designation: |
| 94 | + # The designation is not already used - good to go |
| 95 | + valid_designation = new_designation |
| 96 | + break |
| 97 | + |
| 98 | + if not valid_designation: |
| 99 | + # Could not generate a designation that is not already used |
83 | 100 | raise SourceDesignationCollisionError()
|
84 | 101 |
|
85 | 102 | # Store the source in the DB
|
86 |
| - source_db_record = models.Source(filesystem_id, journalist_designation) |
| 103 | + source_db_record = models.Source( |
| 104 | + filesystem_id=filesystem_id, journalist_designation=valid_designation |
| 105 | + ) |
87 | 106 | db_session.add(source_db_record)
|
88 | 107 | try:
|
89 | 108 | db_session.commit()
|
@@ -164,3 +183,51 @@ def get_default(cls) -> "_SourceScryptManager":
|
164 | 183 | scrypt_p=config.SCRYPT_PARAMS["p"],
|
165 | 184 | )
|
166 | 185 | return _default_scrypt_mgr
|
| 186 | + |
| 187 | + |
| 188 | +_default_designation_generator: Optional["_DesignationGenerator"] = None |
| 189 | + |
| 190 | + |
| 191 | +class _DesignationGenerator: |
| 192 | + |
| 193 | + def __init__(self, nouns: List[str], adjectives: List[str]): |
| 194 | + self._random_generator = SystemRandom() |
| 195 | + |
| 196 | + # Ensure that there are no empty lists or empty strings |
| 197 | + if not nouns: |
| 198 | + raise ValueError("Nouns word list is empty") |
| 199 | + shortest_noun = min(nouns, key=len) |
| 200 | + shortest_noun_length = len(shortest_noun) |
| 201 | + if shortest_noun_length < 1: |
| 202 | + raise ValueError("Nouns word list contains an empty string") |
| 203 | + |
| 204 | + if not adjectives: |
| 205 | + raise ValueError("Adjectives word list is empty") |
| 206 | + shortest_adjective = min(adjectives, key=len) |
| 207 | + shortest_adjective_length = len(shortest_adjective) |
| 208 | + if shortest_adjective_length < 1: |
| 209 | + raise ValueError("Adjectives word list contains an empty string") |
| 210 | + |
| 211 | + self._nouns = nouns |
| 212 | + self._adjectives = adjectives |
| 213 | + |
| 214 | + def generate_journalist_designation(self) -> str: |
| 215 | + random_adjective = self._random_generator.choice(self._adjectives) |
| 216 | + random_noun = self._random_generator.choice(self._nouns) |
| 217 | + return f"{random_adjective} {random_noun}" |
| 218 | + |
| 219 | + @classmethod |
| 220 | + def get_default(cls) -> "_DesignationGenerator": |
| 221 | + # Late import so _SourceScryptManager can be used without a config.py in the parent folder |
| 222 | + from sdconfig import config |
| 223 | + |
| 224 | + global _default_designation_generator |
| 225 | + if _default_designation_generator is None: |
| 226 | + # Parse the nouns and adjectives files from the config |
| 227 | + nouns = Path(config.NOUNS).read_text().strip().splitlines() |
| 228 | + adjectives = Path(config.ADJECTIVES).read_text().strip().splitlines() |
| 229 | + |
| 230 | + # Create the generator |
| 231 | + _default_designation_generator = cls(nouns=nouns, adjectives=adjectives) |
| 232 | + |
| 233 | + return _default_designation_generator |
0 commit comments