Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add note about large workflows #3

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,50 @@ workflow = Workflow(
In this example, Task 2 will run roughly 1 second after Task 1 finishes, and
Task 3 and will run 2 seconds after Task 2 finishes.

### Large Workflows

Because of how `dramatiq-workflow` is implemented, each task in a workflow has
to know about the remaining tasks in the workflow that could potentially run
after it. When a workflow has a large number of tasks, this can lead to an
increase of memory usage in the broker and increased network traffic between
the broker and the workers, especially when using `Group` tasks: Each task in a
`Group` can potentially be the last one to finish, so each task has to retain a
copy of the remaining tasks that run after the `Group`.

There are a few things you can do to alleviate this issue:

- Minimize the usage of parameters in the `message` method. Instead, consider
using a database to store data that is required by your tasks.
- Limit the size of groups to a reasonable number of tasks. Instead of
scheduling one task with 1000 tasks in a group, consider scheduling 10 groups
with 100 tasks each and chaining them together.
- Consider breaking down large workflows into smaller partial workflows that
then schedule a subsequent workflow at the very end of the outermost `Chain`.

Lastly, you can use compression to reduce the size of the messages in your
queue. While dramatiq does not provide a compression implementation by default,
one can be added with just a few lines of code. For example:

```python
import dramatiq
from dramatiq.encoder import JSONEncoder, MessageData
import lz4.frame

class DramatiqLz4JSONEncoder(JSONEncoder):
def encode(self, data: MessageData) -> bytes:
return lz4.frame.compress(super().encode(data))

def decode(self, data: bytes) -> MessageData:
try:
decompressed = lz4.frame.decompress(data)
except RuntimeError:
# Uncompressed data from before the switch to lz4
decompressed = data
return super().decode(decompressed)

dramatiq.set_encoder(DramatiqLz4JSONEncoder())
```

## License

This project is licensed under the MIT License. See the [LICENSE](LICENSE) file
Expand Down
Loading