1
1
import asyncio
2
2
import json
3
3
import os
4
-
5
- from datetime import datetime
6
-
7
4
import boto3
8
- import pandas as pd
9
-
10
5
from botocore .client import Config , ClientError
11
6
12
7
from csvapi .parseview import ParseView
13
8
from csvapi .profileview import ProfileView
9
+ from csvapi .utils import enrich_db_with_metadata
14
10
from csvapi .setup_logger import logger
15
- from csvapi .utils import get_hash , create_connection
16
-
17
- from config import DB_ROOT_DIR
11
+ from csvapi .utils import (
12
+ get_hash ,
13
+ check_message_structure ,
14
+ check_csv_detective_report_structure ,
15
+ check_profile_report_structure
16
+ )
18
17
19
18
MINIO_URL = os .environ .get ("MINIO_URL" , "http://localhost:9000" )
20
19
MINIO_USER = os .environ .get ("MINIO_USER" , "minio" )
@@ -26,167 +25,87 @@ def run_process_message(key: str, data: dict, topic: str) -> None:
26
25
27
26
28
27
async def process_message (key : str , message : dict , topic : str ) -> None :
29
- # Get url
30
- # Should think if we keep that
31
- # r = requests.get('https://www.data.gouv.fr/api/1/datasets/{}/resources/{}'.format(message['meta']['dataset_id'], key)) # noqa
32
- # url = r.json()['url']
33
- if message is not None and message ['service' ] == 'csvdetective' :
34
- url = 'https://www.data.gouv.fr/fr/datasets/r/{}' .format (key )
35
- urlhash = get_hash (url )
36
- logger .info (urlhash )
37
-
38
- # Connect to minio
39
- s3_client = boto3 .client (
40
- "s3" ,
41
- endpoint_url = MINIO_URL ,
42
- aws_access_key_id = MINIO_USER ,
43
- aws_secret_access_key = MINIO_PASSWORD ,
44
- config = Config (signature_version = "s3v4" ),
45
- )
46
-
47
- try :
48
- s3_client .head_bucket (Bucket = message ['value' ]['data_location' ]['bucket' ])
49
- except ClientError as e :
50
- logger .error (e )
51
- logger .error (
52
- "Bucket {} does not exist or credentials are invalid" .format (
53
- message ['value' ]['location' ]['bucket' ]
28
+ if message ['service' ] == "csvdetective" :
29
+ if check_message_structure (message ):
30
+ url = 'https://www.data.gouv.fr/fr/datasets/r/{}' .format (key )
31
+ urlhash = get_hash (url )
32
+ logger .info (urlhash )
33
+
34
+ # Connect to minio
35
+ s3_client = boto3 .client (
36
+ "s3" ,
37
+ endpoint_url = MINIO_URL ,
38
+ aws_access_key_id = MINIO_USER ,
39
+ aws_secret_access_key = MINIO_PASSWORD ,
40
+ config = Config (signature_version = "s3v4" ),
41
+ )
42
+
43
+ try :
44
+ s3_client .head_bucket (Bucket = message ['value' ]['report_location' ]['bucket' ])
45
+ except ClientError as e :
46
+ logger .error (e )
47
+ logger .error (
48
+ "Bucket {} does not exist or credentials are invalid" .format (
49
+ message ['value' ]['report_location' ]['bucket' ]
50
+ )
51
+ )
52
+ return
53
+
54
+ # Load csv-detective report
55
+ try :
56
+ response = s3_client .get_object (
57
+ Bucket = message ['value' ]['report_location' ]['bucket' ],
58
+ Key = message ['value' ]['report_location' ]['key' ]
59
+ )
60
+ content = response ['Body' ]
61
+ csv_detective_report = json .loads (content .read ())
62
+ except ClientError as e :
63
+ logger .error (e )
64
+ logger .error (
65
+ "Report does not exist in bucket or content is not valid json"
54
66
)
67
+ return
68
+
69
+ if not check_csv_detective_report_structure (csv_detective_report ):
70
+ logger .error (
71
+ "csvdetective report malformed"
72
+ )
73
+ return
74
+
75
+ # Parse file and store it to sqlite
76
+ parseViewInstance = ParseView ()
77
+ await parseViewInstance .parse_from_consumer (
78
+ parseViewInstance ,
79
+ url = url ,
80
+ urlhash = urlhash ,
81
+ csv_detective_report = csv_detective_report
55
82
)
56
- return
57
-
58
- # Load csv-detective report
59
- response = s3_client .get_object (
60
- Bucket = message ['value' ]['report_location' ]['bucket' ],
61
- Key = message ['value' ]['report_location' ]['key' ]
62
- )
63
- content = response ['Body' ]
64
- csv_detective_report = json .loads (content .read ())
65
-
66
- # Parse file and store it to sqlite
67
- parseViewInstance = ParseView ()
68
- await parseViewInstance .parse_from_consumer (
69
- parseViewInstance ,
70
- url = url ,
71
- urlhash = urlhash ,
72
- csv_detective_report = csv_detective_report ,
73
- )
74
-
75
- # Profile file
76
- profileViewInstance = ProfileView ()
77
- profile_report = await profileViewInstance .get_minimal_profile (
78
- profileViewInstance ,
79
- urlhash = urlhash ,
80
- csv_detective_report = csv_detective_report ,
81
- )
82
-
83
- # Save to sql
84
- conn = create_connection (DB_ROOT_DIR + '/' + urlhash + '.db' )
85
- # c = conn.cursor()
86
-
87
- general_infos = [
88
- {
89
- 'encoding' : csv_detective_report ['encoding' ],
90
- 'separator' : csv_detective_report ['separator' ],
91
- 'header_row_idx' : csv_detective_report ['header_row_idx' ],
92
- 'total_lines' : profile_report ['table' ]['n' ],
93
- 'nb_columns' : profile_report ['table' ]['n_var' ],
94
- 'nb_cells_missing' : profile_report ['table' ]['n_cells_missing' ],
95
- 'nb_vars_with_missing' : profile_report ['table' ]['n_vars_with_missing' ],
96
- 'nb_vars_all_missing' : profile_report ['table' ]['n_vars_all_missing' ],
97
- 'date_last_check' : datetime .today ().strftime ('%Y-%m-%d' ),
98
- 'dataset_id' : message ['meta' ]['dataset_id' ],
99
- 'resource_id' : key
100
- }
101
- ]
102
- df = pd .DataFrame (general_infos )
103
- df .to_sql ('general_infos' , con = conn , if_exists = 'replace' , index = False )
104
-
105
- columns_infos = []
106
- categorical_infos = []
107
- top_infos = []
108
- numeric_infos = []
109
- numeric_plot_infos = []
110
- for col in profile_report ['variables' ]:
111
- column_info = {}
112
- column_info ['name' ] = col
113
- column_info ['format' ] = csv_detective_report ['columns' ][col ]['format' ]
114
- column_info ['nb_distinct' ] = profile_report ['variables' ][col ]['n_distinct' ]
115
- column_info ['is_unique' ] = profile_report ['variables' ][col ]['is_unique' ]
116
- column_info ['nb_unique' ] = profile_report ['variables' ][col ]['n_unique' ]
117
- column_info ['type' ] = profile_report ['variables' ][col ]['type' ]
118
- column_info ['nb_missing' ] = profile_report ['variables' ][col ]['n_missing' ]
119
- column_info ['count' ] = profile_report ['variables' ][col ]['count' ]
120
- columns_infos .append (column_info )
121
-
122
- cat_cols = [
123
- 'siren' , 'siret' , 'code_postal' , 'code_commune_insee' ,
124
- 'code_departement' , 'code_region' , 'tel_fr' ,
125
- ]
126
- if csv_detective_report ['columns' ][col ]['format' ] in cat_cols :
127
- column_info ['type' ] = 'Categorical'
128
-
129
- if column_info ['type' ] == 'Categorical' and \
130
- len (profile_report ['variables' ][col ]['value_counts_without_nan' ]) < 10 :
131
- for cat in profile_report ['variables' ][col ]['value_counts_without_nan' ]:
132
- categorical_info = {}
133
- categorical_info ['column' ] = col
134
- categorical_info ['value' ] = cat
135
- categorical_info ['nb' ] = profile_report ['variables' ][col ]['value_counts_without_nan' ][cat ]
136
- categorical_infos .append (categorical_info )
137
-
138
- if column_info ['type' ] == 'Numeric' :
139
- numeric_info = {}
140
- numeric_info ['column' ] = col
141
- numeric_info ['mean' ] = profile_report ['variables' ][col ]['mean' ]
142
- numeric_info ['std' ] = profile_report ['variables' ][col ]['std' ]
143
- numeric_info ['min' ] = profile_report ['variables' ][col ]['min' ]
144
- numeric_info ['max' ] = profile_report ['variables' ][col ]['max' ]
145
- numeric_infos .append (numeric_info )
146
- for i in range (len (profile_report ['variables' ][col ]['histogram' ]['bin_edges' ])):
147
- numeric_plot_info = {}
148
- numeric_plot_info ['column' ] = col
149
- numeric_plot_info ['value' ] = profile_report ['variables' ][col ]['histogram' ]['bin_edges' ][i ]
150
- numeric_plot_info ['type' ] = 'bin_edges'
151
- numeric_plot_infos .append (numeric_plot_info )
152
-
153
- for i in range (len (profile_report ['variables' ][col ]['histogram' ]['counts' ])):
154
- numeric_plot_info = {}
155
- numeric_plot_info ['column' ] = col
156
- numeric_plot_info ['value' ] = profile_report ['variables' ][col ]['histogram' ]['counts' ][i ]
157
- numeric_plot_info ['type' ] = 'counts'
158
- numeric_plot_infos .append (numeric_plot_info )
159
-
160
- cpt = 0
161
- for top in profile_report ['variables' ][col ]['value_counts_without_nan' ]:
162
- if (cpt < 10 ):
163
- top_info = {}
164
- top_info ['column' ] = col
165
- top_info ['value' ] = top
166
- top_info ['nb' ] = profile_report ['variables' ][col ]['value_counts_without_nan' ][top ]
167
- top_infos .append (top_info )
168
- cpt = cpt + 1
169
-
170
- df = pd .DataFrame (columns_infos )
171
- if df .shape [0 ] > 0 :
172
- df .to_sql ('columns_infos' , con = conn , if_exists = 'replace' , index = False )
173
-
174
- df = pd .DataFrame (categorical_infos )
175
- if df .shape [0 ] > 0 :
176
- df .to_sql ('categorical_infos' , con = conn , if_exists = 'replace' , index = False )
177
-
178
- df = pd .DataFrame (top_infos )
179
- if df .shape [0 ] > 0 :
180
- df .to_sql ('top_infos' , con = conn , if_exists = 'replace' , index = False )
181
-
182
- df = pd .DataFrame (numeric_infos )
183
- if df .shape [0 ] > 0 :
184
- df .to_sql ('numeric_infos' , con = conn , if_exists = 'replace' , index = False )
185
-
186
- df = pd .DataFrame (numeric_plot_infos )
187
- if df .shape [0 ] > 0 :
188
- df .to_sql ('numeric_plot_infos' , con = conn , if_exists = 'replace' , index = False )
189
-
190
- conn .commit ()
191
-
192
- print ('ok' )
83
+
84
+ # Profile file
85
+ profileViewInstance = ProfileView ()
86
+ profile_report = await profileViewInstance .get_minimal_profile (
87
+ profileViewInstance ,
88
+ urlhash = urlhash ,
89
+ csv_detective_report = csv_detective_report
90
+ )
91
+
92
+ if not check_profile_report_structure (profile_report ):
93
+ logger .error (
94
+ "pandas profiling report malformed"
95
+ )
96
+ return
97
+
98
+ enrich_db_with_metadata (
99
+ urlhash ,
100
+ csv_detective_report ,
101
+ profile_report ,
102
+ message ['meta' ]['dataset_id' ],
103
+ key
104
+ )
105
+
106
+ logger .info ('Enrichment done!' )
107
+
108
+ else :
109
+ logger .error ('Problem with structure message' )
110
+ else :
111
+ logger .info ('Message received from {} service - do not process' .format (message ['service' ]))
0 commit comments