-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement concurrent GCS reads #256
Comments
Sure, I'll check and get back to you. |
Hey, I tried executing connector following your suggestion to include a concurrent task for the get_blob_document, and here are my findings, The caller method(prepare_docs) from byoc:419 expect the return value as a single dictionary, hence get_doc must yield a single dictionary, if I execute get_blob_document in a concurrent task, I would have to pull out and yield the resulted documents one by one, the throughput of this method would come as same as the current approach. The bottleneck is that the caller method(prepare_docs) expects a single dictionary as a return value. Let us know if anything needs to be changed. We feel that applying concurrency over get_blob_document will not make much difference as it has no I/O bound and very less CPU bound operations( just the doc mapping), the connector will be more faster if we can apply concurrency over the get_content method for content extraction. What do you think? |
I had in mind a different approach. You can think of it a bit differently:
untested pseudo-code: def get_docs():
q = asyncio.Queue()
fetchers = ConcurrentTasks(10)
async def grab_content(blob_document):
content = await self.get_content(blob_document)
await q.put((blob_document, content))
async def producer():
async for blobs in self.fetch_blobs():
for blob_document in self.get_blob_document(blobs=blobs)
await fetchers.put(grab_content(blob_document))
q.put('FINISHED')
t = asyncio.create_task(producer())
while True:
item = await q.get()
if item == 'FINISHED':
break
yield item
await fetchers.wait()
await t with this code, you can grab 10 documents in parallel |
Hey @tarekziade,
Do you think it would be possible to implement this change at the framework level so all the connectors benefit from it and leave the content extraction job at the framework level only? |
@jignesh-crest can you publish the PR so I can see it ? We can iterate from there. We can make it reusable, but we need to think about how to do it, so the connector class keeps the straightforward
For the first, maybe a v1 could be in the base class of the connector class
For this, you can use |
Sure, I'll do it. |
The patch looks good in general; I added a few comments.
Right now you are grabbing the content right away with
so you are returning pseudo-code, untested
|
We tried as per your suggestion, and we observed that with this approach get_content is not completely concurrent. We have to modify our code to handle lazy_download concurrently as it is responsible for content extraction. As of now with this approach, the time taken to run the connector is very much similar to what we were getting previously without the concurrency. What do you suggest? |
Right, I realize I will work on that change today and ping you when I have a patch. |
this is what I have in mind #279 -- along with this change I think this will unlock concurrency |
Hey @tarekziade , |
Sure let's try like this, if you don't mind re-running the test with this. One thing we need to add before we land though, is a way for the source to give a value for the concurrency because this change will impact all sources and some might not support 10 concurrent calls. We will need to verify that GCS and other sources are working ok with this. |
I added A source class can use this to override the options values:
|
#279 was merged |
Hey @tarekziade , With the concurrency moved to the framework I suggest removing the overhead of the queue and fetchers from the source class as listing buckets and objects doesn't take much time. |
Sounds good, thanks for the test |
@tarekziade we're closing the draft PR #271 assuming you will close this issue as we are not going to add concurrency at the connector level |
right now the google connector reads all documents in a sequence:
https://github.com/elastic/connectors-python/blob/main/connectors/sources/gcs.py#L345
we can push concurrent
self.get_blob_document
calls using aConcurrentTasks
instance https://github.com/elastic/connectors-python/blob/main/connectors/utils.py#L342The text was updated successfully, but these errors were encountered: