-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prototype for batch Python interface #4937
Conversation
This is really great. I have some thoughts below, mostly brain storming. Don't take any of it too seriously. Some thoughts:
I don't have a no-brainer suggestion. Happy to brainstorm ideas offline. I think ultimately this is a minor syntactic choice.
Then the question becomes, how do associate
I wonder, will we ever want arrays to be formatted other than joined with spaces? I worry the user will want more flexibility in formatting, and we'll want that in Python. What about if the argument is a function, it takes a dictionary from resource names to their string representation, and you can format however you want? Then you could write the last command as:
What's the left hand side? Why isn't this just This suggests another issue: what if you want to use |
All of these things I completely agree with!
|
@cseed I don't really like how this looks with lists as the inputs to the commands. To me, it's much harder to read and modify. I like the commands looking as much like writing a shell script as possible. Maybe others feel differently though... Also, you can't do something like this from pyapi import Pipeline
p = Pipeline()
bfile_root = 'gs://jigold/input'
bed = bfile_root + '.bed'
bim = bfile_root + '.bim'
fam = bfile_root + '.fam'
p.write_input(bed=bed, bim=bim, fam=fam)
subset = p.new_task()
subset = (subset
.label('subset')
.command(['plink', '--bed', p.bed, '--bim', p.bim, '--fam', p.fam, '--make-bed', '--out', subset.tmp1])
.command(['awk', "'{ print $1, $2}'", subset.tmp1 + '.fam', "| sort | uniq -c | awk '{ if ($1 != 1) print $2, $3 }'",
'>', subset.tmp2])
.command(['plink', '--bed', p.bed, '--bim', p.bim, '--fam', p.fam, '--remove', subset.tmp2,
'--make-bed', '--out', subset.tmp2]))
shapeit_tasks = []
for contig in [str(x) for x in range(1, 4)]:
shapeit = p.new_task()
shapeit = (shapeit
.label('shapeit')
.command(['shapeit', '--bed-file', subset.ofile, '--chr ', contig, '--out', shapeit.ofile]))
shapeit_tasks.append(shapeit)
merger = p.new_task()
merger = (merger
.label('merge')
.command(['cat', ' '.join([task.ofile for task in shapeit_tasks]), '>>', merger.ofile))
p.write_output(merger.ofile + ".haps", "gs://jigold/final_output.txt")
p.run() |
should this have an assigned reviewer? |
@cseed Sorry if this doesn't make sense -- we can discuss in person. Here's my attempt to fix the problems outlined above. The tradeoff made is that we have to refer to the object (ex: I tried hacking the Python AST to not have to refer to the object, but I think it's going to be difficult to get the AST parsing exactly right and not have too many implicit rules within our language. I also considered writing a DSL, but found that it's hard to specify the part with the from pyapi import Pipeline, resource_group
p = Pipeline()
input_bfile = p.new_resource_group(bed="gs://jigold/input_root.bed",
bim="gs://jigold/input_root.bim",
fam="gs://jigold/input_root.fam")
def bfile(root):
return resource_group(root, lambda x: {"bed": x + ".bed", "bim": x + ".bim", "fam": x + ".fam"})
subset = (p.new_task()
.label('subset'))
subset = subset
.command(f'plink --bfile {input_bfile} --make-bed {bfile(subset.tmp1)}')
.command("awk '{print $1, $2}'" +
subset.tmp1.fam +
" | sort | uniq -c | awk '{ if ($1 != 1) print $2, $3 }' > " +
subset.tmp2)
.command(f"plink --bfile {input_bfile} --remove {subset.tmp2} --make-bed {bfile(subset.ofile)}"))
def shapeit_output(root):
return resource_group(root, lambda x: {"haps": x + ".haps", "log": x + ".log"})
for contig in [str(x) for x in range(1, 4)]:
shapeit = (p.new_task()
.label('shapeit'))
shapeit = (shapeit
.command(f'shapeit --bed-file {subset.ofile} --chr {contig} --out {shapeit_output(shapeit.ofile)}'))
merger = (p.new_task()
.label('merge'))
merger = (merger
.command('cat {files} >> {ofile}'.format(files=" ".join([task.ofile.haps for task in p.select_tasks("shapeit")]), ofile=merger.ofile))
p.write_output(merger.ofile, "gs://jigold/final_output.txt")
p.run() |
@cseed I'm really happy with the interface now! Could you please look over this again and let me know if there are any suggestions you have before I write some tests and give this to someone to code review. I also called this from pyapi import Pipeline, resource_group_builder
p = Pipeline() # initialize a pipeline
# Define resource group builders (used with `declare_resource_group`)
rgb_bfile = resource_group_builder(bed="{root}.bed",
bim="{root}.bim",
fam="{root}.fam")
rgb_shapeit = resource_group_builder(haps="{root}.haps",
log="{root}.log")
# Import a file as a resource
file = p.write_input('gs://hail-jigold/random_file.txt')
# Import a set of input files as a resource group
input_bfile = p.write_input_group(bed='gs://hail-jigold/input.bed',
bim='gs://hail-jigold/input.bim',
fam='gs://hail-jigold/input.fam')
# Remove duplicate samples from a PLINK dataset
subset = p.new_task()
subset = (subset
.label('subset')
.declare_resource_group(tmp1=rgb_bfile, ofile=rgb_bfile)
.command(f'plink --bfile {input_bfile} --make-bed {subset.tmp1}')
.command(f"awk '{{ print $1, $2}}' {subset.tmp1.fam} | sort | uniq -c | awk '{{ if ($1 != 1) print $2, $3 }}' > {subset.tmp2}")
.command(f"plink --bed {input_bfile.bed} --bim {input_bfile.bim} --fam {input_bfile.fam} --remove {subset.tmp2} --make-bed {subset.ofile}"
))
# Run shapeit for each contig from 1-3 with the output from subset
for contig in [str(x) for x in range(1, 4)]:
shapeit = p.new_task()
shapeit = (shapeit
.label('shapeit')
.declare_resource_group(ofile=rgb_shapeit)
.command(f'shapeit --bed-file {subset.ofile} --chr {contig} --out {shapeit.ofile}'))
# Merge the shapeit output files together
merger = p.new_task()
merger = (merger
.label('merge')
.command('cat {files} >> {ofile}'.format(files=" ".join([t.ofile.haps for t in p.select_tasks('shapeit')]),
ofile=merger.ofile)))
# Write the result of the merger to a permanent location
p.write_output(merger.ofile, "gs://jigold/final_output.txt")
# Execute the pipeline
p.run() #! /usr/bash
set -ex
# define tmp directory
__TMP_DIR__=/tmp//pipeline.yG41vqpS/
# __TASK__0 write_input
cp gs://hail-jigold/random_file.txt ${__TMP_DIR__}/rsfKylng
# __TASK__1 write_input
cp gs://hail-jigold/input.bed ${__TMP_DIR__}/xJONBVn7.bed
# __TASK__2 write_input
cp gs://hail-jigold/input.bim ${__TMP_DIR__}/xJONBVn7.bim
# __TASK__3 write_input
cp gs://hail-jigold/input.fam ${__TMP_DIR__}/xJONBVn7.fam
# __TASK__4 subset
__RESOURCE_GROUP__0=${__TMP_DIR__}/xJONBVn7
__RESOURCE_GROUP__1=${__TMP_DIR__}/TB7ZUbj8
__RESOURCE__6=${__TMP_DIR__}/TB7ZUbj8.fam
__RESOURCE__10=${__TMP_DIR__}/EVeRHf7V
__RESOURCE__1=${__TMP_DIR__}/xJONBVn7.bed
__RESOURCE__2=${__TMP_DIR__}/xJONBVn7.bim
__RESOURCE__3=${__TMP_DIR__}/xJONBVn7.fam
__RESOURCE_GROUP__2=${__TMP_DIR__}/MXBQugBx
plink --bfile ${__RESOURCE_GROUP__0} --make-bed ${__RESOURCE_GROUP__1}
awk '{ print $1, $2}' ${__RESOURCE__6} | sort | uniq -c | awk '{ if ($1 != 1) print $2, $3 }' > ${__RESOURCE__10}
plink --bed ${__RESOURCE__1} --bim ${__RESOURCE__2} --fam ${__RESOURCE__3} --remove ${__RESOURCE__10} --make-bed ${__RESOURCE_GROUP__2}
# __TASK__5 shapeit
__RESOURCE_GROUP__2=${__TMP_DIR__}/MXBQugBx
__RESOURCE_GROUP__3=${__TMP_DIR__}/YSm1XkKf
shapeit --bed-file ${__RESOURCE_GROUP__2} --chr 1 --out ${__RESOURCE_GROUP__3}
# __TASK__6 shapeit
__RESOURCE_GROUP__2=${__TMP_DIR__}/MXBQugBx
__RESOURCE_GROUP__4=${__TMP_DIR__}/1HyBvsdN
shapeit --bed-file ${__RESOURCE_GROUP__2} --chr 2 --out ${__RESOURCE_GROUP__4}
# __TASK__7 shapeit
__RESOURCE_GROUP__2=${__TMP_DIR__}/MXBQugBx
__RESOURCE_GROUP__5=${__TMP_DIR__}/jtM69Ahm
shapeit --bed-file ${__RESOURCE_GROUP__2} --chr 3 --out ${__RESOURCE_GROUP__5}
# __TASK__8 merge
__RESOURCE__11=${__TMP_DIR__}/YSm1XkKf.haps
__RESOURCE__13=${__TMP_DIR__}/1HyBvsdN.haps
__RESOURCE__15=${__TMP_DIR__}/jtM69Ahm.haps
__RESOURCE__17=${__TMP_DIR__}/z6ccazmC
cat ${__RESOURCE__11} ${__RESOURCE__13} ${__RESOURCE__15} >> ${__RESOURCE__17}
# __TASK__9 write_output
__RESOURCE__17=${__TMP_DIR__}/z6ccazmC
cp ${__RESOURCE__17} gs://jigold/final_output.txt
# remove tmp directory
rm -r ${__TMP_DIR__} |
@danking suggested we move this to a separate project from |
This really does look great! I have two small suggestions:
|
This is now ready to be reviewed. @danking Could you please help me setup the tests to run on the CI? @catoverdrive This is an example of the interface and the output generated. There's also a tests file in there. I'm happy to explain the design to you if you'd like. from pipeline import Pipeline
p = Pipeline() # initialize a pipeline
# Define mapping for taking a file root to a set of output files
bfile = {'bed': '{root}.bed', 'bim': '{root}.bim', 'fam': '{root}.fam'}
# Import a file as a resource
file = p.read_input('gs://hail-jigold/random_file.txt')
# Import a set of input files as a resource group
input_bfile = p.read_input_group(bed='gs://hail-jigold/input.bed',
bim='gs://hail-jigold/input.bim',
fam='gs://hail-jigold/input.fam')
# Remove duplicate samples from a PLINK dataset
subset = p.new_task()
subset = (subset
.label('subset')
.docker('ubuntu')
.declare_resource_group(tmp1=bfile, ofile=bfile)
.command(f'plink --bfile {input_bfile} --make-bed {subset.tmp1}')
.command(f"awk '{{ print $1, $2}}' {subset.tmp1.fam} | sort | uniq -c | awk '{{ if ($1 != 1) print $2, $3 }}' > {subset.tmp2}")
.command(f"plink --bed {input_bfile.bed} --bim {input_bfile.bim} --fam {input_bfile.fam} --remove {subset.tmp2} --make-bed {subset.ofile}"
))
# Run shapeit for each contig from 1-3 with the output from subset
for contig in [str(x) for x in range(1, 4)]:
shapeit = p.new_task()
shapeit = (shapeit
.label('shapeit')
.declare_resource_group(ofile={'haps': "{root}.haps", 'log': "{root}.log"})
.command(f'shapeit --bed-file {subset.ofile} --chr {contig} --out {shapeit.ofile}'))
# Merge the shapeit output files together
merger = p.new_task()
merger = (merger
.label('merge')
.command('cat {files} >> {ofile}'.format(files=" ".join([t.ofile.haps for t in p.select_tasks('shapeit')]),
ofile=merger.ofile)))
# Write the result of the merger to a permanent location
p.write_output(merger.ofile, "gs://jigold/final_output.txt")
# Execute the pipeline
p.run(dry_run=True) #!/bin/bash
set -ex
# change cd to tmp directory
cd /tmp//pipeline.jlQrNJZW/
# __TASK__0 read_input
cp gs://hail-jigold/random_file.txt nfVpMp4n
# __TASK__1 read_input
cp gs://hail-jigold/input.bed 33qZtfwg.bed
# __TASK__2 read_input
cp gs://hail-jigold/input.bim 33qZtfwg.bim
# __TASK__3 read_input
cp gs://hail-jigold/input.fam 33qZtfwg.fam
# __TASK__4 subset
__RESOURCE_GROUP__0=33qZtfwg
__RESOURCE_GROUP__1=yibUlBkL
__RESOURCE__6=yibUlBkL.fam
__RESOURCE__10=29aBQihd
__RESOURCE__1=33qZtfwg.bed
__RESOURCE__2=33qZtfwg.bim
__RESOURCE__3=33qZtfwg.fam
__RESOURCE_GROUP__2=YXS0tQKi
plink --bfile ${__RESOURCE_GROUP__0} --make-bed ${__RESOURCE_GROUP__1}
awk '{ print $1, $2}' ${__RESOURCE__6} | sort | uniq -c | awk '{ if ($1 != 1) print $2, $3 }' > ${__RESOURCE__10}
plink --bed ${__RESOURCE__1} --bim ${__RESOURCE__2} --fam ${__RESOURCE__3} --remove ${__RESOURCE__10} --make-bed ${__RESOURCE_GROUP__2}
# __TASK__5 shapeit
__RESOURCE_GROUP__2=YXS0tQKi
__RESOURCE_GROUP__3=gidGmbcC
shapeit --bed-file ${__RESOURCE_GROUP__2} --chr 1 --out ${__RESOURCE_GROUP__3}
# __TASK__6 shapeit
__RESOURCE_GROUP__2=YXS0tQKi
__RESOURCE_GROUP__4=W5hjCmPK
shapeit --bed-file ${__RESOURCE_GROUP__2} --chr 2 --out ${__RESOURCE_GROUP__4}
# __TASK__7 shapeit
__RESOURCE_GROUP__2=YXS0tQKi
__RESOURCE_GROUP__5=ySM8T0lZ
shapeit --bed-file ${__RESOURCE_GROUP__2} --chr 3 --out ${__RESOURCE_GROUP__5}
# __TASK__8 merge
__RESOURCE__11=gidGmbcC.haps
__RESOURCE__13=W5hjCmPK.haps
__RESOURCE__15=ySM8T0lZ.haps
__RESOURCE__17=Z5OLJG6Y
cat ${__RESOURCE__11} ${__RESOURCE__13} ${__RESOURCE__15} >> ${__RESOURCE__17}
# __TASK__9 write_output
__RESOURCE__17=Z5OLJG6Y
cp ${__RESOURCE__17} gs://jigold/final_output.txt |
Other things to add in separate PRs:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
Let me know when you get the tests working and I'll approve it.
pipeline/setup.py
Outdated
setup( | ||
name = 'pipeline', | ||
version = '0.0.1', | ||
url = 'https://github.com/hail-is/pipeline.git', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this URL works
@catoverdrive Here's the output with docker commands: #!/bin/bash
# change cd to tmp directory
cd /tmp//pipeline.S9YTZap5/
# __TASK__0 read_input
cp gs://hail-jigold/random_file.txt DWRmR1Lh
# __TASK__1 read_input
cp gs://hail-jigold/input.bed Aw2arWP9.bed
# __TASK__2 read_input
cp gs://hail-jigold/input.bim Aw2arWP9.bim
# __TASK__3 read_input
cp gs://hail-jigold/input.fam Aw2arWP9.fam
# __TASK__4 subset
docker run -v /tmp//pipeline.S9YTZap5/:/tmp//pipeline.S9YTZap5/ -w /tmp//pipeline.S9YTZap5/ ubuntu /bin/bash -c '__RESOURCE_GROUP__0=Aw2arWP9; __RESOURCE_GROUP__1=srXTmGQE; __RESOURCE__6=srXTmGQE.fam; __RESOURCE__10=8ueGZQqn; __RESOURCE__1=Aw2arWP9.bed; __RESOURCE__2=Aw2arWP9.bim; __RESOURCE__3=Aw2arWP9.fam; __RESOURCE_GROUP__2=ESEFn8Tm; plink --bfile ${__RESOURCE_GROUP__0} --make-bed ${__RESOURCE_GROUP__1}&& awk '"'"'{ print $1, $2}'"'"' ${__RESOURCE__6} | sort | uniq -c | awk '"'"'{ if ($1 != 1) print $2, $3 }'"'"' > ${__RESOURCE__10}&& plink --bed ${__RESOURCE__1} --bim ${__RESOURCE__2} --fam ${__RESOURCE__3} --remove ${__RESOURCE__10} --make-bed ${__RESOURCE_GROUP__2}'
# __TASK__5 shapeit
docker run -v /tmp//pipeline.S9YTZap5/:/tmp//pipeline.S9YTZap5/ -w /tmp//pipeline.S9YTZap5/ gcr.io/shapeit /bin/bash -c '__RESOURCE_GROUP__2=ESEFn8Tm; __RESOURCE_GROUP__3=K1TfWX3n; shapeit --bed-file ${__RESOURCE_GROUP__2} --chr 1 --out ${__RESOURCE_GROUP__3}'
# __TASK__6 shapeit
docker run -v /tmp//pipeline.S9YTZap5/:/tmp//pipeline.S9YTZap5/ -w /tmp//pipeline.S9YTZap5/ gcr.io/shapeit /bin/bash -c '__RESOURCE_GROUP__2=ESEFn8Tm; __RESOURCE_GROUP__4=8dRi0LwZ; shapeit --bed-file ${__RESOURCE_GROUP__2} --chr 2 --out ${__RESOURCE_GROUP__4}'
# __TASK__7 shapeit
docker run -v /tmp//pipeline.S9YTZap5/:/tmp//pipeline.S9YTZap5/ -w /tmp//pipeline.S9YTZap5/ gcr.io/shapeit /bin/bash -c '__RESOURCE_GROUP__2=ESEFn8Tm; __RESOURCE_GROUP__5=NIqfevqS; shapeit --bed-file ${__RESOURCE_GROUP__2} --chr 3 --out ${__RESOURCE_GROUP__5}'
# __TASK__8 merge
docker run -v /tmp//pipeline.S9YTZap5/:/tmp//pipeline.S9YTZap5/ -w /tmp//pipeline.S9YTZap5/ ubuntu /bin/bash -c '__RESOURCE__11=K1TfWX3n.haps; __RESOURCE__13=8dRi0LwZ.haps; __RESOURCE__15=NIqfevqS.haps; __RESOURCE__17=GLxOwBss; cat ${__RESOURCE__11} ${__RESOURCE__13} ${__RESOURCE__15} >> ${__RESOURCE__17}'
# __TASK__9 write_output
__RESOURCE__17=GLxOwBss
cp ${__RESOURCE__17} gs://jigold/final_output.txt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requesting changes so this doesn't keep getting tested. Batch testing is broken.
@cseed I got a version working!!!! I'd like to test on other potential pipelines and make a real example that will run for demo purposes.