-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
list.py
213 lines (194 loc) · 7.78 KB
/
list.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import json
from dbt.contracts.graph.nodes import Exposure, SourceDefinition, Metric, SemanticModel
from dbt.flags import get_flags
from dbt.graph import ResourceTypeSelector
from dbt.task.runnable import GraphRunnableTask
from dbt.task.test import TestSelector
from dbt.node_types import NodeType
from dbt.events.functions import (
fire_event,
warn_or_error,
)
from dbt.events.types import (
NoNodesSelected,
ListCmdOut,
)
from dbt.exceptions import DbtRuntimeError, DbtInternalError
from dbt.events.contextvars import task_contextvars
class ListTask(GraphRunnableTask):
DEFAULT_RESOURCE_VALUES = frozenset(
(
NodeType.Model,
NodeType.Snapshot,
NodeType.Seed,
NodeType.Test,
NodeType.Source,
NodeType.Exposure,
NodeType.Metric,
NodeType.SemanticModel,
)
)
ALL_RESOURCE_VALUES = DEFAULT_RESOURCE_VALUES | frozenset((NodeType.Analysis,))
ALLOWED_KEYS = frozenset(
(
"alias",
"name",
"package_name",
"depends_on",
"tags",
"config",
"resource_type",
"source_name",
"original_file_path",
"unique_id",
)
)
def __init__(self, args, config, manifest) -> None:
super().__init__(args, config, manifest)
if self.args.models:
if self.args.select:
raise DbtRuntimeError('"models" and "select" are mutually exclusive arguments')
if self.args.resource_types:
raise DbtRuntimeError(
'"models" and "resource_type" are mutually exclusive ' "arguments"
)
def _iterate_selected_nodes(self):
selector = self.get_node_selector()
spec = self.get_selection_spec()
nodes = sorted(selector.get_selected(spec))
if not nodes:
warn_or_error(NoNodesSelected())
return
if self.manifest is None:
raise DbtInternalError("manifest is None in _iterate_selected_nodes")
for node in nodes:
if node in self.manifest.nodes:
yield self.manifest.nodes[node]
elif node in self.manifest.sources:
yield self.manifest.sources[node]
elif node in self.manifest.exposures:
yield self.manifest.exposures[node]
elif node in self.manifest.metrics:
yield self.manifest.metrics[node]
elif node in self.manifest.semantic_models:
yield self.manifest.semantic_models[node]
else:
raise DbtRuntimeError(
f'Got an unexpected result from node selection: "{node}"'
f"Expected a source or a node!"
)
def generate_selectors(self):
for node in self._iterate_selected_nodes():
if node.resource_type == NodeType.Source:
assert isinstance(node, SourceDefinition)
# sources are searched for by pkg.source_name.table_name
source_selector = ".".join([node.package_name, node.source_name, node.name])
yield f"source:{source_selector}"
elif node.resource_type == NodeType.Exposure:
assert isinstance(node, Exposure)
# exposures are searched for by pkg.exposure_name
exposure_selector = ".".join([node.package_name, node.name])
yield f"exposure:{exposure_selector}"
elif node.resource_type == NodeType.Metric:
assert isinstance(node, Metric)
# metrics are searched for by pkg.metric_name
metric_selector = ".".join([node.package_name, node.name])
yield f"metric:{metric_selector}"
elif node.resource_type == NodeType.SemanticModel:
assert isinstance(node, SemanticModel)
semantic_model_selector = ".".join([node.package_name, node.name])
yield f"semantic_model:{semantic_model_selector}"
else:
# everything else is from `fqn`
yield ".".join(node.fqn)
def generate_names(self):
for node in self._iterate_selected_nodes():
yield node.search_name
def generate_json(self):
for node in self._iterate_selected_nodes():
yield json.dumps(
{
k: v
for k, v in node.to_dict(omit_none=False).items()
if (
k in self.args.output_keys
if self.args.output_keys
else k in self.ALLOWED_KEYS
)
}
)
def generate_paths(self):
for node in self._iterate_selected_nodes():
yield node.original_file_path
def run(self):
# We set up a context manager here with "task_contextvars" because we
# we need the project_root in compile_manifest.
with task_contextvars(project_root=self.config.project_root):
self.compile_manifest()
output = self.args.output
if output == "selector":
generator = self.generate_selectors
elif output == "name":
generator = self.generate_names
elif output == "json":
generator = self.generate_json
elif output == "path":
generator = self.generate_paths
else:
raise DbtInternalError("Invalid output {}".format(output))
return self.output_results(generator())
def output_results(self, results):
"""Log, or output a plain, newline-delimited, and ready-to-pipe list of nodes found."""
for result in results:
self.node_results.append(result)
if get_flags().LOG_FORMAT == "json":
fire_event(ListCmdOut(msg=result))
else:
# Cleaner to leave as print than to mutate the logger not to print timestamps.
print(result)
return self.node_results
@property
def resource_types(self):
if self.args.models:
return [NodeType.Model]
if not self.args.resource_types:
return list(self.DEFAULT_RESOURCE_VALUES)
values = set(self.args.resource_types)
if "default" in values:
values.remove("default")
values.update(self.DEFAULT_RESOURCE_VALUES)
if "all" in values:
values.remove("all")
values.update(self.ALL_RESOURCE_VALUES)
return list(values)
@property
def selection_arg(self):
# for backwards compatibility, list accepts both --models and --select,
# with slightly different behavior: --models implies --resource-type model
if self.args.models:
return self.args.models
else:
return self.args.select
def defer_to_manifest(self, adapter, selected_uids):
# list don't defer
return
def get_node_selector(self):
if self.manifest is None or self.graph is None:
raise DbtInternalError("manifest and graph must be set to get perform node selection")
if self.resource_types == [NodeType.Test]:
return TestSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
)
else:
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=self.resource_types,
include_empty_nodes=True,
)
def interpret_results(self, results):
# list command should always return 0 as exit code
return True