Skip to content

Commit

Permalink
Merge pull request #28 from sara-nl/SPD-417
Browse files Browse the repository at this point in the history
SPD-417: add scanner and get resources from tokens
  • Loading branch information
lnauta authored Jan 14, 2025
2 parents acddc3d + d87b2e9 commit 3994721
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 34 deletions.
77 changes: 77 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,83 @@ self.iterator = EndlessViewIterator(self.iterator)

</details>

<details closed>
<summary>Autopiloting</summary>

### Automatically start your client

This example shows how to automatically start a picas client (or pilot) to process tokens from the database.
While this example explicitly shows the case of two types of tokens, that is single-core and multi-core work, you can adjust the code to:
- Run for a single view, such as your default tokens.
- Add more than the 2 views, to process as many types of tokens as you want (where type could also be GPU, high-memory, or other properties of a job).
- Add properties to the tokens in your "pushTokens" code, such as
- "gpu: 1" and start a GPU-based job
- "time: 72:00:00" and then start a job with a 3-day walltime
- "cores: 8" and start a high-memory job

This can be achieved by adjusting:
1. The view code to create all the necessary views
2. The scanner code to scan these views and submit the necessary jobs
3. The job scripts (.sh) that end up in slurm / your scheduler
4. The pilot jobs that scan the views containing the work
5. Finally, The tokens need to be available in your database

### Running the autopilot

In this example, two types of tokens are to be executed: single-core tokens and multi-core (4 cores) tokens. It is written for a slurm cluster, so the user may have to adjust the code if they want to run it elsewhere. Like the examples in the example folder, a running CouchDB instance is needed.

To run this example, first the design documents for specific resources have to be created. This is explained next, and then the execution of the autopiloting code is shown.

#### Creating custom made design documents

To select tokens based on some property in the body of the token, we want to create design documents with views that can do so.
This is already present in the `createViews.py` script. Open the script and _uncomment_ the two extra views at the bottom. Then execute:

```
python createViews.py
```

This will create two extra design documents with the same views (todo, error, done, etc.) but with the extra logic added to check for the property `doc.cores`. The documents are called `SingleCore` and `MultiCore`: one for tokens that will use 1 CPU core, and one for tokens that need 4 CPU cores (the number 4 is arbitrary).
The property in the token can be any property you want, in this case we couple it to the number of cores given to the job in slurm. The value should be set to what the job requires and then will be used at execution time.

In the database, these design docs and their views are present and can be used. To push some tokens with the `cores` propery to the database, run:

```
python pushAutoPilotExampleTokens.py
```

If you inspect the `pushAutoPilotExampleTokens.py` script, you will see that the `cores` property is added, and set to either 1 or 4 for this example.
Now we want to select the tokens that have a specific number of cores, and start a picas pilot with these cores, to execute the token body.

#### Running picas with different design documents and views.

To start scanning the different design documents, for example, to execute the work with different numbers of cores, run:

```
python core-scanner.py
```

which will default to view `SingleCore` that was created above and filters on a core count of 1. This is equivalent to running explicitly:

```
python core_scanner.py --cores 1 --design_doc SingleCore
```

To run this with multiple cores and a different design document do:

```
python core_scanner.py --cores 4 --design_doc MultiCore
```

And now your process will start the picas clients needed to evaluate your tokens. The process will check for either single-core tokens and multi-core tokens and start the jobs on the cluster: either for a job with 1 core, or a job with 4 cores, to process the different kinds of work that require differing resources. The number of cores is passed through `core_scanner.py` to sbatch.

This example can be adjusted to use any user defined design document and type of job on a cluster you need. Using different number of cores, GPUs, or other resources can now be done with specified jobs tailor made for each resource.

### Running autopilot on a schedule

To run the scanner on a schedule, one can start it using (in slurm) scrontab, as described in https://doc.spider.surfsara.nl/en/latest/Pages/workflows.html#recurring-jobs and https://slurm.schedmd.com/scrontab.html or other automation tools.

</details>

# PiCaS overview

Expand Down
3 changes: 3 additions & 0 deletions examples/N-core-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

python local_example.py --design_doc $DOC --view $VIEW
28 changes: 28 additions & 0 deletions examples/core_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-

import logging
import sys

import picasconfig
from picas.picaslogger import picaslogger
from picas.clients import CouchDB
from picas.executers import execute
from picas.util import arg_parser

picaslogger.propagate = False

# expand the parser for the example
parser = arg_parser()
parser.add_argument("--cores", default=1, type=str, help="Number of cores for the job")
parser.set_defaults(design_doc="SingleCore")
args = parser.parse_args()

client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD)

work_avail = client.is_view_nonempty(args.view, design_doc=args.design_doc)
if work_avail:
picaslogger.info(f"Starting a picas clients checking view {args.view} in design document {args.design_doc}")
command = ["sbatch", f"--cpus-per-task={args.cores}", f"--export=VIEW={args.view},DOC={args.design_doc}", "N-core-job.sh"]
execute(command)
else:
picaslogger.info(f"Not starting a picas client, there is nothing to do in view {args.view} and for the designdocs {args.design_doc}.")
61 changes: 36 additions & 25 deletions examples/createViews.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,65 @@
import picasconfig


def createViews(db):
def getViewCode(s):
# double { } are needed for formatting
generalViewCode='''
function(doc) {
if(doc.type == "token") {
if(%s) {
function(doc) {{
if(doc.type == "token") {{
if({0}) {{
emit(doc._id, doc._id);
}
}
}
}}
}}
}}
'''
return generalViewCode.format(s)

def createViews(db, design_doc_name='Monitor', logic_appendix=''):
# todo View
todoCondition = 'doc.lock == 0 && doc.done == 0'
todo_view = ViewDefinition('Monitor', 'todo', generalViewCode %(todoCondition))
todoCondition = todoCondition + logic_appendix
todo_view = ViewDefinition(design_doc_name, 'todo', getViewCode(todoCondition))
todo_view.sync(db)
# locked View
lockedCondition = 'doc.lock > 0 && doc.done == 0'
locked_view = ViewDefinition('Monitor', 'locked', generalViewCode %(lockedCondition))
lockedCondition = lockedCondition + logic_appendix
locked_view = ViewDefinition(design_doc_name, 'locked', getViewCode(lockedCondition))
locked_view.sync(db)
# done View
doneCondition = 'doc.lock > 0 && doc.done > 0 && parseInt(doc.exit_code) == 0'
done_view = ViewDefinition('Monitor', 'done', generalViewCode %(doneCondition))
doneCondition = doneCondition + logic_appendix
done_view = ViewDefinition(design_doc_name, 'done', getViewCode(doneCondition))
done_view.sync(db)
# error View
errorCondition = 'doc.lock > 0 && doc.done > 0 && parseInt(doc.exit_code) != 0'
error_view = ViewDefinition('Monitor', 'error', generalViewCode %(errorCondition))
errorCondition = errorCondition + logic_appendix
error_view = ViewDefinition(design_doc_name, 'error', getViewCode(errorCondition))
error_view.sync(db)
# overview_total View -- lists all views and the number of tokens in each view
overviewMapCode='''
function(doc) {
if(doc.type == "token") {
if (doc.lock == 0 && doc.done == 0){
overviewMapCode=f'''
function(doc) {{
if(doc.type == "token") {{
if ({todoCondition}){{
emit('todo', 1);
}
if(doc.lock > 0 && doc.done == 0) {
}}
if({lockedCondition}) {{
emit('locked', 1);
}
if(doc.lock > 0 && doc.done > 0 && parseInt(doc.exit_code) == 0) {
}}
if({doneCondition}) {{
emit('done', 1);
}
if(doc.lock > 0 && doc.done > 0 && parseInt(doc.exit_code) != 0) {
}}
if({errorCondition}) {{
emit('error', 1);
}
}
}
}}
}}
}}
'''
overviewReduceCode='''
function (key, values, rereduce) {
return sum(values);
}
'''
overview_total_view = ViewDefinition('Monitor', 'overview_total', overviewMapCode, overviewReduceCode)
overview_total_view = ViewDefinition(design_doc_name, 'overview_total', overviewMapCode, overviewReduceCode)
overview_total_view.sync(db)


Expand All @@ -83,3 +91,6 @@ def get_db():
db = get_db()
# Create the Views in database
createViews(db)
# Create the Views for the autopilot example
#createViews(db, design_doc_name='SingleCore', logic_appendix=' && doc.cores == 1')
#createViews(db, design_doc_name='MultiCore', logic_appendix=' && doc.cores == 4')
2 changes: 1 addition & 1 deletion examples/grid-example.jdl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
Arguments = "startpilot.sh";
Stdoutput = "parametricjob.out";
StdError = "parametricjob.err";
InputSandbox = {"grid-sandbox/CouchDB-1.2.tar.gz", "grid-sandbox/picas.tar", "grid-sandbox/startpilot.sh", "local-example.py", "grid-sandbox/process_task.sh", "bin/fractals", "picasconfig.py"};
InputSandbox = {"grid-sandbox/CouchDB-1.2.tar.gz", "grid-sandbox/picas.tar", "grid-sandbox/startpilot.sh", "local_example.py", "grid-sandbox/process_task.sh", "bin/fractals", "picasconfig.py"};
OutputSandbox = {"parametricjob.out", "parametricjob.err"};
]
2 changes: 1 addition & 1 deletion examples/grid-sandbox/startpilot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ chmod u+x ${JOBDIR}/process_task.sh
chmod u+x ${JOBDIR}/fractals
ls -l ${JOBDIR}

python ${JOBDIR}/local-example.py
python ${JOBDIR}/local_example.py
9 changes: 6 additions & 3 deletions examples/local-example.py → examples/local_example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'''
@helpdesk: SURF helpdesk <helpdesk@surf.nl>
usage: python local-example.py
usage: python local_example.py
description:
Connect to PiCaS server
Get the next token in todo View
Expand All @@ -11,6 +11,7 @@
Attach the logs to the token
'''

import argparse
import logging
import os
import time
Expand All @@ -23,7 +24,7 @@
from picas.iterators import TaskViewIterator
from picas.iterators import EndlessViewIterator
from picas.modifiers import BasicTokenModifier
from picas.util import Timer
from picas.util import Timer, arg_parser

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -79,13 +80,15 @@ def process_task(self, token):


def main():
# parse user arguments
args = arg_parser().parse_args()
# setup connection to db
client = CouchDB(url=picasconfig.PICAS_HOST_URL, db=picasconfig.PICAS_DATABASE, username=picasconfig.PICAS_USERNAME, password=picasconfig.PICAS_PASSWORD)
print("Connected to the database %s sucessfully. Now starting work..." %(picasconfig.PICAS_DATABASE))
# Create token modifier
modifier = BasicTokenModifier()
# Create actor
actor = ExampleActor(client, modifier)
actor = ExampleActor(client, modifier, view=args.view, design_doc=args.design_doc)
# Start work!
actor.run(max_token_time=1800, max_total_time=3600, max_tasks=10, max_scrub=2)

Expand Down
56 changes: 56 additions & 0 deletions examples/pushAutoPilotExampleTokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'''
@helpdesk: SURF helpdesk <helpdesk@surf.nl>
usage: python pushTokens.py
description:
Connects to PiCaS server
Creates three tokens, two for single-core, one for multi-core
Loads the tokens
'''

import sys
import os
import couchdb
import random
import picasconfig

def getNextIndex():
db = get_db()

index = 0
while db.get(f"token_{index}") is not None:
index+=1

return index

def loadTokens(db, ncores=1):
i = getNextIndex()

token = {
'_id': 'token_' + str(i),
'type': 'token',
'lock': 0,
'done': 0,
'hostname': '',
'scrub_count': 0,
'input': 'echo $SLURM_CPUS_PER_TASK',
'exit_code': '',
'cores': f'{ncores}'
}
db.update([token])

def get_db():
server = couchdb.Server(picasconfig.PICAS_HOST_URL)
username = picasconfig.PICAS_USERNAME
pwd = picasconfig.PICAS_PASSWORD
server.resource.credentials = (username,pwd)
db = server[picasconfig.PICAS_DATABASE]
return db

if __name__ == '__main__':
#Create a connection to the server
db = get_db()
#Load the tokens to the database
loadTokens(db, ncores=1)
loadTokens(db, ncores=1)
loadTokens(db, ncores=4)
3 changes: 2 additions & 1 deletion examples/slurm-example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# Attach the logs to the token



# You may set environmental variables needed in the SLURM job
# For example, when using the LUMI container wrapper:
# export PATH="/path/to/install_dir/bin:$PATH"
python local-example.py
python local_example.py
2 changes: 1 addition & 1 deletion examples/snakemake.jdl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
Arguments = "startpilot.sh";
Stdoutput = "parametricjob.out";
StdError = "parametricjob.err";
InputSandbox = {"grid-sandbox/CouchDB-1.2.tar.gz", "grid-sandbox/picas.tar", "grid-sandbox/startpilot.sh", "local-example.py", "grid-sandbox/process_task.sh", "picasconfig.py",
InputSandbox = {"grid-sandbox/CouchDB-1.2.tar.gz", "grid-sandbox/picas.tar", "grid-sandbox/startpilot.sh", "local_example.py", "grid-sandbox/process_task.sh", "picasconfig.py",
"grid-sandbox/Snakefile", "grid-sandbox/plot-quals.py"};
OutputSandbox = {"parametricjob.out", "parametricjob.err"};
]
29 changes: 27 additions & 2 deletions picas/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

import couchdb
from couchdb.design import ViewDefinition
from couchdb.http import ResourceConflict
from couchdb.http import ResourceConflict, ResourceNotFound

from .documents import Document
from .documents import Document, Task
from .picaslogger import picaslogger


Expand Down Expand Up @@ -225,3 +225,28 @@ def try_set(value, d, key, subkey):
try_set(member_roles, security, 'members', 'roles')

self.db.resource.put("_security", security)

def is_view_nonempty(self, view, **view_params):
"""
Database view scanner
Useful for starting pilot jobs automatically. When a view is non-empty, returns true: a pilot can be started.
:param view: database view to scan for tokens
:return: bool
"""
# To ensure proper logging when design_doc is not passed into is_view_nonempty,
# the variable is created as the default used in self.view. Otherwise the f-string below breaks on default input.
design_doc = view_params.setdefault('design_doc', "Monitor")
try:
doc = self.get_single_from_view(view, **view_params)
task = Task(doc)
picaslogger.debug(doc)
picaslogger.debug(task['input'])
picaslogger.info(f"View {view} under design document {design_doc} is non-empty.")
return True
except IndexError as e:
picaslogger.info(f"View {view} under design document {design_doc} is empty: {e}")
return False
except ResourceNotFound as e:
picaslogger.info(f"Non-existing view and design document passed: {view} in {design_doc}")
return False
Loading

0 comments on commit 3994721

Please sign in to comment.