[med-svn] [snakemake] 01/05: Imported Upstream version 3.7.1+dfsg

Kevin Murray daube-guest at moszumanska.debian.org
Tue Jun 28 05:07:39 UTC 2016


This is an automated email from the git hooks/post-receive script.

daube-guest pushed a commit to branch master
in repository snakemake.

commit 944514417a5c6f0c73994869e17e9f798e31c277
Author: Kevin Murray <spam at kdmurray.id.au>
Date:   Tue Jun 28 14:53:12 2016 +1000

    Imported Upstream version 3.7.1+dfsg
---
 CHANGELOG.md             | 14 ++++++++++++++
 environment.yml          | 15 +++++++++++++++
 setup.py                 |  2 +-
 snakemake/__init__.py    | 39 ++++++++++++++++++++++++++++++---------
 snakemake/common.py      |  1 +
 snakemake/dag.py         |  3 ++-
 snakemake/executors.py   | 40 +++++++++++++++++++++++++++++++++-------
 snakemake/io.py          | 11 +++++------
 snakemake/jobs.py        | 15 ++++++++++-----
 snakemake/logging.py     | 27 +++++++++++++++++++++------
 snakemake/remote/HTTP.py | 14 ++++++++++----
 snakemake/rules.py       | 16 ++++++----------
 snakemake/scheduler.py   |  7 +++++--
 snakemake/utils.py       | 24 ++++++++++++++++++++++++
 snakemake/version.py     |  2 +-
 snakemake/workflow.py    | 12 +++++++-----
 16 files changed, 185 insertions(+), 57 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 581916d..43dddda 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,19 @@
 # Change Log
 
+## [3.7.1] - 2016-05-16
+### Changed
+- Fixed a missing import of the multiprocessing module.
+
+## [3.7.0] - 2016-05-05
+### Added
+- The entries in `resources` and the `threads` job attribute can now be callables that must return `int` values.
+- Multiple `--cluster-config` arguments can be given to the Snakemake command line. Later one override earlier ones.
+- In the API, multiple `cluster_config` paths can be given as a list, alternatively to the previous behaviour of expecting one string for this parameter.
+- When submitting cluster jobs (either through `--cluster` or `--drmaa`), you can now use `--max-jobs-per-second` to limit the number of jobs being submitted (also available through Snakemake API). Some cluster installations have problems with too many jobs per second.
+- Wildcard values are now printed upon job execution in addition to input and output files.
+### Changed
+- Fixed a bug with HTTP remote providers.
+
 ## [3.6.1] - 2016-04-08
 ### Changed
 - Work around missing RecursionError in Python < 3.5
diff --git a/environment.yml b/environment.yml
new file mode 100644
index 0000000..828922d
--- /dev/null
+++ b/environment.yml
@@ -0,0 +1,15 @@
+channels:
+  - bioconda
+  - r
+dependencies:
+  - rpy2 >=0.7.6
+  - boto
+  - moto
+  - httpretty ==0.8.10
+  - filechunkio
+  - pyyaml
+  - nose
+  - ftputil
+  - pysftp
+  - requests
+  - dropbox
diff --git a/setup.py b/setup.py
index 6e8abeb..3536c17 100644
--- a/setup.py
+++ b/setup.py
@@ -53,7 +53,7 @@ setup(
          "snakemake-bash-completion = snakemake:bash_completion"]
     },
     package_data={'': ['*.css', '*.sh', '*.html']},
-    tests_require=['rpy2', 'docutils', 'nose>=1.3', 'boto>=2.38.0', 'filechunkio>=1.6', 
+    tests_require=['rpy2', 'httpretty==0.8.10', 'docutils', 'nose>=1.3', 'boto>=2.38.0', 'filechunkio>=1.6', 
                      'moto>=0.4.14', 'ftputil>=3.2', 'pysftp>=0.2.8', 'requests>=2.8.1', 'dropbox>=5.2'],
     cmdclass={'test': NoseTestCommand},
     classifiers=
diff --git a/snakemake/__init__.py b/snakemake/__init__.py
index 2e8a2ad..a5f040a 100644
--- a/snakemake/__init__.py
+++ b/snakemake/__init__.py
@@ -9,7 +9,6 @@ import glob
 import argparse
 from argparse import ArgumentError
 import logging as _logging
-import multiprocessing
 import re
 import sys
 import inspect
@@ -23,6 +22,7 @@ from snakemake.logging import setup_logger, logger
 from snakemake.version import __version__
 from snakemake.io import load_configfile
 from snakemake.shell import shell
+from snakemake.utils import update_config, available_cpu_count
 
 
 def snakemake(snakefile,
@@ -93,6 +93,7 @@ def snakemake(snakefile,
               updated_files=None,
               log_handler=None,
               keep_logger=False,
+              max_jobs_per_second=None,
               verbose=False):
     """Run snakemake on a given snakefile.
 
@@ -125,7 +126,7 @@ def snakemake(snakefile,
         quiet (bool):               do not print any default job information (default False)
         keepgoing (bool):           keep goind upon errors (default False)
         cluster (str):              submission command of a cluster or batch system to use, e.g. qsub (default None)
-        cluster_config (str):       configuration file for cluster options (default None)
+        cluster_config (str,list):  configuration file for cluster options, or list thereof (default None)
         cluster_sync (str):         blocking cluster submission command (like SGE 'qsub -sync y')  (default None)
         drmaa (str):                if not None use DRMAA for cluster support, str specifies native args passed to the cluster when submitting a job
         jobname (str):              naming scheme for cluster job scripts (default "snakejob.{rulename}.{jobid}.sh")
@@ -163,6 +164,7 @@ def snakemake(snakefile,
         updated_files(list):        a list that will be filled with the files that are updated or created during the workflow execution
         verbose(bool):              show additional debug output (default False)
         log_handler (function):     redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
+        max_jobs_per_second:        maximal number of cluster/drmaa jobs per second, None to impose no limit (default None)
 
             :level:
                 the log level ("info", "error", "debug", "progress", "job_info")
@@ -216,8 +218,18 @@ def snakemake(snakefile,
     else:
         nodes = sys.maxsize
 
+    if isinstance(cluster_config, str):
+        # Loading configuration from one file is still supported for
+        # backward compatibility
+        cluster_config = [cluster_config]
     if cluster_config:
-        cluster_config = load_configfile(cluster_config)
+        # Load all configuration files
+        configs = [load_configfile(f) for f in cluster_config]
+        # Merge in the order as specified, overriding earlier values with
+        # later ones
+        cluster_config = configs[0]
+        for other in configs[1:]:
+            update_config(cluster_config, other)
     else:
         cluster_config = dict()
 
@@ -356,6 +368,7 @@ def snakemake(snakefile,
                     cluster_sync=cluster_sync,
                     jobname=jobname,
                     drmaa=drmaa,
+                    max_jobs_per_second=max_jobs_per_second,
                     printd3dag=printd3dag,
                     immediate_submit=immediate_submit,
                     ignore_ambiguity=ignore_ambiguity,
@@ -478,7 +491,7 @@ def get_argument_parser():
     parser.add_argument(
         "--cores", "--jobs", "-j",
         action="store",
-        const=multiprocessing.cpu_count(),
+        const=available_cpu_count(),
         nargs="?",
         metavar="N",
         type=int,
@@ -488,7 +501,7 @@ def get_argument_parser():
     parser.add_argument(
         "--local-cores",
         action="store",
-        default=multiprocessing.cpu_count(),
+        default=available_cpu_count(),
         metavar="N",
         type=int,
         help=
@@ -679,12 +692,15 @@ def get_argument_parser():
     parser.add_argument(
         "--cluster-config", "-u",
         metavar="FILE",
+        default=[],
+        action="append",
         help=
         ("A JSON or YAML file that defines the wildcards used in 'cluster'"
-         "for specific rules, instead of having them specified in the Snakefile."
+         "for specific rules, instead of having them specified in the Snakefile. "
          "For example, for rule 'job' you may define: "
-         "{ 'job' : { 'time' : '24:00:00' } } "
-         "to specify the time for rule 'job'.\n")),
+         "{ 'job' : { 'time' : '24:00:00' } } to specify the time for rule 'job'. "
+         "You can specify more than one file.  The configuration files are merged "
+         "with later values overriding earlier ones.")),
     parser.add_argument(
         "--immediate-submit", "--is",
         action="store_true",
@@ -811,6 +827,10 @@ def get_argument_parser():
         nargs="+",
         help=
         "Only use given rules. If omitted, all rules in Snakefile are used.")
+    parser.add_argument(
+        "--max-jobs-per-second", default=None, type=float,
+        help=
+        "Maximal number of cluster/drmaa jobs per second, default is no limit")
     parser.add_argument('--timestamp', '-T',
                         action='store_true',
                         help='Add a timestamp to all logging output')
@@ -993,7 +1013,8 @@ def main():
                             wait_for_files=args.wait_for_files,
                             keep_target_files=args.keep_target_files,
                             keep_shadow=args.keep_shadow,
-                            allowed_rules=args.allowed_rules)
+                            allowed_rules=args.allowed_rules,
+                            max_jobs_per_second=args.max_jobs_per_second)
 
     if args.profile:
         with open(args.profile, "w") as out:
diff --git a/snakemake/common.py b/snakemake/common.py
new file mode 100644
index 0000000..916866c
--- /dev/null
+++ b/snakemake/common.py
@@ -0,0 +1 @@
+DYNAMIC_FILL = "__snakemake_dynamic__"
diff --git a/snakemake/dag.py b/snakemake/dag.py
index 1b33df1..cdc57a0 100644
--- a/snakemake/dag.py
+++ b/snakemake/dag.py
@@ -24,6 +24,7 @@ from snakemake.exceptions import RemoteFileException, WorkflowError
 from snakemake.exceptions import UnexpectedOutputException, InputFunctionException
 from snakemake.logging import logger
 from snakemake.output_index import OutputIndex
+from snakemake.common import DYNAMIC_FILL
 
 # Workaround for Py <3.5 prior to existence of RecursionError
 try:
@@ -936,7 +937,7 @@ class DAG:
 
         def format_wildcard(wildcard):
             name, value = wildcard
-            if _IOFile.dynamic_fill in value:
+            if DYNAMIC_FILL in value:
                 value = "..."
             return "{}: {}".format(name, value)
 
diff --git a/snakemake/executors.py b/snakemake/executors.py
index 27f1061..310cf50 100644
--- a/snakemake/executors.py
+++ b/snakemake/executors.py
@@ -93,6 +93,7 @@ class AbstractExecutor:
                                                  job.dynamic_output)),
                         log=list(job.log),
                         benchmark=job.benchmark,
+                        wildcards=job.wildcards_dict,
                         reason=str(self.dag.reason(job)),
                         resources=job.resources_dict,
                         priority="highest"
@@ -269,7 +270,9 @@ class ClusterExecutor(RealExecutor):
                  printshellcmds=False,
                  latency_wait=3,
                  benchmark_repeats=1,
-                 cluster_config=None, local_input=None):
+                 cluster_config=None,
+                 local_input=None,
+                 max_jobs_per_second=None):
         local_input = local_input or []
         super().__init__(workflow, dag,
                          printreason=printreason,
@@ -315,6 +318,12 @@ class ClusterExecutor(RealExecutor):
         self.cores = cores if cores else ""
         self.cluster_config = cluster_config if cluster_config else dict()
 
+        self.max_jobs_per_second = max_jobs_per_second
+        if self.max_jobs_per_second:
+            self.rate_lock = threading.RLock()
+            self.rate_interval = 1 / self.max_jobs_per_second
+            self.rate_last_called = 0
+
         self.active_jobs = list()
         self.lock = threading.Lock()
         self.wait = True
@@ -331,7 +340,18 @@ class ClusterExecutor(RealExecutor):
     def cancel(self):
         self.shutdown()
 
+    def _limit_rate(self):
+        """Called in ``_run()`` for rate-limiting"""
+        with self.rate_lock:
+            elapsed = time.clock() - self.rate_last_called
+            wait = self.rate_interval - elapsed
+            if wait > 0:
+                time.sleep(wait)
+            self.rate_last_called = time.clock()
+
     def _run(self, job, callback=None, error_callback=None):
+        if self.max_jobs_per_second:
+            self._limit_rate()
         super()._run(job, callback=callback, error_callback=error_callback)
         logger.shellcmd(job.shellcmd)
 
@@ -420,7 +440,8 @@ class GenericClusterExecutor(ClusterExecutor):
                  quiet=False,
                  printshellcmds=False,
                  latency_wait=3,
-                 benchmark_repeats=1):
+                 benchmark_repeats=1,
+                 max_jobs_per_second=None):
         super().__init__(workflow, dag, cores,
                          jobname=jobname,
                          printreason=printreason,
@@ -428,7 +449,8 @@ class GenericClusterExecutor(ClusterExecutor):
                          printshellcmds=printshellcmds,
                          latency_wait=latency_wait,
                          benchmark_repeats=benchmark_repeats,
-                         cluster_config=cluster_config)
+                         cluster_config=cluster_config,
+                         max_jobs_per_second=max_jobs_per_second)
         self.submitcmd = submitcmd
         self.external_jobid = dict()
         self.exec_job += ' && touch "{jobfinished}" || touch "{jobfailed}"'
@@ -525,7 +547,8 @@ class SynchronousClusterExecutor(ClusterExecutor):
                  quiet=False,
                  printshellcmds=False,
                  latency_wait=3,
-                 benchmark_repeats=1):
+                 benchmark_repeats=1,
+                 max_jobs_per_second=None):
         super().__init__(workflow, dag, cores,
                          jobname=jobname,
                          printreason=printreason,
@@ -533,7 +556,8 @@ class SynchronousClusterExecutor(ClusterExecutor):
                          printshellcmds=printshellcmds,
                          latency_wait=latency_wait,
                          benchmark_repeats=benchmark_repeats,
-                         cluster_config=cluster_config, )
+                         cluster_config=cluster_config,
+                         max_jobs_per_second=max_jobs_per_second)
         self.submitcmd = submitcmd
         self.external_jobid = dict()
 
@@ -609,7 +633,8 @@ class DRMAAExecutor(ClusterExecutor):
                  drmaa_args="",
                  latency_wait=3,
                  benchmark_repeats=1,
-                 cluster_config=None, ):
+                 cluster_config=None,
+                 max_jobs_per_second=None):
         super().__init__(workflow, dag, cores,
                          jobname=jobname,
                          printreason=printreason,
@@ -617,7 +642,8 @@ class DRMAAExecutor(ClusterExecutor):
                          printshellcmds=printshellcmds,
                          latency_wait=latency_wait,
                          benchmark_repeats=benchmark_repeats,
-                         cluster_config=cluster_config, )
+                         cluster_config=cluster_config,
+                         max_jobs_per_second=max_jobs_per_second)
         try:
             import drmaa
         except ImportError:
diff --git a/snakemake/io.py b/snakemake/io.py
index 16ee45c..384e300 100644
--- a/snakemake/io.py
+++ b/snakemake/io.py
@@ -15,6 +15,7 @@ from collections import Iterable, namedtuple
 from snakemake.exceptions import MissingOutputException, WorkflowError, WildcardError, RemoteFileException
 from snakemake.logging import logger
 from inspect import isfunction, ismethod
+from snakemake.common import DYNAMIC_FILL
 
 
 def lstat(f):
@@ -47,8 +48,6 @@ class _IOFile(str):
     A file that is either input or output of a rule.
     """
 
-    dynamic_fill = "__snakemake_dynamic__"
-
     def __new__(cls, file):
         obj = str.__new__(cls, file)
         obj._is_function = isfunction(file) or ismethod(file)
@@ -168,7 +167,7 @@ class _IOFile(str):
             self.remote_object.upload()
 
     def prepare(self):
-        path_until_wildcard = re.split(self.dynamic_fill, self.file)[0]
+        path_until_wildcard = re.split(DYNAMIC_FILL, self.file)[0]
         dir = os.path.dirname(path_until_wildcard)
         if len(dir) > 0 and not os.path.exists(dir):
             try:
@@ -191,7 +190,7 @@ class _IOFile(str):
             lchmod(self.file, mode)
 
     def remove(self, remove_non_empty_dir=False):
-        remove(self.file, remove_non_empty_dir=False)
+        remove(self.file, remove_non_empty_dir)
 
     def touch(self, times=None):
         """ times must be 2-tuple: (atime, mtime) """
@@ -231,7 +230,7 @@ class _IOFile(str):
                             wildcards,
                             fill_missing=fill_missing,
                             fail_dynamic=fail_dynamic,
-                            dynamic_fill=self.dynamic_fill),
+                            dynamic_fill=DYNAMIC_FILL),
             rule=self.rule)
 
         file_with_wildcards_applied.clone_flags(self)
@@ -260,7 +259,7 @@ class _IOFile(str):
         return self.regex().match(target) or None
 
     def format_dynamic(self):
-        return self.replace(self.dynamic_fill, "{*}")
+        return self.replace(DYNAMIC_FILL, "{*}")
 
     def clone_flags(self, other):
         if isinstance(self._file, str):
diff --git a/snakemake/jobs.py b/snakemake/jobs.py
index 4e3a835..53bab86 100644
--- a/snakemake/jobs.py
+++ b/snakemake/jobs.py
@@ -18,6 +18,7 @@ from snakemake.utils import format, listfiles
 from snakemake.exceptions import RuleException, ProtectedOutputException
 from snakemake.exceptions import UnexpectedOutputException
 from snakemake.logging import logger
+from snakemake.common import DYNAMIC_FILL
 
 
 def jobfiles(jobs, type):
@@ -41,11 +42,15 @@ class Job:
          self.ruleio,
          self.dependencies) = rule.expand_wildcards(self.wildcards_dict)
 
-        self.resources_dict = {
-            name: min(
+        self.resources_dict = {}
+        for name, res in rule.resources.items():
+            if callable(res):
+                res = res(self.wildcards)
+                if not isinstance(res, int):
+                    raise ValueError("Callable for resources must return int")
+            self.resources_dict[name] = min(
                 self.rule.workflow.global_resources.get(name, res), res)
-            for name, res in rule.resources.items()
-        }
+
         self.threads = self.resources_dict["_cores"]
         self.resources = Resources(fromdict=self.resources_dict)
         self.shadow_dir = None
@@ -491,7 +496,7 @@ class Job:
         """ Expand dynamic files. """
         return list(listfiles(pattern,
                               restriction=self.wildcards,
-                              omit_value=_IOFile.dynamic_fill))
+                              omit_value=DYNAMIC_FILL))
 
 
 class Reason:
diff --git a/snakemake/logging.py b/snakemake/logging.py
index 9e52546..b604cca 100644
--- a/snakemake/logging.py
+++ b/snakemake/logging.py
@@ -11,6 +11,9 @@ import os
 import json
 from multiprocessing import Lock
 import tempfile
+from functools import partial
+
+from snakemake.common import DYNAMIC_FILL
 
 
 class ColorizingStreamHandler(_logging.StreamHandler):
@@ -171,25 +174,33 @@ class Logger:
 
             yield "{}rule {}:".format("local" if msg["local"] else "",
                                       msg["name"])
-            for item in "input output log".split():
+            for item in ["input", "output", "log"]:
                 fmt = format_item(item, omit=[], valueformat=", ".join)
                 if fmt != None:
                     yield fmt
+
             singleitems = ["benchmark"]
-            if self.printreason:
-                singleitems.append("reason")
             for item in singleitems:
                 fmt = format_item(item, omit=None)
                 if fmt != None:
                     yield fmt
+
+            wildcards = format_wildcards(msg["wildcards"])
+            if wildcards:
+                yield "\twildcards: " + wildcards
+
             for item, omit in zip("priority threads".split(), [0, 1]):
                 fmt = format_item(item, omit=omit)
                 if fmt != None:
                     yield fmt
+
             resources = format_resources(msg["resources"])
             if resources:
                 yield "\tresources: " + resources
 
+            if self.printreason:
+                singleitems.append("reason")
+
         level = msg["level"]
         if level == "info":
             self.logger.warning(msg["msg"])
@@ -228,10 +239,14 @@ class Logger:
             print(json.dumps({"nodes": msg["nodes"], "links": msg["edges"]}))
 
 
-def format_resources(resources, omit_resources="_cores _nodes".split()):
+def format_dict(dict, omit_keys=[], omit_values=[]):
     return ", ".join("{}={}".format(name, value)
-                     for name, value in resources.items()
-                     if name not in omit_resources)
+                     for name, value in dict.items()
+                     if name not in omit_keys and value not in omit_values)
+
+
+format_resources = partial(format_dict, omit_keys={"_cores", "_nodes"})
+format_wildcards = partial(format_dict, omit_values={DYNAMIC_FILL})
 
 
 def format_resource_names(resources, omit_resources="_cores _nodes".split()):
diff --git a/snakemake/remote/HTTP.py b/snakemake/remote/HTTP.py
index 11959f6..af035ee 100644
--- a/snakemake/remote/HTTP.py
+++ b/snakemake/remote/HTTP.py
@@ -56,11 +56,17 @@ class RemoteObject(DomainObject):
         for k,v in self.kwargs.items():
             kwargs_to_use[k] = v
 
+        # Check that in case authentication kwargs are provided, they are either ("username", "password") combination
+        # or "auth", but not both.
+        if kwargs_to_use["username"] and kwargs_to_use["password"] and kwargs_to_use["auth"]:
+            raise TypeError("Authentication accepts either username and password or requests.auth object")
+        # If "username" and "password" kwargs are provided, use those to construct a tuple for "auth". Neither
+        # requests.head() nor requests.get() accept them as-is.
         if kwargs_to_use["username"] and kwargs_to_use["password"]:
-            kwargs_to_use["auth"] = ('user', 'pass')
-        else:
-            del kwargs_to_use["username"]
-            del kwargs_to_use["password"]
+            kwargs_to_use["auth"] = (kwargs_to_use["username"], kwargs_to_use["password"])
+        # Delete "username" and "password" from kwargs
+        del kwargs_to_use["username"]
+        del kwargs_to_use["password"]
 
         url = self._iofile._file + self.additional_request_string
         # default to HTTPS
diff --git a/snakemake/rules.py b/snakemake/rules.py
index 1f40a4e..f4ad9b1 100644
--- a/snakemake/rules.py
+++ b/snakemake/rules.py
@@ -238,18 +238,14 @@ class Rule:
                 self.dependencies[item] = item.rule
             _item = IOFile(item, rule=self)
             if is_flagged(item, "temp"):
-                if not output:
-                    raise SyntaxError("Only output files may be temporary")
-                self.temp_output.add(_item)
+                if output:
+                    self.temp_output.add(_item)
             if is_flagged(item, "protected"):
-                if not output:
-                    raise SyntaxError("Only output files may be protected")
-                self.protected_output.add(_item)
+                if output:
+                    self.protected_output.add(_item)
             if is_flagged(item, "touch"):
-                if not output:
-                    raise SyntaxError(
-                        "Only output files may be marked for touching.")
-                self.touch_output.add(_item)
+                if output:
+                    self.touch_output.add(_item)
             if is_flagged(item, "dynamic"):
                 if output:
                     self.dynamic_output.add(_item)
diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py
index ca7a1ce..fa7f58f 100644
--- a/snakemake/scheduler.py
+++ b/snakemake/scheduler.py
@@ -40,6 +40,7 @@ class JobScheduler:
                  printreason=False,
                  printshellcmds=False,
                  keepgoing=False,
+                 max_jobs_per_second=None,
                  latency_wait=3,
                  benchmark_repeats=1,
                  greediness=1.0):
@@ -111,7 +112,8 @@ class JobScheduler:
                     quiet=quiet,
                     printshellcmds=printshellcmds,
                     latency_wait=latency_wait,
-                    benchmark_repeats=benchmark_repeats, )
+                    benchmark_repeats=benchmark_repeats,
+                    max_jobs_per_second=max_jobs_per_second)
                 if immediate_submit:
                     self.job_reward = self.dryrun_job_reward
                     self._submit_callback = partial(self._proceed,
@@ -128,7 +130,8 @@ class JobScheduler:
                     printshellcmds=printshellcmds,
                     latency_wait=latency_wait,
                     benchmark_repeats=benchmark_repeats,
-                    cluster_config=cluster_config, )
+                    cluster_config=cluster_config,
+                    max_jobs_per_second=max_jobs_per_second)
         else:
             # local execution or execution of cluster job
             # calculate how many parallel workers the executor shall spawn
diff --git a/snakemake/utils.py b/snakemake/utils.py
index fbfa9ed..daf742e 100644
--- a/snakemake/utils.py
+++ b/snakemake/utils.py
@@ -11,6 +11,7 @@ import inspect
 import textwrap
 from itertools import chain
 from collections import Mapping
+import multiprocessing
 
 from snakemake.io import regex, Namedlist
 from snakemake.logging import logger
@@ -259,3 +260,26 @@ def set_protected_output(*rules):
         logger.debug(
             "setting output of rule '{rule}' to protected".format(rule=rule))
         rule.protected_output = set(rule.output)
+
+
+def available_cpu_count():
+    """
+    Return the number of available virtual or physical CPUs on this system.
+    The number of available CPUs can be smaller than the total number of CPUs
+    when the cpuset(7) mechanism is in use, as is the case on some cluster
+    systems.
+
+    Adapted from http://stackoverflow.com/a/1006301/715090
+    """
+    try:
+        with open('/proc/self/status') as f:
+            status = f.read()
+        m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', status)
+        if m:
+            res = bin(int(m.group(1).replace(',', ''), 16)).count('1')
+            if res > 0:
+                return res
+    except IOError:
+        pass
+
+    return multiprocessing.cpu_count()
diff --git a/snakemake/version.py b/snakemake/version.py
index b202327..975f691 100644
--- a/snakemake/version.py
+++ b/snakemake/version.py
@@ -1 +1 @@
-__version__ = "3.6.1"
+__version__ = "3.7.1"
diff --git a/snakemake/workflow.py b/snakemake/workflow.py
index 0363dae..3cee989 100644
--- a/snakemake/workflow.py
+++ b/snakemake/workflow.py
@@ -212,6 +212,7 @@ class Workflow:
                 keep_target_files=False,
                 keep_shadow=False,
                 allowed_rules=None,
+                max_jobs_per_second=None,
                 greediness=1.0,
                 no_hooks=False):
 
@@ -234,7 +235,7 @@ class Workflow:
         if not targets:
             targets = [self.first_rule
                        ] if self.first_rule is not None else list()
-                       
+
         if prioritytargets is None:
             prioritytargets = list()
         if forcerun is None:
@@ -422,6 +423,7 @@ class Workflow:
                                  cluster_config=cluster_config,
                                  cluster_sync=cluster_sync,
                                  jobname=jobname,
+                                 max_jobs_per_second=max_jobs_per_second,
                                  immediate_submit=immediate_submit,
                                  quiet=quiet,
                                  keepgoing=keepgoing,
@@ -567,8 +569,8 @@ class Workflow:
             if ruleinfo.params:
                 rule.set_params(*ruleinfo.params[0], **ruleinfo.params[1])
             if ruleinfo.threads:
-                if not isinstance(ruleinfo.threads, int):
-                    raise RuleException("Threads value has to be an integer.",
+                if not isinstance(ruleinfo.threads, int) and not callable(ruleinfo.threads):
+                    raise RuleException("Threads value has to be an integer or a callable.",
                                         rule=rule)
                 rule.resources["_cores"] = ruleinfo.threads
             if ruleinfo.shadow_depth:
@@ -584,10 +586,10 @@ class Workflow:
                 args, resources = ruleinfo.resources
                 if args:
                     raise RuleException("Resources have to be named.")
-                if not all(map(lambda r: isinstance(r, int),
+                if not all(map(lambda r: isinstance(r, int) or callable(r),
                                resources.values())):
                     raise RuleException(
-                        "Resources values have to be integers.",
+                        "Resources values have to be integers or callables",
                         rule=rule)
                 rule.resources.update(resources)
             if ruleinfo.priority:

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/snakemake.git



More information about the debian-med-commit mailing list