Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for RoundRobinAssignor #511

Open
2 tasks done
alexakra opened this issue May 27, 2023 · 9 comments
Open
2 tasks done

Add support for RoundRobinAssignor #511

alexakra opened this issue May 27, 2023 · 9 comments
Labels
documentation Improvements or additions to documentation enhancement New feature or request

Comments

@alexakra
Copy link

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Expected behavior

I would like to use RoundRobinAssignor to distribute available partitions evenly across all consumers.

Actual behavior

Today Faust groups the same partition numbers onto the same consumers for all topics with the same number of partitions.

Versions

  • Python version: 3.10
  • Faust version: 0.10.1
  • Operating system: Debian GNU/Linux 11 (bullseye)
  • Kafka version: 3.4
@wbarnha
Copy link
Member

wbarnha commented May 27, 2023

Faust can support using aiokafka's RoundRobinAssignor ! We have some cases where it uses that by default for cases such as #402.

Try setting app.assignor=RoundRobinPartitionAssignor on startup.

@wbarnha wbarnha added the documentation Improvements or additions to documentation label May 28, 2023
@alexakra
Copy link
Author

It is failing:
[ERROR] [^-App]: Crashed reason=AttributeError("type object 'RoundRobinPartitionAssignor' has no attribute 'assigned_standbys'")
I think that there is no assignor field on App.

@wbarnha wbarnha added the enhancement New feature or request label May 30, 2023
@tynianovddi
Copy link

same issue here

@dada-engineer
Copy link
Collaborator

Have you tried setting PartitionAssignor when initializing the app? https://faust-streaming.github.io/faust/userguide/settings.html#partitionassignor

@alexakra
Copy link
Author

alexakra commented Nov 6, 2023

Have you tried setting PartitionAssignor when initializing the app? https://faust-streaming.github.io/faust/userguide/settings.html#partitionassignor

Still failing:
app = App(..., PartitionAssignor=RoundRobinPartitionAssignor)

File "../lib/python3.11/site-packages/mode/services.py", line 807, in _default_start
await self._actually_start()
File "../lib/python3.11/site-packages/mode/services.py", line 824, in _actually_start
await self.on_start()
File "../lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 478, in on_start
self._consumer = self._create_consumer(loop=self.thread_loop)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "../lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 497, in _create_consumer
return self._create_worker_consumer(transport)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "../lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 507, in _create_worker_consumer
self.app.assignor
File "../lib/python3.11/site-packages/mode/utils/objects.py", line 659, in get
value = obj.dict[self.name] = self.__get(obj)
^^^^^^^^^^^^^^^
File "../lib/python3.11/site-packages/faust/app/base.py", line 2069, in assignor
assignor = self.conf.PartitionAssignor( # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: RoundRobinPartitionAssignor() takes no arguments

@dada-engineer
Copy link
Collaborator

This is weird as faust is also setting this assignor in a case where table standby replicas is set to zero. If you do not need them can you try to set the setting TABLE_STANDBY_REPLICAS to zero?

If not you would need to build your own class that implements the interface of fausts PartitionAssignorT type

@alexakra
Copy link
Author

alexakra commented Nov 6, 2023

I did and it works. But I opened this issue to solve it properly and not looking for a workaround because I cannot rely on it.

@dada-engineer
Copy link
Collaborator

I do not see an issue. faust says if you want to assign a PartitionAssignor your own you need to provide a PartitonAssignorT compatible class. RoundRobinPartitioner clearly isn't as it has no init function accepting the provided args of faust.

Whats the issue in summary then? 🤔

@alexakra
Copy link
Author

alexakra commented Nov 6, 2023

My thoughts:

  1. Faust internally uses RoundRobinPartitioner which is not PartitonAssignorT compatible class. Why is it so?
  2. This is a common use case where many would like to use the standard implementation. I don't need a custom one, so there is no need that each of us will implement custom RoundRobinPartitioner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants