Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timestamp metadata and Spark #93

Closed
mrocklin opened this issue Feb 27, 2017 · 31 comments
Closed

Timestamp metadata and Spark #93

mrocklin opened this issue Feb 27, 2017 · 31 comments

Comments

@mrocklin
Copy link
Member

OK, so I create a pandas dataframe that has a timestamp column. I save this to parquet using fastparquet and then read the data with Spark. I find that my spark dataframe identifies my timestamp column as an integer column. Is there perhaps some special metadata that Spark is looking out for?

Example

In [1]: import pandas as pd

In [2]: import pyspark

In [3]: import fastparquet

In [4]: df = pd.DataFrame({'x': [1, 2, 3]})

In [5]: df['x'] = pd.to_datetime(df.x)

In [6]: df
Out[6]: 
                              x
0 1970-01-01 00:00:00.000000001
1 1970-01-01 00:00:00.000000002
2 1970-01-01 00:00:00.000000003

In [7]: sc = pyspark.SparkContext('local[4]')
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/02/27 17:13:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/27 17:13:11 WARN Utils: Your hostname, carbon resolves to a loopback address: 127.0.1.1; using 192.168.1.115 instead (on interface wlp4s0)
17/02/27 17:13:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/02/27 17:13:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

In [8]: sql = pyspark.SQLContext(sc)

In [9]: fastparquet.write('foo.parquet', df)

In [10]: sdf = sql.read.parquet('foo.parquet')
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

In [11]: sdf
Out[11]: DataFrame[x: bigint]
@martindurant
Copy link
Member

It turns out that spark uses the less efficient int96 times internally, and ignores the metadata labeling these integers as times. If the intent is to export to spark, then converting on the spark side (from_unixtime) would probably be the most efficient, or one could write with times=int96 compatibility. For loading with fastparquet, the default int64 is much more efficient.

@mrocklin
Copy link
Member Author

mrocklin commented Feb 28, 2017 via email

@martindurant
Copy link
Member

Yes, exactly - see test_output.py::test_pyspark_roundtrip

@kruhly
Copy link

kruhly commented Mar 6, 2017

Do you have any thoughts on a roundtrip if you start with a pyspark generated .parquet file instead of a fastparquet generated .parquet file because you will not be able to check for dtype.kind == 'M'?

If I create a .parquet file using pyspark and the pyspark function current_timestamp, fastparquet seems to read it a dtype = 'S12'.

Unfortunately, I can't attach an example file. In my small test, I used several 'inserted_i' columns with different timestamp formats:

Example

df = pd.DataFrame([['csdved.conf.xml', 1],  ['adrvedsev.conf.xml', 2]], columns=['a', 'b'])

df['inserted1'] = pd.Timestamp.now()
df['inserted3'] = df['inserted1'].values.astype(int)/pow(10,6)

ddf = spark.createDataFrame(df)

ddf = ddf.select('*', current_timestamp().alias('inserted2'))
ddf = ddf.withColumn('inserted4', from_unixtime(ddf.inserted1/pow(10,9))

ddf.coalesce(1).write.parquet('batch_id=1', mode='overwrite', compression='snappy')

reading with

ParquetFile(...).to_pandas().dtypes

returns

u'inserted1': dtype('int64'),
 u'inserted2': dtype('S12'),
 u'inserted3': dtype('int64'),
 u'inserted4': dtype('O')

Conversion necessary to get a datetime dtype after reading with ParquetFile(), pf.to_pandas():

inserted1 => pf.to_pandas()['inserted1'].astype('<M8[ns]')
inserted2 => ??? 
inserted3 => pf.to_pandas()['inserted3'].astype('<M8[ms]')
inserted4 =>pf.to_pandas()['inserted4'].astype('<M8[ns]')

The output of the inserted2 column does not play well with the other columns

pf.to_pandas().inserted2

0    ��ݖ�$ۀ%
1    ��ݖ�$ۀ%
Name: inserted2, dtype: object

@martindurant
Copy link
Member

Use the to_pandas(timestamp96=['inserted2']) to automatically convert the S12-type column to times.

@mrocklin
Copy link
Member Author

mrocklin commented Mar 6, 2017 via email

@martindurant
Copy link
Member

It's not in the schema, no, because this isn't a type at all according to the parquet standard. I don't know how Spark knows, it may be elsewhere in the metadata. I can look.

@mrocklin
Copy link
Member Author

mrocklin commented Mar 6, 2017 via email

@martindurant
Copy link
Member

@martindurant
Copy link
Member

martindurant commented Mar 6, 2017

So additional information can be found in an additional schema definition that spark has lying around. I wonder if any other parquet producers do this? The entry suggests this is spark-specific.

>>> pf = fastparquet.ParquetFile(...)
>>> json.loads([kv.value for kv in pf.fmd.key_value_metadata  if kv.key=='org.apache.spark.sql.parquet.row.metadata'][0])['fields']
[...
{'metadata': {}, 'name': 'inserted2', 'nullable': True, 'type': 'timestamp'}
]

Do we want to special-case this?

@martindurant
Copy link
Member

martindurant commented Mar 6, 2017

(my research previously indicated that this style of timestamp encoding was originally from map-reduce; but most of those tools require specifying the schema in code, so maybe automatically inferring the timestamp was not required)

@martindurant
Copy link
Member

martindurant commented Mar 6, 2017

Long discussion on this issue: apache/parquet-format#49 ; some people saying that int96 is only ever used for this purpose (although the canonical logical type INTERVAL is listed as using it too), others saying it's time that spark et al just used int64 in all cases, like most software libraries do... Changes to the official parquet spec are very rare, nothing in a couple of years except the interesting addition of a NULL type that I wasn't aware of.

@kruhly
Copy link

kruhly commented Mar 6, 2017

Thanks very much for to_pandas(timestamp96=['inserted2']) that is just what I needed. I was going to suggest the timestamp type but you beat me too it. Thanks again.

@fsck-mount
Copy link

I've a small problem with timestamps. In Pandas it is

192913    2017-09-06T11:59:51.106Z
192914    2017-09-06T11:59:49.635Z
192915    2017-09-06T11:59:47.155Z
192916    2017-09-06T11:59:53.379Z
192917    2017-09-06T11:59:48.517Z
192918    2017-09-06T11:59:52.624Z
192919    2017-09-06T11:59:55.410Z
192920    2017-09-06T11:59:54.614Z
192921    2017-09-06T11:59:55.389Z
Name: tm_parsed, Length: 192922, dtype: object

the same thing in presto, (fastparquet -> paruqet in s3) is showing up like:

         tm_parsed
---------------------------
 +49652-01-25 01:21:42.000
 +49652-01-25 01:31:01.000
 +49652-01-25 01:23:40.000
 +49652-01-25 01:23:39.000
 +49652-01-25 06:30:06.000
(5 rows)

any help on this?

@martindurant
Copy link
Member

Did you write the data with times='int96' as described in the docs?

@martindurant
Copy link
Member

@Gowtham-Sai - on second thoughts, I notice that the data type of your column is "object" - should it not be datetime64? The pandas function pd.to_datetime is probably what you need.

@fsck-mount
Copy link

fsck-mount commented Sep 7, 2017

@martindurant Sorry, I didn't check the docs. I will try with that option now and let you know.
Regarding the second thought, just to paste the content, i've done that. But actually I was applying pd.to_datetime. So, it's datetime.

@fsck-mount
Copy link

@martindurant , also is there way that, I can convert all the existing files in s3 to int96 quickly?

@martindurant
Copy link
Member

martindurant commented Sep 7, 2017

From int64->int96 timestamps? Parquet does not conveniently allow you to remove or add columns to existing data, you would have to download everything, convert, and write again. If spark is your final destination, then you could perform a map operation on the int64 column on load to convert to times (they are in micro-seconds since 1970 unix epoch).

@fsck-mount
Copy link

fsck-mount commented Sep 8, 2017

@martindurant , my approach is presto + (paruqet in s3). Yes, I int64 to int96. Yep, i'm converting them one by one manually.
To my surprise, the one which is repaired still giving the bad date,
6th Date: (Rewritten using times="int96")

timestamp: INT96 GZIP DO:0 FPO:11012784 SZ:201129/1693576/8.42 VC:141128 ENC:RLE,PLAIN,BIT_PACKED

the one which written was giving correct date. Originally written with times="int96".
7th Date:

timestamp: INT96 GZIP DO:0 FPO:16868848 SZ:293854/2176288/7.41 VC:181354 ENC:PLAIN,BIT_PACKED,RLE

Is there any useful information that we can extract from the above lines regarding the above behaviour ?

@fsck-mount
Copy link

and also, if the file exists in s3 already, does FastParquet will overwrite the file?
This is the only way I can think, the above one is possible, if the files are not writing to s3. But in that case, it is supposed to raise error.

I'm using the below way to make them int96. Lemme know if you find any loop hole with the below code.

def write_parquet(df, filepath, opener):
	fastparquet.write(filepath, df, open_with=opener,
		row_group_offsets=5000000, compression='GZIP', file_scheme='simple',
		append=False, write_index=True, has_nulls=True, times='int96')

ls = fs.glob('vtap-sync/yr=2017/mn=09/dt=0[3-9]/*/*json')
def main():
    for i in ls:
        try:
            print "[+] Processing File: %s"%i
            write_parquet(fp.ParquetFile(i, open_with=s3open).to_pandas(),i,s3open)
            print "[+] Processing Done: %s"%i
        except Exception as e: 
            print "[+] Processing Failed: %s \n due to: %s"%(i, e)
        gc.collect()

@martindurant
Copy link
Member

It overwrites. S3 does not have immediate consistency, so if write to a path that already exists, the previous version may still be available for a time. If you are using a _metadata file, you will have to remake it after your loop to update the schema within.

@fsck-mount
Copy link

@martindurant , I guess it's because of the s3 issue. Not issue, may be that feature.

Because for me it's really funny to see the same query giving 2 different results in row.
It's like query first time executed gives the wrong result. Same query, just pressing the up arrow button and hitting enter gives the correct result. And again doing it gives wrong result.

Had good fun with this. This driven me crazy.

Lemme know if you ever visit India.

@martindurant
Copy link
Member

Not any time soon :)

@fsck-mount
Copy link

fsck-mount commented Sep 8, 2017

@martindurant , which one will be equivalent to presto utm_source is not null ,empty string '' or np.nan?

@martindurant
Copy link
Member

utm_source is a string field? The equivalent of NULL in python is None, and this is used in any object-type field. Pandas uses np.nan as NULL for float columns (and the similar `NaT for times).

@martindurant
Copy link
Member

Of course, you could use an empty string if you prefer, but then presumably your query expression would be utm_source is not "".

@fsck-mount
Copy link

Yes, utm_source is a string filed. But I've None as utm_source value. But I'm not really sure, whether I've value as "None" or it's because of '' empty string or None is converted to "None". I will spend sometime on this.

@fsck-mount
Copy link

fsck-mount commented Sep 10, 2017

@martindurant , the problem is because of the following piece of code.

str_columns = ['message', 'cod_message', 'error']
df[str_columns] = df[str_columns].astype(unicode) # This step converting  np.nan to 'nan' and None to 'None'. 

How can I avoid this?
I've to do astype(unicode) because I'm using python 2 and I've some problem with dataset.

@martindurant
Copy link
Member

This would do it, although I'm am sure there are better and more efficient ways, such as using the null filtering/filling functions.

for col in str_columns:
    df[col] = df[col].map(lambda x: unicode(x) if x is not None else None)

@fsck-mount
Copy link

yep, may be, this will be better, or may not be.

for str_col in str_columns:
	df[str_col] = df[str_col].dropna().astype(unicode)
	df[str_col] = df[str_col].where(pd.notnull(df[str_col]), None)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants