Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement concurrent GCS reads #256

Closed
tarekziade opened this issue Dec 20, 2022 · 17 comments
Closed

Implement concurrent GCS reads #256

tarekziade opened this issue Dec 20, 2022 · 17 comments
Labels
enhancement New feature or request

Comments

@tarekziade
Copy link
Contributor

right now the google connector reads all documents in a sequence:

https://github.com/elastic/connectors-python/blob/main/connectors/sources/gcs.py#L345

we can push concurrent self.get_blob_document calls using a ConcurrentTasks instance https://github.com/elastic/connectors-python/blob/main/connectors/utils.py#L342

@tarekziade tarekziade added the enhancement New feature or request label Dec 20, 2022
@jignesh-crest
Copy link
Collaborator

Sure, I'll check and get back to you.

@jignesh-crest
Copy link
Collaborator

Hey, I tried executing connector following your suggestion to include a concurrent task for the get_blob_document, and here are my findings, The caller method(prepare_docs) from byoc:419 expect the return value as a single dictionary, hence get_doc must yield a single dictionary, if I execute get_blob_document in a concurrent task, I would have to pull out and yield the resulted documents one by one, the throughput of this method would come as same as the current approach. The bottleneck is that the caller method(prepare_docs) expects a single dictionary as a return value.

Modified get_docs:
image

Modified get_blob_document:
image

Let us know if anything needs to be changed.

We feel that applying concurrency over get_blob_document will not make much difference as it has no I/O bound and very less CPU bound operations( just the doc mapping), the connector will be more faster if we can apply concurrency over the get_content method for content extraction. What do you think?

@tarekziade
Copy link
Contributor Author

tarekziade commented Dec 29, 2022

I had in mind a different approach. You can think of it a bit differently:

  1. get_docs() can create an asyncio.Queue object, and start tasks that will add docs to it.
  2. get_docs() can yield items from that queue
  3. the tasks that was created to fill the queue are the one that call the Google API concurrently to get the content, using ConcurrentTasks

untested pseudo-code:

def get_docs():

    q = asyncio.Queue()
    fetchers = ConcurrentTasks(10)


    async def grab_content(blob_document):
        content = await self.get_content(blob_document)
        await q.put((blob_document, content))

    async def producer():
        async for blobs in self.fetch_blobs():
            for blob_document in self.get_blob_document(blobs=blobs)
                await fetchers.put(grab_content(blob_document))
        q.put('FINISHED')

    t = asyncio.create_task(producer())
    while True:
        item = await q.get()
        if item == 'FINISHED':
            break
        yield item

    await fetchers.wait()
    await t

with this code, you can grab 10 documents in parallel

@jignesh-crest
Copy link
Collaborator

Hey @tarekziade,
We have tried the suggestion you provided, time taken to index the docs have been reduced significantly. However, there are a few drawbacks to this approach mentioned below:

  1. The content of the document will be downloaded in each sync.
    The source class doesn't have any way through which it can implement "doit" functionality for fetching an object's content.

  2. We need to limit the queue size to stop memory bloating.

Do you think it would be possible to implement this change at the framework level so all the connectors benefit from it and leave the content extraction job at the framework level only?

@tarekziade
Copy link
Contributor Author

@jignesh-crest can you publish the PR so I can see it ? We can iterate from there.

We can make it reusable, but we need to think about how to do it, so the connector class keeps the straightforward get_docs API. I think the deferred doit feature has to be removed anyways because we want to index documents in two ways in the future:

For the first, maybe a v1 could be in the base class of the connector class

We need to limit the queue size to stop memory bloating.

For this, you can use MemQueue in utils.

@jignesh-crest
Copy link
Collaborator

Sure, I'll do it.

@tarekziade
Copy link
Contributor Author

The patch looks good in general; I added a few comments.

The content of the document will be downloaded in each sync.

Right now you are grabbing the content right away with

async def grab_content(blob):
    blob_document = await self.prepare_blob_document(blob)
    await self.queue.put((blob_document, None))

so you are returning None for lazy_download - if you change it to:

pseudo-code, untested

async def grab_content(blob):
   document = some_metadata
   ts = document['the_ts']

    async lazy(doit, es_ts):
        nonlocal blob
        if not doit:
            return
        if ts == es_ts:
           # no change
           return
       return self.prepare_blob_document(blob) 


    await self.queue.put((document, lazy))

@jignesh-crest
Copy link
Collaborator

The patch looks good in general; I added a few comments.

The content of the document will be downloaded in each sync.

Right now you are grabbing the content right away with

async def grab_content(blob):
    blob_document = await self.prepare_blob_document(blob)
    await self.queue.put((blob_document, None))

so you are returning None for lazy_download - if you change it to:

pseudo-code, untested

async def grab_content(blob):
   document = some_metadata
   ts = document['the_ts']

    async lazy(doit, es_ts):
        nonlocal blob
        if not doit:
            return
        if ts == es_ts:
           # no change
           return
       return self.prepare_blob_document(blob) 


    await self.queue.put((document, lazy))

We tried as per your suggestion, and we observed that with this approach get_content is not completely concurrent. We have to modify our code to handle lazy_download concurrently as it is responsible for content extraction.

As of now with this approach, the time taken to run the connector is very much similar to what we were getting previously without the concurrency.

What do you suggest?

@tarekziade
Copy link
Contributor Author

Right, I realize byoei.Fetcher.get_docs blocks on each lazy_download call. It needs to be changed to run lazy downloads concurrently the same way. Once it's done, the last approach should work

I will work on that change today and ping you when I have a patch.

@tarekziade
Copy link
Contributor Author

this is what I have in mind #279 -- along with this change I think this will unlock concurrency

@jignesh-crest
Copy link
Collaborator

Hey @tarekziade ,
Moving concurrency to the framework for lazy downloads looks great.
It was get_content() which was taking longer time than expected. With the new approach, the content will be fetched quickly comparatively.
With the concurrency moved to the framework I suggest removing the overhead of the queue and fetchers from the source class as listing buckets and objects doesn't take much time.
What do you suggest?

@tarekziade
Copy link
Contributor Author

Hey @tarekziade , Moving concurrency to the framework for lazy downloads looks great. It was get_content() which was taking longer time than expected. With the new approach, the content will be fetched quickly comparatively. With the concurrency moved to the framework I suggest removing the overhead of the queue and fetchers from the source class as listing buckets and objects doesn't take much time. What do you suggest?

Sure let's try like this, if you don't mind re-running the test with this. One thing we need to add before we land though, is a way for the source to give a value for the concurrency because this change will impact all sources and some might not support 10 concurrent calls. We will need to verify that GCS and other sources are working ok with this.

@tarekziade
Copy link
Contributor Author

I added tweak_bulk_options, that gets a default implementation in BaseDataSource.

A source class can use this to override the options values:

class MySource1(BaseDataSource):
    def tweak_bulk_options(self, options):
        # this source does not support concurrent download
        options['concurrent_downloads'] = 1

class MySource2(BaseDataSource):
    def tweak_bulk_options(self, options):
        # this source does support heavy concurrency
        options['concurrent_downloads'] = 100

@tarekziade
Copy link
Contributor Author

#279 was merged

@jignesh-crest
Copy link
Collaborator

Hey @tarekziade ,
We tried executing the GCS connector with the concurrency and without the concurrency at the connector level on top of your branch tarekziade/concurrentdl. The difference observed was of ~2 minutes for a total of 19743 documents.

With the concurrency moved to the framework I suggest removing the overhead of the queue and fetchers from the source class as listing buckets and objects doesn't take much time.
What do you think?

@tarekziade
Copy link
Contributor Author

Sounds good, thanks for the test

@jignesh-crest
Copy link
Collaborator

@tarekziade we're closing the draft PR #271 assuming you will close this issue as we are not going to add concurrency at the connector level

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants