-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
Copy pathversion_solver.py
487 lines (399 loc) · 19.5 KB
/
version_solver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
from __future__ import annotations
import functools
import time
from typing import TYPE_CHECKING
from poetry.core.packages.dependency import Dependency
from poetry.mixology.failure import SolveFailure
from poetry.mixology.incompatibility import Incompatibility
from poetry.mixology.incompatibility_cause import ConflictCause
from poetry.mixology.incompatibility_cause import NoVersionsCause
from poetry.mixology.incompatibility_cause import RootCause
from poetry.mixology.partial_solution import PartialSolution
from poetry.mixology.result import SolverResult
from poetry.mixology.set_relation import SetRelation
from poetry.mixology.term import Term
if TYPE_CHECKING:
from poetry.core.packages.project_package import ProjectPackage
from poetry.packages import DependencyPackage
from poetry.puzzle.provider import Provider
_conflict = object()
class DependencyCache:
"""
A cache of the valid dependencies.
The key observation here is that during the search - except at backtracking
- once we have decided that a dependency is invalid, we never need check it
again.
"""
def __init__(self, provider: Provider) -> None:
self.provider = provider
self.cache: dict[
tuple[str, str | None, str | None, str | None, str | None],
list[DependencyPackage],
] = {}
self.search_for = functools.lru_cache(maxsize=128)(self._search_for)
def _search_for(self, dependency: Dependency) -> list[DependencyPackage]:
key = (
dependency.complete_name,
dependency.source_type,
dependency.source_url,
dependency.source_reference,
dependency.source_subdirectory,
)
packages = self.cache.get(key)
if packages is None:
packages = self.provider.search_for(dependency)
else:
packages = [
p for p in packages if dependency.constraint.allows(p.package.version)
]
self.cache[key] = packages
return packages
def clear(self) -> None:
self.cache.clear()
class VersionSolver:
"""
The version solver that finds a set of package versions that satisfy the
root package's dependencies.
See https://github.com/dart-lang/pub/tree/master/doc/solver.md for details
on how this solver works.
"""
def __init__(self, root: ProjectPackage, provider: Provider) -> None:
self._root = root
self._provider = provider
self._dependency_cache = DependencyCache(provider)
self._incompatibilities: dict[str, list[Incompatibility]] = {}
self._contradicted_incompatibilities: set[Incompatibility] = set()
self._solution = PartialSolution()
@property
def solution(self) -> PartialSolution:
return self._solution
def solve(self) -> SolverResult:
"""
Finds a set of dependencies that match the root package's constraints,
or raises an error if no such set is available.
"""
start = time.time()
root_dependency = Dependency(self._root.name, self._root.version)
root_dependency.is_root = True
self._add_incompatibility(
Incompatibility([Term(root_dependency, False)], RootCause())
)
try:
next: str | None = self._root.name
while next is not None:
self._propagate(next)
next = self._choose_package_version()
return self._result()
except Exception:
raise
finally:
self._log(
f"Version solving took {time.time() - start:.3f} seconds.\n"
f"Tried {self._solution.attempted_solutions} solutions."
)
def _propagate(self, package: str) -> None:
"""
Performs unit propagation on incompatibilities transitively
related to package to derive new assignments for _solution.
"""
changed = {package}
while changed:
package = changed.pop()
# Iterate in reverse because conflict resolution tends to produce more
# general incompatibilities as time goes on. If we look at those first,
# we can derive stronger assignments sooner and more eagerly find
# conflicts.
for incompatibility in reversed(self._incompatibilities[package]):
if incompatibility in self._contradicted_incompatibilities:
continue
result = self._propagate_incompatibility(incompatibility)
if result is _conflict:
# If the incompatibility is satisfied by the solution, we use
# _resolve_conflict() to determine the root cause of the conflict as
# a new incompatibility.
#
# It also backjumps to a point in the solution
# where that incompatibility will allow us to derive new assignments
# that avoid the conflict.
root_cause = self._resolve_conflict(incompatibility)
# Back jumping erases all the assignments we did at the previous
# decision level, so we clear [changed] and refill it with the
# newly-propagated assignment.
changed.clear()
changed.add(str(self._propagate_incompatibility(root_cause)))
break
elif result is not None:
changed.add(str(result))
def _propagate_incompatibility(
self, incompatibility: Incompatibility
) -> str | object | None:
"""
If incompatibility is almost satisfied by _solution, adds the
negation of the unsatisfied term to _solution.
If incompatibility is satisfied by _solution, returns _conflict. If
incompatibility is almost satisfied by _solution, returns the
unsatisfied term's package name.
Otherwise, returns None.
"""
# The first entry in incompatibility.terms that's not yet satisfied by
# _solution, if one exists. If we find more than one, _solution is
# inconclusive for incompatibility and we can't deduce anything.
unsatisfied = None
for term in incompatibility.terms:
relation = self._solution.relation(term)
if relation == SetRelation.DISJOINT:
# If term is already contradicted by _solution, then
# incompatibility is contradicted as well and there's nothing new we
# can deduce from it.
self._contradicted_incompatibilities.add(incompatibility)
return None
elif relation == SetRelation.OVERLAPPING:
# If more than one term is inconclusive, we can't deduce anything about
# incompatibility.
if unsatisfied is not None:
return None
# If exactly one term in incompatibility is inconclusive, then it's
# almost satisfied and [term] is the unsatisfied term. We can add the
# inverse of the term to _solution.
unsatisfied = term
# If *all* terms in incompatibility are satisfied by _solution, then
# incompatibility is satisfied and we have a conflict.
if unsatisfied is None:
return _conflict
self._contradicted_incompatibilities.add(incompatibility)
adverb = "not " if unsatisfied.is_positive() else ""
self._log(f"derived: {adverb}{unsatisfied.dependency}")
self._solution.derive(
unsatisfied.dependency, not unsatisfied.is_positive(), incompatibility
)
complete_name: str = unsatisfied.dependency.complete_name
return complete_name
def _resolve_conflict(self, incompatibility: Incompatibility) -> Incompatibility:
"""
Given an incompatibility that's satisfied by _solution,
The `conflict resolution`_ constructs a new incompatibility that encapsulates
the root cause of the conflict and backtracks _solution until the new
incompatibility will allow _propagate() to deduce new assignments.
Adds the new incompatibility to _incompatibilities and returns it.
.. _conflict resolution:
https://github.com/dart-lang/pub/tree/master/doc/solver.md#conflict-resolution
"""
self._log(f"conflict: {incompatibility}")
new_incompatibility = False
while not incompatibility.is_failure():
# The term in incompatibility.terms that was most recently satisfied by
# _solution.
most_recent_term = None
# The earliest assignment in _solution such that incompatibility is
# satisfied by _solution up to and including this assignment.
most_recent_satisfier = None
# The difference between most_recent_satisfier and most_recent_term;
# that is, the versions that are allowed by most_recent_satisfier and not
# by most_recent_term. This is None if most_recent_satisfier totally
# satisfies most_recent_term.
difference = None
# The decision level of the earliest assignment in _solution *before*
# most_recent_satisfier such that incompatibility is satisfied by
# _solution up to and including this assignment plus
# most_recent_satisfier.
#
# Decision level 1 is the level where the root package was selected. It's
# safe to go back to decision level 0, but stopping at 1 tends to produce
# better error messages, because references to the root package end up
# closer to the final conclusion that no solution exists.
previous_satisfier_level = 1
for term in incompatibility.terms:
satisfier = self._solution.satisfier(term)
if most_recent_satisfier is None:
most_recent_term = term
most_recent_satisfier = satisfier
elif most_recent_satisfier.index < satisfier.index:
previous_satisfier_level = max(
previous_satisfier_level, most_recent_satisfier.decision_level
)
most_recent_term = term
most_recent_satisfier = satisfier
difference = None
else:
previous_satisfier_level = max(
previous_satisfier_level, satisfier.decision_level
)
if most_recent_term == term:
# If most_recent_satisfier doesn't satisfy most_recent_term on its
# own, then the next-most-recent satisfier may be the one that
# satisfies the remainder.
difference = most_recent_satisfier.difference(most_recent_term)
if difference is not None:
previous_satisfier_level = max(
previous_satisfier_level,
self._solution.satisfier(difference.inverse).decision_level,
)
# If most_recent_identifier is the only satisfier left at its decision
# level, or if it has no cause (indicating that it's a decision rather
# than a derivation), then incompatibility is the root cause. We then
# backjump to previous_satisfier_level, where incompatibility is
# guaranteed to allow _propagate to produce more assignments.
# using assert to suppress mypy [union-attr]
assert most_recent_satisfier is not None
if (
previous_satisfier_level < most_recent_satisfier.decision_level
or most_recent_satisfier.cause is None
):
self._solution.backtrack(previous_satisfier_level)
self._contradicted_incompatibilities.clear()
self._dependency_cache.clear()
if new_incompatibility:
self._add_incompatibility(incompatibility)
return incompatibility
# Create a new incompatibility by combining incompatibility with the
# incompatibility that caused most_recent_satisfier to be assigned. Doing
# this iteratively constructs an incompatibility that's guaranteed to be
# true (that is, we know for sure no solution will satisfy the
# incompatibility) while also approximating the intuitive notion of the
# "root cause" of the conflict.
new_terms = [
term for term in incompatibility.terms if term != most_recent_term
]
for term in most_recent_satisfier.cause.terms:
if term.dependency != most_recent_satisfier.dependency:
new_terms.append(term)
# The most_recent_satisfier may not satisfy most_recent_term on its own
# if there are a collection of constraints on most_recent_term that
# only satisfy it together. For example, if most_recent_term is
# `foo ^1.0.0` and _solution contains `[foo >=1.0.0,
# foo <2.0.0]`, then most_recent_satisfier will be `foo <2.0.0` even
# though it doesn't totally satisfy `foo ^1.0.0`.
#
# In this case, we add `not (most_recent_satisfier \ most_recent_term)` to
# the incompatibility as well, See the `algorithm documentation`_ for
# details.
#
# .. _algorithm documentation:
# https://github.com/dart-lang/pub/tree/master/doc/solver.md#conflict-resolution # noqa: E501
if difference is not None:
inverse = difference.inverse
if inverse.dependency != most_recent_satisfier.dependency:
new_terms.append(inverse)
incompatibility = Incompatibility(
new_terms, ConflictCause(incompatibility, most_recent_satisfier.cause)
)
new_incompatibility = True
partially = "" if difference is None else " partially"
self._log(
f"! {most_recent_term} is{partially} satisfied by"
f" {most_recent_satisfier}"
)
self._log(f'! which is caused by "{most_recent_satisfier.cause}"')
self._log(f"! thus: {incompatibility}")
raise SolveFailure(incompatibility)
def _choose_package_version(self) -> str | None:
"""
Tries to select a version of a required package.
Returns the name of the package whose incompatibilities should be
propagated by _propagate(), or None indicating that version solving is
complete and a solution has been found.
"""
unsatisfied = self._solution.unsatisfied
if not unsatisfied:
return None
class Preference:
"""
Preference is one of the criteria for choosing which dependency to solve
first. A higher value means that there are "more options" to satisfy
a dependency. A lower value takes precedence.
"""
DIRECT_ORIGIN = 0
NO_CHOICE = 1
USE_LATEST = 2
LOCKED = 3
DEFAULT = 4
# Prefer packages with as few remaining versions as possible,
# so that if a conflict is necessary it's forced quickly.
# In order to provide results that are as deterministic as possible
# and consistent between `poetry lock` and `poetry update`, the return value
# of two different dependencies should not be equal if possible.
def _get_min(dependency: Dependency) -> tuple[bool, int, int]:
# Direct origin dependencies must be handled first: we don't want to resolve
# a regular dependency for some package only to find later that we had a
# direct-origin dependency.
if dependency.is_direct_origin():
return False, Preference.DIRECT_ORIGIN, 1
is_specific_marker = not dependency.marker.is_any()
use_latest = dependency.name in self._provider.use_latest
if not use_latest:
locked = self._provider.get_locked(dependency)
if locked:
return is_specific_marker, Preference.LOCKED, 1
num_packages = len(self._dependency_cache.search_for(dependency))
if num_packages < 2:
preference = Preference.NO_CHOICE
elif use_latest:
preference = Preference.USE_LATEST
else:
preference = Preference.DEFAULT
return is_specific_marker, preference, num_packages
if len(unsatisfied) == 1:
dependency = unsatisfied[0]
else:
dependency = min(*unsatisfied, key=_get_min)
locked = self._provider.get_locked(dependency)
if locked is None:
packages = self._dependency_cache.search_for(dependency)
package = next(iter(packages), None)
if package is None:
# If there are no versions that satisfy the constraint,
# add an incompatibility that indicates that.
self._add_incompatibility(
Incompatibility([Term(dependency, True)], NoVersionsCause())
)
complete_name = dependency.complete_name
return complete_name
else:
package = locked
package = self._provider.complete_package(package)
conflict = False
for incompatibility in self._provider.incompatibilities_for(package):
self._add_incompatibility(incompatibility)
# If an incompatibility is already satisfied, then selecting version
# would cause a conflict.
#
# We'll continue adding its dependencies, then go back to
# unit propagation which will guide us to choose a better version.
conflict = conflict or all(
term.dependency.complete_name == dependency.complete_name
or self._solution.satisfies(term)
for term in incompatibility.terms
)
if not conflict:
self._solution.decide(package.package)
self._log(
f"selecting {package.package.complete_name}"
f" ({package.package.full_pretty_version})"
)
complete_name = dependency.complete_name
return complete_name
def _result(self) -> SolverResult:
"""
Creates a #SolverResult from the decisions in _solution
"""
decisions = self._solution.decisions
return SolverResult(
self._root,
[p for p in decisions if not p.is_root()],
self._solution.attempted_solutions,
)
def _add_incompatibility(self, incompatibility: Incompatibility) -> None:
self._log(f"fact: {incompatibility}")
for term in incompatibility.terms:
if term.dependency.complete_name not in self._incompatibilities:
self._incompatibilities[term.dependency.complete_name] = []
if (
incompatibility
in self._incompatibilities[term.dependency.complete_name]
):
continue
self._incompatibilities[term.dependency.complete_name].append(
incompatibility
)
def _log(self, text: str) -> None:
self._provider.debug(text, self._solution.attempted_solutions)