Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AttributeError: 'IntegerArray' object has no attribute 'tobytes' #406

Open
JamesCropcho opened this issue Feb 24, 2019 · 8 comments
Open

Comments

@JamesCropcho
Copy link

Hello!

I am hoping you might know what is going on.

Using edge versions of Dask and FastParquet on Python 3.7.2 I execute:

df.to_parquet(
    'us_president_tax_returns.parquet', 
    engine='fastparquet', 
    append=False)

Which raises:

AttributeError                            Traceback (most recent call last)
<timed eval> in <module>

~/dask/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
   2828         """ See dd.to_parquet docstring for more information """
   2829         from .io import to_parquet
-> 2830         return to_parquet(self, path, *args, **kwargs)
   2831 
   2832     @derived_from(pd.DataFrame)

~/dask/dask/dataframe/io/parquet.py in to_parquet(df, path, engine, compression, write_index, append, ignore_divisions, partition_on, storage_options, compute, **kwargs)
   1232 
   1233     if compute:
-> 1234         out.compute()
   1235         return None
   1236     return out

~/dask/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~/dask/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

~/dask/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     74     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     75                         cache=cache, get_id=_thread_get_id,
---> 76                         pack_exception=pack_exception, **kwargs)
     77 
     78     # Cleanup pools associated to dead threads

~/dask/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    458                         _execute_task(task, data)  # Re-execute locally
    459                     else:
--> 460                         raise_exception(exc, tb)
    461                 res, worker_id = loads(res_info)
    462                 state['cache'][key] = res

~/dask/dask/compatibility.py in reraise(exc, tb)
    110         if exc.__traceback__ is not tb:
    111             raise exc.with_traceback(tb)
--> 112         raise exc
    113 
    114     import pickle as cPickle

~/dask/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    228     try:
    229         task, data = loads(task_info)
--> 230         result = _execute_task(task, data)
    231         id = get_id()
    232         result = dumps((result, id))

~/dask/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~/dask/dask/dataframe/io/parquet.py in _write_partition_fastparquet(df, fs, path, filename, fmd, compression, partition_on)
    546         with fs.open(fs.sep.join([path, filename]), 'wb') as fil:
    547             rgs = make_part_file(fil, df, fmd.schema, compression=compression,
--> 548                                  fmd=fmd)
    549     return rgs
    550 

~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/fastparquet/writer.py in make_part_file(f, data, schema, compression, fmd)
    629     with f as f:
    630         f.write(MARKER)
--> 631         rg = make_row_group(f, data, schema, compression=compression)
    632         if fmd is None:
    633             fmd = parquet_thrift.FileMetaData(num_rows=len(data),

~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/fastparquet/writer.py in make_row_group(f, data, schema, compression)
    617                 comp = compression
    618             chunk = write_column(f, data[column.name], column,
--> 619                                  compression=comp)
    620             rg.columns.append(chunk)
    621     rg.total_byte_size = sum([c.meta_data.total_uncompressed_size for c in

~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/fastparquet/writer.py in write_column(f, data, selement, compression)
    511 
    512     bdata = definition_data + repetition_data + encode[encoding](
--> 513             data, selement)
    514     bdata += 8 * b'\x00'
    515     try:

~/.pyenv/versions/3.7.2/lib/python3.7/site-packages/fastparquet/writer.py in encode_plain(data, se)
    254         return pack_byte_array(list(out))
    255     else:
--> 256         return out.tobytes()
    257 
    258 

AttributeError: 'IntegerArray' object has no attribute 'tobytes'

While it's not doable for me to submit a full df.dtypes.values output, I can give you partial output:

array([dtype('uint32'),
       CategoricalDtype(categories=['REDACTED'], ordered=False),
       UInt16Dtype(),
       CategoricalDtype(categories=['REDACTED'], ordered=False),
       dtype('O'),
       dtype('O'), datetime64[ns, UTC], dtype('O'), dtype('uint16'),
       dtype('O'),
       CategoricalDtype(categories=['__UNKNOWN_CATEGORIES__'], ordered=False),
       dtype('bool'),
       dtype('bool'), UInt8Dtype(), datetime64[ns, UTC],
       datetime64[ns, UTC], datetime64[ns, UTC], datetime64[ns, UTC],
       UInt8Dtype(),
       UInt32Dtype(), UInt16Dtype(), UInt32Dtype(), dtype('O'),
       dtype('O'), dtype('O'), dtype('O'), dtype('O'),
       dtype('O'), dtype('O')], dtype=object)

Note my use of the new Nullable Integer data type (https://pandas.pydata.org/pandas-docs/stable/user_guide/integer_na.html#integer-na).

What do you think?

―James

@martindurant
Copy link
Member

Note that you would get a much simpler traceback if you tried to write the same data using fastparquet's write() function directly.

Indeed, the code is assuming that the data is a numpy array, which used to always be the case for integers. The new integer-with-nulls should follow the path that was previously one of the options for object-type arrays, to encode asinteger-with-nulls. This is fixable but would take a little poking around.

@JamesCropcho
Copy link
Author

JamesCropcho commented Feb 24, 2019 via email

@martindurant
Copy link
Member

This is certainly fastparquet's remit. I would appreciate any help in fixing it, though. The code is already in place for the object([int, int, None]) case from before, but needs logic to call it correctly in the right place.

@martindurant
Copy link
Member

(it is likely that the IntegerArray already has the right structures internally to make writing them to parquet easy - @TomAugspurger )

@TomAugspurger
Copy link
Member

We basically haven't addressed IO for extension arrays: pandas-dev/pandas#20612. Once that's solved, the idea would be for each ExtensionArray to determine how it should be serialized.

I'm not sure whether fastparquet wants to get ahead of pandas here. The internal representation of IntegerArray is likely to change in the near future.

@martindurant
Copy link
Member

Quite surprised to see that IntegerArray doesn't have public methods to get the values and mask separately. They are available as attributes _data and _mask, which is what we'll have to use.

@TomAugspurger
Copy link
Member

TomAugspurger commented Feb 24, 2019 via email

@martindurant
Copy link
Member

Given

       IntegerArray is currently experimental, and its API or internal
       implementation may change without warning.

I think it's reasonable for fastparquet not to support this for the time being. It should become the standard thing that parquet integer columns create, and, as in this case, it should be valid input when writing - but not yet. I would ask that users keep to the standard object or float representation for now, even though it will be less efficient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants