Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved redis performance in large keyspace, added pagination, and auto-expiration #119

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,28 @@ flow.status

`reload` is needed to see the latest status, since workflows are updated asynchronously.

## Loading workflows

### Finding a workflow by id

```
flow = Workflow.find(id)
```

### Paging through workflows

To get workflows with pagination, use start and stop (inclusive) index values:

```
flows = Workflow.page(0, 99)
```

Or in reverse order:

```
flows = Workflow.page(0, 99, order: :desc)
```

## Advanced features

### Global parameters for jobs
Expand Down Expand Up @@ -449,12 +471,18 @@ end
bundle exec gush show <workflow_id>
```

- of all created workflows:
- of a page of workflows:

```
bundle exec gush list
```

- of the most recent 100 workflows

```
bundle exec gush list -99 -1
```

### Vizualizing workflows as image

This requires that you have imagemagick installed on your computer:
Expand All @@ -480,7 +508,9 @@ end

### Cleaning up afterwards

Running `NotifyWorkflow.create` inserts multiple keys into Redis every time it is ran. This data might be useful for analysis but at a certain point it can be purged via Redis TTL. By default gush and Redis will keep keys forever. To configure expiration you need to 2 things. Create initializer (specify config.ttl in seconds, be different per environment).
Running `NotifyWorkflow.create` inserts multiple keys into Redis every time it is run. This data might be useful for analysis but at a certain point it can be purged. By default gush and Redis will keep keys forever. To configure expiration you need to do two things.

1. Create an initializer that specifies `config.ttl` in seconds. Best NOT to set TTL to be too short (like minutes) but about a week in length.

```ruby
# config/initializers/gush.rb
Expand All @@ -491,7 +521,9 @@ Gush.configure do |config|
end
```

And you need to call `flow.expire!` (optionally passing custom TTL value overriding `config.ttl`). This gives you control whether to expire data for specific workflow. Best NOT to set TTL to be too short (like minutes) but about a week in length. And you can run `Client.expire_workflow` and `Client.expire_job` passing appropriate IDs and TTL (pass -1 to NOT expire) values.
2. Call `Client#expire_workflows` periodically, which will clear all expired stored workflow and job data and indexes. This method can be called at any rate, but ideally should be called at least once for every 1000 workflows created.

If you need more control over individual workflow expiration, you can call `flow.expire!(ttl)` with a TTL different from the Gush configuration, or with -1 to never expire the workflow.

### Avoid overlapping workflows

Expand All @@ -509,6 +541,19 @@ def find_by_class klass
end
```

## Gush 3.0 Migration

Gush 3.0 adds indexing for fast workflow pagination and changes the mechanism for expiring workflow data from Redis.

### Migration

Run `bundle exec gush migrate` after upgrading. This will update internal data structures.

### Expiration API

Periodically run `Gush::Client.new.expire_workflows` to expire data. Workflows will be automatically enrolled in this expiration, so there is no longer a need to call `workflow.expire!`.


## Contributors

- [Mateusz Lenik](https://github.com/mlen)
Expand Down
1 change: 1 addition & 0 deletions lib/gush.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
require "gush/configuration"
require "gush/errors"
require "gush/job"
require "gush/migration"
require "gush/worker"
require "gush/workflow"

Expand Down
29 changes: 26 additions & 3 deletions lib/gush/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ def rm(workflow_id)
client.destroy_workflow(workflow)
end

desc "list", "Lists all workflows with their statuses"
def list
workflows = client.all_workflows
desc "list START STOP", "Lists workflows from START index through STOP index with their statuses"
option :start, type: :numeric, default: nil
option :stop, type: :numeric, default: nil
def list(start=nil, stop=nil)
workflows = client.workflow_ids(start, stop).map do |id|
client.find_workflow(id)
end

rows = workflows.map do |workflow|
[workflow.id, (Time.at(workflow.started_at) if workflow.started_at), workflow.class, {alignment: :center, value: status_for(workflow)}]
end
Expand Down Expand Up @@ -120,6 +125,24 @@ def viz(class_or_id)
end
end

desc "migrate", "Runs all unapplied migrations to Gush storage"
def migrate
Dir[File.join(__dir__, 'migrate', '*.rb')].each {|file| require file }

applied = Gush::Migration.subclasses.sort(&:version).count do |klass|
migration = klass.new
next if migration.migrated?

puts "Migrating to #{klass.name} (#{migration.version})"
migration.migrate
puts "== #{migration.version} #{klass.name}: migrated ==="

true
end

puts "#{applied} #{'migrations'.pluralize(applied)} applied"
end

private

def client
Expand Down
98 changes: 93 additions & 5 deletions lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,41 @@ def next_free_workflow_id
id
end

# Returns the specified range of workflow ids, sorted by created timestamp.
#
# @param start, stop [Integer] see https://redis.io/docs/latest/commands/zrange/#index-ranges
# for details on the start and stop parameters.
# @param by_ts [Boolean] if true, start and stop are treated as timestamps
# rather than as element indexes, which allows the workflows to be indexed
# by created timestamp
# @param order [Symbol] if :asc, finds ids in ascending created timestamp;
# if :desc, finds ids in descending created timestamp
# @returns [Array<String>] array of workflow ids
def workflow_ids(start=nil, stop=nil, by_ts: false, order: :asc)
start ||= 0
stop ||= 99

redis.zrange(
"gush.idx.workflows.created_at",
start,
stop,
by_score: by_ts,
rev: order&.to_sym == :desc
)
end

def workflows(start=nil, stop=nil, **kwargs)
workflow_ids(start, stop, **kwargs).map { |id| find_workflow(id) }
end

def workflows_count
redis.zcard('gush.idx.workflows.created_at')
end

# Deprecated.
#
# This method is not performant when there are a large number of workflows
# or when the redis keyspace is large. Use workflows instead with pagination.
def all_workflows
redis.scan_each(match: "gush.workflows.*").map do |key|
id = key.sub("gush.workflows.", "")
Expand All @@ -92,7 +127,13 @@ def find_workflow(id)

unless data.nil?
hash = Gush::JSON.decode(data, symbolize_keys: true)
keys = redis.scan_each(match: "gush.jobs.#{id}.*")

if hash[:job_klasses]
keys = hash[:job_klasses].map { |klass| "gush.jobs.#{id}.#{klass}" }
else
# For backwards compatibility, get job keys via a full keyspace scan
keys = redis.scan_each(match: "gush.jobs.#{id}.*")
end

nodes = keys.each_with_object([]) do |key, array|
array.concat(redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) })
Expand All @@ -105,15 +146,25 @@ def find_workflow(id)
end

def persist_workflow(workflow)
created_at = Time.now.to_f
added = redis.zadd("gush.idx.workflows.created_at", created_at, workflow.id, nx: true)

if added && configuration.ttl&.positive?
expires_at = created_at + configuration.ttl
redis.zadd("gush.idx.workflows.expires_at", expires_at, workflow.id, nx: true)
end

redis.set("gush.workflows.#{workflow.id}", workflow.to_json)

workflow.jobs.each {|job| persist_job(workflow.id, job) }
workflow.jobs.each {|job| persist_job(workflow.id, job, expires_at: expires_at) }
workflow.mark_as_persisted

true
end

def persist_job(workflow_id, job)
def persist_job(workflow_id, job, expires_at: nil)
redis.zadd("gush.idx.jobs.expires_at", expires_at, "#{workflow_id}.#{job.klass}", nx: true) if expires_at

redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json)
end

Expand All @@ -134,22 +185,59 @@ def find_job(workflow_id, job_name)

def destroy_workflow(workflow)
redis.del("gush.workflows.#{workflow.id}")
redis.zrem("gush.idx.workflows.created_at", workflow.id)
redis.zrem("gush.idx.workflows.expires_at", workflow.id)
workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end

def destroy_job(workflow_id, job)
redis.del("gush.jobs.#{workflow_id}.#{job.klass}")
redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}")
end

def expire_workflows(expires_at=nil)
expires_at ||= Time.now.to_f

ids = redis.zrange("gush.idx.workflows.expires_at", "-inf", expires_at, by_score: true)
return if ids.empty?

redis.del(ids.map { |id| "gush.workflows.#{id}" })
redis.zrem("gush.idx.workflows.created_at", ids)
redis.zrem("gush.idx.workflows.expires_at", ids)

expire_jobs(expires_at)
end

def expire_jobs(expires_at=nil)
expires_at ||= Time.now.to_f

keys = redis.zrange("gush.idx.jobs.expires_at", "-inf", expires_at, by_score: true)
return if keys.empty?

redis.del(keys.map { |key| "gush.jobs.#{key}" })
redis.zrem("gush.idx.jobs.expires_at", keys)
end

def expire_workflow(workflow, ttl=nil)
ttl ||= configuration.ttl
redis.expire("gush.workflows.#{workflow.id}", ttl)

if ttl&.positive?
redis.zadd("gush.idx.workflows.expires_at", Time.now.to_f + ttl, workflow.id)
Copy link

@natemontgomery natemontgomery Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry a bit about using the #to_f value for this. That value is explicitly an approximation (of the rational representation), and doing math with floats is notoriously ... not fun.

We could try and do something using the integer (ie #to_i) and sub-second (ie #usec or #nsec) values themselves to avoid any approximation and float messiness down the road. The first thing that comes to mind would be some concatenated integer/BigDecimal representation. This kind of approach would even let us be explicit about the precision of our value here, which I do like. The limit being just how big a number Redis will let us put in there :)

This might be 'not worth it' right now (which is a fine answer) but I do worry we might set up some nasty surprises later with a float here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redis uses floating point numbers for sorted scores. I think the rationale for using them here is:

  1. We need the sub-second precision for ordering workflows by creation time
  2. We want the integer value to represent seconds (rather than milliseconds or something) so that clients can easily pass in a timestamp value with or without fractional seconds
  3. We want minimal friction with the redis implementation
  4. This score is solely for sorting and filtering on timestamp values, so I wouldn't anticipate the kind of problematic floating point imprecision you see with hand-entered values (like money or hours worked or something)

Copy link

@natemontgomery natemontgomery Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, the Redis side is a float regardless. The imprecision of the estimate for the Time#to_f value should be at the nanosecond (or at worst maybe 10s of nanoseconds) level so it's unlikely that we would get any ordering wonkiness unless our throughput here was extremely high (or extremely distributed etc). Even then the ordering being off may or may not be significant. Can definitely see this being a tradeoff that works well for this code.

The other concerns would really be implementation side for anyone doing some calculation based off the Gush expire_at values in their code but they should take care to check things before they do math with Time floats anyways :)

Thanks for replying!

else
redis.zrem("gush.idx.workflows.expires_at", workflow.id)
end

workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) }
end

def expire_job(workflow_id, job, ttl=nil)
ttl ||= configuration.ttl
redis.expire("gush.jobs.#{workflow_id}.#{job.klass}", ttl)

if ttl&.positive?
redis.zadd("gush.idx.jobs.expires_at", Time.now.to_f + ttl, "#{workflow_id}.#{job.klass}")
else
redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}")
end
end

def enqueue_job(workflow_id, job)
Expand Down
21 changes: 21 additions & 0 deletions lib/gush/migrate/1_create_gush_workflows_created.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module Gush
class IndexWorkflowsByCreatedAtAndExpiresAt < Gush::Migration
def self.version
1
end

def up
redis.scan_each(match: "gush.workflows.*").map do |key|
id = key.sub("gush.workflows.", "")
workflow = client.find_workflow(id)

ttl = redis.ttl(key)
redis.persist(key)
workflow.jobs.each { |job| redis.persist("gush.jobs.#{id}.#{job.klass}") }

client.persist_workflow(workflow)
client.expire_workflow(workflow, ttl.positive? ? ttl : -1)
end
end
end
end
37 changes: 37 additions & 0 deletions lib/gush/migration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
module Gush
class Migration
def migrate
return if migrated?

up
migrated!
end

def up
# subclass responsibility

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider raising a NotImplemented error for this? To guide implementation I think it might help clarify.

raise NotImplementedError
end

def version
self.class.version
end

def migrated?
redis.sismember("gush.migration.schema_migrations", version)
end

private

def migrated!
redis.sadd("gush.migration.schema_migrations", version)
end

def client
@client ||= Client.new
end

def redis
Gush::Client.redis_connection(client.configuration)
end
end
end
5 changes: 5 additions & 0 deletions lib/gush/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def self.find(id)
Gush::Client.new.find_workflow(id)
end

def self.page(start=0, stop=99, order: :asc)
Gush::Client.new.workflows(start, stop, order: order)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I do prefer to have 'all positional or all kw' args unless I have a really strong reason not to. Mixing them can be confusing as you make more contexts for usage. I think you do a good job handling the differences in the code, just a style thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mirroring the redis zrange argument style here (though one difference is that that method doesn't have default values for start and stop). I don't have a strong preference either way. I'm out on vacation at the moment, but if @pokonski also prefers the all-kwarg style I can make that change in a week or two.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a preference here, so up to you 👍

end

def self.create(*args, **kwargs)
flow = new(*args, **kwargs)
flow.save
Expand Down Expand Up @@ -185,6 +189,7 @@ def to_hash
total: jobs.count,
finished: jobs.count(&:finished?),
klass: name,
job_klasses: jobs.map(&:class).map(&:to_s).uniq,
status: status,
stopped: stopped,
started_at: started_at,
Expand Down
Loading