-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbulk_select.py
86 lines (65 loc) · 1.86 KB
/
bulk_select.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import csv
import psycopg2
from cStringIO import StringIO
import time
def getResults(stream):
"""
get result generator
"""
f = StringIO(stream)
result = csv.DictReader(f, restkey=None)
for item in result:
yield item
f.close()
class BulkSelect:
def __init__(self):
"""
Init connection
"""
conStr = "postgresql://<user>:<pwd>@localhost:5432/<db_name>"
self.conn = psycopg2.connect(conStr)
self.cursor = self.conn.cursor()
def __del__(self):
"""
Clean memory, close connections
"""
self.cursor.close()
self.conn.close()
def getData(self):
"""
Common way to load big data set
:return List:
"""
start_time = time.time()
query = """
SELECT * from big_data inner join big_data as t1 USING(fname)
"""
self.cursor.execute(query)
result = list()
for item in self.cursor:
result.append(item)
print("--- %s seconds ---" % (time.time() - start_time))
return result
def getDataCopy(self):
"""
COPY approach to load big data set
:return List:
"""
start_time = time.time()
query = """
SELECT * from big_data inner join big_data as t1 USING(fname)
"""
output = StringIO()
self.cursor.copy_expert("COPY (%s) TO STDOUT (FORMAT 'csv', HEADER true)" % query, output)
data = output.getvalue()
output.close()
result = list()
for item in getResults(data):
# do whatever we need
item = {k: None if v == "" else v for k, v in item.items()}
result.append(item)
print("--- %s seconds ---" % (time.time() - start_time))
return result
dalc = BulkSelect()
dalc.getData()
dalc.getDataCopy()