-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconvert_to_csv_pandas.py
161 lines (134 loc) · 5.98 KB
/
convert_to_csv_pandas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import pandas as pd
import os
import logging
import argparse
from urllib.parse import urlparse, urlunparse
class PwSafeProcessor:
def __init__(self, input_file_path):
self.input_file_path = input_file_path
self.output_dir = os.path.join(os.getcwd(), 'output')
self.output_file_path = os.path.join(self.output_dir, 'output.csv')
# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Ensure output directory exists
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
self.cleanup_output_files()
self.df = self.load_input_file()
def cleanup_output_files(self):
"""Remove previous output files if they exist."""
if os.path.exists(self.output_file_path):
os.remove(self.output_file_path)
logging.info(f"Deleted previous file: {self.output_file_path}")
def load_input_file(self):
"""Load the input file into a pandas DataFrame."""
try:
df = pd.read_csv(self.input_file_path, sep='\t', na_values=[""]).fillna("")
logging.info(f"Loaded input file: {self.input_file_path}")
return df
except Exception as e:
logging.error(f"Error loading input file: {e}")
raise
def process_data(self):
"""Process the data according to the specified transformations."""
try:
self.drop_unnecessary_columns()
self.rename_columns()
self.remove_empty_rows()
self.transform_rows()
self.validate_urls()
self.remove_invalid_rows()
logging.info("Data processing completed.")
except Exception as e:
logging.error(f"Error processing data: {e}")
raise
def drop_unnecessary_columns(self):
"""Drop columns that are not needed for the final output."""
columns_to_remove = [
'Created Time', 'Password Modified Time', 'Record Modified Time',
'Password Policy', 'Password Policy Name', 'History', 'Symbols'
]
self.df = self.df.drop(columns=columns_to_remove)
def rename_columns(self):
"""Rename columns for better readability."""
self.df = self.df.rename(columns={'Group/Title': 'Title'})
def remove_empty_rows(self):
"""Remove rows that are completely empty."""
self.df = self.df.dropna(how='all')
def transform_rows(self):
"""Apply transformations to each row in the DataFrame."""
self.df = self.df.apply(self.process_row, axis=1)
def process_row(self, row):
"""Transform a single row according to the specified rules."""
try:
# Update Title values
row['Title'] = row['Title'].split('.')[-1].strip()
# Replace missing Username with e-mail if available
if row['Username'] == '' and row['e-mail'] != '':
row['Username'] = row['e-mail']
# Add e-mail to Notes if both Username and e-mail are present and e-mail is not in Username
if row['Username'] != row['e-mail'] and row['e-mail'] != '':
row['Notes'] = (row['Notes'] + ('; ' if row['Notes'] else '') + 'email - ' + row['e-mail']).strip()
# Create URL value using the Title followed by ".com"
if row['URL'] == '':
row['URL'] = row['Title'].replace(" ", "") + '.com'
else:
row['URL'] = row['URL'].replace(" ", "")
return row
except Exception as e:
logging.error(f"Error processing row: {e}")
return row
def validate_urls(self):
"""Validate and fix URLs in the DataFrame."""
self.df['URL'] = self.df['URL'].apply(self.validate_url)
def validate_url(self, url):
"""Ensure the URL is properly formatted and use HTTPS scheme."""
try:
parsed_url = urlparse(url)
if not parsed_url.scheme:
parsed_url = parsed_url._replace(scheme="https")
if not parsed_url.netloc:
parsed_url = parsed_url._replace(netloc=parsed_url.path, path="")
valid_url = urlunparse(parsed_url).lower()
parsed_valid_url = urlparse(valid_url)
if parsed_valid_url.scheme and parsed_valid_url.netloc:
return valid_url
return None
except Exception as e:
logging.error(f"Error validating URL: {e}")
return None
def remove_invalid_rows(self):
"""Remove rows with invalid URLs or empty Username and Notes."""
self.df = self.df[self.df['URL'].notna()]
self.df = self.df[~((self.df['Username'] == '') & (self.df['Notes'] == ''))]
def drop_email_column(self):
"""Drop the e-mail column from the DataFrame."""
self.df = self.df.drop(columns=['e-mail'])
logging.info("Dropped e-mail column.")
def save_output_files(self):
"""Save the processed DataFrame to a CSV file."""
try:
self.df.to_csv(self.output_file_path, index=False)
logging.info(f"File has been converted to CSV and saved as {self.output_file_path}")
except Exception as e:
logging.error(f"Error saving output file: {e}")
raise
def run(self):
"""Run the full processing pipeline."""
try:
self.process_data()
self.drop_email_column()
self.save_output_files()
except Exception as e:
logging.error(f"Error in run: {e}")
raise
def main():
"""Main function to parse arguments and run the processor."""
parser = argparse.ArgumentParser(description="Process pwsafe data")
parser.add_argument("--input", type=str, default=os.path.join(os.getcwd(), 'pwsafe.txt'), help="Input file path")
args = parser.parse_args()
input_file_path = args.input
processor = PwSafeProcessor(input_file_path)
processor.run()
if __name__ == "__main__":
main()