Skip to content

Commit

Permalink
Support pathlib objects
Browse files Browse the repository at this point in the history
  • Loading branch information
cbrnr committed Apr 16, 2019
1 parent 6402f61 commit b619562
Showing 1 changed file with 113 additions and 108 deletions.
221 changes: 113 additions & 108 deletions pyxdf/pyxdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import xml.etree.ElementTree as ET
from collections import OrderedDict, defaultdict
import logging
from pathlib import Path

import numpy as np

Expand Down Expand Up @@ -213,120 +214,124 @@ def __init__(self, xml):
filesize = os.path.getsize(filename)

# read file contents ([SomeText] below refers to items in the XDF Spec)
with gzip.GzipFile(filename, 'rb') if filename.endswith('.xdfz') else open(filename, 'rb') as f:
filename = Path(filename) # convert to pathlib object
if filename.suffix == '.xdfz' or filename.suffixes == ['.xdf', '.gz']:
f = gzip.open(filename, 'rb')
else:
f = open(filename, 'rb')

# read [MagicCode]
if f.read(4) != b'XDF:':
raise Exception('not a valid XDF file: %s' % filename)
# read [MagicCode]
if f.read(4) != b'XDF:':
raise Exception('not a valid XDF file: %s' % filename)

# for each chunk...
StreamId = None
while True:
# for each chunk...
StreamId = None
while True:

# noinspection PyBroadException
try:
# read [NumLengthBytes], [Length]
chunklen = _read_varlen_int(f)
except Exception:
if f.tell() < filesize - 1024:
logger.warn('got zero-length chunk, scanning forward to '
'next boundary chunk.')
_scan_forward(f)
continue
else:
logger.info(' reached end of file.')
break

# read [Tag]
tag = struct.unpack('<H', f.read(2))[0]
log_str = ' Read tag: {} at {} bytes, length={}'.format(tag, f.tell(), chunklen)
if tag in [2, 3, 4, 6]:
StreamId = struct.unpack('<I', f.read(4))[0]
log_str += ', StreamId={}'.format(StreamId)

logger.debug(log_str)

# read the chunk's [Content]...
if tag == 1:
# read [FileHeader] chunk
xml_string = f.read(chunklen - 2)
fileheader = _xml2dict(ET.fromstring(xml_string))
elif tag == 2:
# read [StreamHeader] chunk...
# read [Content]
xml_string = f.read(chunklen - 6)
decoded_string = xml_string.decode('utf-8', 'replace')
hdr = _xml2dict(ET.fromstring(decoded_string))
streams[StreamId] = hdr
logger.debug(' found stream ' + hdr['info']['name'][0])
# initialize per-stream temp data
temp[StreamId] = StreamData(hdr)
elif tag == 3:
# read [Samples] chunk...
# noinspection PyBroadException
try:
# read [NumLengthBytes], [Length]
chunklen = _read_varlen_int(f)
except Exception:
if f.tell() < filesize - 1024:
logger.warn('got zero-length chunk, scanning forward to '
'next boundary chunk.')
_scan_forward(f)
continue
# read [NumSampleBytes], [NumSamples]
nsamples = _read_varlen_int(f)
# allocate space
stamps = np.zeros((nsamples,))
if temp[StreamId].fmt == 'string':
# read a sample comprised of strings
values = [[None] * temp[StreamId].nchns
for _ in range(nsamples)]
# for each sample...
for k in range(nsamples):
# read or deduce time stamp
if struct.unpack('B', f.read(1))[0]:
stamps[k] = struct.unpack('<d', f.read(8))[0]
else:
stamps[k] = (temp[StreamId].last_timestamp +
temp[StreamId].tdiff)
temp[StreamId].last_timestamp = stamps[k]
# read the values
for ch in range(temp[StreamId].nchns):
raw = f.read(_read_varlen_int(f))
values[k][ch] = raw.decode(errors='replace')
else:
logger.info(' reached end of file.')
break

# read [Tag]
tag = struct.unpack('<H', f.read(2))[0]
log_str = ' Read tag: {} at {} bytes, length={}'.format(tag, f.tell(), chunklen)
if tag in [2, 3, 4, 6]:
StreamId = struct.unpack('<I', f.read(4))[0]
log_str += ', StreamId={}'.format(StreamId)

logger.debug(log_str)

# read the chunk's [Content]...
if tag == 1:
# read [FileHeader] chunk
xml_string = f.read(chunklen - 2)
fileheader = _xml2dict(ET.fromstring(xml_string))
elif tag == 2:
# read [StreamHeader] chunk...
# read [Content]
xml_string = f.read(chunklen - 6)
decoded_string = xml_string.decode('utf-8', 'replace')
hdr = _xml2dict(ET.fromstring(decoded_string))
streams[StreamId] = hdr
logger.debug(' found stream ' + hdr['info']['name'][0])
# initialize per-stream temp data
temp[StreamId] = StreamData(hdr)
elif tag == 3:
# read [Samples] chunk...
# noinspection PyBroadException
try:
# read [NumSampleBytes], [NumSamples]
nsamples = _read_varlen_int(f)
# allocate space
stamps = np.zeros((nsamples,))
if temp[StreamId].fmt == 'string':
# read a sample comprised of strings
values = [[None] * temp[StreamId].nchns
for _ in range(nsamples)]
# for each sample...
for k in range(nsamples):
# read or deduce time stamp
if struct.unpack('B', f.read(1))[0]:
stamps[k] = struct.unpack('<d', f.read(8))[0]
else:
stamps[k] = (temp[StreamId].last_timestamp +
temp[StreamId].tdiff)
temp[StreamId].last_timestamp = stamps[k]
# read the values
for ch in range(temp[StreamId].nchns):
raw = f.read(_read_varlen_int(f))
values[k][ch] = raw.decode(errors='replace')
else:
# read a sample comprised of numeric values
values = np.zeros((nsamples, temp[StreamId].nchns))
# for each sample...
for k in range(nsamples):
# read or deduce time stamp
if struct.unpack('B', f.read(1))[0]:
stamps[k] = struct.unpack('<d', f.read(8))[0]
else:
stamps[k] = (temp[StreamId].last_timestamp +
temp[StreamId].tdiff)
temp[StreamId].last_timestamp = stamps[k]
# read the values
raw = f.read(temp[StreamId].samplebytes)
values[k, :] = struct.unpack(temp[StreamId].structfmt, raw)
logger.debug(' reading [%s,%s]' % (temp[StreamId].nchns,
nsamples))
# optionally send through the on_chunk function
if on_chunk is not None:
values, stamps, streams[StreamId] = on_chunk(values, stamps,
streams[StreamId], s)
# append to the time series...
temp[StreamId].time_series.append(values)
temp[StreamId].time_stamps.append(stamps)
except Exception as e:
# an error occurred (perhaps a chopped-off file): emit a
# warning and scan forward to the next recognized chunk
logger.error('found likely XDF file corruption (%s), '
'scanning forward to next boundary chunk.' % e)
_scan_forward(f)
elif tag == 6:
# read [StreamFooter] chunk
xml_string = f.read(chunklen - 6)
streams[StreamId]['footer'] = _xml2dict(ET.fromstring(xml_string))
elif tag == 4:
# read [ClockOffset] chunk
temp[StreamId].clock_times.append(struct.unpack('<d', f.read(8))[0])
temp[StreamId].clock_values.append(struct.unpack('<d', f.read(8))[0])
else:
# skip other chunk types (Boundary, ...)
f.read(chunklen - 2)
# read a sample comprised of numeric values
values = np.zeros((nsamples, temp[StreamId].nchns))
# for each sample...
for k in range(nsamples):
# read or deduce time stamp
if struct.unpack('B', f.read(1))[0]:
stamps[k] = struct.unpack('<d', f.read(8))[0]
else:
stamps[k] = (temp[StreamId].last_timestamp +
temp[StreamId].tdiff)
temp[StreamId].last_timestamp = stamps[k]
# read the values
raw = f.read(temp[StreamId].samplebytes)
values[k, :] = struct.unpack(temp[StreamId].structfmt, raw)
logger.debug(' reading [%s,%s]' % (temp[StreamId].nchns,
nsamples))
# optionally send through the on_chunk function
if on_chunk is not None:
values, stamps, streams[StreamId] = on_chunk(values, stamps,
streams[StreamId], s)
# append to the time series...
temp[StreamId].time_series.append(values)
temp[StreamId].time_stamps.append(stamps)
except Exception as e:
# an error occurred (perhaps a chopped-off file): emit a
# warning and scan forward to the next recognized chunk
logger.error('found likely XDF file corruption (%s), '
'scanning forward to next boundary chunk.' % e)
_scan_forward(f)
elif tag == 6:
# read [StreamFooter] chunk
xml_string = f.read(chunklen - 6)
streams[StreamId]['footer'] = _xml2dict(ET.fromstring(xml_string))
elif tag == 4:
# read [ClockOffset] chunk
temp[StreamId].clock_times.append(struct.unpack('<d', f.read(8))[0])
temp[StreamId].clock_values.append(struct.unpack('<d', f.read(8))[0])
else:
# skip other chunk types (Boundary, ...)
f.read(chunklen - 2)

# Concatenate the signal across chunks
for stream in temp.values():
Expand Down

0 comments on commit b619562

Please sign in to comment.