[Piuparts-commits] [piuparts] 06/09: dwke: move classes Problem and FailureManager to piupartslib/dwke.py

Holger Levsen holger at moszumanska.debian.org
Mon Feb 10 13:03:04 UTC 2014

This is an automated email from the git hooks/post-receive script.

holger pushed a commit to branch develop
in repository piuparts.

commit e76e44387976caca2e6642c2fccec883f5ea770f
Author: Andreas Beckmann <anbe at debian.org>
Date:   Sun Feb 9 17:42:36 2014 +0100

    dwke: move classes Problem and FailureManager to piupartslib/dwke.py
    along with some globals that need to be cleaned up later
    Signed-off-by: Andreas Beckmann <anbe at debian.org>
 debian/changelog                       |   3 +
 master-bin/detect_well_known_errors.py | 188 +---------------------------
 piupartslib/dwke.py                    | 220 +++++++++++++++++++++++++++++++++
 3 files changed, 224 insertions(+), 187 deletions(-)

diff --git a/debian/changelog b/debian/changelog
index 04df2bb..e7d5706 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -45,6 +45,9 @@ piuparts (0.57) UNRELEASED; urgency=low
     - No longer special-case packages as essential-required and test them like
       normal packages (no-op test plus adequate run).  (Closes: #735907)
     - Improve ordering of packages to be tested/recycled.
+  * piupartslib/dwke.py:
+    - Factored out classes Problem and FailureManager and some helpers from
+      detect_well_known_errors.py
   * piuparts-master-backend.py:
     - Improve master.log verbosity.
   * piuparts-report.py:
diff --git a/master-bin/detect_well_known_errors.py b/master-bin/detect_well_known_errors.py
index 7674bc8..b4827f7 100755
--- a/master-bin/detect_well_known_errors.py
+++ b/master-bin/detect_well_known_errors.py
@@ -26,20 +26,17 @@ import sys
 import time
 import logging
 import re
-from collections import namedtuple
 import argparse
 import piupartslib
 from piupartslib.conf import MissingSection
+from piupartslib.dwke import *
 CONFIG_FILE = "/etc/piuparts/piuparts.conf"
 DISTRO_CONFIG_FILE = "/etc/piuparts/distros.conf"
 KPR_DIRS = ('pass', 'bugged', 'affected', 'fail')
-KPR_EXT = '.kpr'
-BUG_EXT = '.bug'
-LOG_EXT = '.log'
 TPL_EXT = '.tpl'
@@ -112,162 +109,6 @@ def setup_logging(log_level):
-class Problem():
-    """ Encapsulate a particular known problem """
-    def __init__(self, probpath):
-        """probpath is the path to the problem definition file"""
-        self.probpath = probpath
-        self.name = os.path.basename(probpath)
-        self.short_name = os.path.splitext(self.name)[0]
-        self.tags_are_valid = True
-        self.required_tags = ["PATTERN", "WHERE", "ISSUE",
-                              "HEADER", "HELPTEXT"]
-        self.optional_tags = ["EXCLUDE_PATTERN", "EXPLAIN", "PRIORITY"]
-        self.init_problem()
-        for tag in self.required_tags:
-            if not tag in self.__dict__:
-                self.tags_are_valid = False
-        if "PATTERN" in self.__dict__:
-            self.inc_re = re.compile(self.PATTERN)
-        else:
-            self.inc_re = None
-        if "EXCLUDE_PATTERN" in self.__dict__:
-            self.exc_re = re.compile(self.EXCLUDE_PATTERN)
-        else:
-            self.exc_re = None
-    def valid(self):
-        return self.tags_are_valid
-    def init_problem(self):
-        """Load problem file parameters (HELPTEXT="foo" -> self.HELPTEXT)"""
-        pb = open(self.probpath, 'r')
-        probbody = pb.read()
-        pb.close()
-        tagged = re.sub("^([A-Z]+=)", "<hdr>\g<0>", probbody, 0, re.MULTILINE)
-        for chub in re.split('<hdr>', tagged)[1:]:
-            (name, value) = re.split("=", chub, 1, re.MULTILINE)
-            while value[-1] == '\n':
-                value = value[:-1]
-            if  re.search("^\'.+\'$", value, re.MULTILINE|re.DOTALL) \
-             or re.search('^\".+\"$', value, re.MULTILINE|re.DOTALL):
-                value = value[1:-1]
-            if name in self.required_tags or name in self.optional_tags:
-                self.__dict__[name] = value
-            else:
-                self.tags_are_valid = False
-        self.WHERE = self.WHERE.split(" ")
-    def has_problem(self, logbody, where):
-        """Does the log text 'logbody' contain this known problem?"""
-        if where in self.WHERE:
-            if self.inc_re.search(logbody, re.MULTILINE):
-                for line in logbody.splitlines():
-                    if self.inc_re.search(line):
-                        if self.exc_re == None \
-                               or not self.exc_re.search(line):
-                            return True
-        return False
-    def get_command(self):
-        cmd = "grep -E \"%s\"" % self.PATTERN
-        if "EXCLUDE_PATTERN" in self.__dict__:
-            cmd += " | grep -v -E \"%s\"" % self.EXCLUDE_PATTERN
-        return cmd
-class FailureManager():
-    """Class to track known failures encountered, by package,
-       where (e.g. 'fail'), and known problem type"""
-    def __init__(self, logdict):
-        """logdict is {pkgspec: fulllogpath} across all log files"""
-        self.logdict = logdict
-        self.failures = []
-        self.load_failures()
-    def load_failures(self):
-        """Collect failures across all kpr files, as named tuples"""
-        for pkgspec in self.logdict:
-            logpath = self.logdict[pkgspec]
-            try:
-                kp = open(get_kpr_path(logpath), 'r')
-                for line in kp.readlines():
-                    (where, problem) = self.parse_kpr_line(line)
-                    self.failures.append(make_failure(where, problem, pkgspec))
-                kp.close()
-            except IOError:
-                logging.error("Error processing %s" % get_kpr_path(logpath))
-    def parse_kpr_line(self, line):
-        """Parse a line in a kpr file into where (e.g. 'pass') and problem name"""
-        m = re.search("^([a-z]+)/.+ (.+)$", line)
-        return (m.group(1), m.group(2))
-    def sort_by_path(self):
-        self.failures.sort(key=lambda x: self.logdict[x.pkgspec])
-    def sort_by_bugged_and_rdeps(self, pkgsdb):
-        self.pkgsdb = pkgsdb
-        def keyfunc(x, pkgsdb=self.pkgsdb, logdict=self.logdict):
-            rdeps = pkgsdb.rrdep_count(get_pkg(x.pkgspec))
-            is_failed = get_where(logdict[x.pkgspec]) == "fail"
-            return (not is_failed, -rdeps, logdict[x.pkgspec])
-        self.failures.sort(key=keyfunc)
-    def filtered(self, problem):
-        return [x for x in self.failures if problem==x.problem]
-def make_failure(where, problem, pkgspec):
-    return (namedtuple('Failure', 'where problem pkgspec')(where, problem, pkgspec))
-def get_where(logpath):
-    """Convert a path to a log file to the 'where' component (e.g. 'pass')"""
-    return logpath.split('/')[-2]
-def replace_ext(fpath, newext):
-    basename = os.path.splitext(os.path.split(fpath)[1])[0]
-    return '/'.join(fpath.split('/')[:-1] + [basename + newext])
-def get_pkg(pkgspec):
-    return pkgspec.split('_')[0]
-def get_kpr_path(logpath):
-    """Return the kpr file path for a particular log path"""
-    return replace_ext(logpath, KPR_EXT)
 def pts_subdir(source):
     if source[:3] == "lib":
         return source[:4]
@@ -279,20 +120,6 @@ def source_pkg(pkgspec, db):
     return source_name
-def get_file_dict(workdirs, ext):
-    """For files in [workdirs] with extension 'ext', create a dict of
-       <pkgname>_<version>: <path>"""
-    filedict = {}
-    for dir in workdirs:
-        for fl in os.listdir(dir):
-            if os.path.splitext(fl)[1] == ext:
-                filedict[os.path.splitext(os.path.basename(fl))[0]] \
-                    = os.path.join(dir, fl)
-    return filedict
 def get_pkgspec(logpath):
     """For a log full file spec, return the pkgspec (<pkg>_<version)"""
     return logpath.split('/')[-1]
@@ -515,19 +342,6 @@ def detect_well_known_errors(sections, config, problem_list, recheck, recheck_fa
     logging.info(time.strftime("%a %b %2d %H:%M:%S %Z %Y", time.localtime()))
-def create_problem_list(pdir):
-    plist = []
-    for pfile in [x for x in sorted(os.listdir(pdir)) if x.endswith(".conf")]:
-        prob = Problem(os.path.join(pdir, pfile))
-        if prob.valid():
-            plist.append(prob)
-        else:
-            logging.error("Keyword error in %s - skipping" % pfile)
-    return plist
 if __name__ == '__main__':
diff --git a/piupartslib/dwke.py b/piupartslib/dwke.py
new file mode 100644
index 0000000..5d9b899
--- /dev/null
+++ b/piupartslib/dwke.py
@@ -0,0 +1,220 @@
+# -*- coding: utf-8 -*-
+# Copyright 2013 David Steele (dsteele at gmail.com)
+# Copyright © 2014 Andreas Beckmann (anbe at debian.org)
+# This file is part of Piuparts
+# Piuparts is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+# Piuparts is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, see <http://www.gnu.org/licenses/>.
+import os
+import logging
+import re
+from collections import namedtuple
+KPR_EXT = '.kpr'
+BUG_EXT = '.bug'
+LOG_EXT = '.log'
+class Problem():
+    """ Encapsulate a particular known problem """
+    def __init__(self, probpath):
+        """probpath is the path to the problem definition file"""
+        self.probpath = probpath
+        self.name = os.path.basename(probpath)
+        self.short_name = os.path.splitext(self.name)[0]
+        self.tags_are_valid = True
+        self.required_tags = ["PATTERN", "WHERE", "ISSUE",
+                              "HEADER", "HELPTEXT"]
+        self.optional_tags = ["EXCLUDE_PATTERN", "EXPLAIN", "PRIORITY"]
+        self.init_problem()
+        for tag in self.required_tags:
+            if not tag in self.__dict__:
+                self.tags_are_valid = False
+        if "PATTERN" in self.__dict__:
+            self.inc_re = re.compile(self.PATTERN)
+        else:
+            self.inc_re = None
+        if "EXCLUDE_PATTERN" in self.__dict__:
+            self.exc_re = re.compile(self.EXCLUDE_PATTERN)
+        else:
+            self.exc_re = None
+    def valid(self):
+        return self.tags_are_valid
+    def init_problem(self):
+        """Load problem file parameters (HELPTEXT="foo" -> self.HELPTEXT)"""
+        pb = open(self.probpath, 'r')
+        probbody = pb.read()
+        pb.close()
+        tagged = re.sub("^([A-Z]+=)", "<hdr>\g<0>", probbody, 0, re.MULTILINE)
+        for chub in re.split('<hdr>', tagged)[1:]:
+            (name, value) = re.split("=", chub, 1, re.MULTILINE)
+            while value[-1] == '\n':
+                value = value[:-1]
+            if  re.search("^\'.+\'$", value, re.MULTILINE|re.DOTALL) \
+             or re.search('^\".+\"$', value, re.MULTILINE|re.DOTALL):
+                value = value[1:-1]
+            if name in self.required_tags or name in self.optional_tags:
+                self.__dict__[name] = value
+            else:
+                self.tags_are_valid = False
+        self.WHERE = self.WHERE.split(" ")
+    def has_problem(self, logbody, where):
+        """Does the log text 'logbody' contain this known problem?"""
+        if where in self.WHERE:
+            if self.inc_re.search(logbody, re.MULTILINE):
+                for line in logbody.splitlines():
+                    if self.inc_re.search(line):
+                        if self.exc_re == None \
+                               or not self.exc_re.search(line):
+                            return True
+        return False
+    def get_command(self):
+        cmd = "grep -E \"%s\"" % self.PATTERN
+        if "EXCLUDE_PATTERN" in self.__dict__:
+            cmd += " | grep -v -E \"%s\"" % self.EXCLUDE_PATTERN
+        return cmd
+class FailureManager():
+    """Class to track known failures encountered, by package,
+       where (e.g. 'fail'), and known problem type"""
+    def __init__(self, logdict):
+        """logdict is {pkgspec: fulllogpath} across all log files"""
+        self.logdict = logdict
+        self.failures = []
+        self.load_failures()
+    def load_failures(self):
+        """Collect failures across all kpr files, as named tuples"""
+        for pkgspec in self.logdict:
+            logpath = self.logdict[pkgspec]
+            try:
+                kp = open(get_kpr_path(logpath), 'r')
+                for line in kp.readlines():
+                    (where, problem) = self.parse_kpr_line(line)
+                    self.failures.append(make_failure(where, problem, pkgspec))
+                kp.close()
+            except IOError:
+                logging.error("Error processing %s" % get_kpr_path(logpath))
+    def parse_kpr_line(self, line):
+        """Parse a line in a kpr file into where (e.g. 'pass') and problem name"""
+        m = re.search("^([a-z]+)/.+ (.+)$", line)
+        return (m.group(1), m.group(2))
+    def sort_by_path(self):
+        self.failures.sort(key=lambda x: self.logdict[x.pkgspec])
+    def sort_by_bugged_and_rdeps(self, pkgsdb):
+        self.pkgsdb = pkgsdb
+        def keyfunc(x, pkgsdb=self.pkgsdb, logdict=self.logdict):
+            rdeps = pkgsdb.rrdep_count(get_pkg(x.pkgspec))
+            is_failed = get_where(logdict[x.pkgspec]) == "fail"
+            return (not is_failed, -rdeps, logdict[x.pkgspec])
+        self.failures.sort(key=keyfunc)
+    def filtered(self, problem):
+        return [x for x in self.failures if problem==x.problem]
+def make_failure(where, problem, pkgspec):
+    return (namedtuple('Failure', 'where problem pkgspec')(where, problem, pkgspec))
+def get_where(logpath):
+    """Convert a path to a log file to the 'where' component (e.g. 'pass')"""
+    return logpath.split('/')[-2]
+def replace_ext(fpath, newext):
+    basename = os.path.splitext(os.path.split(fpath)[1])[0]
+    return '/'.join(fpath.split('/')[:-1] + [basename + newext])
+def get_pkg(pkgspec):
+    return pkgspec.split('_')[0]
+def get_kpr_path(logpath):
+    """Return the kpr file path for a particular log path"""
+    return replace_ext(logpath, KPR_EXT)
+def get_file_dict(workdirs, ext):
+    """For files in [workdirs] with extension 'ext', create a dict of
+       <pkgname>_<version>: <path>"""
+    filedict = {}
+    for dir in workdirs:
+        for fl in os.listdir(dir):
+            if os.path.splitext(fl)[1] == ext:
+                filedict[os.path.splitext(os.path.basename(fl))[0]] \
+                    = os.path.join(dir, fl)
+    return filedict
+def create_problem_list(pdir):
+    plist = []
+    for pfile in [x for x in sorted(os.listdir(pdir)) if x.endswith(".conf")]:
+        prob = Problem(os.path.join(pdir, pfile))
+        if prob.valid():
+            plist.append(prob)
+        else:
+            logging.error("Keyword error in %s - skipping" % pfile)
+    return plist
+# vi:set et ts=4 sw=4 :

Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/piuparts/piuparts.git

More information about the Piuparts-commits mailing list