Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix races in conductor with dynamic agents #568

Merged
merged 3 commits into from
Nov 27, 2023

Conversation

GodTamIt
Copy link
Contributor

Description

This change fixes issues where agents being dynamically added would cause races within the conductor's internal state.

Simple Python code to reproduce
import time
import faust
import os

version = os.getenv("VERSION") or "v1"
app = faust.App(
    f"test-app-repro-bug-{version}",
    debug=True,
    autodiscover=[],
    store="rocksdb://",
    topic_partitions=4,
    topic_allow_declare=True,
    topic_disable_leader=False,
    stream_wait_empty=False,
)

source_topic = app.topic(
    "source-topic",
    internal=True,
    partitions=1,
    value_type=str,
)

in_progress = False
current = 0
@app.timer(2, on_leader=True)
async def timer_task(app):
    global current
    global in_progress
    if in_progress:
        return
    in_progress = True
    current += 1
    await create_agent_and_topic_then_wait(app, str(current), f"hello {current}")
    in_progress = False

@app.timer(5)
async def sender(app):
    ts = time.time()
    print(f"sending value: {ts}")
    await source_topic.send(key="key", value=f"value {ts}")


async def dummy_consumer(stream):
    print("starting consumer", stream)
    async for value in stream:
        print(f"consuming value: {value}")
        yield value

async def create_agent_and_topic_then_wait(app, key, value):
    print(f"creating agent and topic for {value}")
    internal_topic_1 = app.topic(
        f"internal-topic-1-{key}-{version}",
        internal=True,
        value_type=str,
    )
    await app.agent(source_topic, sinks=internal_topic_1)(dummy_consumer).start()

    internal_topic_2 = app.topic(
        f"internal-topic-2-{key}-{version}",
        internal=True,
        value_type=str,
    )
    await app.agent(internal_topic_1, sinks=internal_topic_2)(dummy_consumer).start()

    internal_topic_3 = app.topic(
        f"internal-topic-3-{key}-{version}",
        internal=True,
        value_type=str,
    )
    await app.agent(internal_topic_3)(dummy_consumer).start()

With this change, this program no longer races and crashes. Note it may have to run for a bit, since _resubscribe_sleep_lock_seconds is somewhat high.

@wbarnha
Copy link
Member

wbarnha commented Nov 21, 2023

I'll revisit this after fixing the pyproject.toml file. Thank you for the PR!

@wbarnha
Copy link
Member

wbarnha commented Nov 21, 2023

Could I get permissions to edit this PR? I'd like to update it with the main branch.

@GodTamIt
Copy link
Contributor Author

I've granted access now, and I've also rebased on top of master for your convenience.

Copy link

codecov bot commented Nov 21, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (ad8dd0e) 93.71% compared to head (413eeab) 93.72%.

Additional details and impacted files
@@           Coverage Diff           @@
##           master     #568   +/-   ##
=======================================
  Coverage   93.71%   93.72%           
=======================================
  Files         102      102           
  Lines       11156    11166   +10     
  Branches     1534     1537    +3     
=======================================
+ Hits        10455    10465   +10     
  Misses        613      613           
  Partials       88       88           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@patkivikram
Copy link
Collaborator

Do we need to modify the cython bindings as well?

@GodTamIt
Copy link
Contributor Author

Do we need to modify the cython bindings as well?

I'm not sure. Mind pointing me in the right direction for this?

@wbarnha
Copy link
Member

wbarnha commented Nov 26, 2023

Do we need to modify the cython bindings as well?

I'm not sure. Mind pointing me in the right direction for this?

Cython bindings are under https://github.com/faust-streaming/faust/tree/master/faust/_cython, but it's not immediately apparent to me if any changes need to be made there.

I'd merge this ASAP because I know it fixes an important issue, but I'd like to understand how the value for _resubscribe_sleep_lock_seconds can be fine-tuned to optimize performance.

Edit: Looks like that value has been there since 2018. I'm cautious about changing it.

@GodTamIt
Copy link
Contributor Author

Looking at the bindings, I don't believe the bindings need to be modified.

I'd merge this ASAP because I know it fixes an important issue, but I'd like to understand how the value for _resubscribe_sleep_lock_seconds can be fine-tuned to optimize performance.

Edit: Looks like that value has been there since 2018. I'm cautious about changing it.

@wbarnha I know the code quite well around this value, since I was modifying all the code that uses it.

_resubscribe_sleep_lock_seconds trades off between the latency of receiving messages for a newly added topic (usually only applies to the dynamic case) and the cost of resubscribing agents (iff there are changes to the topics list). It attempts to coalesce topic list changes that happen in quick succession and prevents the framework from constantly resubscribing to topics after every change.

If the value is set too low and an agent is adding topics very frequently, then resubscription will happen extremely often and issue unnecessary work on the async loop. If the value is set too high, it will take a long time for a newly added agent to start receiving messages (this time is bounded by the value of _resubscribe_sleep_lock_seconds, barring something hogging the async loop).

@wbarnha
Copy link
Member

wbarnha commented Nov 27, 2023

Looking at the bindings, I don't believe the bindings need to be modified.

I'd merge this ASAP because I know it fixes an important issue, but I'd like to understand how the value for _resubscribe_sleep_lock_seconds can be fine-tuned to optimize performance.
Edit: Looks like that value has been there since 2018. I'm cautious about changing it.

@wbarnha I know the code quite well around this value, since I was modifying all the code that uses it.

_resubscribe_sleep_lock_seconds trades off between the latency of receiving messages for a newly added topic (usually only applies to the dynamic case) and the cost of resubscribing agents (iff there are changes to the topics list). It attempts to coalesce topic list changes that happen in quick succession and prevents the framework from constantly resubscribing to topics after every change.

If the value is set too low and an agent is adding topics very frequently, then resubscription will happen extremely often and issue unnecessary work on the async loop. If the value is set too high, it will take a long time for a newly added agent to start receiving messages (this time is bounded by the value of _resubscribe_sleep_lock_seconds, barring something hogging the async loop).

Thanks for the thorough write-up, I think this is worthy of being placed into the documentation. I also agree that the bindings don't need to be modified.

Just to be sure there's nothing missed in the unittests, I'll test this locally on my own machine and if everything seems nominal, I'll approve this PR.

Copy link
Member

@wbarnha wbarnha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested the changes locally, everything looks good on my end. Thanks for the PR!

@GodTamIt
Copy link
Contributor Author

Thanks for the thorough write-up, I think this is worthy of being placed into the documentation. I also agree that the bindings don't need to be modified.

I added documentation around _resubscribe_sleep_lock_seconds.

In the future, we should expose this as a setting to clients of the framework so they can choose.

Just to be sure there's nothing missed in the unittests, I'll test this locally on my own machine and if everything seems nominal, I'll approve this PR.

There's nothing missing AFAICT. I couldn't find a good way to incorporate my repro code into a unit test as they currently exist.

@wbarnha wbarnha merged commit 58b18a1 into faust-streaming:master Nov 27, 2023
19 of 22 checks passed
@GodTamIt GodTamIt deleted the fix-dynamic-agents branch December 30, 2023 06:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants