diff --git a/LICENSE b/LICENSE old mode 100644 new mode 100755 diff --git a/MANIFEST.in b/MANIFEST.in old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 index f8b8150..7ee2b6f --- a/README.md +++ b/README.md @@ -1,23 +1,13 @@ ![SciLuigi Logo](http://i.imgur.com/2aMT04J.png) -* ***UPDATE, Nov, 2016: A paper with the motivation and design decisions behind SciLuigi [now available](http://dx.doi.org/10.1186/s13321-016-0179-6)*** - * If you use SciLuigi in your research, please cite it like this:
- Lampa S, Alvarsson J, Spjuth O. Towards agile large-scale predictive modelling in drug discovery with flow-based programming design principles. *J Cheminform*. 2016. doi:[10.1186/s13321-016-0179-6](http://dx.doi.org/10.1186/s13321-016-0179-6). -* ***A Virtual Machine with a realistic, runnable, example workflow in a Jupyter Notebook, is available [here](https://github.com/pharmbio/bioimg-sciluigi-casestudy)*** -* ***Watch a 10 minute screencast going through the basics of using SciLuigi [here](https://www.youtube.com/watch?v=gkKUWskRbjw)*** -* ***See a poster describing the motivations behind SciLuigi [here](http://dx.doi.org/10.13140/RG.2.1.1143.6246)*** +# Scientific Luigi +(SciLuigi for short) is a light-weight wrapper library around [Spotify](http://spotify.com)'s [Luigi](http://github.com/spotify/luigi) workflow system that aims to make writing scientific workflows more fluent, flexible and modular. -Scientific Luigi (SciLuigi for short) is a light-weight wrapper library around [Spotify](http://spotify.com)'s [Luigi](http://github.com/spotify/luigi) -workflow system that aims to make writing scientific workflows more fluent, flexible and -modular. +Luigi is a flexile and fun-to-use library. It has turned out though that its default way of defining dependencies by hard coding them in each task's requires() function is not optimal for some type of workflows common e.g. in bioinformatics where multiple inputs and outputs, complex dependencies, and the need to quickly try different workflow connectivity in an explorative fashion is central to the way of working. -Luigi is a flexile and fun-to-use library. It has turned out though -that its default way of defining dependencies by hard coding them in each task's -requires() function is not optimal for some type of workflows common e.g. in bioinformatics where multiple inputs and outputs, complex dependencies, -and the need to quickly try different workflow connectivity in an explorative fashion is central to the way of working. +Sciluigi can (optionally) complete tasks by running commands in containers. This can improve reproducibility (as a container can be portably run on the cloud, on private clusters, or for lightweight tasks on a users computer via docker) and ease of use (not requiring the end-user of a workflow to install finicky bioinformatics software while avoiding the problem of conflicting dependencies). Sciluigi can facilitate running software that only runs on linux when hosted on a Windows or Macintosh computer, and leverage cloud computing resources (AWS batch). -SciLuigi was designed to solve some of these problems, by providing the following -"features" over vanilla Luigi: +SciLuigi was designed to solve some of these problems, by providing the following "features" over vanilla Luigi: - Separation of dependency definitions from the tasks themselves, for improved modularity and composability. @@ -30,39 +20,21 @@ SciLuigi was designed to solve some of these problems, by providing the followin - Inputs and outputs are connected with an intuitive "single-assignment syntax". - "Good default" high-level logging of workflow tasks and execution times. - Produces an easy to read audit-report with high level information per task. -- Integration with some HPC workload managers. - (So far only [SLURM](http://slurm.schedmd.com/) though). +- Integration with some HPC workload managers, currently AWS batch. +- Integration with cloud-bucket stores (currently AWS S3). +- When containers are used, one can prototype and test a task on test data locally + with docker, and then run it on cloud resources (e.g. AWS batch) when confronted + with a large dataset with only a change in a single parameter. Because of Luigi's easy-to-use API these changes have been implemented as a very thin layer on top of luigi's own API with no changes at all to the luigi core, which means that you can continue leveraging the work already being put into maintaining and further developing luigi by the team at Spotify and others. -## Workflow code quick demo - -***For a brief 10 minute screencast going through the basics below, see [this link](https://www.youtube.com/watch?v=gkKUWskRbjw)*** - -Just to give a quick feel for how a workflow definition might look like in SciLuigi, check this code example -(implementation of tasks hidden here for brevity. See Usage section further below for more details): - -```python -import sciluigi as sl - -class MyWorkflow(sl.WorkflowTask): - def workflow(self): - # Initialize tasks: - foowrt = self.new_task('foowriter', MyFooWriter) - foorpl = self.new_task('fooreplacer', MyFooReplacer, - replacement='bar') - - # Here we do the *magic*: Connecting outputs to inputs: - foorpl.in_foo = foowrt.out_foo - - # Return the last task(s) in the workflow chain. - return foorpl -``` - -That's it! And again, see the "usage" section just below for a more detailed description of getting to this! +* ***UPDATE, Nov, 2016: A paper with the motivation and design decisions behind SciLuigi [now available](http://dx.doi.org/10.1186/s13321-016-0179-6)*** + * If you use SciLuigi in your research, please cite it like this:
+ Lampa S, Alvarsson J, Spjuth O. Towards agile large-scale predictive modelling in drug discovery with flow-based programming design principles. *J Cheminform*. 2016. doi:[10.1186/s13321-016-0179-6](http://dx.doi.org/10.1186/s13321-016-0179-6).* +* ***See a poster describing the motivations behind SciLuigi [here](http://dx.doi.org/10.13140/RG.2.1.1143.6246)*** ## Support: Getting help @@ -72,6 +44,8 @@ Please use the [issue queue](https://github.com/pharmbio/sciluigi/issues) for an - Python 2.7 - 3.4 - Luigi 1.3.x - 2.0.1 +- boto3 > 1.7.10 +- docker >= 3.2.1 ## Install @@ -129,31 +103,37 @@ Then, you need to define some tasks that can be done in this workflow. This is done by: -1. Creating a subclass of `sciluigi.Task` (or `sciluigi.SlurmTask` if you want Slurm support) +1. Creating a subclass of `sciluigi.ContainerTask` 2. Adding fields named `in_` for each input, in the new task class -3. Define methods named `out_()` for each output, that return `sciluigi.TargetInfo` objects. (sciluigi.TargetInfo is initialized with a reference to the task object itself - typically `self` - and a path name, where upstream tasks paths can be used). +3. Define methods named `out_()` for each output, that return `sciluigi.ContainerTargetInfo` objects. sciluigi.TargetInfo is initialized with a reference to the task object itself - typically `self` - and an url. ContainerTargets can silently change where they are hosted, including on local filesystems (/path/to/file.txt) or in buckets (s3://bucket/key/file.txt). 4. Define luigi parameters to the task. -5. Implement the `run()` method of the task. +5. Define the container engine and parameters that the container will be run. +6. Implement the `run()` method of the task. #### Example: -Let's define a simple task that just writes "foo" to a file named `foo.txt`: +##### Let's define a simple task that just writes "foo" to a file named `foo.txt`. + +For this very simple task, we do not need a container, and thus we can base the task on the sciluigi.Task class. We do use the sciluigi.ContainerTargetInfo class here. The path/url we gave is for the local filesystem. If instead we gave an S3 bucket/key url (s3://bucket/foo.txt), this class will handle uploading (and later downloading if needed) from S3. ```python class MyFooWriter(sciluigi.Task): # We have no inputs here # Define outputs: def out_foo(self): - return sciluigi.TargetInfo(self, 'foo.txt') + return sciluigi.ContainerTargetInfo(self, 'foo.txt') def run(self): with self.out_foo().open('w') as foofile: foofile.write('foo\n') ``` -Then, let's create a task that replaces "foo" with "bar": +##### Then, let's create a task that replaces "foo" with "bar": + +This task will be run in a container, in this case, the alpine linux container. This way (say if we are running sciluigi on a Windows machine without sed), we can still run the command wihtout fuss. In fact, no matter where this is hosted, the task will reliably run in the docker container the same way. ```python -class MyFooReplacer(sciluigi.Task): +class MyFooReplacer(sciluigi.ContainerTask): + container = 'alpine:3.7' replacement = sciluigi.Parameter() # Here, we take as a parameter # what to replace foo with. # Here we have one input, a "foo file": @@ -162,24 +142,27 @@ class MyFooReplacer(sciluigi.Task): def out_replaced(self): # As the path to the returned target(info), we # use the path of the foo file: - return sciluigi.TargetInfo(self, self.in_foo().path + '.bar.txt') + return sciluigi.ContainerTargetInfo(self, self.in_foo().path + '.bar.txt') def run(self): - with self.in_foo().open() as in_f: - with self.out_replaced().open('w') as out_f: - # Here we see that we use the parameter self.replacement: - out_f.write(in_f.read().replace('foo', self.replacement)) + self.ex( + command="sed 's/foo/$repl/g' $infile > $outfile", + input_targets={ + 'infile': self.in_foo(), + }, + output_targets={ + 'outfile': self.out_replaced(), + }, + extra_parameters={ + 'repl': self.replacement, + } + ) ``` +Several things have happened here: -The last lines, we could have instead written using the command-line `sed` utility, available in linux, by calling it on the commandline, with the built-in `ex()` method: - -```python - def run(self): - # Here, we use the in-built self.ex() method, to execute commands: - self.ex("sed 's/foo/{repl}/g' {inpath} > {outpath}".format( - repl=self.replacement, - inpath=self.in_foo().path, - outpath=self.out_replaced().path)) -``` +- We've specified which container the command should be run in. This can be any docker-style URI +- The command now uses the [python string template system](https://docs.python.org/3.5/library/string.html#string.Template) to replace parameters, input and output targets +- We use a ContainerTargetInfo in place of a ContainerTarget. This replacement target takes a URL, and can seemlessly handle +local files, S3 buckets (and in the future SFTP, etc). ### Write the workflow definition @@ -189,7 +172,8 @@ We do this by: 1. Instantiating the tasks, using the `self.new_task(, , *args, **kwargs)` method, of the workflow task. 2. Connect the tasks together, by pointing the right `out_*` method to the right `in_*` field. -3. Returning the last task in the chain, from the workflow method. +3. Giving some basic parameters as to which sort of container engine should be used for the container task via defining a `sciluigi.ContainerInfo` class. +4. Returning the last task in the chain, from the workflow method. #### Example: @@ -197,9 +181,20 @@ We do this by: import sciluigi class MyWorkflow(sciluigi.WorkflowTask): def workflow(self): - foowriter = self.new_task('foowriter', MyFooWriter) - fooreplacer = self.new_task('fooreplacer', MyFooReplacer, - replacement='bar') + foowriter = self.new_task( + 'foowriter', + MyFooWriter + ) + fooreplacer = self.new_task( + 'fooreplacer', + MyFooReplacer, + containerinfo=sciluigi.ContainerInfo( + vcpu=1, + mem=512, + engine='docker', + ), + replacement='bar' + ) # Here we do the *magic*: Connecting outputs to inputs: fooreplacer.in_foo = foowriter.out_foo @@ -269,6 +264,8 @@ If you run into any of these problems, you might be interested in a new workflow Changelog --------- +- 0.9.6b7_ct + - Support for containerized tasks and `ContainerTargetInfo` - 0.9.3b4 - Support for Python 3 (Thanks to @jeffcjohnson for contributing this!). - Bug fixes. diff --git a/README.rst b/README.rst old mode 100644 new mode 100755 diff --git a/example-config/containerinfo.ini b/example-config/containerinfo.ini new file mode 100644 index 0000000..390d281 --- /dev/null +++ b/example-config/containerinfo.ini @@ -0,0 +1,79 @@ +# Sciluigi needs to know how to run your containers +# This configuration file helps specify the options needed +[DEFAULT] +# Which container engine to use. Options include: +# docker -> docker on the hosting machine +# aws_batch -> AWS batch +# pbs -> PBS / torque via qsub +# slurm -> slurm HPC management engine, via srun +engine = docker + +# How many vcpu to request (concurrent threads) +vcpu = 1 + +# Maximum memory, in MB +mem = 4096 + +# Time limit in minutes +timeout = 10080 + +container_working_dir = /tmp/ + +# Some engine specific options +# ** singularity (for slurm and pbs) ** +# where should we store our singularity containers. +# Should be some shared filesystem between nodes +container_cache = + +# ** slurm ** +# To which partition should we submit +slurm_partition = + +# ** PBS ** +# Under which account should jobs be submitted +pbs_account = +# to which queue? +pbs_queue = +# Path on shared filesystem between nodes +# To use to store scripts. +pbs_scriptpath = + + +# ** AWS batch ** +# The role ID needed for tasks to access S3 +aws_jobRoleArn = +# S3 bucket to use for temporary upload / download of files +aws_s3_scratch_loc = +# Which batch job queue should jobs be submitted +aws_batch_job_queue = +# Prefix to add to jobs (human readable) +aws_batch_job_prefix = +# How often should we poll batch (secs) +aws_batch_job_poll_sec = 10 +# Where can we find credentials (defaults to ~/.aws if not specified) +aws_secrets_loc = +# How many times to try submitting via boto before being killed +aws_boto_max_tries = 10 + +# Now specify some defaults for tasks with specific resource need types +# Overriding only the relevant options + +# High memory relative to number of CPU +[highmem] +mem = 120000 +vcpu = 1 + +# Mixed needs for moderate mulitthreaded tasks +[midcpu] +mem = 4096 +vcpu = 4 + +# Big cpu and memory +[heavy] +mem = 120000 +;vcpu = 12 + +# Minimal CPU and memory needs (suitable for IO limited tasks) +[light] +vcpu = 1 +mem = 1024 diff --git a/examples/Dockerfile b/examples/Dockerfile new file mode 100644 index 0000000..d07c024 --- /dev/null +++ b/examples/Dockerfile @@ -0,0 +1,20 @@ +# sciluigi-example +# +# VERSION 0.1.0__bcw.0.3.0 + +FROM ubuntu:16.04 +# Create some mount points in the container for use by bucket-command-wrapper +RUN mkdir -p /mnt/inputs/file && mkdir -p /mnt/outputs/file && mkdir /scratch && mkdir /working +# Install at least python3 (used by BCW). It's OK to change the specific version of python3 used. +RUN apt-get update && apt-get install -y \ +python3>=3.5.1-3 \ +python3-pip>=3.5.1-3 +# Since we are ONLY installing python3 link to it to make it the default python +RUN ln -s /usr/bin/python3 /usr/bin/python +# Install bucket_command_wrapper via pip, along with boto3 / awscli if we want to use AWS at all +RUN pip3 install \ +awscli>=1.15.14 \ +boto3>=1.7.14 \ +bucket_command_wrapper==0.3.0 + +# Feel free to make this more useful by installing software, etc diff --git a/examples/clean.sh b/examples/clean.sh old mode 100644 new mode 100755 diff --git a/examples/data/a.txt b/examples/data/a.txt old mode 100644 new mode 100755 diff --git a/examples/data/acgt.txt b/examples/data/acgt.txt old mode 100644 new mode 100755 diff --git a/examples/data/afolder/hej.txt b/examples/data/afolder/hej.txt old mode 100644 new mode 100755 diff --git a/examples/data/c.txt b/examples/data/c.txt old mode 100644 new mode 100755 diff --git a/examples/data/g.txt b/examples/data/g.txt old mode 100644 new mode 100755 diff --git a/examples/data/t.txt b/examples/data/t.txt old mode 100644 new mode 100755 diff --git a/examples/example-containertask-1.py b/examples/example-containertask-1.py new file mode 100755 index 0000000..ae1d451 --- /dev/null +++ b/examples/example-containertask-1.py @@ -0,0 +1,217 @@ +#!/usr/bin/env python3 + +# This is an example of how the ContainerTask and ContainerTargetInfo +# classes extend sciluigi, and allow commands to be run in containers +# seamlessly on different container engines / HPC systems +# Start, ironically, at the bottom and work your way up! + +import logging +import luigi +import sciluigi as sl +import argparse +import os +from subprocess import call + +log = logging.getLogger('sciluigi-interface') + +# ------------------------------------------------------------------------ +# Workflow class(es) +# ------------------------------------------------------------------------ + + +class MyWorkflow(sl.WorkflowTask): + # Here are some parameters to define how we want to run our container + engine = sl.Parameter() + aws_secrets_loc = sl.Parameter() + # Only when using AWS_batch + jobRoleArn = sl.Parameter(default="") + s3_scratch_loc = sl.Parameter(default="") + batch_job_queue = sl.Parameter(default="") + + def workflow(self): + rawdata = self.new_task('rawdata', RawData) + + # Run first without a container + atot = self.new_task( + 'atot', + AToT) + atot.in_data = rawdata.out_rawdata + + # And now in a container! + # To run in a container, we have to + # specify which engine, and parameters we need + # This is done through a ContainerInfo class + # We will initialize via the parameters we recieved + # from the command line + test_containerinfo = sl.ContainerInfo( + engine=self.engine, + vcpu=1, # Number of vCPU to request + mem=256, # Memory in MB + timeout=5, # time in minutes + aws_secrets_loc=self.aws_secrets_loc, + aws_jobRoleArn=self.jobRoleArn, + aws_s3_scratch_loc=self.s3_scratch_loc, + aws_batch_job_queue=self.batch_job_queue, + ) + # Now actually start the task. + # Note: This allows different instances of the same task + # to use different engines, queues, etc as needed + atot_in_container = self.new_task( + 'atot_in_container', + AToT_ContainerTask, + containerinfo=test_containerinfo, + ) + atot_in_container.in_data = rawdata.out_rawdata + return (atot, atot_in_container) + +# ------------------------------------------------------------------------ +# Task classes +# ------------------------------------------------------------------------ + + +class RawData(sl.ExternalTask): + # It's perfectly fine to combine local, external and container tasks + # all in one workflow. + def out_rawdata(self): + return sl.ContainerTargetInfo(self, 'data/acgt.txt') + + +class AToT(sl.Task): + # Here is the non-containerized version of this task + in_data = None + + def out_replatot(self): + return sl.TargetInfo(self, self.in_data().path + '.atot') + + # ------------------------------------------------ + + def run(self): + cmd = 'cat ' + self.in_data().path + ' | sed "s/A/T/g" > ' + self.out_replatot().path + log.info("COMMAND TO EXECUTE: " + cmd) + call(cmd, shell=True) + + +class AToT_ContainerTask(sl.ContainerTask): + # Here is the containerized version of this task. + # In this simple example, there isn't much advantage of running in a container + # But when dealing with specialized software (requiring complex and brittle dependencies) + # or with heavy tasks needing big harware to run, there is an advantage. + # This task will run identically locally via docker, on AWS, or via PBS. + + # ALL ContainerTasks must specify which container is to be used + container = 'golob/sciluigi-example:0.1.0__bcw.0.3.0' + + # Dependencies (inputs) are the same as in a non-containerized task + in_data = None + + def out_replatot(self): + # ContainerTargetInfo will take care of shifting files to and from + # cloud providers (S3 at this time) and your local filesystems + return sl.ContainerTargetInfo(self, self.in_data().path + '.container.atot') + + # ------------------------------------------------ + + def run(self): + # ContainerTasks use the python string template system to handle inputs and outputs + # Same command as above, but with template placeholders $inFile for in and $outFile. + # This often works out neater than the more complex string combinations as above + # in the non-containerized task. + cmd = 'cat $inFile | sed "s/A/T/g" > $outFile' + self.ex( + command=cmd, + input_targets={ # A dictionary with the key being the template placeholder + 'inFile': self.in_data(), # Value is a ContainerTarget + }, + output_targets={ + 'outFile': self.out_replatot() # Same drill for outputs + }, + extra_params={} # Optional dict of other placeholders to fill in the command string + ) + + +# Run this file as script +# ------------------------------------------------------------------------ + +if __name__ == '__main__': + # Depending on the container engine used, you must specify some basic settings + # This can be done (as here) via CLI settings, a config file, or even hard-wired + # into scripts. + + parser = argparse.ArgumentParser(description=""" + Containertask example for sciluigi""") + subparsers = parser.add_subparsers( + help='Which container engine to use', + dest='engine', + required=True, + ) + # If we are going to shuffle to-from AWS-S3 we need to know secrets + parser.add_argument( + '--aws-secrets-loc', + help="""Where are the AWS secrets located""", + default=os.path.expanduser('~/.aws'), + metavar='~/.aws' + ) + # The simplest case is docker; all one needs for docker is to have it installed. + docker_parser = subparsers.add_parser('docker') + + # AWS-batch has a few options that must be specified to work + # Including which account, queue, and a directory to store temporary scripts. + aws_parser = subparsers.add_parser("aws_batch") + aws_parser.add_argument( + '--jobRoleArn', + help="""Job role to use when submitting to batch""", + required=True, + metavar='arn:aws:iam::12345:role/somerole' + ) + aws_parser.add_argument( + '--s3-scratch-loc', + help="""Temporary S3 location to transiently keep input/output files. + format: s3://bucket/key/prefix/""", + required=True, + metavar='s3://bucket/key/prefix/to/temp/loc/' + ) + aws_parser.add_argument( + '--batch-job-queue', + help="""To which batch queue should the jobs be submitted?""", + required=True, + metavar='some_queue_name' + ) + # PBS has a few options that must be specified to work + # Including which account, queue, and distinct from AWS, + # a directory to store temporary scripts AND singularity containers; + # these directories must be on a shared file system visible to nodes + pbs_parser = subparsers.add_parser("pbs") + pbs_parser.add_argument( + '--container_cache', '-cc', + help="""Location to store temporary singularity containers for pbs / slurm. + Must be on a shared file system to work properly.""", + required=True, + ) + pbs_parser.add_argument( + '--account', + help="""Account to use for PBS job submission""", + required=True, + ) + pbs_parser.add_argument( + '--queue', + help="""Into which PBS queue should the jobs be submitted.""", + required=True, + ) + pbs_parser.add_argument( + '--scriptpath', + help="""Location on a shared file system to store temporary scripts""", + required=True, + ) + + args = parser.parse_args() + # Extract these parameters to the arguments for our workflow + args_list = [ + "--{}={}".format(k.replace('_', '-'), v) + for k, v in vars(args).items() + ] + + sl.run( + local_scheduler=True, + main_task_cls=MyWorkflow, + cmdline_args=args_list, + ) diff --git a/examples/example1.py b/examples/example1.py old mode 100644 new mode 100755 diff --git a/examples/example2_ngi.py b/examples/example2_ngi.py old mode 100644 new mode 100755 diff --git a/examples/example3_components.py b/examples/example3_components.py old mode 100644 new mode 100755 diff --git a/examples/example3_workflow.py b/examples/example3_workflow.py old mode 100644 new mode 100755 diff --git a/examples/example4_multiwf.py b/examples/example4_multiwf.py old mode 100644 new mode 100755 diff --git a/examples/sciluigi b/examples/sciluigi deleted file mode 120000 index 79eca18..0000000 --- a/examples/sciluigi +++ /dev/null @@ -1 +0,0 @@ -../sciluigi \ No newline at end of file diff --git a/sciluigi/AWSBatchTaskWatcher.py b/sciluigi/AWSBatchTaskWatcher.py new file mode 100644 index 0000000..b4734d5 --- /dev/null +++ b/sciluigi/AWSBatchTaskWatcher.py @@ -0,0 +1,113 @@ +# Class to monitor and wait on AWS Batch Jobs + +import boto3 +import multiprocessing as mp +import time +import logging + + +class AWSBatchTaskWatcher(): + COMPLETED_JOB_STATES = { + 'SUCCEEDED', + 'FAILED', + # A state I've added for jobs that no longer exist on batch. + # This can be for older jobs whose status is deleted from AWS + 'DOESNOTEXIST' + } + POLLING_DELAY_SEC = 10 + JOB_WAIT_SECS = 1 + + def pollJobState(self): + while True: + try: + self.__log__.debug("Poll tick. {} jobs".format( + len(self.__jobStateDict__)) + ) + jobIDs_needing_update = [ + jID for jID, state in self.__jobStateDict__.items() + if state not in self.COMPLETED_JOB_STATES + ] + self.__active_job_ids__ = set(jobIDs_needing_update) + if len(jobIDs_needing_update) > 0: + self.__log__.debug("Polling AWS about {} jobs".format( + len(jobIDs_needing_update)) + ) + update_result = self.__batch_client__.describe_jobs( + jobs=jobIDs_needing_update + ) + update_result_jobs = update_result.get('jobs', []) + updated_job_status = { + j['jobId']: j['status'] + for j in update_result_jobs + } + jobIdsWithoutResult = list(set(jobIDs_needing_update) - set(updated_job_status.keys())) + updated_job_status.update({ + jID: "DOESNOTEXIST" + for jID in jobIdsWithoutResult + }) + self.__jobStateDict__.update(updated_job_status) + + time.sleep(self.POLLING_DELAY_SEC) + except BrokenPipeError: + # Handle if the calling process ends, destroying the manager. + # We should terminate too + return + + def waitOnJob(self, jobID): + # Works by adding this jobID to the dict if it does not exist + if jobID not in self.__jobStateDict__: + self.__log__.info("Adding jobId {} to our watch list".format(jobID)) + self.__jobStateDict__[jobID] = None + # And then waiting for the polling child process to update the job status + while self.__jobStateDict__[jobID] not in self.COMPLETED_JOB_STATES: + self.__log__.debug("Still waiting on {}".format(jobID)) + time.sleep(self.JOB_WAIT_SECS) + # Implicitly our job has reached a completed state + self.__log__.info("JobID {} returned with status {}".format( + jobID, + self.__jobStateDict__[jobID] + )) + if self.__jobStateDict__[jobID] == 'DOESNOTEXIST': + self.__log__.warning("JobID {} did not exist on batch".format(jobID)) + return self.__jobStateDict__[jobID] + + def __init__( + self, + session_options={}, + debug=False): + # Logging first: + self.__log__ = logging.getLogger('AWSBatchTaskWatcher') + self.__log__.setLevel(logging.INFO) + console_handler = logging.StreamHandler() + if debug: + console_handler.setLevel(logging.DEBUG) + else: + console_handler.setLevel(logging.INFO) + self.__log__.addHandler(console_handler) + # BOTO3 / Batch client + self.__session__ = boto3.Session(session_options) + self.__batch_client__ = self.__session__.client( + 'batch' + ) + # Holder for active jobs + self.__active_job_ids__ = set() + # Use the multiprocessing manager to create a job state dict + # that can safely be shared among processes + self.__manager__ = mp.Manager() + self.__jobStateDict__ = self.__manager__.dict() + # Start a child process to poll batch for job status + self.__jobStatePoller__ = mp.Process(target=self.pollJobState) + self.__jobStatePoller__.start() + + def __del__(self): + # Delete active jobs + if hasattr(self, '__active_job_ids__') and self.__active_job_ids__ is not None: + for jobId in self.__active_job_ids__: + self.__batch_client__.terminate_job( + jobId=jobId, + reason='Workflow cancelled' + ) + # Explicitly stop the polling process when this class is destroyed. + if hasattr(self, '__jobStatePoller__'): + self.__jobStatePoller__.terminate() + diff --git a/sciluigi/__init__.py b/sciluigi/__init__.py old mode 100644 new mode 100755 index 97a5345..76db16a --- a/sciluigi/__init__.py +++ b/sciluigi/__init__.py @@ -9,6 +9,7 @@ from sciluigi.audit import AuditTrailHelpers from sciluigi import dependencies +from sciluigi.dependencies import ContainerTargetInfo from sciluigi.dependencies import TargetInfo from sciluigi.dependencies import S3TargetInfo from sciluigi.dependencies import DependencyHelpers @@ -43,3 +44,25 @@ from sciluigi.util import timepath from sciluigi.util import recordfile_to_dict from sciluigi.util import dict_to_recordfile + +from sciluigi import containertask +from sciluigi.containertask import ContainerInfo +from sciluigi.containertask import ContainerTask +from sciluigi.containertask import ContainerHelpers + +from sciluigi.AWSBatchTaskWatcher import AWSBatchTaskWatcher +try: + batch_task_watcher = AWSBatchTaskWatcher() +except: + batch_task_watcher = None + +import threading + +# Lock to ensure only one singularity image is created +singularity_lock = threading.Lock() + +def getBatchTaskWatcher(): + global batch_task_watcher + if batch_task_watcher is None: + raise NotImplementedError + return batch_task_watcher diff --git a/sciluigi/audit.py b/sciluigi/audit.py old mode 100644 new mode 100755 diff --git a/sciluigi/containertask.py b/sciluigi/containertask.py new file mode 100644 index 0000000..8407414 --- /dev/null +++ b/sciluigi/containertask.py @@ -0,0 +1,1151 @@ +import luigi +import sciluigi +import json +import logging +import subprocess +import docker +import os +import stat +from string import Template +import shlex +import uuid +import time +import io +from botocore.exceptions import ClientError +import tempfile +import datetime +import configparser + +try: + from urlparse import urlsplit, urljoin +except ImportError: + from urllib.parse import urlsplit, urljoin + +# Setup logging +log = logging.getLogger('sciluigi-interface') + + +class ContainerInfo(): + """ + A data object to store parameters related to running a specific + tasks in a container (docker / batch / etc). Mostly around resources. + """ + # Which container system to use + # Docker by default. Extensible in the future for batch, slurm-singularity, etc + engine = None + # num vcpu required + vcpu = None + # max memory (mb) + mem = None + # Env + env = None + # Timeout in minutes + timeout = None + # Format is {'source_path': {'bind': '/container/path', 'mode': mode}} + mounts = None + + # Location within the container for scratch work. Can be paired with a mount + container_working_dir = None + # Local Container cache location. For things like singularity that need to pull + # And create a local container + container_cache = None + + # AWS specific stuff + aws_jobRoleArn = None + aws_s3_scratch_loc = None + aws_batch_job_queue = None + aws_batch_job_prefix = None + aws_batch_job_poll_sec = None + aws_secrets_loc = None + aws_boto_max_tries = None + aws_batch_job_poll_sec = None + + # PBS STUFF + pbs_account = None + pbs_queue = None + pbs_scriptpath = None + + # SLURM specifics + slurm_partition = None + + def __init__(self, + engine='docker', + vcpu=1, + mem=4096, + timeout=10080, # Seven days of minutes + mounts={}, + container_cache='.', + aws_jobRoleArn='', + aws_s3_scratch_loc='', + aws_batch_job_queue='', + aws_batch_job_prefix=None, + aws_batch_job_poll_sec=10, + aws_secrets_loc=os.path.expanduser('~/.aws'), + aws_boto_max_tries=10, + slurm_partition=None, + pbs_account='', + pbs_queue='', + pbs_scriptpath=None, + container_working_dir='/tmp/' + ): + self.engine = engine + self.vcpu = vcpu + self.mem = mem + self.timeout = timeout + self.mounts = mounts + self.container_cache = container_cache + + self.aws_jobRoleArn = aws_jobRoleArn + self.aws_s3_scratch_loc = aws_s3_scratch_loc + self.aws_batch_job_queue = aws_batch_job_queue + self.aws_batch_job_prefix = aws_batch_job_prefix + self.aws_batch_job_poll_sec = aws_batch_job_poll_sec + self.aws_secrets_loc = aws_secrets_loc + self.aws_boto_max_tries = aws_boto_max_tries + + self.slurm_partition = slurm_partition + + self.pbs_account = pbs_account + self.pbs_queue = pbs_queue + self.pbs_scriptpath = pbs_scriptpath + + # Method to allow population from a config file + # Sparing the user from having to repeat this + def from_config( + self, + configfile_path=os.path.expanduser('~/.sciluigi/containerinfo.ini'), + section='DEFAULT'): + config = configparser.ConfigParser() + if not os.path.exists(configfile_path): + log.error( + """Could not find a sciluigi configuration file at {}""".format( + configfile_path) + ) + return + # Implicit else + config.read(configfile_path) + if section not in config.sections(): + log.error( + """Section {} not found in the sciluigi configuration file at {}""".format( + section, + configfile_path + ) + ) + return + # Implicit else, override values if the config value is not a blank string + config_values = config[section] + if config_values.get('engine', "") != "": + self.engine = config_values['engine'] + if config_values.get('vcpu', "") != "": + try: + self.vcpu = int(config_values['vcpu']) + except ValueError: + log.error("Could not convert vcpu {} to int".format(config_values['vcpu'])) + if config_values.get('mem', "") != "": + try: + self.mem = int(config_values['mem']) + except ValueError: + log.error("Could not convert mem {} to int".format(config_values['mem'])) + if config_values.get('timeout', "") != "": + try: + self.timeout = int(config_values['timeout']) + except ValueError: + log.error("Could not convert timeout {} to int".format(config_values['timeout'])) + + if config_values.get('mounts', "") != "": + try: + self.mounts = json.loads(config_values['mounts']) + except ValueError: + log.error("Could not convert {} to a dict".format(config_values['mounts'])) + + if config_values.get('container_cache', "") != "": + self.container_cache = config_values['container_cache'] + + if config_values.get('aws_jobRoleArn', "") != "": + self.aws_jobRoleArn = config_values['aws_jobRoleArn'] + if config_values.get('aws_s3_scratch_loc', "") != "": + self.aws_s3_scratch_loc = config_values['aws_s3_scratch_loc'] + if config_values.get('aws_batch_job_queue', "") != "": + self.aws_batch_job_queue = config_values['aws_batch_job_queue'] + if config_values.get('aws_batch_job_prefix', "") != "": + self.aws_batch_job_prefix = config_values['aws_batch_job_prefix'] + if config_values.get('aws_batch_job_poll_sec', "") != "": + try: + self.aws_batch_job_poll_sec = int(config_values['aws_batch_job_poll_sec']) + except ValueError: + log.error("Could not convert batch poll time of {} to int".format( + config_values['aws_batch_job_poll_sec']) + ) + if config_values.get('aws_secrets_loc', "") != "": + self.aws_secrets_loc = config_values['aws_secrets_loc'] + + if config_values.get('aws_boto_max_tries', "") != "": + try: + self.aws_boto_max_tries = int(config_values['aws_boto_max_tries']) + except ValueError: + log.error("Could not convert boto max tries {} to int".format( + config_values['aws_boto_max_tries']) + ) + + if config_values.get('slurm_partition', "") != "": + self.slurm_partition = config_values['slurm_partition'] + + if config_values.get('pbs_account', "") != "": + self.pbs_account = config_values['pbs_account'] + + if config_values.get('pbs_queue', "") != "": + self.pbs_queue = config_values['pbs_queue'] + + if config_values.get('pbs_scriptpath', "") != "": + self.pbs_scriptpath = config_values['pbs_scriptpath'] + + if config_values.get('container_working_dir', "") != "": + self.container_working_dir = config_values['container_working_dir'] + + def __str__(self): + """ + Return string of this information + """ + return( + "{} with Cpu {}, Mem {} MB, timeout {} secs, and container cache {}".format( + self.engine, + self.vcpu, + self.mem, + self.timeout, + self.container_cache + )) + + +class ContainerInfoParameter(sciluigi.parameter.Parameter): + ''' + A specialized luigi parameter, taking ContainerInfo objects. + ''' + + def parse(self, x): + if isinstance(x, ContainerInfo): + return x + else: + log.error('parameter is not instance of ContainerInfo. It is instead {}' + .format(type(x))) + raise Exception('parameter is not instance of ContainerInfo. It is instead {}' + .format(type(x))) + + +class ContainerHelpers(): + """ + Mixin with various methods and variables for running commands in containers using (Sci)-Luigi + """ + # Other class-fields + # Resource guidance for this container at runtime. + containerinfo = ContainerInfoParameter(default=None) + + # The ID of the container (docker registry style). + container = None + + def map_targets_to_container(self, targets): + """ + Accepts a dictionary where the keys are identifiers for various targets + and the value is the target + + This breaks down the targets by their schema (file, s3, etc). + For each schema a lowest-common-path is found and a suggested container + mountpoint is generated + + What one gets back is a nested dict + { + 'scheme': { + 'common_prefix': '/path/on/source/shared/by/all/targets/of/schema', + 'rel_paths': { + 'identifier': 'path_rel_to_common_prefix' + } + 'targets': { + 'identifier': target, + } + } + } + """ + # Determine the schema for these targets via comprehension + schema = {t.scheme for t in targets.values()} + return_dict = {} + for scheme in schema: + return_dict[scheme] = {} + # Get only the targets for this scheme + scheme_targets = {i: t for i, t in targets.items() if t.scheme == scheme} + common_prefix = os.path.commonpath( + [os.path.dirname( + os.path.join( + urlsplit(t.path).netloc, + urlsplit(t.path).path + ) + ) for t in scheme_targets.values()]) + return_dict[scheme]['common_prefix'] = common_prefix + return_dict[scheme]['targets'] = scheme_targets + return_dict[scheme]['relpaths'] = { + i: os.path.relpath( + os.path.join( + urlsplit(t.path).netloc, + urlsplit(t.path).path + ), + common_prefix) + for i, t in scheme_targets.items() + } + return return_dict + + def mounts_CP_DF_UF( + self, + input_targets, + output_targets, + inputs_mode, + outputs_mode, + input_mount_point, + output_mount_point): + + container_paths = {} + mounts = self.containerinfo.mounts.copy() + UF = [] + DF = [] + + output_target_maps = self.map_targets_to_container( + output_targets, + ) + out_schema = set(output_target_maps.keys()) + # Local file targets can just be mapped. + file_output_common_prefix = None + if 'file' in out_schema: + file_output_common_prefix = output_target_maps['file']['common_prefix'] + # Be sure the output directory exists + try: + os.makedirs(os.path.abspath(output_target_maps['file']['common_prefix'])) + except FileExistsError: + # No big deal + pass + mounts[os.path.abspath(output_target_maps['file']['common_prefix'])] = { + 'bind': os.path.join(output_mount_point, 'file'), + 'mode': outputs_mode + } + container_paths.update({ + i: os.path.join(output_mount_point, 'file', rp) + for i, rp in output_target_maps['file']['relpaths'].items() + }) + out_schema.remove('file') + # Handle other schema here using BCW, creating the appropriate UF parameters + for scheme in out_schema: + for identifier in output_target_maps[scheme]['targets']: + container_paths[identifier] = os.path.join( + output_mount_point, + scheme, + output_target_maps[scheme]['relpaths'][identifier] + ) + UF.append("{}::{}".format( + container_paths[identifier], + output_target_maps[scheme]['targets'][identifier].path + )) + + input_target_maps = self.map_targets_to_container( + input_targets + ) + in_schema = set(input_target_maps.keys()) + if 'file' in in_schema: + # Check for the edge case where our common prefix for input and output is the same + if file_output_common_prefix and file_output_common_prefix == input_target_maps['file']['common_prefix']: + # It is! Skip adding a mount for inputs then, and reset our input mountpoint + input_mount_point = output_mount_point + pass + else: # Add our mount + mounts[os.path.abspath(input_target_maps['file']['common_prefix'])] = { + 'bind': os.path.join(input_mount_point, 'file'), + 'mode': inputs_mode + } + container_paths.update({ + i: os.path.join(input_mount_point, 'file', rp) + for i, rp in input_target_maps['file']['relpaths'].items() + }) + in_schema.remove('file') + + # Handle other schema here using BCW, creating the appropriate DF parameters + for scheme in in_schema: + for identifier in input_target_maps[scheme]['targets']: + container_paths[identifier] = os.path.join( + input_mount_point, + scheme, + input_target_maps[scheme]['relpaths'][identifier] + ) + DF.append("{}::{}::{}".format( + input_target_maps[scheme]['targets'][identifier].path, + container_paths[identifier], + inputs_mode, + )) + + # Mount the AWS secrets if we have some AND s3 is in one of our schema + if self.containerinfo.aws_secrets_loc and ('s3' in out_schema or 's3' in in_schema): + mounts[self.containerinfo.aws_secrets_loc] = {'bind': '/root/.aws', 'mode': 'ro'} + + return (mounts, container_paths, DF, UF) + + def make_fs_name(self, uri): + uri_list = uri.split('://') + if len(uri_list) == 1: + name = uri_list[0] + else: + name = uri_list[1] + keepcharacters = ('.', '_') + return "".join(c if (c.isalnum() or c in keepcharacters) else '_' for c in name).rstrip() + + def timeout_to_walltime(self): + td = datetime.timedelta(minutes=self.containerinfo.timeout) + hours = td.days * 7 + td.seconds // 3600 + if hours > 99: + hours = 99 + minutes = (td.seconds - (td.seconds // 3600) * 3600) // 60 + seconds = 0 + return "{:02d}:{:02d}:{:02d}".format( + hours, + minutes, + seconds + ) + + def ex( + self, + command, + input_targets={}, + output_targets={}, + extra_params={}, + inputs_mode='ro', + outputs_mode='rw', + input_mount_point='/mnt/inputs', + output_mount_point='/mnt/outputs'): + if self.containerinfo.engine == 'docker': + return self.ex_docker( + command, + input_targets, + output_targets, + extra_params, + inputs_mode, + outputs_mode, + input_mount_point, + output_mount_point + ) + elif self.containerinfo.engine == 'aws_batch': + return self.ex_aws_batch( + command, + input_targets, + output_targets, + extra_params, + inputs_mode, + outputs_mode, + input_mount_point, + output_mount_point + ) + elif self.containerinfo.engine == 'slurm': + return self.ex_singularity_slurm( + command, + input_targets, + output_targets, + extra_params, + inputs_mode, + outputs_mode, + input_mount_point, + output_mount_point + ) + elif self.containerinfo.engine == 'pbs': + return self.ex_singularity_pbs( + command, + input_targets, + output_targets, + extra_params, + inputs_mode, + outputs_mode, + input_mount_point, + output_mount_point + ) + else: + raise Exception("Container engine {} is invalid".format(self.containerinfo.engine)) + + def ex_singularity_pbs( + self, + command, + input_targets={}, + output_targets={}, + extra_params={}, + inputs_mode='ro', + outputs_mode='rw', + input_mount_point='/mnt/inputs', + output_mount_point='/mnt/outputs' + ): + """ + Run command in the container using singularity on slurm, with mountpoints + command is assumed to be in python template substitution format + """ + mounts, container_paths, DF, UF = self.mounts_CP_DF_UF( + input_targets, + output_targets, + inputs_mode, + outputs_mode, + input_mount_point, + output_mount_point) + # Use singularity_lock to ensure only one singularity image is created at a time + with sciluigi.singularity_lock: + img_location = os.path.join( + self.containerinfo.container_cache, + "{}.singularity.simg".format(self.make_fs_name(self.container)) + ) + log.info("Looking for singularity image {}".format(img_location)) + if not os.path.exists(img_location): + log.info("No image at {} Creating....".format(img_location)) + try: + os.makedirs(os.path.dirname(img_location)) + except FileExistsError: + # No big deal + pass + # Singularity is dumb and can only pull images to the working dir + # So, get our current working dir. + cwd = os.getcwd() + # Move to our target dir + os.chdir(os.path.dirname(img_location)) + # Attempt to pull our image + pull_proc = subprocess.run( + [ + 'singularity', + 'pull', + '--name', + os.path.basename(img_location), + "docker://{}".format(self.container) + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + log.info(pull_proc) + # Move back + os.chdir(cwd) + + template_dict = container_paths.copy() + template_dict.update(extra_params) + command = Template(command).substitute(template_dict) + + log.info( + "Attempting to run {} in {}".format( + command, + self.container + ) + ) + command_list = [ + 'singularity', 'exec', '--contain', '-e', '--scratch', self.containerinfo.container_working_dir, + ] + for mp in mounts: + command_list += ['-B', "{}:{}:{}".format(mp, mounts[mp]['bind'], mounts[mp]['mode'])] + command_list.append(img_location) + command_list += ['bucket_command_wrapper', '-c', shlex.quote(command)] + for uf in UF: + command_list += ['-UF', uf] + for df in DF: + command_list += ['-DF', df] + + # Write the command to a script for PBS / QSUB to consume + + with tempfile.NamedTemporaryFile( + mode='wt', + dir=self.containerinfo.pbs_scriptpath, + delete=False + ) as script_h: + # Make executable, readable, and writable by owner + os.chmod( + script_h.name, + stat.S_IRUSR | + stat.S_IWUSR | + stat.S_IXUSR + ) + script_h.write("#!/bin/bash\n") + script_h.write(" ".join(command_list)) + script_h.close() + command_proc = subprocess.run( + [ + 'qsub', + '-I', + '-x', + '-V', + '-A', self.containerinfo.pbs_account, + '-q', self.containerinfo.pbs_queue, + '-l', + 'nodes={}:ppn={},mem={}gb,walltime={}'.format( + 1, + self.containerinfo.vcpu, + int(self.containerinfo.mem / 1024), + self.containerinfo.timeout * 60 + ), + script_h.name + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + os.unlink(script_h.name) + log.info(command_proc.stdout) + if command_proc.stderr: + log.warn(command_proc.stderr) + + def ex_singularity_slurm( + self, + command, + input_targets={}, + output_targets={}, + extra_params={}, + inputs_mode='ro', + outputs_mode='rw', + input_mount_point='/mnt/inputs', + output_mount_point='/mnt/outputs'): + """ + Run command in the container using singularity on slurm, with mountpoints + command is assumed to be in python template substitution format + """ + mounts, container_paths, DF, UF = self.mounts_CP_DF_UF( + input_targets, + output_targets, + inputs_mode, + outputs_mode, + input_mount_point, + output_mount_point) + + with sciluigi.singularity_lock: + img_location = os.path.join( + self.containerinfo.container_cache, + "{}.singularity.img".format(self.make_fs_name(self.container)) + ) + log.info("Looking for singularity image {}".format(img_location)) + if not os.path.exists(img_location): + log.info("No image at {} Creating....".format(img_location)) + try: + os.makedirs(os.path.dirname(img_location)) + except FileExistsError: + # No big deal + pass + # Singularity is dumb and can only pull images to the working dir + # So, get our current working dir. + cwd = os.getcwd() + # Move to our target dir + os.chdir(os.path.dirname(img_location)) + # Attempt to pull our image + pull_proc = subprocess.run( + [ + 'singularity', + 'pull', + '--name', + os.path.basename(img_location), + "docker://{}".format(self.container) + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + log.info(pull_proc) + # Move back + os.chdir(cwd) + + template_dict = container_paths.copy() + template_dict.update(extra_params) + command = Template(command).substitute(template_dict) + + log.info("Attempting to run {} in {}".format( + command, + self.container + )) + + command_list = [ + 'singularity', 'exec', '--contain', '-e', '--scratch', self.containerinfo.container_working_dir, + ] + for mp in mounts: + command_list += ['-B', "{}:{}:{}".format(mp, mounts[mp]['bind'], mounts[mp]['mode'])] + command_list.append(img_location) + command_list += ['bucket_command_wrapper', '-c', command] + for uf in UF: + command_list += ['-UF', uf] + for df in DF: + command_list += ['-DF', df] + + if not self.containerinfo.slurm_partition: # No slurm partition. Run without slurm + command_proc = subprocess.run( + command_list, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + else: + """ + out_fn = os.path.join( + next(tempfile._get_candidate_names()) + ) + command_proc = subprocess.run( + [ + 'sbatch', + '-c', str(self.containerinfo.vcpu), + '--mem={}M'.format(self.containerinfo.mem), + '-t', str(self.containerinfo.timeout), + '-p', self.containerinfo.slurm_partition, + '--wait', + '--output={}'.format(out_fn) + ], + input="#!/bin/bash\n"+subprocess.list2cmdline(command_list)+"\n", + encoding='ascii' + ) + if command_proc.returncode == 0 and os.path.exists(out_fn): + log.info( + open(out_fn, 'rt').read() + ) + elif command_proc.returncode != 0 and os.path.exists(out_fn): + log.error( + open(out_fn, 'rt').read() + ) + try: + os.remove(out_fn) + except: + pass + """ + command_proc = subprocess.run( + [ + 'srun', + '-c', str(self.containerinfo.vcpu), + '--mem={}M'.format(self.containerinfo.mem), + '-t', str(self.containerinfo.timeout), + '-p', self.containerinfo.slurm_partition, + ] + command_list, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + log.info(command_proc.stdout) + if command_proc.stderr: + log.warn(command_proc.stderr) + + def ex_aws_batch( + self, + command, + input_targets={}, + output_targets={}, + extra_params={}, + inputs_mode='ro', + outputs_mode='rw', + input_mount_point='/working/inputs', + output_mount_point='/working/outputs'): + """ + Run a command in a container using AWS batch. + Handles uploading of files to / from s3 and then into the container. + Assumes the container has batch_command_wrapper.py + """ + # + # The steps: + # 1) Upload local input files to S3 scratch bucket/key + # 2) Register / retrieve the job definition + # 3) submit the job definition with parameters filled with this specific command + # 4) Retrieve the output paths from the s3 scratch bucket / key + # + + # Only import AWS libs as needed + import boto3 + batch_client = boto3.client('batch') + s3_client = boto3.client('s3') + # And batch_task_watcher from module + batch_task_watcher = sciluigi.getBatchTaskWatcher() + + if self.containerinfo.aws_batch_job_prefix is None: + run_uuid = str(uuid.uuid4()) + else: + run_uuid = "{}-{}".format( + self.containerinfo.aws_batch_job_prefix, + str(uuid.uuid4()) + ) + # Get a task-specific working dir + input_container_path = os.path.join( + self.containerinfo.container_working_dir, + run_uuid, + 'inputs' + ) + output_container_path = os.path.join( + self.containerinfo.container_working_dir, + run_uuid, + 'outputs' + ) + + # We need mappings for both to and from S3 and from S3 to within the container + # <-> <-> + # The script in the container, bucket_command_wrapper.py, handles the second half + # practically, but we need to provide the link s3://bucket/key::/container/path/file::mode + # the first half we have to do here. + + container_paths = {} # Dict key is command template key. Value is in-container path + UF = set() # Set of UF lines to be added. Format is container_path::bucket_file_uri + DF = set() # Set of UF lines to be added. Format is bucket_file_uri::container_path::mode + needs_s3_download = set() # Set of Tuples. (s3::/bucket/key, target) + s3_temp_to_be_deleted = set() # S3 paths to be deleted. + + # Group our output targets by schema + output_target_maps = self.map_targets_to_container( + output_targets, + ) + # Make our container paths + for schema, schema_targets in output_target_maps.items(): + for k, relpath in schema_targets['relpaths'].items(): + container_paths[k] = os.path.join( + output_container_path, + schema, + relpath + ) + # Inputs too + # Group by schema + input_target_maps = self.map_targets_to_container( + input_targets, + ) + # Make our container paths + for schema, schema_targets in input_target_maps.items(): + for k, relpath in schema_targets['relpaths'].items(): + container_paths[k] = os.path.join( + input_container_path, + schema, + relpath + ) + # Container paths should be done now. + + # Now the need to handle our mapping to-from S3. + # Inputs + for scheme, schema_targets in input_target_maps.items(): + if scheme == 's3': # Already coming from S3. Just make our DF entry + for k, target in schema_targets['targets'].items(): + DF.add('{}::{}::{}'.format( + target.path, + container_paths[k], + inputs_mode + )) + else: # NOT in S3. Will need to be upload to a temp location + for k, target in schema_targets['targets'].items(): + s3_temp_loc = os.path.join( + self.containerinfo.aws_s3_scratch_loc, + run_uuid, + scheme, + 'in', + schema_targets['relpaths'][k] + ) + # Add to DF for inside the container + DF.add('{}::{}::{}'.format( + s3_temp_loc, + container_paths[k], + inputs_mode + )) + # If we are read-write, we can add this to our todo list later + if inputs_mode == 'rw': + needs_s3_download.add(( + s3_temp_loc, + target + )) + # And actually upload to the S3 temp location now + if scheme == 'file' or scheme == '': + s3_client.upload_file( + Filename=os.path.abspath(target.path), + Bucket=urlsplit(s3_temp_loc).netloc, + Key=urlsplit(s3_temp_loc).path.strip('/'), + ExtraArgs={ + 'ServerSideEncryption': 'AES256' + } + ) + else: + # Have to use BytesIO because luigi targets can ONLY be opened in + # text mode, and upload / download fileobj can ONLY accept binary mode files + # For reasons. + s3_client.upload_fileobj( + Fileobj=io.BytesIO( + target.open('r').read().encode('utf-8') + ), + Bucket=urlsplit(s3_temp_loc).netloc, + Key=urlsplit(s3_temp_loc).path.strip('/'), + ExtraArgs={ + 'ServerSideEncryption': 'AES256' + } + ) + s3_temp_to_be_deleted.add(s3_temp_loc) + + # Outputs + for scheme, schema_targets in output_target_maps.items(): + if scheme == 's3': # Already going to S3. Just make our UF entry + for k, target in schema_targets['targets'].items(): + UF.add('{}::{}'.format( + container_paths[k], + target.path, + )) + else: + # NOT ending in S3. Will need to download to target + # and make a temp destination in s3 + for k, target in schema_targets['targets'].items(): + s3_temp_loc = os.path.join( + self.containerinfo.aws_s3_scratch_loc, + run_uuid, + scheme, + 'out', + schema_targets['relpaths'][k] + ) + # Add to UF for inside the container + UF.add('{}::{}'.format( + container_paths[k], + s3_temp_loc + )) + # add this to our download from s3 list later + needs_s3_download.add(( + s3_temp_loc, + target + )) + s3_temp_to_be_deleted.add(s3_temp_loc) + + # 2) Register / retrieve job definition for this container, command, and job role arn + + # Make a UUID based on the container / command + job_def_name = "sl_containertask__{}".format( + uuid.uuid5( + uuid.NAMESPACE_URL, + self.container + str(self.containerinfo.mounts) + self.containerinfo.aws_jobRoleArn + ) + ) + + # Search to see if this job is ALREADY defined. + boto_tries = 0 + while boto_tries < self.containerinfo.aws_boto_max_tries: + boto_tries += 1 + try: + job_def_search = batch_client.describe_job_definitions( + maxResults=1, + status='ACTIVE', + jobDefinitionName=job_def_name, + ) + break + + except ClientError as e: + log.info("Caught boto3 client error, sleeping for 10 seconds ({})".format( + e.response['Error']['Message'] + )) + time.sleep(self.containerinfo.aws_batch_job_poll_sec) + + if len(job_def_search['jobDefinitions']) == 0: + # Not registered yet. Register it now + log.info( + """Registering job definition for {} with role {} and mounts {} under name {} + """.format( + self.container, + self.containerinfo.aws_jobRoleArn, + self.containerinfo.mounts, + job_def_name, + )) + # To be passed along for container properties + aws_volumes = [] + aws_mountPoints = [] + for (host_path, container_details) in self.containerinfo.mounts.items(): + name = str(uuid.uuid5(uuid.NAMESPACE_URL, host_path)) + aws_volumes.append({ + 'host': {'sourcePath': host_path}, + 'name': name + }) + if container_details['mode'].lower() == 'ro': + read_only = True + else: + read_only = False + aws_mountPoints.append({ + 'containerPath': container_details['bind'], + 'sourceVolume': name, + 'readOnly': read_only, + }) + log.info("AWS Volumes: {}".format(str(aws_volumes))) + log.info("AWS mounts: {}".format(str(aws_mountPoints))) + boto_tries = 0 + while boto_tries < self.containerinfo.aws_boto_max_tries: + boto_tries += 1 + try: + batch_client.register_job_definition( + jobDefinitionName=job_def_name, + type='container', + containerProperties={ + 'image': self.container, + 'vcpus': 1, + 'memory': 1024, + 'command': shlex.split(command), + 'jobRoleArn': self.containerinfo.aws_jobRoleArn, + 'mountPoints': aws_mountPoints, + 'volumes': aws_volumes + }, + timeout={ + 'attemptDurationSeconds': self.containerinfo.timeout * 60 + } + ) + break + + except ClientError as e: + log.info("Caught boto3 client error, sleeping for 10 seconds ({})".format( + e.response['Error']['Message'] + )) + time.sleep(self.containerinfo.aws_batch_job_poll_sec) + + else: # Already registered + aws_job_def = job_def_search['jobDefinitions'][0] + log.info('Found job definition for {} with job role {} under name {}'.format( + aws_job_def['containerProperties']['image'], + aws_job_def['containerProperties']['jobRoleArn'], + job_def_name, + )) + + # Build our container command list + template_dict = container_paths.copy() + template_dict.update(extra_params) + container_command_list = [ + 'bucket_command_wrapper', + '--command', Template(command).safe_substitute(template_dict) + ] + # Add in our inputs + for df in DF: + container_command_list += [ + '-DF', + df + ] + + # And our outputs + for uf in UF: + container_command_list += [ + '-UF', + uf + ] + + # Submit the job + boto_tries = 0 + while boto_tries < self.containerinfo.aws_boto_max_tries: + boto_tries += 1 + try: + job_submission = batch_client.submit_job( + jobName=run_uuid, + jobQueue=self.containerinfo.aws_batch_job_queue, + jobDefinition=job_def_name, + containerOverrides={ + 'vcpus': self.containerinfo.vcpu, + 'memory': self.containerinfo.mem, + 'command': container_command_list, + }, + ) + break + + except ClientError as e: + log.info("Caught boto3 client error, sleeping for 10 seconds ({})".format( + e.response['Error']['Message'] + )) + time.sleep(self.containerinfo.aws_batch_job_poll_sec) + + job_submission_id = job_submission.get('jobId') + log.info("Running {} under jobId {}".format( + container_command_list, + job_submission_id + )) + # Wait for the job here + job_final_status = batch_task_watcher.waitOnJob( + job_submission_id + ) + if job_final_status != 'SUCCEEDED': + log.error("Job {} failed with status".format( + job_submission_id, + job_final_status + )) + return + # Implicit else we succeeded + # Now we need to copy back from S3 to our local filesystem + for (s3_loc, target) in needs_s3_download: + if target.scheme == 'file': + try: + os.makedirs( + os.path.dirname( + target.path + ) + ) + except FileExistsError: + pass + s3_client.download_file( + Bucket=urlsplit(s3_loc).netloc, + Key=urlsplit(s3_loc).path.strip('/'), + Filename=os.path.abspath(target.path), + ) + else: + with target.open('w') as target_h: + s3_client.download_file( + Bucket=urlsplit(s3_loc).netloc, + Key=urlsplit(s3_loc).path.strip('/'), + Fileobj=target_h, + ) + # Cleanup the temp S3 + for s3_path in s3_temp_to_be_deleted: + s3_client.delete_object( + Bucket=urlsplit(s3_path).netloc, + Key=urlsplit(s3_path).path.strip('/'), + ) + + # And done + + def ex_docker( + self, + command, + input_targets={}, + output_targets={}, + extra_params={}, + inputs_mode='ro', + outputs_mode='rw', + input_mount_point='/mnt/inputs', + output_mount_point='/mnt/outputs'): + """ + Run command in the container using docker, with mountpoints + command is assumed to be in python template substitution format + """ + client = docker.from_env() + + mounts, container_paths, DF, UF = self.mounts_CP_DF_UF( + input_targets, + output_targets, + inputs_mode, + outputs_mode, + input_mount_point, + output_mount_point) + + template_dict = container_paths.copy() + template_dict.update(extra_params) + command = Template(command).substitute(template_dict) + + command_list = [ + 'bucket_command_wrapper', + '--command', command, + ] + for df in DF: + command_list.append('-DF') + command_list.append(df) + for uf in UF: + command_list.append('-UF') + command_list.append(uf) + + try: + log.info("Attempting to run {} in {} with mounts {}".format( + command_list, + self.container, + mounts, + )) + stdout = client.containers.run( + image=self.container, + command=command_list, + volumes=mounts, + mem_limit="{}m".format(self.containerinfo.mem), + ) + log.info(stdout) + return (0, stdout, "") + except docker.errors.ContainerError as e: + log.error("Non-zero return code from the container: {}".format(e)) + return (-1, "", "") + except docker.errors.ImageNotFound: + log.error("Could not find container {}".format( + self.container) + ) + return (-1, "", "") + except docker.errors.APIError as e: + log.error("Docker Server failed {}".format(e)) + return (-1, "", "") + except Exception as e: + log.error("Unknown error occurred: {}".format(e)) + return (-1, "", "") + + +# ================================================================================ + +class ContainerTask(ContainerHelpers, sciluigi.task.Task): + ''' + luigi task that includes the ContainerHelpers mixin. + ''' + pass diff --git a/sciluigi/dependencies.py b/sciluigi/dependencies.py old mode 100644 new mode 100755 index 2a83454..a41d5bd --- a/sciluigi/dependencies.py +++ b/sciluigi/dependencies.py @@ -8,8 +8,14 @@ from luigi.contrib.s3 import S3Target from luigi.six import iteritems +try: + from urlparse import urlsplit +except ImportError: + from urllib.parse import urlsplit + # ============================================================================== + class TargetInfo(object): ''' Class to be used for sending specification of which target, from which @@ -32,6 +38,35 @@ def open(self, *args, **kwargs): # ============================================================================== + +class ContainerTargetInfo(TargetInfo): + ''' + Class to be used for sending specification of which target, from which + task, to use, when stitching workflow tasks' outputs and inputs together. + Accepts a url as a path, and then can properly create the proper target type + for a given scheme (e.g. s3, file, etc) + ''' + scheme = None + + def __init__(self, task, path, format=None, is_tmp=False, client=None): + self.task = task + self.path = path + sr = urlsplit(path) + self.scheme = sr.scheme + + if sr.scheme == 's3': + self.target = S3Target(path, format=format, client=client) + elif sr.scheme == 'file' or sr.scheme == '': + self.target = luigi.LocalTarget(path, format, is_tmp) + self.scheme = 'file' + self.path = sr.path + else: + raise ValueError("URL scheme {} is not supported".format(sr.scheme)) + + +# ============================================================================== + + class S3TargetInfo(TargetInfo): def __init__(self, task, path, format=None, client=None): self.task = task @@ -40,6 +75,7 @@ def __init__(self, task, path, format=None, client=None): # ============================================================================== + class PostgresTargetInfo(TargetInfo): def __init__(self, task, host, database, user, password, update_id, table=None, port=None): self.task = task @@ -50,10 +86,19 @@ def __init__(self, task, host, database, user, password, update_id, table=None, self.update_id = update_id self.table = table self.port = port - self.target = PostgresTarget(host=host, database=database, user=user, password=password, table=table, update_id=update_id, port=port) + self.target = PostgresTarget( + host=host, + database=database, + user=user, + password=password, + table=table, + update_id=update_id, + port=port + ) # ============================================================================== + class DependencyHelpers(object): ''' Mixin implementing methods for supporting dynamic, and target-based diff --git a/sciluigi/interface.py b/sciluigi/interface.py old mode 100644 new mode 100755 index cca0532..3d1db01 --- a/sciluigi/interface.py +++ b/sciluigi/interface.py @@ -11,6 +11,7 @@ LOGFMT_SCILUIGI = '%(asctime)s %(levelname)8s SCILUIGI %(message)s' DATEFMT = '%Y-%m-%d %H:%M:%S' + def setup_logging(): ''' Set up SciLuigi specific logging @@ -42,21 +43,26 @@ def setup_logging(): luigi_logger.addHandler(luigi_file_handler) luigi_logger.addHandler(stream_handler) luigi_logger.setLevel(logging.WARN) - luigi.interface.setup_interface_logging.has_run = True + try: + luigi.interface.setup_interface_logging.has_run = True + except: + pass sciluigi_logger = logging.getLogger('sciluigi-interface') sciluigi_logger.addHandler(stream_handler) sciluigi_logger.addHandler(sciluigi_file_handler) - sciluigi_logger.setLevel(logging.DEBUG) + sciluigi_logger.setLevel(logging.INFO) setup_logging() + def run(*args, **kwargs): ''' Forwarding luigi's run method ''' luigi.run(*args, **kwargs) + def run_local(*args, **kwargs): ''' Forwarding luigi's run method, with local scheduler diff --git a/sciluigi/parameter.py b/sciluigi/parameter.py old mode 100644 new mode 100755 diff --git a/sciluigi/slurm.py b/sciluigi/slurm.py old mode 100644 new mode 100755 diff --git a/sciluigi/task.py b/sciluigi/task.py old mode 100644 new mode 100755 index 6a9e4f6..42609e4 --- a/sciluigi/task.py +++ b/sciluigi/task.py @@ -15,12 +15,14 @@ # ============================================================================== + def new_task(name, cls, workflow_task, **kwargs): ''' Instantiate a new task. Not supposed to be used by the end-user (use WorkflowTask.new_task() instead). ''' slurminfo = None + containerinfo = None for key, val in [(key, val) for key, val in iteritems(kwargs)]: # Handle non-string keys if not isinstance(key, string_types): @@ -29,19 +31,26 @@ def new_task(name, cls, workflow_task, **kwargs): if isinstance(val, sciluigi.slurm.SlurmInfo): slurminfo = val kwargs[key] = val + if isinstance(val, sciluigi.containertask.ContainerInfo): + containerinfo = val + kwargs[key] = val elif not isinstance(val, string_types): try: - kwargs[key] = json.dumps(val) # Force conversion into string + kwargs[key] = json.dumps(val) # Force conversion into string except TypeError: kwargs[key] = str(val) kwargs['instance_name'] = name kwargs['workflow_task'] = workflow_task kwargs['slurminfo'] = slurminfo + kwargs['containerinfo'] = containerinfo newtask = cls.from_str_params(kwargs) if slurminfo is not None: newtask.slurminfo = slurminfo + if containerinfo is not None: + newtask.containerinfo = containerinfo return newtask + class Task(sciluigi.audit.AuditTrailHelpers, sciluigi.dependencies.DependencyHelpers, luigi.Task): ''' SciLuigi Task, implementing SciLuigi specific functionality for dependency resolution @@ -88,6 +97,7 @@ def ex(self, command): # ============================================================================== + class ExternalTask( sciluigi.audit.AuditTrailHelpers, sciluigi.dependencies.DependencyHelpers, diff --git a/sciluigi/util.py b/sciluigi/util.py old mode 100644 new mode 100755 diff --git a/sciluigi/workflow.py b/sciluigi/workflow.py old mode 100644 new mode 100755 diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index 02e3f24..9e54fbc --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ setup( name='sciluigi', - version='0.9.6b7', + version='2.0.1', description='Helper library for writing dynamic, flexible workflows in luigi', long_description=long_description, author='Samuel Lampa', @@ -30,7 +30,10 @@ 'sciluigi', ], install_requires=[ - 'luigi' + 'luigi', + 'boto3', + 'mongo', + 'docker', ], classifiers=[ 'Development Status :: 4 - Beta', diff --git a/test/test_dependencies.py b/test/test_dependencies.py old mode 100644 new mode 100755 diff --git a/test/test_paramval.py b/test/test_paramval.py old mode 100644 new mode 100755 diff --git a/tools/.logging.conf.template b/tools/.logging.conf.template old mode 100644 new mode 100755 diff --git a/tools/bucket_command_wrapper.py b/tools/bucket_command_wrapper.py new file mode 100755 index 0000000..d6adbc6 --- /dev/null +++ b/tools/bucket_command_wrapper.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python +import argparse +import os +import sys +import re +import subprocess + +# +# Script to be placed in containers to help with making mount points +# and pulling / pushing to buckets (eg S3) to accomodate utilities +# that are unaware of S3 / buckets in containers +# (to run on things like AWS Batch) +# + + +class BCW(): + INPUT_RE = re.compile( + r'^(?P\w+)://(?P[^/]+)/(?P.+?(?P[^/]+))::(?P[^\0]+)::(?Prw|ro)$' + ) + OUTPUT_RE = re.compile( + r'^(?P[^\0]+)::(?P\w+)://(?P[^/]+)/(?P.+?(?P[^/]+))$' + ) + VALID_BUCKET_PROVIDERS = ( + 's3', + ) + + def __init__(self): + parser = self.build_parser() + args = parser.parse_args() + + if args.command: + self.command = args.command + elif os.environ.get('bcw_command'): + self.command = os.environ.get('bcw_command').strip() + else: + raise Exception(""" + No command provided on command line or as an environmental variable (bcw_command) + """) + + print(type(args.download_files)) + print(args.download_files) + + if not args.download_files: + self.download_files = [] + else: + raw_download_files = [f.strip() for f in args.download_files if f.strip() != ""] + if len(raw_download_files) > 0: + self.download_files = self.parse_download_files(raw_download_files) + else: + self.download_files = [] + + if not args.upload_files: + self.upload_files = [] + else: + raw_upload_files = [f.strip() for f in args.upload_files if f.strip() != ""] + if len(raw_upload_files) > 0: + self.upload_files = self.parse_upload_files(raw_upload_files) + else: + self.upload_files = [] + + # Download from the bucket + self.download_files_from_bucket() + + # Run the command + subprocess.run( + args.command, + shell=True + ) + + # Upload files + self.upload_files_to_bucket() + + def build_parser(self): + parser = argparse.ArgumentParser(description=""" + Wrapper to pull from buckets, run a command, and push back to buckets. + example: + bucket_command_wrapper.py -c 'echo hello' \ + -DF s3://bucket/key/path.txt::/mnt/inputs/path.txt::rw \ + s3://bucket/key/path2.txt::/mnt/inputs/path2.txt::ro \ + -UF /mnt/outputs/path.txt::s3://bucket/key/path.txt + """) + + if len(sys.argv) < 2: + parser.print_help() + + # Implicit else + parser.add_argument( + '--command', + '-c', + type=str, + help=""" + Command to be run AFTER downloads BEFORE uploads. + Please enclose in quotes. + Will be passed unaltered as a shell command. + Can also be provided as an environmental variable bcw_command""" + ) + parser.add_argument( + '--download-files', + '-DF', + action='append', + help="""Format is + bucket_file_uri::container_path::mode + Where mode can be 'ro' or 'rw'. + If 'rw' the file will be pushed back to the bucket after the command + IF 'ro, the file will only be pulled from the bucket + e.g: s3://bucket/key/path.txt::/mnt/inputs/path.txt::ro""", + ) + parser.add_argument( + '--upload-files', + '-UF', + action='append', + help="""Format is + container_path::bucket_file_uri + Mode is presumed to be w. (If you want rw / a / use input in mode 'rw') + e.g: /mnt/outputs/path.txt::s3://bucket/key/path.txt""", + ) + + return parser + + def parse_upload_files(self, raw_upload_files): + upload_files = [] + for d in raw_upload_files: + m = self.OUTPUT_RE.search(d.strip()) + if not m: + raise Exception("Invalid upload file {}".format(d)) + # Implicit else + bucket_provider = m.group('bucket_provider').lower() + bucket = m.group('bucket') + key = m.group('key') + bucket_fn = m.group('bucket_fn') + container_path = os.path.abspath(m.group('container_path')) + + # Be sure this is one of the providers we know how to handle + if bucket_provider not in self.VALID_BUCKET_PROVIDERS: + raise Exception("Invalid bucket provider {}. Valid choices are {}".format( + bucket_provider, + ", ".join(self.VALID_BUCKET_PROVIDERS) + )) + + # Be sure we can create the path to the proposed container mount point + try: + os.makedirs( + os.path.dirname( + container_path + ) + ) + except FileExistsError: + # Fine if this path already exists + pass + + upload_files.append({ + 'bucket_provider': bucket_provider, + 'bucket': bucket, + 'key': key, + 'bucket_fn': bucket_fn, + 'container_path': container_path, + }) + return(upload_files) + + def parse_download_files(self, raw_download_files): + download_files = [] + for d in raw_download_files: + m = self.INPUT_RE.search(d.strip()) + if not m: + raise Exception("Invalid download file {}".format(d)) + # Implicit else + bucket_provider = m.group('bucket_provider') + bucket = m.group('bucket') + key = m.group('key') + bucket_fn = m.group('bucket_fn') + container_path = os.path.abspath(m.group('container_path')) + mode = m.group('mode').lower() + + # Be sure this is one of the providers we know how to handle + if bucket_provider not in self.VALID_BUCKET_PROVIDERS: + raise Exception("Invalid bucket provider {}. Valid choices are {}".format( + bucket_provider, + ", ".join(self.VALID_BUCKET_PROVIDERS) + )) + + # Be sure we can create the path to the proposed container mount point + try: + os.makedirs( + os.path.dirname( + container_path + ) + ) + except FileExistsError: + # Fine if this path already exists + pass + + download_files.append({ + 'bucket_provider': bucket_provider, + 'bucket': bucket, + 'key': key, + 'bucket_fn': bucket_fn, + 'container_path': container_path, + 'mode': mode, + }) + return(download_files) + + def download_file_s3(self, df): + import boto3 + s3_client = boto3.client('s3') + s3_client.download_file( + Bucket=df['bucket'], + Key=df['key'], + Filename=df['container_path'], + ) + + def download_files_from_bucket(self): + for df in self.download_files: + if df['bucket_provider'] == 's3': + self.download_file_s3(df) + else: + raise Exception("Invalid bucket provider {}".format( + df['bucket_provider']) + ) + + def upload_file_s3(self, df): + import boto3 + s3_client = boto3.client('s3') + s3_client.upload_file( + Bucket=df['bucket'], + Key=df['key'], + Filename=df['container_path'], + ) + + def upload_files_to_bucket(self): + upload_files = self.upload_files+[f for f in self.download_files if f['mode']=='rw'] + for df in upload_files: + if df['bucket_provider'] == 's3': + self.upload_file_s3(df) + else: + raise Exception("Invalid bucket provider {}".format( + df['bucket_provider']) + ) + + +def main(): + """Entrypoint for main script.""" + BCW() + +if __name__ == "__main__": + main() diff --git a/tools/init_projdir.py b/tools/init_projdir.py old mode 100644 new mode 100755