-
-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Timestamp metadata and Spark #93
Comments
It turns out that spark uses the less efficient int96 times internally, and ignores the metadata labeling these integers as times. If the intent is to export to spark, then converting on the spark side ( |
What is the right way to store the data so that Spark can read it happily?
Is this the timestamp96 option?
…On Tue, Feb 28, 2017 at 9:05 AM, Martin Durant ***@***.***> wrote:
It turns out that spark uses the less efficient int96 times internally,
and ignores the metadata labeling these integers as times. If the intent is
to export to spark, then converting on the spark side (from_unixtime)
would probably be the most efficient, or one could write with times=int96
compatibility. For loading with fastparquet, the default int64 is much more
efficient.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#93 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszIzBm_onMO3YFHy2KEtCztJwrmNWks5rhCmzgaJpZM4MNst2>
.
|
Yes, exactly - see test_output.py::test_pyspark_roundtrip |
Do you have any thoughts on a roundtrip if you start with a pyspark generated .parquet file instead of a fastparquet generated .parquet file because you will not be able to check for dtype.kind == 'M'? If I create a .parquet file using pyspark and the pyspark function current_timestamp, fastparquet seems to read it a dtype = 'S12'. Unfortunately, I can't attach an example file. In my small test, I used several 'inserted_i' columns with different timestamp formats: Example
reading with
returns
Conversion necessary to get a datetime dtype after reading with ParquetFile(), pf.to_pandas():
The output of the inserted2 column does not play well with the other columns
|
Use the |
Does Spark include any metadata about how these should be interpreted?
What happens if someone actually has 12-byte strings? I'm looking for ways
where we can automatically detect this case and prevent users from having
to be clever here.
…On Mon, Mar 6, 2017 at 9:07 AM, Martin Durant ***@***.***> wrote:
Use the to_pandas(timestamp96=['inserted2']) to automatically convert the
S12-type column to times.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#93 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszIdjjluG_ZwVVPdffH4uUBKFXRClks5rjBM4gaJpZM4MNst2>
.
|
It's not in the schema, no, because this isn't a type at all according to the parquet standard. I don't know how Spark knows, it may be elsewhere in the metadata. I can look. |
Could also throw up a StackOverflow question and see if someone else can do
the work for us?
…On Mon, Mar 6, 2017 at 9:16 AM, Martin Durant ***@***.***> wrote:
It's not in the schema, no, because this isn't a type at all according to
the parquet standard. I don't know how Spark knows, it may be elsewhere in
the metadata. I can look.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#93 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszOK5t4EYka7KGpcdnqFORrjA5xADks5rjBUggaJpZM4MNst2>
.
|
So additional information can be found in an additional schema definition that spark has lying around. I wonder if any other parquet producers do this? The entry suggests this is spark-specific.
Do we want to special-case this? |
(my research previously indicated that this style of timestamp encoding was originally from map-reduce; but most of those tools require specifying the schema in code, so maybe automatically inferring the timestamp was not required) |
Long discussion on this issue: apache/parquet-format#49 ; some people saying that int96 is only ever used for this purpose (although the canonical logical type INTERVAL is listed as using it too), others saying it's time that spark et al just used int64 in all cases, like most software libraries do... Changes to the official parquet spec are very rare, nothing in a couple of years except the interesting addition of a NULL type that I wasn't aware of. |
Thanks very much for |
I've a small problem with timestamps. In Pandas it is 192913 2017-09-06T11:59:51.106Z
192914 2017-09-06T11:59:49.635Z
192915 2017-09-06T11:59:47.155Z
192916 2017-09-06T11:59:53.379Z
192917 2017-09-06T11:59:48.517Z
192918 2017-09-06T11:59:52.624Z
192919 2017-09-06T11:59:55.410Z
192920 2017-09-06T11:59:54.614Z
192921 2017-09-06T11:59:55.389Z
Name: tm_parsed, Length: 192922, dtype: object the same thing in presto, (fastparquet -> paruqet in s3) is showing up like: tm_parsed
---------------------------
+49652-01-25 01:21:42.000
+49652-01-25 01:31:01.000
+49652-01-25 01:23:40.000
+49652-01-25 01:23:39.000
+49652-01-25 06:30:06.000
(5 rows) any help on this? |
Did you write the data with |
@Gowtham-Sai - on second thoughts, I notice that the data type of your column is "object" - should it not be |
@martindurant Sorry, I didn't check the docs. I will try with that option now and let you know. |
@martindurant , also is there way that, I can convert all the existing files in s3 to int96 quickly? |
From int64->int96 timestamps? Parquet does not conveniently allow you to remove or add columns to existing data, you would have to download everything, convert, and write again. If spark is your final destination, then you could perform a map operation on the int64 column on load to convert to times (they are in micro-seconds since 1970 unix epoch). |
@martindurant , my approach is presto + (paruqet in s3). Yes, I int64 to int96. Yep, i'm converting them one by one manually. timestamp: INT96 GZIP DO:0 FPO:11012784 SZ:201129/1693576/8.42 VC:141128 ENC:RLE,PLAIN,BIT_PACKED the one which written was giving correct date. Originally written with timestamp: INT96 GZIP DO:0 FPO:16868848 SZ:293854/2176288/7.41 VC:181354 ENC:PLAIN,BIT_PACKED,RLE Is there any useful information that we can extract from the above lines regarding the above behaviour ? |
and also, if the file exists in s3 already, does FastParquet will overwrite the file? I'm using the below way to make them int96. Lemme know if you find any loop hole with the below code. def write_parquet(df, filepath, opener):
fastparquet.write(filepath, df, open_with=opener,
row_group_offsets=5000000, compression='GZIP', file_scheme='simple',
append=False, write_index=True, has_nulls=True, times='int96')
ls = fs.glob('vtap-sync/yr=2017/mn=09/dt=0[3-9]/*/*json')
def main():
for i in ls:
try:
print "[+] Processing File: %s"%i
write_parquet(fp.ParquetFile(i, open_with=s3open).to_pandas(),i,s3open)
print "[+] Processing Done: %s"%i
except Exception as e:
print "[+] Processing Failed: %s \n due to: %s"%(i, e)
gc.collect() |
It overwrites. S3 does not have immediate consistency, so if write to a path that already exists, the previous version may still be available for a time. If you are using a |
@martindurant , I guess it's because of the s3 issue. Not issue, may be that feature. Because for me it's really funny to see the same query giving 2 different results in row. Had good fun with this. This driven me crazy. Lemme know if you ever visit India. |
Not any time soon :) |
@martindurant , which one will be equivalent to presto |
|
Of course, you could use an empty string if you prefer, but then presumably your query expression would be |
Yes, utm_source is a string filed. But I've |
@martindurant , the problem is because of the following piece of code. str_columns = ['message', 'cod_message', 'error']
df[str_columns] = df[str_columns].astype(unicode) # This step converting np.nan to 'nan' and None to 'None'. How can I avoid this? |
This would do it, although I'm am sure there are better and more efficient ways, such as using the null filtering/filling functions.
|
yep, may be, this will be better, or may not be. for str_col in str_columns:
df[str_col] = df[str_col].dropna().astype(unicode)
df[str_col] = df[str_col].where(pd.notnull(df[str_col]), None) |
OK, so I create a pandas dataframe that has a timestamp column. I save this to parquet using fastparquet and then read the data with Spark. I find that my spark dataframe identifies my timestamp column as an integer column. Is there perhaps some special metadata that Spark is looking out for?
Example
The text was updated successfully, but these errors were encountered: