-
Notifications
You must be signed in to change notification settings - Fork 11
/
process_utils.py
159 lines (121 loc) · 4.35 KB
/
process_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python
"""
Processes incoming data from the Office of Research and munges it into
the output formats expected by the CFPB chart display organisms.
Output formats are documented at
www.github.com/cfpb/consumer-credit-trends
"""
# Python library imports
import os
import csv
import datetime
import math
import logging
import json
__author__ = "Consumer Financial Protection Bureau"
__credits__ = ["Hillary Jeffrey"]
__license__ = "CC0-1.0"
__version__ = "2.0"
__maintainer__ = "CFPB"
__email__ = "tech@cfpb.gov"
__status__ = "Development"
# Constants
SEC_TO_MS = 1000
# Set up logging
logging.basicConfig(level="INFO")
logger = logging.getLogger(__name__)
# Utility Methods
def save_csv(filename, content, writemode='w'):
"""Saves the specified content object into a csv file."""
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
logger.info(
"Created directories for {}".format(os.path.dirname(filename))
)
# Write output as a csv file
with open(filename, writemode) as csvfile:
writer = csv.writer(csvfile, delimiter=',')
writer.writerows(content)
logger.debug("Wrote file '{}'".format(filename))
def save_json(filename, json_content, writemode='w'):
"""Dumps the specified JSON content into a .json file"""
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
logger.info(
"Created directories for {}".format(os.path.dirname(filename))
)
# Write output as a json file
with open(filename, writemode) as fp:
json.dump(
json_content,
fp,
sort_keys=True,
indent=4,
separators=(',', ': ')
)
logger.debug("Wrote file '{}'".format(filename))
def load_csv(filename, readmode='r', skipheaderrow=True):
"""Loads CSV data from a file"""
with open(filename, readmode) as csvfile:
reader = csv.reader(csvfile)
data = list(reader)
if skipheaderrow:
return data[1:]
else:
return data
def expand_path(path):
"""Expands a relative path into an absolute path"""
rootpath = os.path.abspath(os.path.expanduser(path))
return rootpath
def get_csv_list(path):
"""Loads a list of files in the specified directory"""
files = [f for f in os.listdir(path)
if f.lower().endswith('.csv')
and os.path.isfile(os.path.join(path, f))]
return files
def milliseconds(sec):
"""Convert seconds to milliseconds"""
return sec * SEC_TO_MS
# Unix Epoch conversion from http://stackoverflow.com/questions/11743019/
def epochtime(datestring, schema="%Y-%m"):
"""Converts a date string from specified schema to seconds since
J70/Unix epoch"""
date = datetime.datetime.strptime(datestring, schema)
return int(round((date - datetime.datetime(1970, 1, 1)).total_seconds()))
# Modified from an answer at:
# http://stackoverflow.com/questions/3154460/
def human_numbers(num, decimal_places=1, whole_units_only=1):
"""Given a number, returns a human-modifier (million/billion) number
Number returned will be to the specified number of decimal places with
modifier (default: 1) - e.g. 1100000 returns '1.1 million'.
If whole_units_only is specified, no parts less than one unit will
be displayed, i.e. 67.012 becomes 67.
whole_units_only has no effect on numbers with modifiers (>1 million)."""
numnames = [
'',
'',
'million',
'billion',
'trillion',
'quadrillion',
'quintillion'
]
n = float(num)
idx = max(0,
min(len(numnames) - 1,
int(math.floor(0 if n == 0 else math.log10(abs(n))/3))
)
)
# Create the output string with the requested number of decimal places
# This has to be a separate step from the format() call because otherwise
# format() gets called on the final fragment only
outstr = '{:,.' + str(decimal_places) + 'f} {}'
# Insert commas every 3 numbers
if idx < 2:
if whole_units_only:
return '{:,}'.format(int(round(n)))
else:
return outstr.format(n, numnames[idx]).strip()
# Calculate the output number by order of magnitude
outnum = n / 10**(3 * idx)
return outstr.format(outnum, numnames[idx])