Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Internal avro parser. #10764

Conversation

kasobol-msft
Copy link
Contributor

No description provided.

value = value.encode('utf-8')
self._meta[key] = value

def _GetInputFileLength(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this will work with Quick Query, since QQ streams don't have a defined length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to get rid of that. Looks like stream-like object starts to return empty when end is reached.

@annatisch
Copy link
Member

Thanks @kasobol-msft!
Is any part of this public facing? Or all entirely internal?

Adding @rakshith91 for review.

@@ -0,0 +1,5 @@
# -------------------------------------------------------------------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that the contents of the _shared directory must be copied across all storage SDKs (queues/fileshare/datalake/etc). This includes the _shared directory in the test suite. If you feel this should not be in the other SDKs (i.e. that avro will not be a part of queue messages, or datalake etc), then please move this directory up a layer :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@annatisch this code is going to be used in azure-storage-blob and azure-storage-blob-changefeed (that one is work in progress) modules. So shared seems to be better fit. If so do we need to copy avro to all storage modules or only two that take dependency on it?

For the context. It's imperative that we hide avro parser and not expose it. So having it packaged separately and adding as dependency isn't an option unfortunately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can keep it in shared - but it will be distributed in every storage SDK (queues, datalake, etc).
At some point we will add a CI step to ensure the _shared module is kept consistent between all the SDKs.

If you do not wish this to be included in all the storage SDKs, then you could move it up a level to azure.storage.blob._avro

Copy link
Contributor

@xiafu-msft xiafu-msft Apr 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Anna! @annatisch
We created a package azure-storage-blob-changefeed package which depends on azure-storage-blob. Previously I planned to add _shared folder to azure-storage-blob-changefeed to use avro(that is what I told kamil), but now we plan to directly import _shared.avro from azure-storage-blob so that we don't need to put _shared folder in blob-changefeed pkg.

In the future we will have changefeed feature in fileshare and datalake, so probably we can also import the _shared folder from azure-storage-file-share and azure-storage-file-datalake so we don't need to add _shared folder in azure-storage-fileshare-changefeed and azure-storage-datalake-changefeed(these changefeed packages haven't been created yet)

So I guess probably it makes more sense to put leave the avro in shared folder?
What do you think ^_^?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it stays in _shared.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xiafu-msft! Yes if the plan is to support this in files and datalake as well, then shared is a good place for it. Could you please duplicate it across the other packages?

Copy link
Contributor

@xiafu-msft xiafu-msft Apr 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Anna! @annatisch
Thanks for your feedback! We still need to tweak the DafaFileReader a bit to fit ChangeFeed feature. After that we will duplicate it! So that will be in another pr.

@kasobol-msft
Copy link
Contributor Author

Thanks @kasobol-msft!
Is any part of this public facing? Or all entirely internal?

Adding @rakshith91 for review.

@annatisch All internal. No intent of exposing that.

@kasobol-msft kasobol-msft marked this pull request as ready for review April 16, 2020 22:12
return self.read_data(self.writer_schema, self.reader_schema, decoder)

def read_data(self, writer_schema, reader_schema, decoder):
# schema matching
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need both writer and reader schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most general case we can have two schemas - schema in the file that was used to write it. Reader schema can be supplied by someone trying to parse avro file to narrow down things they care about. It's kind of equivalent of "casting" in strongly type language.

However, I think we are not going to use it in foreseeable future, so I'm removing that and just leaving writer_schema.

Please also notice how writer_schema is defined:
As defined in the Avro specification, we call the schema encoded in the data the "writer's schema".

# read the header: magic, meta, sync
self._read_header()

# ensure codec is valid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't really need codec for QQ or ChangeFeed, are we sure it works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be validating that so that we don't attempt parsing something we can't support. For example server side can at some point add codec support before SDK does that Or server side adds it and customer is using SDK that didn't catch up. In such case we want clear message about what's going on.

I believe we have that validation in .Net as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'm debating whether to keep or drop "deflate" support from sync version. It's working (and covered by tests). Though we won't use it in nearest future.

class DataFileReader(object):
"""Read files written by DataFileWriter."""

# TODO: allow user to specify expected schema?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to delete these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed TODO's.

avro_name = self.GetName(name=name, namespace=namespace)
return self._names.get(avro_name.fullname, None)

def GetSchema(self, name, namespace=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to follow the same naming format? to make all method names in the format like get_schema()?
Ctrl+Shift+r will help you replace the name across files, just be careful not to replace the name in other packages lol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

del prunable['namespace']
return prunable

def Register(self, schema):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also let's make this method lower case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

# ------------------------------------------------------------------------------


def _SchemaFromJSONString(json_string, names):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make all methods below in lower case ^_^ eg. _schema_from_json_string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

@gapra-msft gapra-msft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me on the Avro side of things. Could you work with Emily and eventually maybe get a QQ file so we can add a test for that, just for completion sake. Doesnt have to be part of this PR

from azure.storage.blob._shared.avro.datafile import DataFileReader
from azure.storage.blob._shared.avro.avro_io import DatumReader

SCHEMAS_TO_VALIDATE = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason to place the tests in the _shared folder? fyi, Just like the _shared folder in the code; we need to follow the same norms in the tests too. _shared folder will be common across packages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rakshith91 It's testing _shared code so I was trying to match paths. But I see your point, so I'll move it.
However I wonder what would be a good place for such tests when _shared is actually automagically cloned across packages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put it an avro folder - looks like you already did - thanks :)

@kasobol-msft kasobol-msft merged commit 155c166 into Azure:feature/storage-stg73 Apr 23, 2020
xiafu-msft pushed a commit to xiafu-msft/azure-sdk-for-python that referenced this pull request May 1, 2020
* initial avro parser

* try fixing test...

* falling in love with python compatibility...

* make linter happy.

* raise StopIteration when there is no more bytes instead of tracking file length.

* async avro parser

* fix syntax for Python 3.5

* get rid of 'readers_schema' as we only honor schema that has been written to file ('writer_schema').

* pr feedback

* trim unused code.

* pr feedback.

* simplify skip sync in next.

* move avro tests from _shared.
xiafu-msft pushed a commit to xiafu-msft/azure-sdk-for-python that referenced this pull request May 2, 2020
* initial avro parser

* try fixing test...

* falling in love with python compatibility...

* make linter happy.

* raise StopIteration when there is no more bytes instead of tracking file length.

* async avro parser

* fix syntax for Python 3.5

* get rid of 'readers_schema' as we only honor schema that has been written to file ('writer_schema').

* pr feedback

* trim unused code.

* pr feedback.

* simplify skip sync in next.

* move avro tests from _shared.
xiafu-msft pushed a commit to xiafu-msft/azure-sdk-for-python that referenced this pull request May 12, 2020
* initial avro parser

* try fixing test...

* falling in love with python compatibility...

* make linter happy.

* raise StopIteration when there is no more bytes instead of tracking file length.

* async avro parser

* fix syntax for Python 3.5

* get rid of 'readers_schema' as we only honor schema that has been written to file ('writer_schema').

* pr feedback

* trim unused code.

* pr feedback.

* simplify skip sync in next.

* move avro tests from _shared.
xiafu-msft pushed a commit that referenced this pull request Jun 10, 2020
* initial avro parser

* try fixing test...

* falling in love with python compatibility...

* make linter happy.

* raise StopIteration when there is no more bytes instead of tracking file length.

* async avro parser

* fix syntax for Python 3.5

* get rid of 'readers_schema' as we only honor schema that has been written to file ('writer_schema').

* pr feedback

* trim unused code.

* pr feedback.

* simplify skip sync in next.

* move avro tests from _shared.
xiafu-msft added a commit that referenced this pull request Jul 7, 2020
* [Storage]STG73

* [Blob][Swagger]Update Swagger (#10943)

* [Blob][Swagger]Regenerate Swagger Code

* fix container test failure caused by list_containers include type change

* [Storage] Internal avro parser. (#10764)

* initial avro parser

* try fixing test...

* falling in love with python compatibility...

* make linter happy.

* raise StopIteration when there is no more bytes instead of tracking file length.

* async avro parser

* fix syntax for Python 3.5

* get rid of 'readers_schema' as we only honor schema that has been written to file ('writer_schema').

* pr feedback

* trim unused code.

* pr feedback.

* simplify skip sync in next.

* move avro tests from _shared.

* Jumbo blob support (#11176)

* wip

* initial test coverage.

* wip.

* wip

* single upload.

* add async tests.

* disable 50k block tests.

* datalake append.

* async datalake

* disable tests that send large payload over network.

* pr feedback.

* Undelete share (#11394)

* Undelete container (#11339)

* [Storage][Blob] Added support for Object Replication (#11525)

* Blob versioning (#11154)

* [Blob][QuickQuery]Add Quick Query Support (#10946)

* [Blob][STG73]Blob Tags (#11418)

* regenerate code (#11964)

* fix the bug which caused only showing fatal error (#11997)

* [Storage][STG73]Address API Review Comments (#12111)

* [Storage][STG73]Address API Review Comments

* [Storage][STG73]dict<policy, rules> -> list(ObjectReplicationPolicy)

* fix blob tag_value test

* expose ObjectReplicationPolicy and ObjectReplicationRule, fix test

* fix test

* Changefeed (#10755)

* [ChangeFeed]Add ChangeFeed Package

* test_avro failure

* update dev_requirement.txt

* change namespace to azure.storage.blob.changefeed

* address comments

* optimize memory when reading changefeed events

* namespace change

* set up package change

* fix failed tests

* readme and kwargs

* Update sdk/storage/azure-storage-blob-changefeed/azure/storage/blob/changefeed/_change_feed_client.py

Co-authored-by: Rakshith Bhyravabhotla <sabhyrav@microsoft.com>

* address comments

* 'azure-storage-blob>=12.3.0' which does not match the frozen requirement 'azure-storage-blob~=1.3'

Co-authored-by: Rakshith Bhyravabhotla <sabhyrav@microsoft.com>

* [Storage-Blob] Quick Query API (#11991)

* Renamed query error

* Renamed query reader

* Updated config models

* Updated format request params

* Updated iterator

* fix the bug which caused only showing fatal error

* Updated Error message

* Fixed query helper

* Started test conversion

* small fix

* Fixed tests

* Updated error handling + json model

* Updated recordings

* Removed old recording

* Added iter tests

* Iter test recordings

* Fix test

* Remove extra recording

* Fix pylint

* Some docs cleanup

* Renamed iter_records -> iter_stream

* Review feedback

* Updated tests

* Missing commas

* Fix syntax

* Fix pylint

Co-authored-by: xiafu <xiafu@microsoft.com>

* tag sas (#12258)

* tag sas

* disable undelete_container

* pylint

* skip undelete_container tests

* [Blob][Versioning]Disable Versioning Live Test (#12281)

* [Blob][QQ]Default output_format to input_format (#12283)

* [Storage][Jumbo]Remove super (#12314)

* [Storage][JumboBlob]remove empty super()

* pypy3

* change sas version to latest

* set tags account location to central canada

* re-recording queue

* changefeed paths generator

* mark tests for vid as playback only

* fix changefeed

* fix pylint
make the test account location for tags to central canada

* add a delay before calling find_blobs_by_tags

* remove tags header

* mark a large file test playback only

* revert "mark a large file test playback only"
skip upload large file test
address comment

* move tag permission and filter_by_tags permission to kwargs

Co-authored-by: Kamil Sobol <61715331+kasobol-msft@users.noreply.github.com>
Co-authored-by: Ze Qian Zhang <zezha@microsoft.com>
Co-authored-by: Rakshith Bhyravabhotla <sabhyrav@microsoft.com>
Co-authored-by: annatisch <antisch@microsoft.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants