-
Notifications
You must be signed in to change notification settings - Fork 0
/
file_operations.py
190 lines (159 loc) · 10.1 KB
/
file_operations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# file_operations.py
from typing import Union
import hashlib
from py7zr import SevenZipFile, is_7zfile
from zipfile import ZipFile, is_zipfile
import os
import logging
from xml.dom import minidom
from special_path_resolver import resolve_special_path
import shutil
def compute_sha1(data_or_file: Union[bytes, str]) -> str:
sha1 = hashlib.sha1()
try:
if isinstance(data_or_file, str): # Check if the input is a file path
# Open the file in binary mode
with open(data_or_file, "rb") as file:
# Read the file in chunks of 4096 bytes
chunk = file.read(4096)
# Loop until there are no more chunks to read
while chunk:
sha1.update(chunk) # Update the SHA-1 hash with the current chunk
chunk = file.read(4096) # Read the next chunk
else: # Assume the input is already data
sha1.update(data_or_file) # Update the SHA-1 hash with the provided data
except Exception as e:
logging.error(f"Error computing SHA1: {e}")
return sha1.hexdigest()
def extract_and_check_files_in_gsba(gsm_gsba_file_path: str, inner_folder_path: str, output_folder_path: str):
results = {}
# Check if the file ends in .gsba
if not gsm_gsba_file_path.endswith('.gsba'):
raise ValueError(f"ERROR: File {gsm_gsba_file_path} does not end in .gsba")
try:
# Check if its a .7z file
if is_7zfile(gsm_gsba_file_path):
with SevenZipFile(gsm_gsba_file_path, mode='r') as z:
extracted_files = z.read()
# Process the extracted data further or store it for later use
for file_path, file_data_io in extracted_files.items():
# Skip directories and process files within the inner folder
if not file_path.endswith('/') and file_path.startswith(inner_folder_path):
# Convert BytesIO object to bytes-like object
file_data = file_data_io.getvalue()
# Compute SHA-1 hash and uncompressed size, in bytes
sha1_hash = compute_sha1(file_data)
uncompressed_size = len(file_data)
results[file_path] = {
'hash': sha1_hash,
'size': uncompressed_size
}
#Create and copy the the file to the output folder (subfolder 'Temp')
dir_path = os.path.dirname(os.path.join(output_folder_path, "Temp", inner_folder_path, file_path[2:].replace("/", "\\")).replace("\\\\", "\\").replace("/","\\"))
os.makedirs(dir_path, exist_ok=True)
file_to_create = os.path.join(dir_path, os.path.basename(file_path).replace("\\\\", "\\"))
with open(file_to_create, "wb") as file:
file.write(file_data)
elif is_zipfile(gsm_gsba_file_path):
with ZipFile(gsm_gsba_file_path, 'r') as zip_ref:
extracted_files = zip_ref.namelist()
# Process the extracted data further or store it for later use
for file_path in extracted_files:
# Skip directories and process files within the inner folder
if not file_path.endswith('/') and file_path.startswith(inner_folder_path):
# Convert BytesIO object to bytes-like object
file_data = zip_ref.read(file_path)
# Compute SHA-1 hash and uncompressed size, in bytes
sha1_hash = compute_sha1(file_data)
uncompressed_size = len(file_data)
results[file_path] = {
'hash': sha1_hash,
'size': uncompressed_size
}
#Create and copy the the file to the output folder (subfolder 'Temp')
inner_folder_path = inner_folder_path.replace("\\", "")
inner_folder_path = inner_folder_path.replace("/", "")
partial_path_1 = os.path.join(output_folder_path, "Temp")
partial_path_1 = os.path.join(partial_path_1, inner_folder_path)
partial_path_2 = file_path[2:]
partial_path_2 = partial_path_2.replace("/", "\\")
partial_path_2 = partial_path_2.replace("\\\\", "\\")
partial_path_2 = partial_path_2.replace("/","\\")
partial_path_2 = (partial_path_2.lstrip("\\")).rstrip("\\")
dir_path = os.path.dirname(os.path.join(partial_path_1, partial_path_2))
#dir_path = os.path.dirname(os.path.join(output_folder_path, "Temp", inner_folder_path, ((file_path[2:].replace("/", "\\")).replace("\\\\", "\\")).replace("/","\\")))
os.makedirs(dir_path, exist_ok=True)
file_to_create = os.path.join(dir_path, os.path.basename(file_path).replace("\\\\", "\\"))
with open(file_to_create, "wb") as file:
file.write(file_data)
else:
raise ValueError(f"ERROR: File {gsm_gsba_file_path} is not a .7z or .zip file, at heart. The .gsba file might be corrupted.")
except Exception as e:
logging.error(f"Error extracting and checking files in GSBA: {e}")
return results
def move_files_to_gsba_based_on_gsm_xml(inner_folder_path: str, xml_string: str, specific_output_folder_path: str, date_string: str, inner_folder_name: str):
xml_data = minidom.parseString(xml_string)
game_name = xml_data.getElementsByTagName('GameName')[0].firstChild.data
#Fix game name bad character encoding
game_name = game_name.replace(":","").replace('â„¢', '™').replace('™', '™').replace('®', '®')
archive_name = specific_output_folder_path
output_zip_path = specific_output_folder_path
# Create specific folder, with timestamp, to avoid conflict with files from backups from the same game
# Define the directory and output file paths
#output_zip_path = os.path.join(specific_output_folder_path, f"{game_name}")
# Create the zip file
#archive_name = os.path.dirname(output_zip_path) + "_" + date_string[0:19].replace("T","_").replace(":",".")
#specific_output_folder_path = archive_name
directory_elements = xml_data.getElementsByTagName('Directory')
inner_gsba_folder_i = 1
#dedups_dirs = []
for dir_element in directory_elements:
#if dir_element.getElementsByTagName('Path')[0].firstChild.data in dedups_dirs:
# continue
special_path_data = dir_element.getElementsByTagName('SpecialPath')[0].firstChild
if special_path_data is not None:
if special_path_data.data == "%STEAM_CLOUD%":
extra_pathy_id = dir_element.getElementsByTagName('SteamID')[0].childNodes[0].nodeValue
elif special_path_data.data in {"%UPLAY%", "%UPLAY_2%"}:
extra_pathy_id = dir_element.getElementsByTagName('Uplay')[0].childNodes[0].nodeValue
else:
extra_pathy_id = ""
else:
extra_pathy_id = ""
if special_path_data is not None:
special_path_converted = resolve_special_path(special_path_data.data)
else:
special_path_converted = ""
folder = os.path.join(special_path_converted, extra_pathy_id, dir_element.getElementsByTagName('Path')[0].firstChild.data)
file_elements = dir_element.getElementsByTagName('File')
for file_element in file_elements:
virtual_fullpath = (folder + os.path.sep + file_element.firstChild.data).replace("ó", "ó")
if virtual_fullpath[1] == ":":
virtual_fullpath = "drive-" + virtual_fullpath[0] + virtual_fullpath[2:]
virtual_fullpath = virtual_fullpath.replace("\\", "/")
intermediate_path = inner_folder_path + os.path.sep + game_name
#List folders inside the intermediate path folder
#inner_folders = os.listdir(intermediate_path)
#fullpath = inner_folder_path + os.path.sep + game_name + os.path.sep + inner_folders[0] + os.path.sep + virtual_fullpath
fullpath = inner_folder_path + os.path.sep + game_name + os.path.sep + inner_folder_name + os.path.sep + virtual_fullpath
fullpath = fullpath.replace("\\", "/")
#Copy file over from 'fullpath' to specific_output_folder_path
dest_folder = os.path.join(specific_output_folder_path, str(inner_gsba_folder_i), os.path.dirname(file_element.firstChild.data))
os.makedirs(dest_folder, exist_ok=True)
shutil.copy(fullpath.replace("\\", "/").replace("////", "/"), os.path.join(dest_folder, os.path.basename(fullpath)))
inner_gsba_folder_i += 1
#dedups_dirs.append(dir_element.getElementsByTagName('Path')[0].firstChild.data)
# Define the directory and output file paths
#output_zip_path = os.path.join(specific_output_folder_path, f"{game_name}")
# Create the zip file
#archive_name = os.path.dirname(output_zip_path) + "_" + date_string[0:19].replace("T","_").replace(":",".")
shutil.make_archive(archive_name, 'zip', specific_output_folder_path)
try:
os.rename(archive_name + '.zip', archive_name + ".gsba")
#except FileExistsError
except Exception as e:
logging.error(f"Error renaming file: {e}")
#Delete the zip file
os.remove(archive_name + '.zip')
# Delete folders inside the destination with the same name as the now created .gsba file
shutil.rmtree(output_zip_path) #(os.path.dirname(output_zip_path))