[med-svn] [python-pyflow] 01/04: New upstream version 1.1.14

Andreas Tille tille at debian.org
Sun Feb 12 07:21:37 UTC 2017


This is an automated email from the git hooks/post-receive script.

tille pushed a commit to branch master
in repository python-pyflow.

commit 2e215d91603725da789e6129fc5e2e14f7399c1a
Author: Andreas Tille <tille at debian.org>
Date:   Sat Feb 11 20:44:24 2017 +0100

    New upstream version 1.1.14
---
 pyflow/doc/ChangeLog.txt     |   2 +
 pyflow/src/pyflow.py         | 142 +++++++++++++++++++++++++++++++++++++------
 scratch/bench/README.md      |   3 +
 scratch/bench/manyThreads.py |  41 +++++++++++++
 4 files changed, 171 insertions(+), 17 deletions(-)

diff --git a/pyflow/doc/ChangeLog.txt b/pyflow/doc/ChangeLog.txt
index bc0bcab..0ce9f92 100644
--- a/pyflow/doc/ChangeLog.txt
+++ b/pyflow/doc/ChangeLog.txt
@@ -1,3 +1,5 @@
+v1.1.14 20170112
+* STREL-391 improve task throughput for nodes with 100's of cores
 v1.1.13 20160414
 * fix rare issue with sets of dependent checkpoint tasks
 * fix for travis CI script from Dominic Jodoin
diff --git a/pyflow/src/pyflow.py b/pyflow/src/pyflow.py
index 0ed516d..8af84ed 100644
--- a/pyflow/src/pyflow.py
+++ b/pyflow/src/pyflow.py
@@ -1132,8 +1132,6 @@ class CommandTaskRunner(BaseTaskRunner) :
         @param tmpDir: location to write files containing output from
                   the task wrapper script (and not the wrapped task)
         """
-        import pickle
-
         BaseTaskRunner.__init__(self, runStatus, taskStr, sharedFlowLog, setRunstate)
 
         self.cmd = cmd
@@ -1145,29 +1143,38 @@ class CommandTaskRunner(BaseTaskRunner) :
         self.errFile = errFile
         self.tmpDir = tmpDir
         self.schedulerArgList = schedulerArgList
+        self.runid = runid
+        self.taskStr = taskStr
         if not os.path.isfile(self.taskWrapper) :
             raise Exception("Can't find task wrapper script: %s" % self.taskWrapper)
 
+
+    def initFileSystemItems(self):
+        import pickle
+
         ensureDir(self.tmpDir)
         self.wrapFile = os.path.join(self.tmpDir, "pyflowTaskWrapper.signal.txt")
 
         # setup all the data to be passed to the taskWrapper and put this in argFile:
-        taskInfo = { 'nCores' : nCores,
-                   'outFile' : outFile, 'errFile' : errFile,
-                   'cwd' : cmd.cwd, 'env' : cmd.env,
-                   'cmd' : cmd.cmd, 'isShellCmd' : (cmd.type == "str") }
+        taskInfo = { 'nCores' : self.nCores,
+                     'outFile' : self.outFile, 'errFile' : self.errFile,
+                     'cwd' : self.cmd.cwd, 'env' : self.cmd.env,
+                     'cmd' : self.cmd.cmd, 'isShellCmd' : (self.cmd.type == "str") }
 
         argFile = os.path.join(self.tmpDir, "taskWrapperParameters.pickle")
         pickle.dump(taskInfo, open(argFile, "w"))
 
-        self.wrapperCmd = [self.taskWrapper, runid, taskStr, argFile]
-
+        self.wrapperCmd = [self.taskWrapper, self.runid, self.taskStr, argFile]
 
 
     def _run(self) :
         """
         Outer loop of _run() handles task retry behavior:
         """
+
+        # these initialization steps only need to happen once:
+        self.initFileSystemItems()
+
         startTime = time.time()
         retries = 0
         retInfo = Bunch(retval=1, taskExitMsg="", isAllowRetry=False)
@@ -1610,6 +1617,39 @@ class SGETaskRunner(CommandTaskRunner) :
 
 
 
+class TaskFileWriter(StoppableThread) :
+    """
+    This class runs on a separate thread and is
+    responsible for updating the state and info task
+    files
+    """
+
+    def __init__(self, writeFunc) :
+        StoppableThread.__init__(self)
+        # parameter copy:
+        self.writeFunc = writeFunc
+        # thread settings:
+        self.setDaemon(True)
+        self.setName("TaskFileWriter-Thread")
+
+        self.isWrite = threading.Event()
+
+    def run(self) :
+        while not self.stopped() :
+            self._writeIfSet()
+            time.sleep(5)
+            self.isWrite.wait()
+
+    def flush(self):
+        self._writeIfSet()
+
+    def _writeIfSet(self) :
+        if self.isWrite.isSet() :
+            self.isWrite.clear()
+            self.writeFunc()
+
+
+
 class TaskManager(StoppableThread) :
     """
     This class runs on a separate thread from workflowRunner,
@@ -1993,14 +2033,14 @@ class TaskNode(object) :
     Represents an individual task in the task graph
     """
 
-    def __init__(self, tdag, lock, init_id, namespace, label, payload, isContinued, isFinishedEvent) :
-        self.tdag = tdag
+    def __init__(self, lock, init_id, namespace, label, payload, isContinued, isFinishedEvent, isWriteTaskStatus) :
         self.lock = lock
         self.id = init_id
         self.namespace = namespace
         self.label = label
         self.payload = payload
         self.isContinued = isContinued
+        self.isWriteTaskStatus = isWriteTaskStatus
 
         # if true, do not execute this task or honor it as a dependency for child tasks
         self.isIgnoreThis = False
@@ -2107,7 +2147,7 @@ class TaskNode(object) :
         else :
             self.runstateUpdateTimeStamp = updateTimeStamp
         self.runstate = runstate
-        self.tdag.writeTaskStatus()
+        self.isWriteTaskStatus.set()
 
     #def getParents(self) :
     #    return self.parents
@@ -2176,6 +2216,10 @@ class TaskDAG(object) :
         # unique id for each task in each run -- not persistent across continued runs:
         self.taskId = 0
 
+        # as tasks are added, occasionally spool task info to disk, and record the last
+        # task index written + 1
+        self.lastTaskIdWritten = 0
+
         # it will be easier for people to read the task status file if
         # the tasks are in approximately the same order as they were
         # added by the workflow:
@@ -2191,6 +2235,9 @@ class TaskDAG(object) :
         # cycle applies
         self.isFinishedEvent = threading.Event()
 
+        self.isWriteTaskInfo = None
+        self.isWriteTaskStatus = None
+
     @lockMethod
     def isTaskPresent(self, namespace, label) :
         return ((namespace, label) in self.labelMap)
@@ -2368,7 +2415,7 @@ class TaskDAG(object) :
             else:
                 raise Exception("Task: '%s' is already in TaskDAG" % (fullLabel))
 
-        task = TaskNode(self, self.lock, self.taskId, namespace, label, payload, isContinued, self.isFinishedEvent)
+        task = TaskNode(self.lock, self.taskId, namespace, label, payload, isContinued, self.isFinishedEvent, self.isWriteTaskStatus)
 
         self.taskId += 1
 
@@ -2399,8 +2446,8 @@ class TaskDAG(object) :
                 task.isReset=True
 
         if not isContinued:
-            self.writeTaskInfo(task)
-            self.writeTaskStatus()
+            self.isWriteTaskInfo.set()
+            self.isWriteTaskStatus.set()
 
         # determine if this is an ignoreTasksAfter node
         if label in self.ignoreTasksAfter :
@@ -2518,9 +2565,8 @@ class TaskDAG(object) :
 
         return val
 
-
     @lockMethod
-    def writeTaskInfo(self, task) :
+    def writeTaskInfoOld(self, task) :
         """
         appends a description of new tasks to the taskInfo file
         """
@@ -2553,6 +2599,57 @@ class TaskDAG(object) :
         fp.write(taskline + "\n")
         fp.close()
 
+    @lockMethod
+    def writeTaskInfo(self) :
+        """
+        appends a description of all new tasks to the taskInfo file
+        """
+
+        def getTaskLineFromTask(task) :
+            """
+            translate a task into its single-line summary format in the taskInfo file
+            """
+            depstring = ""
+            if len(task.parents) :
+                depstring = ",".join([p.label for p in task.parents])
+
+            cmdstring = ""
+            nCores = "0"
+            memMb = "0"
+            priority = "0"
+            isForceLocal = "0"
+            payload = task.payload
+            cwdstring = ""
+            if   payload.type() == "command" :
+                cmdstring = str(payload.cmd)
+                nCores = str(payload.nCores)
+                memMb = str(payload.memMb)
+                priority = str(payload.priority)
+                isForceLocal = boolToStr(payload.isForceLocal)
+                cwdstring = payload.cmd.cwd
+            elif payload.type() == "workflow" :
+                cmdstring = payload.name()
+            else :
+                assert 0
+            return "\t".join((task.label, task.namespace, payload.type(),
+                              nCores, memMb, priority,
+                              isForceLocal, depstring, cwdstring, cmdstring))
+
+        assert (self.lastTaskIdWritten <= self.taskId)
+
+        if self.lastTaskIdWritten == self.taskId : return
+
+        newTaskLines = []
+        while self.lastTaskIdWritten < self.taskId :
+            task = self.labelMap[self.addOrder[self.lastTaskIdWritten]]
+            newTaskLines.append(getTaskLineFromTask(task))
+            self.lastTaskIdWritten += 1
+
+        fp = open(self.taskInfoFile, "a")
+        for taskLine in newTaskLines :
+            fp.write(taskLine + "\n")
+        fp.close()
+
 
 
 # workflowRunner:
@@ -3925,7 +4022,6 @@ class WorkflowRunner(object) :
             stackDumpFp.close()
 
 
-
     def _runWorkflow(self, param) :
         #
         # Primary workflow logic when nothing goes wrong:
@@ -3964,6 +4060,9 @@ class WorkflowRunner(object) :
             runStatus.errorCode = 1
             runStatus.errorMessage = "Thread: '%s', has stopped without a traceable cause" % (trun.getName())
 
+        self._taskInfoWriter.flush()
+        self._taskStatusWriter.flush()
+
         return self._evalWorkflow(runStatus)
 
 
@@ -3999,6 +4098,15 @@ class WorkflowRunner(object) :
         if cdata.param.isContinue :
             self._setupContinuedWorkflow()
 
+        self._taskInfoWriter = TaskFileWriter(self._tdag.writeTaskInfo)
+        self._taskStatusWriter = TaskFileWriter(self._tdag.writeTaskStatus)
+
+        self._tdag.isWriteTaskInfo = self._taskInfoWriter.isWrite
+        self._tdag.isWriteTaskStatus = self._taskStatusWriter.isWrite
+
+        self._taskInfoWriter.start()
+        self._taskStatusWriter.start()
+
 
 
     def _createContinuedStateFile(self) :
diff --git a/scratch/bench/README.md b/scratch/bench/README.md
new file mode 100644
index 0000000..f013811
--- /dev/null
+++ b/scratch/bench/README.md
@@ -0,0 +1,3 @@
+The manyThreads benchmark differentiates the optimizations introduced by STREL-391 to help
+improve total task throughput when a very high number of cores is available on a single
+machine. 
diff --git a/scratch/bench/manyThreads.py b/scratch/bench/manyThreads.py
new file mode 100644
index 0000000..68d7643
--- /dev/null
+++ b/scratch/bench/manyThreads.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+
+import os.path
+import sys
+
+# add module path by hand
+#
+scriptDir=os.path.abspath(os.path.dirname(__file__))
+sys.path.append(scriptDir+"/../../pyflow/src")
+
+from pyflow import WorkflowRunner
+
+
+
+class SimpleWorkflow(WorkflowRunner) :
+    """
+    A workflow designed to differentiate the runtime impact
+    of STREL-391
+    """
+
+    def __init__(self) :
+        pass
+
+    def workflow(self) :
+        for i in range(4000) :
+            self.addTask("task%s" % (i),["sleep","0"])
+
+
+
+# Instantiate the workflow
+#
+# parameters are passed into the workflow via its constructor:
+#
+wflow = SimpleWorkflow()
+
+# Run the worklow:
+#
+retval=wflow.run(mode="local",nCores=400,isQuiet=True)
+
+sys.exit(retval)
+

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-pyflow.git



More information about the debian-med-commit mailing list