Skip to content

Commit

Permalink
Moved parse_simulation_input_file function to parse_input_files.py
Browse files Browse the repository at this point in the history
  • Loading branch information
hprats committed Nov 26, 2024
1 parent cd1e5d3 commit 9bc48bc
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 133 deletions.
155 changes: 155 additions & 0 deletions zacrostools/parse_input_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,161 @@
from zacrostools.custom_exceptions import LatticeModelError


def parse_simulation_input_file(input_file: Union[str, Path]) -> Dict[str, Any]:
"""
Parses a simulation_input.dat file and extracts the simulation parameters.
Parameters
----------
input_file : Union[str, Path]
Path to the simulation input file.
Returns
-------
Dict[str, Any]
Dictionary containing the extracted parameters, including:
- General simulation parameters as key-value pairs.
- Nested dictionaries for 'reporting_scheme' and 'stopping_criteria' containing specific parameters.
Raises
------
FileNotFoundError
If the input file does not exist.
ValueError
If specific parameters have invalid formats or cannot be parsed.
"""

def process_values(kword, vals):
if not vals:
# No values, set to True
return True

# Define keywords that should always return lists
list_keywords = {'gas_specs_names', 'gas_energies', 'gas_molec_weights',
'gas_molar_fracs', 'surf_specs_names', 'surf_specs_dent'}

# For surf_specs_names, remove '*'
if kword == 'surf_specs_names':
return [name.rstrip('*') for name in vals]

# For 'override_array_bounds', store value as a string
if kword == 'override_array_bounds':
return ' '.join(vals)

# For stopping_criteria keywords, handle 'infinite' as string
if kword in stopping_keywords:
val = ' '.join(vals)
if val.lower() in ['infinity', 'infinite']:
return 'infinity'
else:
if kword == 'max_steps':
try:
return int(val)
except ValueError:
return val # Return as string if cannot parse
else:
try:
return float(val)
except ValueError:
return val # Return as string if cannot parse

# For reporting_scheme keywords, store values as strings
if kword in reporting_keywords:
return ' '.join(vals)

# For certain keywords, always return a list
if kword in list_keywords:
try:
return [int(v) for v in vals]
except ValueError:
try:
return [float(v) for v in vals]
except ValueError:
return vals # Return as list of strings

# Default handling
if len(vals) == 1:
val = vals[0]
try:
return int(val)
except ValueError:
try:
return float(val)
except ValueError:
return val # Return as string
else:
# Multiple values, try to parse as list of ints
try:
return [int(v) for v in vals]
except ValueError:
# Try to parse as list of floats
try:
return [float(v) for v in vals]
except ValueError:
# Return as list of strings
return vals

input_file = Path(input_file)
if not input_file.is_file():
raise FileNotFoundError(f"Input file '{input_file}' does not exist.")

data = {}
reporting_scheme = {}
stopping_criteria = {}
reporting_keywords = ['snapshots', 'process_statistics', 'species_numbers']
stopping_keywords = ['max_steps', 'max_time', 'wall_time']

# Initialize the special keywords with None
for key in reporting_keywords:
reporting_scheme[key] = None
for key in stopping_keywords:
stopping_criteria[key] = None

with input_file.open('r') as f:
lines = f.readlines()
current_keyword = None
current_values = []
for line in lines:
line = line.strip()
if not line or line.startswith('#'):
continue
if line == 'finish':
continue
# Check if line starts with a keyword
if not line[0].isspace():
# New keyword line
tokens = line.split()
keyword = tokens[0]
values = tokens[1:]
# If we were collecting values for a previous keyword, store them
if current_keyword is not None:
if current_keyword in reporting_keywords:
reporting_scheme[current_keyword] = process_values(current_keyword, current_values)
elif current_keyword in stopping_keywords:
stopping_criteria[current_keyword] = process_values(current_keyword, current_values)
else:
data[current_keyword] = process_values(current_keyword, current_values)
# Start collecting values for the new keyword
current_keyword = keyword
current_values = values
else:
# Continuation line, add tokens to current_values
tokens = line.split()
current_values.extend(tokens)
# After processing all lines, store the last keyword's values
if current_keyword is not None:
if current_keyword in reporting_keywords:
reporting_scheme[current_keyword] = process_values(current_keyword, current_values)
elif current_keyword in stopping_keywords:
stopping_criteria[current_keyword] = process_values(current_keyword, current_values)
else:
data[current_keyword] = process_values(current_keyword, current_values)
# Add reporting_scheme and stopping_criteria to data
data['reporting_scheme'] = reporting_scheme
data['stopping_criteria'] = stopping_criteria
return data


def parse_lattice_input_file(input_file: Union[str, Path]) -> Dict[str, Any]:
"""
Parses a lattice_input.dat file and extracts the lattice parameters.
Expand Down
137 changes: 4 additions & 133 deletions zacrostools/read_functions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import numpy as np
from zacrostools.custom_exceptions import EnergeticsModelError
from zacrostools.custom_exceptions import KMCOutputError
from zacrostools.parse_input_files import parse_simulation_input_file
from zacrostools.custom_exceptions import EnergeticsModelError, KMCOutputError


def parse_general_output(path):
Expand Down Expand Up @@ -58,138 +58,9 @@ def parse_general_output(path):
return data


def parse_simulation_input(path):
def process_values(keyword, values):
if not values:
# No values, set to True
return True

# Define keywords that should always return lists
list_keywords = {'gas_specs_names', 'gas_energies', 'gas_molec_weights',
'gas_molar_fracs', 'surf_specs_names', 'surf_specs_dent'}

# For surf_specs_names, remove '*'
if keyword == 'surf_specs_names':
return [name.rstrip('*') for name in values]

# For 'override_array_bounds', store value as a string
if keyword == 'override_array_bounds':
return ' '.join(values)

# For stopping_criteria keywords, handle 'infinite' as string
if keyword in stopping_keywords:
val = ' '.join(values)
if val.lower() in ['infinity', 'infinite']:
return 'infinity'
else:
if keyword == 'max_steps':
try:
return int(val)
except ValueError:
return val # Return as string if cannot parse
else:
try:
return float(val)
except ValueError:
return val # Return as string if cannot parse

# For reporting_scheme keywords, store values as strings
if keyword in reporting_keywords:
return ' '.join(values)

# For certain keywords, always return a list
if keyword in list_keywords:
try:
return [int(v) for v in values]
except ValueError:
try:
return [float(v) for v in values]
except ValueError:
return values # Return as list of strings

# Default handling
if len(values) == 1:
val = values[0]
try:
return int(val)
except ValueError:
try:
return float(val)
except ValueError:
return val # Return as string
else:
# Multiple values, try to parse as list of ints
try:
return [int(v) for v in values]
except ValueError:
# Try to parse as list of floats
try:
return [float(v) for v in values]
except ValueError:
# Return as list of strings
return values

data = {}
reporting_scheme = {}
stopping_criteria = {}
reporting_keywords = ['snapshots', 'process_statistics', 'species_numbers']
stopping_keywords = ['max_steps', 'max_time', 'wall_time']

# Initialize the special keywords with None
for key in reporting_keywords:
reporting_scheme[key] = None
for key in stopping_keywords:
stopping_criteria[key] = None

filename = os.path.join(path, 'simulation_input.dat')
with open(filename, 'r') as f:
lines = f.readlines()
current_keyword = None
current_values = []
for line in lines:
line = line.strip()
if not line or line.startswith('#'):
continue
if line == 'finish':
continue
# Check if line starts with a keyword
if not line[0].isspace():
# New keyword line
tokens = line.split()
keyword = tokens[0]
values = tokens[1:]
# If we were collecting values for a previous keyword, store them
if current_keyword is not None:
if current_keyword in reporting_keywords:
reporting_scheme[current_keyword] = process_values(current_keyword, current_values)
elif current_keyword in stopping_keywords:
stopping_criteria[current_keyword] = process_values(current_keyword, current_values)
else:
data[current_keyword] = process_values(current_keyword, current_values)
# Start collecting values for the new keyword
current_keyword = keyword
current_values = values
else:
# Continuation line, add tokens to current_values
tokens = line.split()
current_values.extend(tokens)
# After processing all lines, store the last keyword's values
if current_keyword is not None:
if current_keyword in reporting_keywords:
reporting_scheme[current_keyword] = process_values(current_keyword, current_values)
elif current_keyword in stopping_keywords:
stopping_criteria[current_keyword] = process_values(current_keyword, current_values)
else:
data[current_keyword] = process_values(current_keyword, current_values)
# Add reporting_scheme and stopping_criteria to data
data['reporting_scheme'] = reporting_scheme
data['stopping_criteria'] = stopping_criteria
return data


def get_partial_pressures(path):
partial_pressures = {}
simulation_data = parse_simulation_input(path)
simulation_data = parse_simulation_input_file(path)
for i, molecule in enumerate(simulation_data['gas_specs_names']):
partial_pressures[molecule] = simulation_data['pressure'] * simulation_data['gas_molar_fracs'][i]
return partial_pressures
Expand Down Expand Up @@ -264,7 +135,7 @@ def get_stiffness_scalable_steps(path):
def get_surf_specs_data(path):

# Get data from simulation_input.dat
parsed_sim_data = parse_simulation_input(path)
parsed_sim_data = parse_simulation_input_file(path)
surf_specs_names = parsed_sim_data.get('surf_specs_names')
surf_specs_dent = parsed_sim_data.get('surf_specs_dent')
species_dentates = dict(zip(surf_specs_names, surf_specs_dent))
Expand Down

0 comments on commit 9bc48bc

Please sign in to comment.