Skip to content

Commit

Permalink
26 reader for brml brucker files (#43)
Browse files Browse the repository at this point in the history
* Adding brml reader from Carten's repo

* Added meta_info property in IKZ BRML module

* Added units to the quantities in IKZ BRML reader

* Adding regex for filetype brml

* Added read_bruker_brml function

* Cleaning and commenting

* Adding xmltodict import

* Added xmltodict dependency to pyproject

* Fixing bug from rebase

* Added get methods

* Modified read_bruker_brml
  • Loading branch information
ka-sarthak authored Dec 8, 2023
1 parent 28c1409 commit f191c00
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 4 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
"nomad-lab>=1.2.1",
"pytest",
"structlog==22.3.0",
"xmltodict==0.13.0",
]

[project.license]
Expand Down
220 changes: 219 additions & 1 deletion src/nomad_measurements/xrd/IKZ.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import collections
import numpy as np
import time
import xmltodict
from structlog.stdlib import (
BoundLogger,
)
Expand Down Expand Up @@ -259,7 +260,7 @@ def get_1d_scan(self, logger: BoundLogger=None):
if not self.data.shape[0] == 1:
if logger is not None:
logger.warning(
'Multiple/2D scan currently not supported. '
'Multiple/2D/RSM scan currently not supported. '
'Taking the data from the first line scan.'
)
for key, data in output.items():
Expand Down Expand Up @@ -326,3 +327,220 @@ def get_starttime(self, idx=0, to_seconds=True):
return time.mktime(parsed_time)
else:
return parsed_time


class BRMLfile(object):
def __init__(self, path, exp_nbr=0, encoding="utf-8", verbose=True):
self.path = path
with zipfile.ZipFile(path, 'r') as fh:
experiment = "Experiment%i"%exp_nbr
datacontainer = "%s/DataContainer.xml"%experiment

with fh.open(datacontainer, "r") as xml:
data = xmltodict.parse(xml.read(), encoding=encoding)
rawlist = data["DataContainer"]["RawDataReferenceList"]["string"]
# rawlist contains the reference to all the raw files (multiple in case of RSM)
if not isinstance(rawlist, list):
rawlist = [rawlist]

self.data = collections.defaultdict(list)
self.motors = self.data # collections.defaultdict(list)
for i, rawpath in enumerate(rawlist):
if verbose:
if not i:
print("Loading frame %i"%i, end="")
else:
print(", %i"%i, end="")
with fh.open(rawpath, "r") as xml:
# entering RawData<int>.xml
data = xmltodict.parse(xml.read(), encoding=encoding)
dataroute = data["RawData"]["DataRoutes"]["DataRoute"]
scaninfo = dataroute["ScanInformation"]
nsteps = int(scaninfo["MeasurementPoints"])
if nsteps==1:
rawdata = np.array(dataroute["Datum"].split(","))
elif nsteps>1:
rawdata = np.array([d.split(",") for d in dataroute["Datum"]])

rawdata = rawdata.astype(float).T
rdv = dataroute["DataViews"]["RawDataView"]
for view in rdv:
viewtype = view["@xsi:type"]
vstart = int(view["@Start"])
vlen = int(view["@Length"])
if viewtype=="FixedRawDataView":
vname = view["@LogicName"]
self.data[vname].append(rawdata[vstart:(vstart+vlen)])
elif viewtype=="RecordedRawDataView":
vname = view["Recording"]["@LogicName"]
self.data[vname].append(rawdata[vstart:(vstart+vlen)])

self.data["ScanName"].append(scaninfo["@ScanName"])
self.data["TimePerStep"].append(scaninfo["TimePerStep"])
self.data["TimePerStepEffective"].append(scaninfo["TimePerStepEffective"])
self.data["ScanMode"].append(scaninfo["ScanMode"])

scanaxes = scaninfo["ScanAxes"]["ScanAxisInfo"]
if not isinstance(scanaxes, list):
scanaxes = [scanaxes]
for axis in scanaxes:
aname = axis["@AxisName"]
aunit = axis["Unit"]["@Base"]
aref = float(axis["Reference"])
astart = float(axis["Start"]) + aref
astop = float(axis["Stop"]) + aref
astep = float(axis["Increment"])
nint = int(round(abs(astop-astart)/astep))
adata = {} # not originally part of Carsten's code
adata["Value"] = np.linspace(astart, astop, nint+1)
adata["Unit"] = aunit.lower()
self.data[aname].append(adata)

drives = data["RawData"]["FixedInformation"]["Drives"]["InfoData"]
for axis in drives:
aname = axis["@LogicName"]
apos = float(axis["Position"]["@Value"])
aunit = axis["Position"]["@Unit"]
adata = {} # not originally part of Carsten's code
adata["Value"] = apos
adata["Unit"] = aunit.lower()
self.motors[aname].append(adata)

# (block starts) not originally part of Carsten's code
try:
self.mounted_optics_info = (
data["RawData"]["FixedInformation"]["Instrument"]
["PrimaryTracks"]["TrackInfoData"]["MountedOptics"]["InfoData"]
)
except (KeyError, TypeError):
self.mounted_optics_info = []
# (block end) not originally part of Carsten's code

for key in self.data:
self.data[key] = np.array(self.data[key]).squeeze()
if not self.data[key].shape:
self.data[key] = self.data[key].item()
for key in self.motors:
self.motors[key] = np.array(self.motors[key]).squeeze()
if not self.motors[key].shape:
self.motors[key] = self.motors[key].item()

def get_1d_scan(self, logger: BoundLogger=None):
'''
Collect the values and units of intensity, two_theta, and axis positions. Adapts
the output if collected data has multiple/2d scans.
Returns:
Dict[str, Any]: Each dict item contains a list with numerical value
(numpy.ndarray) at index 0 and unit (str) at index 1. If quantity
is not available, the dict item will default to []. If units are not
available for two_theta or axis positions, they will default to 'deg'.
'''
output = collections.defaultdict(list)

counter_key = []
for key in self.data.keys():
if 'counter' in key.lower():
counter_key.append(key)
if len(counter_key) > 1:
raise ValueError("More than one intensity counters found.")

if not self.data.get(counter_key[0]).ndim == 1:
if logger is not None:
logger.warning(
'Multiple/2D/RSM scan currently not supported. '
'Taking the data from the first line scan.'
)
for key in [counter_key[0], 'TwoTheta', 'Theta', 'Chi', 'Phi']:
val = self.data.get(key, None)
if val is not None:
self.data[key] = val[0]

if counter_key:
output['intensity'] = [self.data.get(counter_key[0]), '']

for key in ['TwoTheta', 'Theta', 'Chi', 'Phi']:
data = self.data.get(key)
if data is not None:
val = data.get('Value')
if val is not None:
if not isinstance(val, np.ndarray):
val = np.array([val])
output[key] = [
val,
data.get('Unit', 'deg'),
]

return output

def get_scan_info(self):
'''
Collects the scan information from self.data if available.
Returns:
Dict[str, Any]: contains information about the scan
'''
output = collections.defaultdict(list)
for key in ['ScanName']:
if self.data.get(key) is not None:
output[key] = [
self.data.get(key)[0],
'',
]

return output

def get_source_info(self):
'''
Collects meta information of the X-ray source along with associated units.
Returns:
Dict[str, Any]: Each dict item contains a list with numerical value
(float or int) at index 0 and unit (str) at index 1. One exception
is the item with key 'TubeMaterial' which has str at both indices.
If quantity is not available, the dict item will default to [].
'''
output = collections.defaultdict(list)

source = {}
for component in self.mounted_optics_info:
if component["@xsi:type"] == "TubeMountInfoData":
source = component.get("Tube", {})
if not source:
return output

if source.get('TubeMaterial'):
output['TubeMaterial'] = [
source['TubeMaterial'],
'',
]
if source.get('Generator', {}).get('Voltage'):
val = source['Generator']['Voltage'].get('@Value')
if val is not None:
val = float(val)
output['Voltage'] = [
val,
source['Generator']['Voltage'].get('@Unit', 'kV'),
]
if source.get('Generator', {}).get('Current'):
val = source['Generator']['Current'].get('@Value')
if val is not None:
val = float(val)
output['Current'] = [
val,
source['Generator']['Current'].get('@Unit', 'mA'),
]
for wavelength in [
'WaveLengthAlpha1', 'WaveLengthAlpha2',
'WaveLengthBeta', 'WaveLengthRatio',
]:
if source.get(wavelength):
val = source[wavelength].get('@Value')
if val is not None:
val = float(val)
output[wavelength] = [
val,
source[wavelength].get('@Unit', 'angstrom'),
]

return output
2 changes: 1 addition & 1 deletion src/nomad_measurements/xrd/nomad_plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ description: This is a plugin schema generated from a yaml schema.
name: parsers/xrd
plugin_type: parser
mainfile_mime_re: text/.*|application/zip
mainfile_name_re: ^.*\.xrdml$|^.*\.rasx$
mainfile_name_re: ^.*\.xrdml$|^.*\.rasx$|^.*\.brml$
parser_class_name: nomad_measurements.xrd.XRDParser
code_name: XRD Parser
62 changes: 60 additions & 2 deletions src/nomad_measurements/xrd/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
BoundLogger,
)
from nomad.units import ureg
from nomad_measurements.xrd.IKZ import RASXfile
from nomad_measurements.xrd.IKZ import RASXfile, BRMLfile


def read_panalytical_xrdml(file_path: str, logger: BoundLogger=None) -> Dict[str, Any]:
Expand Down Expand Up @@ -163,7 +163,7 @@ def find_string(path):
def read_rigaku_rasx(file_path: str, logger: BoundLogger=None) -> Dict[str, Any]:
'''
Reads .rasx files from Rigaku instruments
- reader is based on IKZ submodule
- reader is based on IKZ module
- currently supports one scan per file
- in case of multiple scans per file, only the first scan is read
Expand Down Expand Up @@ -235,6 +235,62 @@ def set_quantity(value: Any=None, unit: str=None) -> Any:

return output

def read_bruker_brml(file_path: str, logger: BoundLogger=None) -> Dict[str, Any]:
'''
Reads .brml files from Bruker instruments
- reader is based on IKZ module
Args:
file_path (string): absolute path of the file.
logger (BoundLogger): A structlog logger for propagating errors and warnings.
Returns:
Dict[str, Any]: The X-ray diffraction data in a Python dictionary.
'''
reader = BRMLfile(file_path, verbose=False)
data = reader.get_1d_scan(logger)
scan_info = reader.get_scan_info()
source = reader.get_source_info()

def set_quantity(value: Any=None, unit: str=None) -> Any:
'''
Sets the quantity based on whether value or/and unit are available.
Args:
value (Any): Value of the quantity.
unit (str): Unit of the quantity.
Returns:
Any: Processed quantity with datatype depending on the value.
'''
if not unit:
return value
return value * ureg(unit)

output = {
'detector': set_quantity(*data['intensity']),
'2Theta': set_quantity(*data['TwoTheta']),
'Omega': set_quantity(*data['Theta']), # theta and omega are synonymous in .brml
'Chi': set_quantity(*data['Chi']),
'Phi': set_quantity(*data['Phi']),
'countTime': None,
'metadata': {
'sample_id': None,
'scan_axis': set_quantity(*scan_info['ScanName']),
'source': {
'anode_material': set_quantity(*source['TubeMaterial']),
'kAlpha1': set_quantity(*source['WaveLengthAlpha1']),
'kAlpha2': set_quantity(*source['WaveLengthAlpha2']),
'kBeta': set_quantity(*source['WaveLengthBeta']),
'ratioKAlpha2KAlpha1': set_quantity(*source['WaveLengthRatio']),
'voltage': set_quantity(*source['Voltage']),
'current': set_quantity(*source['Current']),
},
},
}

return output


def read_xrd(file_path: str, logger: BoundLogger) -> Dict[str, Any]:
'''
Expand All @@ -253,4 +309,6 @@ def read_xrd(file_path: str, logger: BoundLogger) -> Dict[str, Any]:
return read_panalytical_xrdml(file_path, logger)
if file_path.endswith('.rasx'):
return read_rigaku_rasx(file_path, logger)
if file_path.endswith('.brml'):
return read_bruker_brml(file_path,logger)
raise ValueError(f'Unsupported file format: {file_path.split(".")[-1]}')

0 comments on commit f191c00

Please sign in to comment.