-
Notifications
You must be signed in to change notification settings - Fork 1
/
json2csv.py
118 lines (96 loc) · 3.53 KB
/
json2csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python
try:
import unicodecsv as csv
except ImportError:
import csv
import json
import operator
import os
from collections import OrderedDict
import logging
logging.basicConfig(level=logging.DEBUG)
class Json2Csv(object):
"""Process a JSON object to a CSV file"""
collection = None
# Better for single-nested dictionaries
SEP_CHAR = ', '
KEY_VAL_CHAR = ': '
DICT_SEP_CHAR = '\r'
DICT_OPEN = ''
DICT_CLOSE = ''
# Better for deep-nested dictionaries
# SEP_CHAR = ', '
# KEY_VAL_CHAR = ': '
# DICT_SEP_CHAR = '; '
# DICT_OPEN = '{ '
# DICT_CLOSE = '} '
def __init__(self, outline):
self.rows = []
if not isinstance(outline, dict):
raise ValueError('You must pass in an outline for JSON2CSV to follow')
elif 'map' not in outline or len(outline['map']) < 1:
raise ValueError('You must specify at least one value for "map"')
key_map = OrderedDict()
for header, key in outline['map']:
splits = key.split('.')
splits = [int(s) if s.isdigit() else s for s in splits]
key_map[header] = splits
self.key_map = key_map
if 'collection' in outline:
self.collection = outline['collection']
def load(self, json_file):
self.process_each(json.load(json_file))
def process_each(self, data):
"""Process each item of a json-loaded dict
"""
if self.collection and self.collection in data:
data = data[self.collection]
for d in data:
logging.info(d)
self.rows.append(self.process_row(d))
def process_row(self, item):
"""Process a row of json data against the key map
"""
row = {}
for header, keys in self.key_map.items():
try:
row[header] = reduce(operator.getitem, keys, item)
except (KeyError, IndexError, TypeError):
row[header] = None
return row
def make_strings(self):
str_rows = []
for row in self.rows:
str_rows.append({k: self.make_string(val)
for k, val in row.items()})
return str_rows
def make_string(self, item):
if isinstance(item, list) or isinstance(item, set) or isinstance(item, tuple):
return self.SEP_CHAR.join([self.make_string(subitem) for subitem in item])
elif isinstance(item, dict):
return self.DICT_OPEN + self.DICT_SEP_CHAR.join([self.KEY_VAL_CHAR.join([k, self.make_string(val)]) for k, val in item.items()]) + self.DICT_CLOSE
else:
return unicode(item)
def write_csv(self, filename='output.csv', make_strings=False):
"""Write the processed rows to the given filename
"""
if (len(self.rows) <= 0):
raise AttributeError('No rows were loaded')
if make_strings:
out = self.make_strings()
else:
out = self.rows
with open(filename, 'wb+') as f:
writer = csv.DictWriter(f, self.key_map.keys())
writer.writeheader()
writer.writerows(out)
class MultiLineJson2Csv(Json2Csv):
def load(self, json_file):
self.process_each(json_file)
def process_each(self, data, collection=None):
"""Load each line of an iterable collection (ie. file)"""
for line in data:
d = json.loads(line)
if self.collection in d:
d = d[self.collection]
self.rows.append(self.process_row(d))