forked from theupdateframework/python-tuf
-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_updater_key_rotations.py
131 lines (108 loc) · 4.13 KB
/
test_updater_key_rotations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python
# Copyright 2021, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Test ngclient Updater using the repository simulator
"""
from dataclasses import dataclass
from securesystemslib.signer import SSlibSigner
from typing import Dict, List, Optional
import os
import sys
import tempfile
import unittest
from tuf.api.metadata import Key
from tuf.exceptions import UnsignedMetadataError
from tuf.ngclient import Updater
from tests import utils
from tests.repository_simulator import RepositorySimulator
from tests.test_metadata_serialization import run_sub_tests_with_dataset
@dataclass
class RootVersion:
keys: list[int]
threshold: int
signatures: list[int]
result: Optional[Exception] = None
class TestUpdaterKeyRotations(unittest.TestCase):
# set dump_dir to trigger repository state dumps
dump_dir:Optional[str] = None
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
# Setup the repository
self.sim = RepositorySimulator()
if self.dump_dir is not None:
# create test specific dump directory
name = self.id().split('.')[-1]
self.sim.dump_dir = os.path.join(self.dump_dir, name)
os.mkdir(self.sim.dump_dir)
# Pre-create a bunch of keys and signers
self.keys: List[Key] = []
self.signers: List[SSlibSigner] = []
for i in range(10):
key, signer = self.sim.create_key()
self.keys.append(key)
self.signers.append(signer)
def tearDown(self):
self.temp_dir.cleanup()
def _run_refresh(self) -> Updater:
if self.sim.dump_dir is not None:
self.sim.write()
# bootstrap with initial root
metadata_dir = tempfile.mkdtemp(dir=self.temp_dir.name)
with open(os.path.join(metadata_dir, "root.json"), "bw") as f:
root = self.sim.download_bytes("https://example.com/metadata/1.root.json", 100000)
f.write(root)
updater = Updater(
metadata_dir,
"https://example.com/metadata/",
"https://example.com/targets/",
self.sim
)
updater.refresh()
return updater
test_cases = {
"1-of-1 key rotation" : [
RootVersion([1], 1, [1]),
RootVersion([2], 1, [2,1]),
RootVersion([2], 1, [2]),
],
"1-of-1 key rotation fail: not signed with old key" : [
RootVersion([1], 1, [1]),
RootVersion([2], 1, [2], UnsignedMetadataError),
],
"1-of-1 key rotation fail: not signed with new key" : [
RootVersion([1], 1, [1]),
RootVersion([2], 1, [1], UnsignedMetadataError),
],
}
@run_sub_tests_with_dataset(test_cases)
def test_root_rotation(self, root_versions: List[RootVersion]):
expected_result = None
# wipe published remote root versions for each subtest:
self.sim.signed_roots = []
self.sim.root.version = 0
# Publish remote root versions
for rootver in root_versions:
# clear root keys, signers
self.sim.root.roles["root"].keyids.clear()
self.sim.signers["root"].clear()
self.sim.root.roles["root"].threshold = rootver.threshold
for i in rootver.keys:
self.sim.root.add_key("root", self.keys[i])
for i in rootver.signatures:
self.sim.add_signer("root", self.signers[i])
self.sim.root.version += 1
self.sim.publish_root()
# only support errors in last root version for now
expected_result = rootver.result
if expected_result is None:
self._run_refresh()
else:
with self.assertRaises(expected_result):
self._run_refresh()
if __name__ == "__main__":
if "--dump" in sys.argv:
TestUpdaterKeyRotations.dump_dir = tempfile.mkdtemp()
print(f"Repository Simulator dumps in {TestUpdaterKeyRotations.dump_dir}")
sys.argv.remove("--dump")
utils.configure_test_logging(sys.argv)
unittest.main()