-
Notifications
You must be signed in to change notification settings - Fork 0
/
Tidal_Field_Calculator.py
368 lines (287 loc) · 15.9 KB
/
Tidal_Field_Calculator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
"""
Tidal_Field_Calculator.py
-------------------------------
Author: Asit Dave
Date: 31-05-2024
License: MIT
Description:
This script is designed to compute the density field, tidal tensor, potential field, and traceless tidal
shear tensor for a given cosmological N-body simulation snapshot (pynbody.snapshot). Users have the option
to smooth the density field using a Gaussian filter and can select whether to compute the potential field
and the traceless tidal shear tensor.
Instructions:
1. Make sure the input_params.txt file is present in the current working directory.
2. Make sure the LSS_TWeb_BlackBox.py script is present in the current working directory.
3. The script expects you to have a simulation snapshot for the analysis.
4. Parameters that can be modified in this script are:
- Projection of the box for the plots (default: xy)
- Thickness of the slice to average the density field over. (default: half box thickness)
Execution:
You can either run the script for all the smoothing scales you want to perform the calculations for in one go
OR
you can run the script for all smoothing scales one by one and then run the Tweb_Structure_Classifier.py script.
"""
#----------------------------------------- IMPORT LIBRARIES ----------------------------------------------#
import pynbody
import numpy as np
from tqdm import tqdm
import os
import logging
import time
from LSS_TWeb_BlackBox import *
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Start the timer
start_time = time.time()
#------------------------------ READ INPUT FILE ----------------------------------#
def read_input_txt_file(file_path: str) -> tuple:
"""
Read the input_params.txt file and extract the user inputs.
Parameters:
- file_path (str): The path to the input parameters file.
Returns:
- tuple: A tuple containing extracted parameters.
"""
try:
snapshot_path, save_path, grid_size, create_density, own_density_path, \
smoothing_scales, calculate_potential, calculate_traceless = read_input_file(file_path)
logging.info('Input parameters read successfully.')
return snapshot_path, save_path, grid_size, create_density, own_density_path, \
smoothing_scales, calculate_potential, calculate_traceless
except Exception as e:
logging.error(f"Error reading input file: {e}")
raise
#------------------------------ LOAD THE SNAPSHOT --------------------------------#
def get_snapshot(snapshot_path: str) -> tuple[pynbody.snapshot.SimSnap, pynbody.simdict.SimDict]:
"""
Load the simulation snapshot from the specified path.
Parameters:
- snapshot_path (str): The file path to the snapshot.
Returns:
- tuple: A tuple containing the loaded snapshot and its header.
"""
try:
logging.info('Loading the snapshot...')
snap, header = load_snapshot(snapshot_path)
logging.info('Snapshot loaded successfully.')
return snap, header
except Exception as e:
logging.error(f"Error loading snapshot: {e}")
raise
#------------------------------ EXTRACT SIMULATION PROPERTIES --------------------------------#
def extract_simulation_params(snapshot_header: pynbody.simdict.SimDict) -> dict[str, float]:
"""
Extract values from the simulation dictionary.
Parameters:
- snapshot_header (pynbody.simdict.SimDict): The simulation dictionary.
Returns:
- dict: A dictionary containing the extracted simulation properties
--> keys: ['omegaM0', 'omegaL0', 'a', 'h']; values: [float]
"""
try:
extracted_values = extract_simdict_values(simdict=snapshot_header)
return extracted_values
except Exception as e:
logging.error(f"Error extracting simulation parameters: {e}")
raise
#------------------------------ SAVE THE SIMULATION PROPERTIES --------------------------------#
def save_simulation_properties(snapshot_header: pynbody.simdict.SimDict, snapshot: pynbody.snapshot.SimSnap) -> None:
"""
Save the simulation properties to a file.
Parameters:
- snapshot_header (pynbody.simdict.SimDict): The simulation header.
- snapshot (pynbody.snapshot.SimSnap): The loaded snapshot.
"""
try:
with open('simulation_properties.txt', "w") as f:
f.write('-------------------------------------------------------------\n')
f.write(' The file contains the simulation properties and parameters \n')
f.write('-------------------------------------------------------------\n\n')
f.write(f"Box size: {snapshot_header['boxsize']}\n")
f.write(f"Omega_M0: {snapshot_header['omegaM0']}\n")
f.write(f"Omega_L0: {snapshot_header['omegaL0']}\n")
f.write(f"a: {snapshot_header['a']}\n")
f.write(f"h: {snapshot_header['h']}\n")
f.write(f"time: {snapshot_header['time']}\n\n")
mean_mass = snapshot['mass'].mean()
f.write(f"Mass of each particle: {mean_mass} * {mean_mass.units}\n")
total_particles = snapshot['pos'].shape[0]
f.write(f"Total number of particles: {total_particles}\n")
logging.info('Simulation properties saved successfully.')
except Exception as e:
logging.error(f"Error saving simulation properties: {e}")
raise
#------------------------------ SAVE THE PARTICLE POSITIONS AND VELOCITIES --------------------------------#
def save_particle_positions_and_velocities(snapshot: pynbody.snapshot.SimSnap) -> None:
"""
Save the particle positions and velocities from the snapshot.
Parameters:
- snapshot (pynbody.snapshot.SimSnap): The loaded snapshot.
"""
try:
save_path_particle_position = os.path.join(save_path, 'particle_positions.npy')
save_data(data=snapshot['pos'], file_path=save_path_particle_position)
logging.info("Particle position file ('particle_positions.npy') saved successfully.")
except Exception as e:
logging.error(f"Error saving particle positions: {e}")
try:
save_path_particle_velocity = os.path.join(save_path, 'particle_velocity.npy')
save_data(data=snapshot['vel'], file_path=save_path_particle_velocity)
logging.info("Particle velocity file ('particle_velocity.npy') saved successfully.")
except Exception as e:
logging.error(f"Error saving particle velocities: {e}")
#------------------------------ CREATE A DENSITY FIELD --------------------------------#
def get_density_field(snapshot: pynbody.simdict, mas: str, verbose: bool) -> np.ndarray:
"""
Compute the density field or load an existing one.
Parameters:
- snapshot (pynbody.simdict): The loaded snapshot.
- mas (str): The mass-assignment scheme. Options: 'NGP', 'CIC', 'TSC', 'PCS', 'gaussian'.
- verbose (bool): Print information on progress.
Returns:
- np.ndarray: The computed or loaded density field.
"""
if not create_density:
logging.info('Loading the density field...')
try:
rho = load_data(own_density_path)
logging.info('Density field loaded successfully.')
except Exception as e:
logging.error(f"Error loading density field: {e}")
raise
else:
save_path_density = os.path.join(save_path, 'density_field.npy')
logging.info('Computing the density field...')
try:
rho = compute_density_field(snapshot=snapshot, grid_size=grid_size, box_size=box_size, mas=mas, verbose=verbose)
logging.info('Density field computed successfully.')
save_data(data=rho, file_path=save_path_density)
logging.info("Density field ('density_field.npy') saved successfully.")
except Exception as e:
logging.error(f"Error computing density field: {e}")
raise
return rho
#------------------------------ EXTRACT SMOOTHING SCALES --------------------------------#
def extract_smoothing_scales(smoothing_scales: list[float]) -> tuple[list[float], list[str]]:
"""
Extract and truncate (remove decimals) the smoothing scales.
Parameters:
- smoothing_scales (list[float]): List of smoothing scales.
Returns:
- tuple: A tuple containing the original floating-point smoothing scales and their truncated string forms.
"""
try:
smth_scales, truncated_scales = extract_scales(smoothing_scales)
return smth_scales, truncated_scales
except Exception as e:
logging.error(f"Error extracting smoothing scales: {e}")
raise
#------------------------------ SMOOTH THE DENSITY FIELD --------------------------------#
def get_smoothed_field(input_field: np.ndarray, smoothing_scales: list[float], truncated_scales: list[str]) -> list[np.ndarray]:
"""
Smooth the density field using the specified smoothing scales.
Parameters:
- input_field (np.ndarray): The input density field.
- smoothing_scales (list[float]): List of smoothing scales.
- truncated_scales (list[str]): List of truncated smoothing scale names.
Returns:
- list[np.ndarray]: List of smoothed density fields.
"""
logging.info('Smoothing the density field....')
try:
create_directory(os.path.join(f'{save_path}', 'smoothed_density_fields'), overwrite=False)
smoothened_rho = []
for i, smth_scale in enumerate(tqdm(smoothing_scales)):
smooth_rho = smooth_field(input_field=input_field, smoothing_scale=smth_scale, box_size=box_size, grid_size=grid_size)
save_path_smooth = os.path.join(save_path, 'smoothed_density_fields', f'smoothed_density_field_{truncated_scales[i]}.npy')
save_data(data=smooth_rho, file_path=save_path_smooth)
smoothened_rho.append(smooth_rho)
logging.info("Density field smoothed for all the smoothing scales and files saved successfully.")
except Exception as e:
logging.error(f"Error smoothing density field: {e}")
raise
return smoothened_rho
#------------------------------ PLOT THE DENSITY FIELD --------------------------------#
def plot_density_field(input_field: np.ndarray, smoothing_scales: list[float], truncated_scales: list[str],
projection: str, slice_thickness: list[int, int], save_path: str) -> None:
"""
Plot the smoothed density fields.
Parameters:
- input_field (np.ndarray): The smoothed density fields.
- smoothing_scales (list[float]): List of smoothing scales.
- truncated_scales (list[str]): List of truncated smoothing scale names.
- projection (str): The projection type ('xy', 'yz', 'xz').
- slice_thickness (list[int, int]): The range of slices to average over [Start index, End index].
"""
try:
create_directory(os.path.join(f'{save_path}', 'density_plots'), overwrite=False)
logging.info('Plotting the density field for respective smoothing scales...')
for i, sm_scale in enumerate(smoothing_scales):
save_sm_path = os.path.join(save_path, 'density_plots')
plot_field(input_field=input_field[i], sm_scale=sm_scale, name_sm_scale=truncated_scales[i],
projection=projection, slice_index=slice_thickness, filepath=save_sm_path)
logging.info('Density field plots saved successfully.')
except Exception as e:
logging.error(f"Error plotting density field: {e}")
raise
#------------------------------ CALCULATE TIDAL SHEAR TENSOR --------------------------------#
def get_tidal_tensor(smoothed_density_field: np.ndarray) -> None:
"""
Calculate the tidal tensor and potential field.
Parameters:
- smoothed_density_field (np.ndarray): The smoothed density field.
"""
try:
create_directory(os.path.join(f'{save_path}', 'tidal_fields'), overwrite=False)
for i in tqdm(range(len(smoothed_density_field))):
if calculate_potential:
create_directory(os.path.join(f'{save_path}', 'potential_field'), overwrite=False)
logging.info('Calculating the tidal tensor and potential field...')
tidal_tensor, Grav_potential = calculate_tidal_tensor(density_field=smoothed_density_field[i], calculate_potential=True)
save_path_tidal_tensor = os.path.join(save_path, 'tidal_fields', f'tidal_tensor_{truncated_scales[i]}.npy')
save_path_tidal_potential = os.path.join(save_path, 'potential_field', f'potential_field_{truncated_scales[i]}.npy')
save_data(data=tidal_tensor, file_path=save_path_tidal_tensor)
save_data(data=Grav_potential, file_path=save_path_tidal_potential)
if calculate_traceless:
traceless_tidal_shear = calculate_traceless_tidal_shear(tidal_tensor, grid_size)
save_path_traceless = os.path.join(save_path, 'tidal_fields', f'traceless_tidal_shear_{truncated_scales[i]}.npy')
save_data(data=traceless_tidal_shear, file_path=save_path_traceless)
else:
logging.info('Calculating the tidal tensor...')
tidal_tensor = calculate_tidal_tensor(density_field=smoothed_density_field[i], calculate_potential=False)
save_path_tidal_tensor = os.path.join(save_path, 'tidal_fields', f'tidal_tensor_{truncated_scales[i]}.npy')
save_data(data=tidal_tensor, file_path=save_path_tidal_tensor)
if calculate_traceless:
traceless_tidal_shear = calculate_traceless_tidal_shear(tidal_tensor=tidal_tensor, grid_size=grid_size)
save_path_traceless = os.path.join(save_path, 'tidal_fields', f'traceless_tidal_shear_{truncated_scales[i]}.npy')
save_data(data=traceless_tidal_shear, file_path=save_path_traceless)
logging.info('Tidal tensor calculations completed.')
except Exception as e:
logging.error(f"Error calculating tidal tensor: {e}")
raise
#----------------------------------------- MAIN EXECUTION ----------------------------------------------#
if __name__ == '__main__':
try:
snapshot_path, save_path, grid_size, create_density, own_density_path, \
smoothing_scales, calculate_potential, calculate_traceless = read_input_txt_file('input_params.txt')
snap, snap_header = get_snapshot(snapshot_path)
extracted_values = extract_simulation_params(snapshot_header=snap_header)
box_size = int(extracted_values['boxsize'])
save_simulation_properties(snapshot_header=snap_header, snapshot=snap)
save_particle_positions_and_velocities(snapshot=snap)
rho = get_density_field(snapshot=snap, mas='CIC', verbose=True)
smth_scales, truncated_scales = extract_smoothing_scales(smoothing_scales)
smoothened_rho = get_smoothed_field(input_field=rho, smoothing_scales=smth_scales, truncated_scales=truncated_scales)
plot_density_field(input_field=smoothened_rho, smoothing_scales=smth_scales, truncated_scales=truncated_scales,
projection='xy', # Specify the projection of the box for classification overlay
slice_thickness=[100, grid_size//2], # Specify the range of slices to average over [Start index, End index]
save_path=save_path
)
get_tidal_tensor(smoothed_density_field=smoothened_rho)
logging.info('All calculations are done. Exiting the program...')
except Exception as e:
logging.error(f"An error occurred: {e}")
# Stop the timer
end_time = time.time()
logging.info(f'Time taken to run the complete script: {end_time - start_time} seconds')
#----------------------------------------------END-OF-THE-SCRIPT------------------------------------------------------------#