-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload_data_to_bigQuery.py
132 lines (106 loc) · 3.56 KB
/
load_data_to_bigQuery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from csv import field_size_limit
from google.cloud import bigquery, storage
import os
from datetime import datetime
# Setting configs
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'ServiceKey_GoogleCloud.json'
# Create a big query client
bigquery_client = bigquery.Client()
# Grabbing today's date
today_date = str(datetime.today().year) + '-' + str(
datetime.today().month) + '-' + str(datetime.today().day)
# Create a dataset called test_dataset
def getOrCreate_dataset(dataset_name):
'''
Get dataset. If dataset does not exist, create one.
Args:
- dataset_name(String)
Returns:
- dataset
'''
print('Fetching Dataset...')
try:
# get and return dataset if exist
dataset = bigquery_client.get_dataset(dataset_name)
print('Done')
print(dataset.self_link)
return dataset
except Exception as e:
# If not, create and return dataset
if e.code == 404:
print('Dataset does not exist. Creating a new one.')
bigquery_client.create_dataset(dataset_name)
dataset = bigquery_client.get_dataset(dataset_name)
print('Done')
print(dataset.self_link)
return dataset
else:
print(e)
def getOrCreate_table(dataset_name, table_name):
'''
Get table. If table does not exist, create one. If dataset does not exist, create one.
Args:
- dataset_name(String)
- table_name(String)
Returns:
- table
'''
# Grab prerequisites for creating a table
dataset = getOrCreate_dataset(dataset_name)
project = dataset.project
dataset = dataset.dataset_id
table = project + '.' + dataset + '.' + table_name
print('\nFetching Table...')
try:
# Get table if exists
t = bigquery_client.get_table(table)
print('Done')
print(t.self_link)
except Exception as e:
# If not, create and get table
if e.code == 404:
print('Table does not exist. Creating a new one.')
bigquery_client.create_table(table)
t = bigquery_client.get_table(table)
print(t.self_link)
finally:
return t
def load_to_bigQuery(dataset_name='log_activity',
table_name='log_table',
date_to_load=today_date):
'''
Load CSV file to BigQuery.
Args:
- date_to_load(String)
- Default - today
Returns:
- None
'''
# Creating a storage client
storage_client = storage.Client()
# Grab bucket data for loading
bucket_name = 'orders_etl_bar' + date_to_load
bucket = storage_client.get_bucket('orders_etl_bar')
blob = bucket.blob(blob_name=date_to_load)
table = getOrCreate_table(dataset_name=dataset_name, table_name=table_name)
job_config = bigquery.LoadJobConfig(schema=[
bigquery.SchemaField("Action", "STRING"),
bigquery.SchemaField("orderNumber", "INTEGER"),
bigquery.SchemaField("orderDate", "DATE"),
bigquery.SchemaField("requiredDate", "DATE"),
bigquery.SchemaField("shippedDate", "DATE"),
bigquery.SchemaField("status", "STRING"),
bigquery.SchemaField("comments", "STRING"),
bigquery.SchemaField("customerNumber", "INTEGER")
],
field_delimiter='|',
source_format=bigquery.SourceFormat.CSV)
uri = 'https://storage.cloud.google.com/orders_etl_bar/' + date_to_load
table_id = table.project + '.' + table.dataset_id + '.' + table.table_id
print('\nLoading log activity data...')
load_job = bigquery_client.load_table_from_uri(uri,
table_id,
job_config=job_config)
load_job.result()
print("Done. Loaded {} rows.".format(load_job.output_rows))
load_to_bigQuery()