Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to anyopen for taking filelike objects; remove broken .zip support #197

Merged
merged 10 commits into from
Jun 30, 2022
64 changes: 48 additions & 16 deletions src/alchemlyb/parsing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

"""
import os
from os import PathLike
from typing import IO, Optional, Union
import bz2
import gzip
import zipfile
Expand All @@ -10,41 +12,71 @@ def bz2_open(filename, mode):
mode += 't' if mode in ['r','w','a','x'] else ''
return bz2.open(filename, mode)


def gzip_open(filename, mode):
mode += 't' if mode in ['r','w','a','x'] else ''
return gzip.open(filename, mode)

def anyopen(filename, mode='r'):
"""Return a file stream for filename, even if compressed.

Supports files compressed with bzip2 (.bz2), gzip (.gz), and zip (.zip)
compression schemes. The appropriate extension must be present for
the function to properly handle the file.
def anyopen(datafile: Union[PathLike, IO], mode='r', compression=None):
"""Return a file stream for file or stream, even if compressed.

Supports files compressed with bzip2 (.bz2) and gzip (.gz) compression
schemes. The appropriate extension must be present for the function to
properly handle the file.

If giving a stream for `datafile`, then you must specify `compression` if
the stream is compressed.

Parameters
----------
filename : str
Path to file to use.
datafile : PathLike | IO
Path to file to use, or an open IO stream. If an IO stream, use
`compression` to specify the type of compression used, if any.
mode : str
Mode for stream; usually 'r' or 'w'.
compression : str
Use to specify compression. Must be one of 'bzip2', 'gzip'.
Overrides use of extension for determining compression if `datafile` is
a file.
dotsdl marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
stream : stream
Open stream for reading.
Open stream for reading or writing, depending on mode.
dotsdl marked this conversation as resolved.
Show resolved Hide resolved

"""
# opener for each type of file
extensions = {'.bz2': bz2_open,
'.gz': gzip_open,
'.zip': zipfile.ZipFile}
orbeckst marked this conversation as resolved.
Show resolved Hide resolved
'.gz': gzip_open}

# compression selections available
compressions = {'bzip2': bz2_open,
'gzip': gzip_open}

ext = os.path.splitext(filename)[1]
# if `datafile` is a stream
if ((hasattr(datafile, 'read') and any((i in mode for i in ('r',)))) or
(hasattr(datafile, 'write') and any((i in mode for i in ('w', 'a', 'x'))))):
# if no compression specified, just pass the stream through
if compression is None:
return datafile
elif compression in compressions:
compressor = compressions[compression]
return compressor(datafile, mode=mode)
else:
raise ValueError("`datafile` is a stream, but specified `compression` '{compression}' is not supported")
dotsdl marked this conversation as resolved.
Show resolved Hide resolved

if ext in extensions:
opener= extensions[ext]
# otherwise, treat as a file
# allow compression to override any extension on the file
dotsdl marked this conversation as resolved.
Show resolved Hide resolved
if compression in compressions:
opener = compressions[compression]

else:
opener = open
# use extension to determine the compression used, if present
elif compression is None:
ext = os.path.splitext(datafile)[1]
if ext in extensions:
opener = extensions[ext]
else:
opener = open

return opener(filename, mode)
return opener(datafile, mode)
107 changes: 107 additions & 0 deletions src/alchemlyb/tests/parsing/test_util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import io
import pytest

from alchemtest.gmx import load_expanded_ensemble_case_1
from alchemlyb.parsing.util import anyopen

Expand All @@ -12,3 +15,107 @@ def test_gzip():
for filename in dataset['data'][leg]:
with anyopen(filename, 'r') as f:
assert type(f.readline()) is str

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to another comment, but maybe add a test for what happens when compression is given with a pathlike, just to ensure no API breaks on whatever behavior is decided on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added explicit test forcing compression despite wrong extension.


def test_gzip_stream():
"""Test that `anyopen` reads streams with specified compression.

"""
dataset = load_expanded_ensemble_case_1()

for leg in dataset['data']:
for filename in dataset['data'][leg]:
with open(filename, 'rb') as f:
with anyopen(f, mode='r', compression='gzip') as f_uc:
assert type(f_uc.readline()) is str


def test_gzip_stream_wrong():
"""Test that `anyopen` gives failure for attempting to decompress gzip
stream with bz2.

"""
dataset = load_expanded_ensemble_case_1()

for leg in dataset['data']:
for filename in dataset['data'][leg]:
with open(filename, 'rb') as f:
with anyopen(f, mode='r', compression='bzip2') as f_uc:
with pytest.raises(OSError, match='Invalid data stream'):
assert type(f_uc.readline()) is str


def test_gzip_stream_wrong_no_compression():
"""Test that `anyopen` gives passthrough when no compression specified on a
stream.

"""
dataset = load_expanded_ensemble_case_1()

for leg in dataset['data']:
for filename in dataset['data'][leg]:
with open(filename, 'rb') as f:
with anyopen(f, mode='r') as f_uc:
assert type(f_uc.readline()) is bytes


@pytest.mark.parametrize('extension', ['bz2', 'gz'])
def test_file_roundtrip(extension, tmp_path):
"""Test that roundtripping write/read to a file works with `anyopen`.

"""

data = "my momma told me to pick the very best one and you are not it"

filepath = tmp_path / f'testfile.txt.{extension}'
with anyopen(filepath, mode='w') as f:
f.write(data)

with anyopen(filepath, 'r') as f:
data_out = f.read()

assert data_out == data


@pytest.mark.parametrize('extension,compression',
[('bz2', 'gzip'), ('gz', 'bzip2')])
def test_file_roundtrip_force_compression(extension, compression, tmp_path):
"""Test that roundtripping write/read to a file works with `anyopen`,
in which we force compression despite different extension.

"""

data = "my momma told me to pick the very best one and you are not it"

filepath = tmp_path / f'testfile.txt.{extension}'
with anyopen(filepath, mode='w', compression=compression) as f:
f.write(data)

with anyopen(filepath, 'r', compression=compression) as f:
data_out = f.read()

assert data_out == data


@pytest.mark.parametrize('compression', ['bzip2', 'gzip'])
def test_stream_roundtrip(compression):
"""Test that roundtripping write/read to a stream works with `anyopen`

"""

data = "my momma told me to pick the very best one and you are not it"

with io.BytesIO() as stream:

# write to stream
with anyopen(stream, mode='w', compression=compression) as f:
f.write(data)

# start at the beginning
stream.seek(0)

# read from stream
with anyopen(stream, 'r', compression=compression) as f:
data_out = f.read()

assert data_out == data