-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy path__init__.py
228 lines (200 loc) · 8.95 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""Fetches Shopify Orders and Products."""
from typing import Any, Dict, Iterator, Iterator, Optional, Iterable
import dlt
from dlt.sources import DltResource
from dlt.common.typing import TDataItem, TAnyDateTime
from dlt.common.time import ensure_pendulum_datetime
from dlt.common import pendulum
from dlt.common import jsonpath as jp
from .settings import (
DEFAULT_API_VERSION,
FIRST_DAY_OF_MILLENNIUM,
DEFAULT_ITEMS_PER_PAGE,
DEFAULT_PARTNER_API_VERSION,
)
from .helpers import ShopifyApi, TOrderStatus, ShopifyPartnerApi
@dlt.source(name="shopify")
def shopify_source(
private_app_password: str = dlt.secrets.value,
api_version: str = DEFAULT_API_VERSION,
shop_url: str = dlt.config.value,
start_date: TAnyDateTime = FIRST_DAY_OF_MILLENNIUM,
end_date: Optional[TAnyDateTime] = None,
created_at_min: TAnyDateTime = FIRST_DAY_OF_MILLENNIUM,
items_per_page: int = DEFAULT_ITEMS_PER_PAGE,
order_status: TOrderStatus = "any",
) -> Iterable[DltResource]:
"""
The source for the Shopify pipeline. Available resources are products, orders, and customers.
`start_time` argument can be used on its own or together with `end_time`. When both are provided
data is limited to items updated in that time range.
The range is "half-open", meaning elements equal and newer than `start_time` and elements older than `end_time` are included.
All resources opt-in to use Airflow scheduler if run as Airflow task
Args:
private_app_password: The app password to the app on your shop.
api_version: The API version to use (e.g. 2023-01).
shop_url: The URL of your shop (e.g. https://my-shop.myshopify.com).
items_per_page: The max number of items to fetch per page. Defaults to 250.
start_date: Items updated on or after this date are imported. Defaults to 2000-01-01.
If end date is not provided, this is used as the initial value for incremental loading and after the initial run, only new data will be retrieved.
Accepts any `date`/`datetime` object or a date/datetime string in ISO 8601 format.
end_time: The end time of the range for which to load data.
Should be used together with `start_date` to limit the data to items updated in that time range.
If end time is not provided, the incremental loading will be enabled and after initial run, only new data will be retrieved
created_at_min: The minimum creation date of items to import. Items created on or after this date are loaded. Defaults to 2000-01-01.
order_status: The order status to filter by. Can be 'open', 'closed', 'cancelled', or 'any'. Defaults to 'any'.
Returns:
Iterable[DltResource]: A list of DltResource objects representing the data resources.
"""
# build client
client = ShopifyApi(shop_url, private_app_password, api_version)
start_date_obj = ensure_pendulum_datetime(start_date)
end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None
created_at_min_obj = ensure_pendulum_datetime(created_at_min)
# define resources
@dlt.resource(primary_key="id", write_disposition="merge")
def products(
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at",
initial_value=start_date_obj,
end_value=end_date_obj,
allow_external_schedulers=True,
),
created_at_min: pendulum.DateTime = created_at_min_obj,
items_per_page: int = items_per_page,
) -> Iterable[TDataItem]:
"""
The resource for products on your shop, supports incremental loading and pagination.
Args:
updated_at: The saved state of the last 'updated_at' value.
Returns:
Iterable[TDataItem]: A generator of products.
"""
params = dict(
updated_at_min=updated_at.last_value.isoformat(),
limit=items_per_page,
order="updated_at asc",
created_at_min=created_at_min.isoformat(),
)
if updated_at.end_value is not None:
params["updated_at_max"] = updated_at.end_value.isoformat()
yield from client.get_pages("products", params)
@dlt.resource(primary_key="id", write_disposition="merge")
def orders(
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at",
initial_value=start_date_obj,
end_value=end_date_obj,
allow_external_schedulers=True,
),
created_at_min: pendulum.DateTime = created_at_min_obj,
items_per_page: int = items_per_page,
status: TOrderStatus = order_status,
) -> Iterable[TDataItem]:
"""
The resource for orders on your shop, supports incremental loading and pagination.
Args:
updated_at: The saved state of the last 'updated_at' value.
Returns:
Iterable[TDataItem]: A generator of orders.
"""
params = dict(
updated_at_min=updated_at.last_value.isoformat(),
limit=items_per_page,
status=status,
order="updated_at asc",
created_at_min=created_at_min.isoformat(),
)
if updated_at.end_value is not None:
params["updated_at_max"] = updated_at.end_value.isoformat()
yield from client.get_pages("orders", params)
@dlt.resource(primary_key="id", write_disposition="merge")
def customers(
updated_at: dlt.sources.incremental[
pendulum.DateTime
] = dlt.sources.incremental(
"updated_at",
initial_value=start_date_obj,
end_value=end_date_obj,
allow_external_schedulers=True,
),
created_at_min: pendulum.DateTime = created_at_min_obj,
items_per_page: int = items_per_page,
) -> Iterable[TDataItem]:
"""
The resource for customers on your shop, supports incremental loading and pagination.
Args:
updated_at: The saved state of the last 'updated_at' value.
Returns:
Iterable[TDataItem]: A generator of customers.
"""
params = dict(
updated_at_min=updated_at.last_value.isoformat(),
limit=items_per_page,
order="updated_at asc",
created_at_min=created_at_min.isoformat(),
)
if updated_at.end_value is not None:
params["updated_at_max"] = updated_at.end_value.isoformat()
yield from client.get_pages("customers", params)
return (products, orders, customers)
@dlt.resource
def shopify_partner_query(
query: str,
data_items_path: jp.TJsonPath,
pagination_cursor_path: jp.TJsonPath,
pagination_variable_name: str = "after",
variables: Optional[Dict[str, Any]] = None,
access_token: str = dlt.secrets.value,
organization_id: str = dlt.config.value,
api_version: str = DEFAULT_PARTNER_API_VERSION,
) -> Iterable[TDataItem]:
"""
Resource for getting paginated results from the Shopify Partner GraphQL API.
This resource will run the given GraphQL query and extract a list of data items from the result.
It will then run the query again with a pagination cursor to get the next page of results.
Example:
query = '''query Transactions($after: String) {
transactions(after: $after, first: 100) {
edges {
cursor
node {
id
}
}
}
}'''
partner_query_pages(
query,
data_items_path="data.transactions.edges[*].node",
pagination_cursor_path="data.transactions.edges[-1].cursor",
pagination_variable_name="after",
)
Args:
query: The GraphQL query to run.
data_items_path: The JSONPath to the data items in the query result. Should resolve to array items.
pagination_cursor_path: The JSONPath to the pagination cursor in the query result, will be piped to the next query via variables.
pagination_variable_name: The name of the variable to pass the pagination cursor to.
variables: Mapping of extra variables used in the query.
access_token: The Partner API Client access token, created in the Partner Dashboard.
organization_id: Your Organization ID, found in the Partner Dashboard.
api_version: The API version to use (e.g. 2024-01). Use `unstable` for the latest version.
Returns:
Iterable[TDataItem]: A generator of the query results.
"""
client = ShopifyPartnerApi(
access_token=access_token,
organization_id=organization_id,
api_version=api_version,
)
yield from client.get_graphql_pages(
query,
data_items_path=data_items_path,
pagination_cursor_path=pagination_cursor_path,
pagination_variable_name=pagination_variable_name,
variables=variables,
)