Pampa is a Ruby library for async & distributed computing, providing the following features:
- cluster-management with dynamic reconfiguration (joining and leaving nodes);
- distribution of the computation jobs to the (active) nodes;
- error handling, job-retry, and fault tolerance;
- fast (non-direct) communication to ensure real-time capabilities.
The Pampa framework may be widely used for:
- large scale web scraping with what we call a "bot-farm";
- payments processing for large-scale eCommerce websites;
- reports generation for highly demanded SaaS platforms;
- heavy mathematical model computing;
and any other tasks that require a virtually infinite amount of CPU computing and memory resources.
As a final words, Pampa supports PostrgreSQL and CockroachDB, and it has been tested on Ubuntu 18.04 and Ruby 3.1.2p20.
Outline
- Installation
- Getting Started
- Define Your Cluster
- Define a Job
- Setup Your Database
- Connect To Your Database
- Running Dispatcher
- Running Workers
- Selection Snippet
- Relaunching Snippet
- Elastic Workers Assignation
- Reporting
- Stand Alone Processes
gem install pampa
This example set up a cluster of workers to process all the numbers from 1 to 100,000; and build the list of odd numbers inside such a range.
Create a new file ~/config.rb
where you will define your cluster, jobs, database connection and logging.
touch ~/config.rb
Additionally, you may want to add the path ~
to your the environment varaible RUBYLIB
, in order to require the config.rb
file from your Ruby code.
export RUBYLIB=~
As a first step, you have to define a cluster of workers.
-
A cluster is composed by one or more nodes (computers).
-
Each worker is a process running on a node.
The code below define your computer as a node of 10 workers.
Add this code to your config.rb
file:
# setup one or more nodes (computers) where to launch worker processes
BlackStack::Pampa.add_nodes(
[
{
:name => 'local',
# setup SSH connection parameters
:net_remote_ip => '127.0.0.1',
:ssh_username => '<your ssh username>', # example: root
:ssh_port => 22,
:ssh_password => '<your ssh password>',
# setup max number of worker processes
:max_workers => 2,
},
]
)
A job is a sequence of tasks.
Each task performs the same function on different records of a table.
The code below is for processing all records into a table numbers
, and update the field is_odd
with true
or false
.
Add this code to your config.rb
file:
# setup the cluster
BlackStack::Pampa.add_job({
:name => 'search_odd_numbers',
# any worker can be assigned for processing this job.
:filter_worker_id => /.*/,
# no more than 5 workers can be assigned for processing this job.
:max_assigned_workers => 5,
# add more workers if the number of pending tasks is over 5.
:max_pending_tasks => 500,
# Minimum number of tasks that a worker must have in queue.
# Default: 5
:queue_size => 5,
# Maximum number of minutes that a task should take to process.
# If a tasks didn't finish X minutes after it started, it is restarted and assigned to another worker.
# Default: 15
:max_job_duration_minutes => 15,
# Maximum number of times that a task can be restarted.
# Default: 3
:max_try_times => 3,
# Define the tasks table: each record is a task.
# The tasks table must have some specific fields for handling the tasks dispatching.
:table => :numbers, # Note, that we are sending a class object here
:field_primary_key => :value,
:field_id => :odd_checking_reservation_id,
:field_time => :odd_checking_reservation_time,
:field_times => :odd_checking_reservation_times,
:field_start_time => :odd_checking_start_time,
:field_end_time => :odd_checking_end_time,
:field_success => :odd_checking_success,
:field_error_description => :odd_checking_error_description,
# Function to execute for each task.
:processing_function => Proc.new do |task, l, job, worker, *args|
l.logs 'Checking if '+task[:value].to_s+' is odd... '
if task[:value] % 2 == 0
task[:is_odd] = false
l.logf 'No.'.red
else
task[:is_odd] = true
l.logf 'Yes.'.green
end
DB[:numbers].where(:value=>task[:value]).update(:is_odd=>task[:is_odd])
end
})
Obviously, you have to create the table in your database. And you have to insert some seed data too.
Here is the PostgreSQL script you have to run the example in this tutorial.
In order to operate with the table numbers
, you have to connect Pampa to your database.
Add this code to your config.rb
file:
# DB ACCESS - KEEP IT SECRET
# Connection string to the demo database: export DATABASE_URL='postgresql://demo:<ENTER-SQL-USER-PASSWORD>@free-tier14.aws-us-east-1.cockroachlabs.cloud:26257/mysaas?sslmode=verify-full&options=--cluster%3Dmysaas-demo-6448'
BlackStack::PostgreSQL::set_db_params({
:db_url => '89.116.25.250', # n04
:db_port => '5432',
:db_name => 'micro.data',
:db_user => 'blackstack',
:db_password => '*****',
:db_sslmode => 'disable',
})
The dispatcher will run an infinite loop, assigning tasks to each worker at each iteration of such a loop.
Step 1: Create your dispatcher.rb
script-file.
touch ~/dispatcher.rb
Step 2: Write this code into your dispatcher.rb
file.
require 'pampa/dispatcher'
Step 3: Run the dispatcher.
Run the command below on your local
node in order to run your worker.
export RUBYLIB=~/
ruby ~/dispatcher.rb
Parameters:
- delay: You may set the delay in seconds between iterations:
ruby ~/dispatcher.rb delay=30
- config: Be default,
dispatcher.rb
willrequire 'config.rb'
. That is why you have to executeexport RUBYLIB=~/
before running the dispatcher. Though, you can define a custom location of the configuration file too.
ruby ~/dispatcher.rb config=~/foo/config.rb
- db: Use this parameters to choose a database driver. The supported values are:
postgres
,crdb
. By default, it is:postgres
.
ruby ~/dispatcher.rb db=crdb
- log: Use this parameter to indicate the process to write the log in the file
./dispatcher.log
or not. The default value isyes
.
ruby ~/dispatcher.rb log=no
Code Snippets:
You can define a code to be executed right after dispatcher connected to the database.
Add a code like this in your config.rb
file.
# Pampa Code Snippets
BlackStack::Pampa.set_snippets({
:dispatcher_function => Proc.new do |l, *args|
require 'my-project/model'
end,
})
You can also pass a logger to the snippet.
# Pampa Code Snippets
BlackStack::Pampa.set_snippets({
:dispatcher_function => Proc.new do |l, *args|
l = BlackStack::DummyLogger.new(nil) if l.nil?
l.logs 'Loading model.. '
require 'my-project/model'
l.logf 'done'.green
end,
})
The worker will run an infineet loop, processing all assigned tasks at each iteration of such a loop.
Step 1: Create a new file worker.rb
file.
touch ~/worker.rb
Step 2: Write this code into your worker.rb
file.
require 'pampa/worker'
Step 3: Run a worker.
Run the command below on your local
node in order to run your worker.
export RUBYLIB=~/
ruby ~/worker.rb id=localhost.1
Parameters:
- delay: You may set the delay in seconds between iterations:
ruby ~/worker.rb id=localhost.1 delay=30
- config: Be default,
worker.rb
willrequire 'config.rb'
. That is why you have to executeexport RUBYLIB=~/
before running the dispatcher. Though, you can define a custom location of the configuration file too.
ruby ~/worker.rb id=localhost.1 config=~/foo/config.rb
- db: Use this parameters to choose a database driver. The supported values are:
postgres
,crdb
. By default, it is:postgres
.
ruby ~/worker.rb id=localhost.1 db=crdb
- log: Use this parameter to indicate the process to write the log in the file
./worker.#{id}.log
or not. The default value isyes
.
ruby ~/worker.rb id=localhost.1 log=no
Code Snippets:
You can define a code to be executed right after worker connected to the database.
Add a code like this in your config.rb
file.
# Pampa Code Snippets
BlackStack::Pampa.set_snippets({
:dispatcher_function => Proc.new do |l, *args|
initialize_scraper()
end,
})
You can also pass a logger to the snippet.
# Pampa Code Snippets
BlackStack::Pampa.set_snippets({
:dispatcher_function => Proc.new do |l, *args|
l = BlackStack::DummyLogger.new(nil) if l.nil?
l.logs 'Initializing scraper... '
initialize_scraper()
l.logf 'done'.green
end,
})
You can re-write the default function used by the dispatcher to choose the records it will assign to the workers.
Example: You want to dispatch the records in the table numbers
, but sorted by value
reversely.
# define the job
BlackStack::Pampa.add_job({
:name => 'search_odd_numbers',
:queue_size => 5,
:max_job_duration_minutes => 15,
:max_try_times => 3,
:table => :numbers, # Note, that we are sending a class object here
:field_primary_key => :value,
:field_id => :odd_checking_reservation_id,
:field_time => :odd_checking_reservation_time,
:field_times => :odd_checking_reservation_times,
:field_start_time => :odd_checking_start_time,
:field_end_time => :odd_checking_end_time,
:field_success => :odd_checking_success,
:field_error_description => :odd_checking_error_description,
:filter_worker_id => /\.1$/, # only worker number 1 will receive tasks of this job.
:max_pending_tasks => 10,
:max_assigned_workers => 5,
:processing_function => Proc.new do |task, l, job, worker, *args|
l.logs 'Checking if '+task[:value].to_s.blue+' is odd... '
if task[:value] % 2 == 0
task[:is_odd] = false
l.logf 'No.'.yellow
else
task[:is_odd] = true
l.logf 'Yes.'.green
end
DB[:numbers].where(:value=>task[:value]).update(:is_odd=>task[:is_odd])
end,
# write a snippet for selecting records to dispatch.
:selecting_function => Proc.new do |n, *args|
DB["
SELECT *
FROM numbers
WHERE odd_checking_reservation_id IS NULL -- record not reserved yet
AND odd_checking_start_time IS NULL -- record is not pending to relaunch
AND COALESCE(odd_checking_reservation_times,0) < 3 -- record didn't fail more than 3 times
ORDER BY value DESC -- I want to order by number reversely
LIMIT #{n} -- don't dispatch more than n records at the time
"].all
end,
})
Example: You define another job to submit the odd numbers to a server.
# define the job
BlackStack::Pampa.add_job({
:name => 'submit_odd_numbers',
:queue_size => 5,
:max_job_duration_minutes => 15,
:max_try_times => 3,
:table => :numbers, # Note, that we are sending a class object here
:field_primary_key => :value,
:field_id => :submit_odd_reservation_id,
:field_time => :submit_odd_reservation_time,
:field_times => :submit_odd_reservation_times,
:field_start_time => :submit_odd_start_time,
:field_end_time => :submit_odd_end_time,
:field_success => :submit_odd_success,
:field_error_description => :submit_odd_error_description,
:filter_worker_id => /\.2/,
:max_pending_tasks => 10,
:max_assigned_workers => 5,
:processing_function => Proc.new do |task, l, job, worker, *args|
# TODO: Run a post call here, to subit the record.
end,
# write a snippet for selecting records to dispatch.
:selecting_function => Proc.new do |n, *args|
DB["
SELECT *
FROM numbers
WHERE submit_odd_reservation_id IS NULL -- record not reserved yet
AND submit_odd_start_time IS NULL -- record is not pending to relaunch
AND COALESCE(submit_odd_reservation_times,0) < 3 -- record didn't fail more than 3 times
AND odd_checking_end_time IS NOT NULL -- record that have been checked
AND COALESCE(odd_checking_success, FALSE) = TRUE -- record that have been checked successfully
AND COALESCE(id_odd, FALSE) = TRUE -- only submit odd values
ORDER BY odd_checking_end_time DESC -- submit records in the order they have been checked
LIMIT #{n} -- don't dispatch more than n records at the time
"].all
end,
})
Other Examples:
- You may want to deliver emails of active email campaigns only (
active=true
). - You may want to process orders in the order they have been created (
order by create_time
). - You may want to add a delay of 1 day from the moment a new user signed up and he/she receives an email notification.
Use :relaunching_function
to write your own snippet code that will choose the records you want to relaunch.
Example: You may want to check if each number is still odd every 10 minutes, because you are afraid the laws of the universe have suddenly changed
# define the job
BlackStack::Pampa.add_job({
:name => 'search_odd_numbers',
:queue_size => 5,
:max_job_duration_minutes => 15,
:max_try_times => 3,
:table => :numbers, # Note, that we are sending a class object here
:field_primary_key => :value,
:field_id => :odd_checking_reservation_id,
:field_time => :odd_checking_reservation_time,
:field_times => :odd_checking_reservation_times,
:field_start_time => :odd_checking_start_time,
:field_end_time => :odd_checking_end_time,
:field_success => :odd_checking_success,
:field_error_description => :odd_checking_error_description,
:filter_worker_id => /\.1$/, # only worker number 1 will receive tasks of this job.
:max_pending_tasks => 10,
:max_assigned_workers => 5,
:processing_function => Proc.new do |task, l, job, worker, *args|
l.logs 'Checking if '+task[:value].to_s.blue+' is odd... '
if task[:value] % 2 == 0
task[:is_odd] = false
l.logf 'No.'.yellow
else
task[:is_odd] = true
l.logf 'Yes.'.green
end
DB[:numbers].where(:value=>task[:value]).update(:is_odd=>task[:is_odd])
end,
# you want to check if each number is still odd every 10 minutes,
# because you are afraid the laws of the universe have suddenly changed.
:relaunching_function => Proc.new do |n, *args|
DB["
SELECT *
FROM numbers
WHERE odd_checking_end_time IS NOT NULL -- record that have been checked
AND odd_checking_end_time < CAST('#{now}' AS TIMESTAMP) - INTERVAL '10 MINUTES' -- checked 10 minutes ago
LIMIT #{n} -- don't relaunch more than n records at the time
"].all
end,
})
Other Examples:
- Every 5 minutes you want to check if a list of websites are online.
- You want to trace the CPU usage of a pool of servers.
- You want to keep mirroring infromation between databases.
The dispatcher process not only assign tasks to workers, but it also assign and unassign workers to each job, depending on the number of task in queue for such a job.
When you define a job,
-
use the parameter
:max_pending_tasks
to tell the dispatcher when it should assign more workers for this job. If the number of pending task is higer than:max_pending_tasks
, the dispatcher will scale the number of assigned workers; -
but also use the parameter
:max_assigned_workers
to prevent your job to monopolize the entire pool of workers and get in the way of other jobs if its number of pending tasks raises so high;
and finally,
- you can use the
:filter_worker_id
to define a regular expession to filter the workers that may be assigned for your job. E.g.: Use/.*/
to allow any worker to be assigned, or use/\.1$/
to allow the first worker in each node to be assigned only.
You can get the number of
- total tasks,
- completed task,
- pending tasks, and
- failed tasks;
calling the methods shown below.
j = BlackStack::Pampa.jobs.first
p j.name
p j.total.to_label
p j.completed.to_label
p j.pending.to_label
p j.failed.to_label
If you wrote snippets for either selecting or relaunching records, you may need to write sneeppets for the reporting methods too, in order to make their numbers congruent.
# define the job
BlackStack::Pampa.add_job({
:name => 'search_odd_numbers',
# ...
:total_function => Proc.new do |*args|
# TODO: return a number here
end,
:completed_function => Proc.new do |*args|
# TODO: return a number here
end,
:pending_function => Proc.new do |*args|
# TODO: return a number here
end,
:completed_function => Proc.new do |*args|
# TODO: return a number here
end,
})
The logo has been taken from here.
Use this library at your own risk.
We use SemVer for versioning. For the versions available, see the last ruby gem.
- Leandro Daniel Sardi - Initial work - LeandroSardi
This project is licensed under the MIT License - see the LICENSE.md file for details.
The :occupied_function
function returns an array with the pending tasks in queue for a worker.
The default function returns all the tasks with :field_id
equal to the name of the worker, and the :field_start_time
empty.
You can setup a custom version of this function.
Example: You may want to sort ....
additional function to decide how many records are pending for processing it should returns an integer keep it nil if you want to run the default function
(this feature is pending to develop)
additional function to decide if the worker can dispatch or not example: use this function when you want to decide based on the remaining credits of the client it should returns true or false keep it nil if you want it returns always true
(this feature is pending to develop)
(this feature is pending to develop)
(this feature is pending to develop)
# setup nodes (computers) where to launch
n = BlackStack::Pampa.add_nodes([{
# setup SSH connection parameters
:net_remote_ip => '54.160.137.218',
:ssh_username => 'ubuntu',
:ssh_port => 22,
:ssh_private_key_file => './plank.pem',
# setup max number of worker processes
:max_workers => 2,
# setup max memory consumption per worker (MBs)
:max_ram => 512,
# setup max CPU usage per worker (%)
:max_cpu => 10,
# setup max disk I/O per worker (mbps)
:max_disk_io => 1000,
# setup max ethernet I/O per worker (mbps)
:max_net_io => 1000,
}])
Pampa provides a standard framework for running stand-alone processes too.
Use the function run_stand_alone
for processing recursivelly, every some seconds.
Example:
require 'pampa'
DELAY = 10 # seconds
ONCE = false # if false, execute this process every DELAY seconds
BlackStack::Pampa.run_stand_alone({
:log_filename => 'example.log',
:delay => DELAY,
:run_once => ONCE,
:function => Proc.new do |l, *args|
begin
l.log 'Hello, World!'
l.log "1. Executing this process recursively, with a timeframe of #{DELAY} seconds between the starting of each one."
l.log "2. If an execution takes more than #{DELAY} seconds, the next execution will start immediatelly."
l.log "3. If an error occurs, the process will stop. So be sure to catch all exceptions here."
# your code here
rescue => e
l.logf "Error: #{e.to_console.red}"
# CTRL+C will be catched here
rescue Interrupt => e
l.logf "Interrupted".red
exit(0)
ensure
l.log "End of the process."
end
end,
})