1
1
import os
2
2
3
3
from base64 import b32encode
4
- from typing import Optional
4
+ from pathlib import Path
5
+ from random import SystemRandom
6
+ from typing import Optional , List
5
7
from typing import TYPE_CHECKING
6
8
7
9
from cryptography .hazmat .backends import default_backend
12
14
import models
13
15
14
16
if TYPE_CHECKING :
15
- from crypto_util import CryptoUtil
16
17
from passphrases import DicewarePassphrase
17
18
from store import Storage
18
19
@@ -66,23 +67,41 @@ class SourceDesignationCollisionError(Exception):
66
67
def create_source_user (
67
68
db_session : Session ,
68
69
source_passphrase : "DicewarePassphrase" ,
69
- source_app_crypto_util : "CryptoUtil" ,
70
70
source_app_storage : "Storage" ,
71
71
) -> SourceUser :
72
72
# Derive the source's info from their passphrase
73
73
scrypt_manager = _SourceScryptManager .get_default ()
74
74
filesystem_id = scrypt_manager .derive_source_filesystem_id (source_passphrase )
75
75
gpg_secret = scrypt_manager .derive_source_gpg_secret (source_passphrase )
76
76
77
- # Create a designation for the source
78
- try :
79
- # TODO: Move display_id() out of CryptoUtil
80
- journalist_designation = source_app_crypto_util .display_id ()
81
- except ValueError :
77
+ # Create a unique journalist designation for the source
78
+ # TODO: Add unique=True to models.Source.journalist_designation to enforce uniqueness
79
+ # as the logic below has a race condition (time we check VS time when we add to the DB)
80
+ designation_generation_attempts = 0
81
+ valid_designation = None
82
+ designation_generator = _DesignationGenerator .get_default ()
83
+ while designation_generation_attempts < 50 :
84
+ # Generate a designation
85
+ designation_generation_attempts += 1
86
+ new_designation = designation_generator .generate_journalist_designation ()
87
+
88
+ # Check to see if it's already used by an existing source
89
+ existing_source_with_same_designation = db_session .query (
90
+ models .Source
91
+ ).filter_by (journalist_designation = new_designation ).one_or_none ()
92
+ if not existing_source_with_same_designation :
93
+ # The designation is not already used - good to go
94
+ valid_designation = new_designation
95
+ break
96
+
97
+ if not valid_designation :
98
+ # Could not generate a designation that is not already used
82
99
raise SourceDesignationCollisionError ()
83
100
84
101
# Store the source in the DB
85
- source_db_record = models .Source (filesystem_id , journalist_designation )
102
+ source_db_record = models .Source (
103
+ filesystem_id = filesystem_id , journalist_designation = valid_designation
104
+ )
86
105
db_session .add (source_db_record )
87
106
try :
88
107
db_session .commit ()
@@ -99,7 +118,7 @@ def create_source_user(
99
118
return SourceUser (source_db_record , filesystem_id , gpg_secret )
100
119
101
120
102
- _default_scrypt_mgr = None # type : Optional[_SourceScryptManager]
121
+ _default_scrypt_mgr : Optional [" _SourceScryptManager" ] = None
103
122
104
123
105
124
class _SourceScryptManager :
@@ -160,3 +179,51 @@ def get_default(cls) -> "_SourceScryptManager":
160
179
scrypt_p = config .SCRYPT_PARAMS ["p" ],
161
180
)
162
181
return _default_scrypt_mgr
182
+
183
+
184
+ _default_designation_generator : Optional ["_DesignationGenerator" ] = None
185
+
186
+
187
+ class _DesignationGenerator :
188
+
189
+ def __init__ (self , nouns : List [str ], adjectives : List [str ]):
190
+ self ._random_generator = SystemRandom ()
191
+
192
+ # Ensure that there are no empty lists or empty strings
193
+ if not nouns :
194
+ raise ValueError ("Nouns word list is empty" )
195
+ shortest_noun = min (nouns , key = len )
196
+ shortest_noun_length = len (shortest_noun )
197
+ if shortest_noun_length < 1 :
198
+ raise ValueError ("Nouns word list contains an empty string" )
199
+
200
+ if not adjectives :
201
+ raise ValueError ("Adjectives word list is empty" )
202
+ shortest_adjective = min (adjectives , key = len )
203
+ shortest_adjective_length = len (shortest_adjective )
204
+ if shortest_adjective_length < 1 :
205
+ raise ValueError ("Adjectives word list contains an empty string" )
206
+
207
+ self ._nouns = nouns
208
+ self ._adjectives = adjectives
209
+
210
+ def generate_journalist_designation (self ) -> str :
211
+ random_adjective = self ._random_generator .choice (self ._adjectives )
212
+ random_noun = self ._random_generator .choice (self ._nouns )
213
+ return f"{ random_adjective } { random_noun } "
214
+
215
+ @classmethod
216
+ def get_default (cls ) -> "_DesignationGenerator" :
217
+ # Late import so _SourceScryptManager can be used without a config.py in the parent folder
218
+ from sdconfig import config
219
+
220
+ global _default_designation_generator
221
+ if _default_designation_generator is None :
222
+ # Parse the nouns and adjectives files from the config
223
+ nouns = Path (config .NOUNS ).read_text ().strip ().splitlines ()
224
+ adjectives = Path (config .ADJECTIVES ).read_text ().strip ().splitlines ()
225
+
226
+ # Create the generator
227
+ _default_designation_generator = cls (nouns = nouns , adjectives = adjectives )
228
+
229
+ return _default_designation_generator
0 commit comments