[apt-proxy-devel] r613 - in trunk: . apt_proxy apt_proxy/test debian/po doc doc/po

Chris Halls halls at costa.debian.org
Thu Aug 3 23:54:49 UTC 2006


Author: halls
Date: Thu Aug  3 23:54:46 2006
New Revision: 613

Added:
   trunk/apt_proxy/cache.py
   trunk/apt_proxy/fetchers.py
   trunk/apt_proxy/test/test_cache.py
   trunk/apt_proxy/test/test_fetchers.py
   trunk/apt_proxy/test/test_requests.py
Removed:
   trunk/debian/TODO
Modified:
   trunk/apt_proxy/apt_proxy.py
   trunk/apt_proxy/apt_proxy_conf.py
   trunk/apt_proxy/misc.py
   trunk/apt_proxy/packages.py
   trunk/apt_proxy/test/test_apt_proxy.py
   trunk/apt_proxy/test/test_config.py
   trunk/apt_proxy/test/test_packages.py
   trunk/debian/changelog
   trunk/debian/control
   trunk/debian/po/cs.po
   trunk/debian/po/da.po
   trunk/debian/po/fr.po
   trunk/debian/po/nl.po
   trunk/debian/po/vi.po
   trunk/debian/postinst
   trunk/debian/rules
   trunk/doc/TODO
   trunk/doc/apt-proxy-import.8.inc
   trunk/doc/apt-proxy.8
   trunk/doc/apt-proxy.conf
   trunk/doc/apt-proxy.conf.5
   trunk/doc/po/apt-proxy.pot
   trunk/doc/po/fr.po
   trunk/doc/po4a.cfg
   trunk/runtests

Log:
Merge branch people/halls/rework to trunk


Modified: trunk/apt_proxy/apt_proxy.py
==============================================================================
--- trunk/apt_proxy/apt_proxy.py	(original)
+++ trunk/apt_proxy/apt_proxy.py	Thu Aug  3 23:54:46 2006
@@ -14,21 +14,19 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-from twisted.internet import reactor, defer, abstract, protocol
-from twisted.protocols import ftp, basic
-
 import os, stat, signal, fcntl, exceptions
 from os.path import dirname, basename
-import tempfile
-import glob
-import re
-import urlparse
-import time
-import string
-import packages
+import tempfile, glob, re, urlparse, time
+from twisted.internet import reactor
 from twisted.python.failure import Failure
+from twisted.internet import error, protocol
+from twisted.web import http
+
 import memleak
-from twisted.internet import error
+import fetchers, cache, packages
+from misc import log, MirrorRecycler
+import twisted_compat
+
 #from posixfile import SEEK_SET, SEEK_CUR, SEEK_END
 #since posixfile is considered obsolete I'll define the SEEK_* constants
 #myself.
@@ -38,1173 +36,8 @@
 
 from types import *
 
-#sibling imports
-import misc
-log = misc.log
-
-from twisted_compat import compat
-from twisted_compat import http
-
 status_dir = '.apt-proxy'
 
-class FileType:
-    """
-    This is just a way to distinguish between different filetypes.
-
-    self.regex: regular expression that files of this type should
-    match. It could probably be replaced with something simpler,
-    but... o well, it works.
-    
-    self.contype: mime string for the content-type http header.
-    
-    mutable: do the contents of this file ever change?  Files such as
-    .deb and .dsc are never changed once they are created.
-    
-    """
-    def __init__ (self, regex, contype, mutable):
-        self.regex = regex
-        self.contype = contype
-        self.mutable = mutable
-
-    def check (self, name):
-        "Returns true if name is of this filetype"
-        if self.regex.search(name):
-            return 1
-        else:
-            return 0
-
-# Set up the list of filetypes that we are prepared to deal with.
-# If it is not in this list, then we will ignore the file and
-# return an error.
-filetypes = (
-    FileType(re.compile(r"\.deb$"), "application/dpkg", 0),
-    FileType(re.compile(r"\.udeb$"), "application/dpkg", 0),
-    FileType(re.compile(r"\.tar\.gz$"), "application/x-gtar", 0),
-    FileType(re.compile(r"\.dsc$"),"text/plain", 0),
-    FileType(re.compile(r"\.diff\.gz$"), "application/x-gzip", 0),
-    FileType(re.compile(r"\.gz$"), "application/x-gzip", 1),
-    FileType(re.compile(r"\.bin$"), "application/octet-stream", 0),
-    FileType(re.compile(r"\.tgz$"), "application/x-gtar", 0),
-    FileType(re.compile(r"\.txt$"), "application/plain-text", 1),
-    FileType(re.compile(r"\.html$"), "application/text-html", 1),
-
-    FileType(re.compile(r"/(Packages|Release(\.gpg)?|Sources|Contents-.*)"
-                        r"(\.(gz|bz2))?$"), 
-             "text/plain", 1),
-
-    FileType(re.compile(r"\.rpm$"), "application/rpm", 0),
-
-    FileType(re.compile(r"/(pkglist|release|srclist)(\.(\w|-)+)?"
-                        r"(\.(gz|bz2))?$"), 
-             "text/plain", 1),
-    )
-
-class FileVerifier(protocol.ProcessProtocol):
-    """
-    Verifies the integrity of a file by running an external
-    command.
-
-    self.deferred: a deferred that will be triggered when the command
-    completes, or if a timeout occurs.
-
-    Sample:
-    
-            verifier = FileVerifier(self)
-            verifier.deferred.addCallbacks(callback_if_ok, callback_if_fail)
-
-        then either callback_if_ok or callback_if_fail will be called
-        when the subprocess finishes execution.
-
-    Checkout twisted.internet.defer.Deferred on how to use self.deferred
-    
-    """
-    def __init__(self, request):
-        self.factory = request.factory
-        self.deferred = defer.Deferred() # Deferred that passes status back
-        self.path = request.local_file
-
-        if re.search(r"\.deb$", self.path):
-            exe = '/usr/bin/dpkg'
-            args = (exe, '--fsys-tarfile', self.path)
-        elif re.search(r"\.gz$", self.path):
-            exe = '/bin/gunzip'
-            args = (exe, '-t', '-v', self.path)
-        elif re.search(r"\.bz2$", self.path):
-            exe = '/usr/bin/bunzip2'
-            args = (exe, '--test', self.path)
-        else:
-            # Unknown file, just check it is not 0 size
-            try:
-                filesize = os.stat(self.path)[stat.ST_SIZE]
-            except:
-                filesize = 0
-
-            if(os.stat(self.path)[stat.ST_SIZE]) < 1:
-                log.debug('Verification failed for ' + self.path)
-                self.failed()
-            else:
-                log.debug('Verification skipped for ' + self.path)
-                self.deferred.callback(None)
-            return
-
-        log.debug("starting verification: " + exe + " " + str(args))
-	self.nullhandle = open("/dev/null", "w")
-        self.process = reactor.spawnProcess(self, exe, args, childFDs = { 0:"w", 1:self.nullhandle.fileno(), 2:"r" })
-        self.laterID = reactor.callLater(self.factory.config.timeout, self.timedout)
-
-    def connectionMade(self):
-        self.data = ''
-
-    def outReceived(self, data):
-        #we only care about errors
-        pass
-    
-    def errReceived(self, data):
-        self.data = self.data + data
-
-    def failed(self):
-        log.debug("verification failed: %s"%(self.path), 'verify', 1)
-        os.unlink(self.path)
-        self.deferred.errback(None)
-
-    def timedout(self):
-        """
-        this should not happen, but if we timeout, we pretend that the
-        operation failed.
-        """
-        self.laterID=None
-        log.debug("Process Timedout:",'verify')
-        self.failed()
-        
-    def processEnded(self, reason=None):
-        """
-        This get's automatically called when the process finishes, we check
-        the status and report through the Deferred.
-        """
-        __pychecker__ = 'unusednames=reason'
-        #log.debug("Process Status: %d" %(self.process.status),'verify')
-        #log.debug(self.data, 'verify')
-        if self.laterID:
-            self.laterID.cancel()
-            if self.process.status == 0:
-                self.deferred.callback(None)
-            else:
-                self.failed()
-
-def findFileType(name):
-    "Look for the FileType of 'name'"
-    for type in filetypes:
-        if type.check(name):
-            return type
-    return None
-
-class TempFile (file):
-    def __init__(self, mode='w+b', bufsize=-1):
-        (fd, name) = tempfile.mkstemp('.apt-proxy')
-        os.close(fd)
-        file.__init__(self, name, mode, bufsize)
-        os.unlink(name)
-    def append(self, data):
-        self.seek(0, SEEK_END)
-        self.write(data)
-    def size(self):
-        return self.tell()
-    def read_from(self, size=-1, start=None):
-        if start != None:
-            self.seek(start, SEEK_SET)
-        data = file.read(self, size)
-        return data
-
-
-class Fetcher:
-    """
-    This is the base class for all Fetcher*, it tries to hold as much
-    common code as posible.
-
-    Subclasses of this class are the ones responsible for contacting
-    the backend servers and fetching the actual data.
-    """
-    gzip_convert = re.compile(r"/Packages$")
-    post_convert = re.compile(r"/Packages.gz$")
-    status_code = http.OK
-    status_message = None
-    requests = None
-    request = None
-    length = None
-    transport = None
-        
-    def insert_request(self, request):
-        """
-        Request should be served through this Fetcher because it asked for
-        the same uri that we are waiting for.
-        
-        We also have to get it up to date, give it all received data, send it
-        the appropriate headers and set the response code.
-        """
-        if request in self.requests:
-            raise RuntimeError, \
-                  'this request is already assigned to this Fetcher'
-        self.requests.append(request)
-        request.apFetcher = self
-        if (self.request):
-            self.update_request(request)
-
-    def update_request(self, request):
-        """
-        get a new request up to date
-        """
-        request.local_mtime = self.request.local_mtime
-        request.local_size = self.request.local_size
-        if(self.status_code != None):
-            request.setResponseCode(self.status_code, self.status_message)
-        for name, value in self.request.headers.items():
-            request.setHeader(name, value)
-        if self.transfered.size() != 0:
-            request.write(self.transfered.read_from(start=0))
-
-    def remove_request(self, request):
-        """
-        Request should NOT be served through this Fetcher, the client
-        probably closed the connection.
-        
-        If this is our last request, we may also close the connection with the
-        server depending on the configuration.
-
-        We keep the last request for reference even if the client closed the
-        connection.
-        """
-        self.requests.remove(request)
-        if len(self.requests) == 0:
-            log.debug("Last request removed",'Fetcher')
-            if not self.factory.config.complete_clientless_downloads:
-                if self.transport:
-                    log.debug(
-                        "telling the transport to loseConnection",'Fetcher')
-                    try:
-                        self.transport.loseConnection()
-                    except KeyError:
-                        # Rsync fetcher already loses conneciton for us
-                        pass
-                if hasattr(self, 'loseConnection'):
-                    self.loseConnection()
-        else:
-            self.request = self.requests[0]
-        request.apFetcher = None
-
-    def transfer_requests(self, fetcher):
-        "Transfer all requests from self to fetcher"
-        for req in self.requests:
-            self.remove_request(req)
-            fetcher.insert_request(req)
-
-    def setResponseCode(self, code, message=None):
-        "Set response code for all requests"
-        #log.debug('Response code: %d - %s' % (code, message),'Fetcher')
-        self.status_code = code
-        self.status_message = message
-        for req in self.requests:
-            req.setResponseCode(code, message)
-
-    def setResponseHeader(self, name, value):
-        "set 'value' for header 'name' on all requests"
-        for req in self.requests:
-            req.setHeader(name, value)
-
-    def __init__(self, request=None):
-        self.requests = []
-        self.transfered = TempFile()
-        if(request):
-            self.activate(request)
-            
-    def activate(self, request):
-        log.debug(str(request.backend) + request.uri, 'Fetcher.activate')
-        self.local_file = request.local_file
-        self.local_mtime = request.local_mtime
-        self.factory = request.factory
-        self.request = request
-        request.content.read()
-
-        for req in self.requests:
-            self.update_request(req)
-        self.requests.append(request)
-
-        request.apFetcher = self
-        if self.factory.runningFetchers.has_key(request.uri):
-            raise RuntimeError, 'There already is a running fetcher'
-        self.factory.runningFetchers[request.uri]=self
-
-    def apDataReceived(self, data):
-        """
-        Should be called from the subclasses when data is available for
-        streaming.
-
-        Keeps all transfered data in 'self.transfered' for requests which arrive
-        later and to write it in the cache at the end.
-
-        Note: self.length if != None is the amount of data pending to be
-        received.
-        """
-        if self.length != None:
-            self.transfered.append(data[:self.length])
-            for req in self.requests:
-                req.write(data[:self.length])
-        else:
-            self.transfered.append(data)
-            for req in self.requests:
-                req.write(data)
-
-    def apDataEnd(self, data, saveData=True):
-        """
-        Called by subclasses when the data transfer is over.
-
-           -caches the received data if everyting went well (if saveData=True)
-           -takes care of mtime and atime
-           -finishes connection with server and the requests
-           
-        """
-        import shutil
-        log.debug("Finished receiving data, status:%d saveData:%d" %(self.status_code, saveData), 'Fetcher');
-        if (self.status_code == http.OK):
-            if saveData:
-                dir = dirname(self.local_file)
-                if(not os.path.exists(dir)):
-                    os.makedirs(dir)
-                f = open(self.local_file, "w")
-                fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
-                f.truncate(0)
-                if type(data) is StringType:
-                    f.write(data)
-                else:
-                    data.seek(0, SEEK_SET)
-                    shutil.copyfileobj(data, f)
-                f.close()
-                if self.local_mtime != None:
-                    os.utime(self.local_file, (time.time(), self.local_mtime))
-                else:
-                    log.debug("no local time: "+self.local_file,'Fetcher')
-                    os.utime(self.local_file, (time.time(), 0))
-
-            self.factory.file_served(self.request.uri)
-
-            #self.request.backend.get_packages_db().packages_file(self.request.uri)
-        
-        if self.transport:
-            try:
-              self.transport.loseConnection()
-            except exceptions.KeyError:
-              # Couldn't close connection - already closed?
-              log.debug("transport.loseConnection() - "
-                        "connection already closed", 'Fetcher')
-              pass
-                
-        for req in self.requests:
-            req.finish()
-
-        self.transfered.close()
-        self.apEnd()
-
-    def apEnd(self):
-        """
-        Called by subclasses when apDataEnd does too many things.
-
-        Let's everyone know that we are not the active Fetcher for our uri.
-        """
-        try:
-            del self.factory.runningFetchers[self.request.uri]
-        except exceptions.KeyError:
-            log.debug("We are not on runningFetchers!!!",'Fetcher')
-            log.debug("Class is not in runningFetchers: "+str(self.__class__),
-                      'Fetcher')
-            if self.request:
-                log.debug(' URI:' + self.request.uri, 'Fetcher')
-            log.debug('Running fetchers: '
-                      +str(self.factory.runningFetchers),'Fetcher')
-            #raise exceptions.KeyError
-        for req in self.requests[:]:
-            self.remove_request(req)
-
-        import gc
-        #Cleanup circular references
-        reactor.callLater(5, gc.collect)
-
-    def apEndCached(self):
-        """
-        A backend has indicated that this file has not changed,
-        so serve the file from the disk cache
-        """
-        self.setResponseCode(http.OK)
-        self.apEndTransfer(FetcherCachedFile)
-        
-    def apEndTransfer(self, fetcher_class):
-        """
-        Remove this Fetcher and transfer all it's requests to a new instance of
-        'fetcher_class'.
-        """
-        #Consider something like this:
-        #req = dummyFetcher.fix_ref_request()
-        #fetcher = fetcher_class()
-        #dummyFetcher.transfer_requests(fetcher)
-        #dummyFetcher.apEnd()
-        #fetcher.activate(req)
-
-        #self.setResponseCode(http.OK)
-        requests = self.requests[:]
-        self.apEnd()  # Remove requests from this fetcher
-        fetcher = None
-        for req in requests:
-            if (fetcher_class != FetcherCachedFile or req.serve_if_cached):
-                running = req.factory.runningFetchers
-                if (running.has_key(req.uri)):
-                    #If we have an active Fetcher just use that
-                    log.debug("have active Fetcher",'Fetcher')
-                    running[req.uri].insert_request(req)
-                    fetcher = running[req.uri]
-                else:
-                    fetcher = fetcher_class(req)
-            else:
-                req.finish()
-        return fetcher
-            
-    def connectionFailed(self, reason=None):
-        """
-        Tell our requests that the connection with the server failed.
-        """
-        msg = '[%s] Connection Failed: %s/%s'%(
-            self.request.backend.base,
-            self.request.backendServer.path, self.request.backend_uri)
-
-        if reason:
-            msg = '%s (%s)'%(msg, reason.getErrorMessage())
-            log.debug("Connection Failed: "+str(reason), 'Fetcher')
-        log.err(msg)
-
-        # Look for alternative fetchers
-        if not self.request.activateNextBackendServer(self):
-            # No more backends, send error response back to client
-            if reason.check(error.ConnectError):
-                self.setResponseCode(http.SERVICE_UNAVAILABLE, "Connect Error")
-            else:
-                self.setResponseCode(http.SERVICE_UNAVAILABLE)
-            self.apDataReceived("")
-            self.apDataEnd(self.transfered, False)
-            #Because of a bug in tcp.Client we may be called twice,
-            #Make sure that next time nothing will happen
-            #FIXME: This hack is probably not anymore pertinent.
-            self.connectionFailed = lambda : log.debug('connectionFailed(2)',
-                                                    'Fetcher','9')
-            
-
-class FetcherDummy(Fetcher):
-    """
-    """
-    gzip_convert = re.compile(r"^Nothing should match this$")
-    post_convert = re.compile(r"^Nothing should match this$")
-    status_code = http.INTERNAL_SERVER_ERROR
-    status_message = None
-        
-    def insert_request(self, request):
-        """
-        """
-        if request in self.requests:
-            raise RuntimeError, \
-                  'this request is already assigned to this Fetcher'
-        self.requests.append(request)
-        request.apFetcher = self
-
-    def remove_request(self, request):
-        """
-        """
-        #make sure that it has updated values, since the requests
-        #may be cached and we need them to serve it.
-        request.local_mtime = self.request.local_mtime
-        request.local_size = self.request.local_size
-
-        self.requests.remove(request)
-        request.apFetcher = None
-
-    def fix_ref_request(self):
-        if self.requests != []:
-            if self.request not in self.requests:
-                request = self.requests[0]
-                request.local_mtime = self.request.local_mtime
-                request.local_size = self.request.local_size
-                self.request = request
-            self.remove_request(self.request)
-        else:
-            self.request = None
-            
-        return self.request
-
-class FetcherFile(Fetcher):
-
-    def activate(self, request):
-        Fetcher.activate(self, request)
-        log.debug("FetcherFile.activate(): uri='%s' server='%s'" % (request.uri, request.backendServer.uri))
-        if not request.apFetcher:
-            log.debug("no request.apFetcher")
-            return
-
-        self.factory.file_served(request.uri)
-
-        # start the transfer
-        self.local_file = request.backendServer.uri[len("file:"):]+ request.uri
-        if not os.path.exists(self.local_file):
-            log.debug("not found: %s" % self.local_file)
-            request.setResponseCode(http.NOT_FOUND)
-            request.write("")
-            request.finish()
-            self.remove_request(request)
-            Fetcher.apEnd(self)
-            return
-        self.local_size = os.stat(self.local_file)[stat.ST_SIZE]
-        
-        log.debug("Serving local file: " + self.local_file + " size:" + str(self.local_size), 'FetcherCachedFile')
-        file = open(self.local_file,'rb')
-        fcntl.lockf(file.fileno(), fcntl.LOCK_SH)
-            
-        request.setHeader("Content-Length", self.local_size)
-        #request.setHeader("Last-modified",
-        #                  http.datetimeToString(request.local_mtime))
-        basic.FileSender().beginFileTransfer(file, request) \
-                          .addBoth(self.file_transfer_complete, request) \
-                          .addBoth(lambda r: file.close())
-
-    # A file transfer has completed
-    def file_transfer_complete(self, result, request):
-        log.debug("transfer complete", 'FetcherCachedFile')
-        request.finish()
-        # Remove this client from request list
-        self.remove_request(request)
-        if len(self.requests) == 0:
-            Fetcher.apEnd(self)
-
-class FetcherHttp(Fetcher, http.HTTPClient):
-
-    forward_headers = [
-        'last-modified',
-        'content-length'
-        ]
-    log_headers = None
-
-    proxy_host = None
-    proxy_port = None
-
-    def activate(self, request):
-        Fetcher.activate(self, request)
-
-        if not self.factory.config.http_proxy is '':
-            (self.proxy_host, self.proxy_port) = request.factory.config.http_proxy.split(':')
-
-        if not request.apFetcher:
-            return
-
-        class ClientFactory(protocol.ClientFactory):
-            "Dummy ClientFactory to comply with current twisted API"
-	    #FIXME: Double check this, haggai thinks it is to blame for the
-	    #hangs.
-            def __init__(self, instance):
-                self.instance = instance
-            def buildProtocol(self, addr):
-                return self.instance
-            def clientConnectionFailed(self, connector, reason):
-                self.instance.connectionFailed(reason)
-            def clientConnectionLost(self, connector, reason):
-                log.debug("XXX clientConnectionLost", "http-client")
-
-        if not self.proxy_host:
-            reactor.connectTCP(request.backendServer.host, request.backendServer.port,
-                               ClientFactory(self), request.backend.config.timeout)
-        else:
-            reactor.connectTCP(self.proxy_host, int(self.proxy_port),
-                               ClientFactory(self), request.backend.config.timeout)
-    def connectionMade(self):
-        if not self.proxy_host:
-            self.sendCommand(self.request.method, self.request.backendServer.path
-                             + "/" + self.request.backend_uri)
-        else:
-            self.sendCommand(self.request.method, "http://"
-                             + self.request.backendServer.host + ":" + str(self.request.backendServer.port)
-                             + "/" + self.request.backendServer.path
-                             + "/" + self.request.backend_uri)
-            
-        self.sendHeader('host', self.request.backendServer.host)
-
-        if self.local_mtime != None:
-            datetime = http.datetimeToString(self.local_mtime)
-            self.sendHeader('if-modified-since', datetime)
-
-        self.endHeaders()
-
-    def handleStatus(self, version, code, message):
-        __pychecker__ = 'unusednames=version,message'
-        log.debug('handleStatus %s - %s' % (code, message), 'http_client')
-        self.status_code = int(code)
-        
-        # Keep a record of server response even if overriden later by setReponseCode
-        self.http_status = self.status_code  
-
-        self.setResponseCode(self.status_code)
-        
-    def handleHeader(self, key, value):
-
-        log.debug("Received: " + key + " " + str(value))
-        key = string.lower(key)
-
-        if key == 'last-modified':
-            self.local_mtime = http.stringToDatetime(value)
-
-        if key in self.forward_headers:
-            self.setResponseHeader(key, value)
-
-    def handleEndHeaders(self):
-        if self.http_status == http.NOT_MODIFIED:
-            log.debug("NOT_MODIFIED " + str(self.status_code),'http_client')
-            self.apEndCached()
-
-    def rawDataReceived(self, data):
-        self.apDataReceived(data)
-
-    def handleResponse(self, buffer):
-        if self.length == 0:
-            self.setResponseCode(http.NOT_FOUND)
-        # print "length: " + str(self.length), "response:", self.status_code
-        if self.http_status == http.NOT_MODIFIED:
-            self.apDataEnd(self.transfered, False)
-        else:
-            self.apDataEnd(self.transfered, True)
-
-    def lineReceived(self, line):
-        """
-        log the line and handle it to the appropriate the base classe.
-        
-        The location header gave me trouble at some point, so I filter it just
-        in case.
-
-        Note: when running a class method directly and not from an object you
-        have to give the 'self' parameter manualy.
-        """
-        #log.debug(line,'http_client')
-        if self.log_headers == None:
-            self.log_headers = line
-        else:
-            self.log_headers += ", " + line;
-        if not re.search('^Location:', line):
-            http.HTTPClient.lineReceived(self, line)
-
-    def sendCommand(self, command, path):
-        "log the line and handle it to the base class."
-        log.debug(command + ":" + path,'http_client')
-        http.HTTPClient.sendCommand(self, command, path)
-
-    def endHeaders(self):
-        "log and handle to the base class."
-        if self.log_headers != None:
-            log.debug(" Headers: " + self.log_headers, 'http_client')
-            self.log_headers = None;
-        http.HTTPClient.endHeaders(self)
-
-    def sendHeader(self, name, value):
-        "log and handle to the base class."
-        log.debug(name + ":" + value,'http_client')
-        http.HTTPClient.sendHeader(self, name, value)
-
-class FetcherFtp(Fetcher, protocol.Protocol):
-    """
-    This is the secuence here:
-
-        -Start and connect the FTPClient
-        -Ask for mtime
-        -Ask for size
-        -if couldn't get the size
-            -try to get it by listing
-        -get all that juicy data
-        
-    NOTE: Twisted's FTPClient code uses it's own timeouts here and there,
-    so the timeout specified for the backend may not always be used
-    """
-    def activate (self, request):
-        Fetcher.activate(self, request)
-        if not request.apFetcher:
-            return
-
-        self.passive_ftp = self.request.backend.config.passive_ftp
-        
-        self.remote_file = (self.request.backendServer.path + "/" 
-                            + self.request.backend_uri)
-
-        from twisted.internet.protocol import ClientCreator
-
-        if not request.backendServer.username:
-            creator = ClientCreator(reactor, ftp.FTPClient, passive=0)
-        else:
-            creator = ClientCreator(reactor, ftp.FTPClient, request.backendServer.username,
-                                    request.backendServer.password, passive=0)
-        d = creator.connectTCP(request.backendServer.host, request.backendServer.port,
-                               request.backend.config.timeout)
-        d.addCallback(self.controlConnectionMade)
-        d.addErrback(self.connectionFailed)
-
-    def controlConnectionMade(self, ftpclient):
-        self.ftpclient = ftpclient
-        
-        if(self.passive_ftp):
-            log.debug('Got control connection, using passive ftp', 'ftp_client')
-            self.ftpclient.passive = 1
-        else:
-            log.debug('Got control connection, using active ftp', 'ftp_client')
-            self.ftpclient.passive = 0
-
-        if log.isEnabled('ftp_client'):
-            self.ftpclient.debug = 1
-
-        self.ftpFetchMtime()
-
-    def ftpFinish(self, code, message=None):
-        "Finish the transfer with code 'code'"
-        self.ftpclient.quit()
-        self.setResponseCode(code, message)
-        self.apDataReceived("")
-        self.apDataEnd(self.transfered)
-
-    def ftpFinishCached(self):
-        "Finish the transfer giving the requests the cached file."
-        self.ftpclient.quit()
-        self.apEndCached()
-
-    def ftpFetchMtime(self):
-        "Get the modification time from the server."
-        def apFtpMtimeFinish(msgs, fetcher, fail):
-            """
-            Got an answer to the mtime request.
-            
-            Someone should check that this is timezone independent.
-            """
-            code = None
-            if not fail:
-                code, msg = msgs[0].split()
-            mtime = None
-            if code == '213':
-                time_tuple=time.strptime(msg[:14], "%Y%m%d%H%M%S")
-                #replace day light savings with -1 (current)
-                time_tuple = time_tuple[:8] + (-1,)
-                #correct the result to GMT
-                mtime = time.mktime(time_tuple) - time.altzone
-            if (fetcher.local_mtime and mtime
-                and fetcher.local_mtime >= mtime):
-                fetcher.ftpFinishCached()
-            else:
-                fetcher.local_mtime = mtime
-                fetcher.ftpFetchSize()
-
-        d = self.ftpclient.queueStringCommand('MDTM ' + self.remote_file)
-        d.addCallbacks(apFtpMtimeFinish, apFtpMtimeFinish,
-                       (self, 0), None, (self, 1), None)
-
-    def ftpFetchSize(self):
-        "Get the size of the file from the server"
-        def apFtpSizeFinish(msgs, fetcher, fail):
-            code = None
-            if not fail:
-                code, msg = msgs[0].split()
-            if code != '213':
-                log.debug("SIZE FAILED",'ftp_client')
-                fetcher.ftpFetchList()
-            else:
-                fetcher.setResponseHeader('content-length', msg)
-                fetcher.ftpFetchFile()
-
-        d = self.ftpclient.queueStringCommand('SIZE ' + self.remote_file)
-        d.addCallbacks(apFtpSizeFinish, apFtpSizeFinish,
-                       (self, 0), None, (self, 1), None)
-
-    def ftpFetchList(self):
-        "If ftpFetchSize didn't work try to get the size with a list command."
-        def apFtpListFinish(msg, filelist, fetcher, fail):
-            __pychecker__ = 'unusednames=msg'
-            if fail:
-                fetcher.ftpFinish(http.INTERNAL_SERVER_ERROR)
-                return
-            if len(filelist.files)== 0:
-                fetcher.ftpFinish(http.NOT_FOUND)
-                return
-            file = filelist.files[0]
-            fetcher.setResponseHeader('content-length', file['size'])
-            fetcher.ftpFetchFile()
-        filelist = ftp.FTPFileListProtocol()
-        d = self.ftpclient.list(self.remote_file, filelist)
-        d.addCallbacks(apFtpListFinish, apFtpListFinish,
-                       (filelist, self, 0), None,
-                       (filelist, self, 1), None)
-
-    def ftpFetchFile(self):
-        "And finally, we ask for the file."
-        def apFtpFetchFinish(msg, code, status, fetcher):
-            __pychecker__ = 'unusednames=msg,status'
-            fetcher.ftpFinish(code)
-        log.debug('ftpFetchFile: ' + self.remote_file, 'ftp_client')
-        d = self.ftpclient.retrieveFile(self.remote_file, self)
-        d.addCallbacks(apFtpFetchFinish, apFtpFetchFinish,
-                       (http.OK, "good", self), None,
-                       (http.NOT_FOUND, "fail", self), None)
-
-    def dataReceived(self, data):
-        self.setResponseCode(http.OK)
-        self.apDataReceived(data)
-
-    def connectionLost(self, reason=None):
-        """
-        Maybe we should do some recovery here, I don't know, but the Deferred
-        should be enough.
-        """
-        log.debug("lost connection: %s"%(reason),'ftp_client')
-
-class FetcherGzip(Fetcher, protocol.ProcessProtocol):
-    """
-    This is a fake Fetcher, it uses the real Fetcher from the request's
-    backend via LoopbackRequest to get the data and gzip's or gunzip's as
-    needed.
-
-    NOTE: We use the serve_cached=0 parameter to Request.fetch so if
-    it is cached it doesn't get uselessly read, we just get it from the cache.
-    """
-    post_convert = re.compile(r"^Should not match anything$")
-    gzip_convert = post_convert
-
-    exe = '/bin/gzip'
-    def activate(self, request, postconverting=0):
-        log.debug("FetcherGzip request:" + str(request.uri) + " postconvert:" + str(postconverting), 'gzip')
-        Fetcher.activate(self, request)
-        if not request.apFetcher:
-            return
-
-        self.args = (self.exe, '-c', '-9', '-n')
-        if(log.isEnabled('gzip',9)):
-            self.args += ('-v',)
-
-        if request.uri[-3:] == '.gz':
-            host_uri = request.uri[:-3]
-        else:
-            host_uri = request.uri+'.gz'
-            self.args += ('-d',)
-        self.host_file = self.factory.config.cache_dir + host_uri
-        self.args += (self.host_file,)
-
-        running = self.factory.runningFetchers
-        if not postconverting or running.has_key(host_uri):
-            #Make sure that the file is there
-            loop = LoopbackRequest(request, self.host_transfer_done)
-            loop.uri = host_uri
-            loop.local_file = self.host_file
-            loop.process()
-            self.loop_req = loop
-            loop.serve_if_cached=0
-            if running.has_key(host_uri):
-                #the file is on it's way, wait for it.
-                running[host_uri].insert_request(loop)
-            else:
-                #we are not postconverting, so we need to fetch the host file.
-                loop.fetch(serve_cached=0)
-        else:
-            #The file should be there already.
-            self.loop_req = None
-            self.host_transfer_done()
-
-    def host_transfer_done(self):
-        """
-        Called by our LoopbackRequest when the real Fetcher calls
-        finish() on it.
-
-        If everything went well, check mtimes and only do the work if needed.
-
-        If posible arrange things so the target file gets the same mtime as
-        the host file.
-        """
-        log.debug('transfer done', 'gzip')
-        if self.loop_req and self.loop_req.code != http.OK:
-            self.setResponseCode(self.loop_req.code,
-                                 self.loop_req.code_message)
-            self.apDataReceived("")
-            self.apDataEnd("")
-            return
-
-        if os.path.exists(self.host_file):
-            self.local_mtime = os.stat(self.host_file)[stat.ST_MTIME]
-        old_mtime = None
-        if os.path.exists(self.local_file):
-            old_mtime = os.stat(self.local_file)[stat.ST_MTIME]
-        if self.local_mtime == old_mtime:
-            self.apEndCached()
-        else:
-            log.debug("Starting process: " + self.exe + " " + str(self.args), 'gzip')
-            self.process = reactor.spawnProcess(self, self.exe, self.args)
-
-    def outReceived(self, data):
-        self.setResponseCode(http.OK)
-        self.apDataReceived(data)
-
-    def errReceived(self, data):
-        log.debug('gzip: ' + data,'gzip')
-
-    def loseConnection(self):
-        """
-        This is a bad workaround Process.loseConnection not doing it's
-        job right.
-        The problem only happends when we try to finish the process
-        while decompresing.
-        """
-        if hasattr(self, 'process') and self.process.pid:
-            try:
-                os.kill(self.process.pid, signal.SIGTERM)
-                self.process.connectionLost()
-            except exceptions.OSError, Error:
-                import errno
-                (Errno, Errstr) = Error
-                if Errno != errno.ESRCH:
-                    log.debug('Passing OSError exception '+Errstr)
-                    raise 
-                else:
-                    log.debug('Threw away exception OSError no such process')
-
-    def processEnded(self, reason=None):
-        __pychecker__ = 'unusednames=reason'
-        log.debug("Status: %d" %(self.process.status),'gzip')
-        if self.process.status != 0:
-            self.setResponseCode(http.NOT_FOUND)
-
-        self.apDataReceived("")
-        self.apDataEnd(self.transfered)
-
-class FetcherRsync(Fetcher, protocol.ProcessProtocol):
-    """
-    I frequently am not called directly, Request.fetch makes the
-    arrangement for FetcherGzip to use us and gzip the result if needed.
-    """
-    post_convert = re.compile(r"^Should not match anything$")
-    gzip_convert = re.compile(r"/Packages.gz$")
-    
-    "Temporary filename that rsync streams to"
-    rsyncTempFile = None
-    
-    "Number of bytes sent to client already"
-    bytes_sent = 0
-
-    def activate (self, request):
-        Fetcher.activate(self, request)
-        if not request.apFetcher:
-            return
-
-        # Change /path/to/FILE -> /path/to/.FILE.* to match rsync tempfile
-        self.globpattern = re.sub(r'/([^/]*)$', r'/.\1.*', self.local_file)
-        
-        for file in glob.glob(self.globpattern):
-          log.msg('Deleting stale tempfile:' + file)
-          unlink(file)
-                
-        uri = 'rsync://'+request.backendServer.host\
-              +request.backendServer.path+'/'+request.backend_uri
-        self.local_dir=re.sub(r"/[^/]*$", "", self.local_file)+'/'
-
-        exe = '/usr/bin/rsync'
-        if(log.isEnabled('rsync',9)):
-            args = (exe, '--partial', '--progress', '--verbose', '--times',
-                    '--timeout', "%d"%(request.backend.config.timeout),
-                    uri, '.',)
-        else:
-            args = (exe, '--quiet', '--times', uri, '.',
-                    '--timeout',  "%d"%(request.backend.config.timeout),
-                    )
-        if(not os.path.exists(self.local_dir)):
-            os.makedirs(self.local_dir)
-        self.process = reactor.spawnProcess(self, exe, args, None,
-                                            self.local_dir)
-
-    def findRsyncTempFile(self):
-        """
-        Look for temporary file created by rsync during streaming
-        """
-        files = glob.glob(self.globpattern)
-        
-        if len(files)==1:
-            self.rsyncTempFile = files[0]
-            log.debug('tempfile: ' + self.rsyncTempFile, 'rsync_client')
-        elif not files:
-            # No file created yet
-            pass
-        else:
-            log.err('found more than one tempfile, abort rsync')
-            self.transport.loseConnection()
-             
-    def connectionMade(self):
-        pass
-
-    "Data received from rsync process to stdout"
-    def outReceived(self, data):
-        for s in string.split(data, '\n'):
-            if len(s):
-                log.debug('rsync: ' + s, 'rsync_client')
-        #self.apDataReceived(data)
-        if not self.rsyncTempFile:
-            self.findRsyncTempFile()
-            # Got tempfile?
-            if self.rsyncTempFile:
-                self.setResponseCode(http.OK)
-        if self.rsyncTempFile:
-            self.sendData()
-
-
-    "Data received from rsync process to stderr"
-    def errReceived(self, data):
-        for s in string.split(data, '\n'):
-            if len(s):
-                log.err('rsync error: ' + s, 'rsync_client')
-
-    def sendData(self):
-        f = None
-        if self.rsyncTempFile:
-            try:
-                f = open(self.rsyncTempFile, 'rb')
-            except IOError:
-                return
-        else:
-            # Tempfile has gone, stream main file
-            #log.debug("sendData open dest " + str(self.bytes_sent))
-            f = open(self.local_file, 'rb')
-            
-        if f:
-            f.seek(self.bytes_sent)
-            data = f.read(abstract.FileDescriptor.bufferSize)
-            #log.debug("sendData got " + str(len(data)))
-            f.close()
-            if data:
-                self.apDataReceived(data)
-                self.bytes_sent = self.bytes_sent + len(data)
-                reactor.callLater(0, self.sendData)
-            elif not self.rsyncTempFile:
-                # Finished reading final file
-                #self.transport = None
-                log.debug("sendData complete")
-                # Tell clients, but data is already saved by rsync so don't
-                # write file again
-                self.apDataEnd(self.transfered, False)
-                
-        
-    def processEnded(self, status_object):
-        __pychecker__ = 'unusednames=reason'
-        log.debug("Status: %d" %(status_object.value.exitCode)
-                  ,'rsync_client')
-        self.rsyncTempFile = None
-        
-        # Success?
-        exitcode = status_object.value.exitCode
-        
-        if exitcode == 0:
-            # File received.  Send to clients.
-            self.local_mtime = os.stat(self.local_file)[stat.ST_MTIME]
-            reactor.callLater(0, self.sendData)
-        else:
-            if exitcode == 10:
-                # Host not found
-                self.setResponseCode(http.INTERNAL_SERVER_ERROR)
-            else:
-                self.setResponseCode(http.NOT_FOUND)
-                
-            if not os.path.exists(self.local_file):
-                try:
-                    os.removedirs(self.local_dir)
-                except:
-                    pass
-            self.apDataReceived("")
-            self.apDataEnd(self.transfered)
-
-    def loseConnection(self):
-        "Kill rsync process"
-        if self.transport:
-            if self.transport.pid:
-                log.debug("killing rsync child" + 
-                          str(self.transport.pid), 'rsync_client')
-                os.kill(self.transport.pid, signal.SIGTERM)
-            #self.transport.loseConnection()
-        
-        
-
-class FetcherCachedFile(Fetcher):
-    """
-    Sends the cached file or tells the client that the file was not
-    'modified-since' if appropriate.
-    """
-    post_convert = re.compile(r"/Packages.gz$")
-    gzip_convert = re.compile(r"^Should not match anything$")
-
-    request = None
-    def if_modified(self, request):
-        """
-        Check if the file was 'modified-since' and tell the client if it
-        wasn't.
-        """
-        if_modified_since = request.getHeader('if-modified-since')
-        if if_modified_since != None:
-            if_modified_since = http.stringToDatetime(
-                    if_modified_since)
-
-        if request.local_mtime <= if_modified_since:
-            request.setResponseCode(http.NOT_MODIFIED)
-            request.setHeader("Content-Length", 0)
-            request.write("")
-            request.finish()
-            self.remove_request(request)
-        
-    def insert_request(self, request):
-        if not request.serve_if_cached:
-            request.finish()
-            return
-        Fetcher.insert_request(self, request)
-        
-        log.debug("Serving from cache for additional client: " + self.local_file + " size:" + str(self.size))
-        self.start_transfer(request)
-        
-    def activate(self, request):
-        Fetcher.activate(self, request)
-        if not request.apFetcher:
-            return
-        self.factory.file_served(request.uri)
-        self.size = request.local_size
-        
-        self.start_transfer(request)
-        
-    def start_transfer(self, request):
-        self.if_modified(request)
-        
-        if len(self.requests) == 0:
-            #we had a single request and didn't have to send it
-            self.apEnd()
-            return
-
-        if self.size:
-            log.debug("Serving from cache: " + self.local_file + " size:" + str(self.size), 'FetcherCachedFile')
-            file = open(self.local_file,'rb')
-            fcntl.lockf(file.fileno(), fcntl.LOCK_SH)
-            
-            request.setHeader("Content-Length", request.local_size)
-            request.setHeader("Last-modified",
-                            http.datetimeToString(request.local_mtime))
-            basic.FileSender().beginFileTransfer(file, request) \
-                            .addBoth(self.file_transfer_complete, request) \
-                            .addBoth(lambda r: file.close())
-#                            .addBoth(lambda r: request.transport.loseConnection())
-        else:
-            log.debug("Zero length file! " + self.local_file, 'FetcherCachedFile')
-            self.file_transfer_complete(None, request)
-            request.finish()
-
-    # A file transfer has completed
-    def file_transfer_complete(self, result, request):
-        log.debug("transfer complete", 'FetcherCachedFile')
-        request.finish()
-        # Remove this client from request list
-        self.remove_request(request)
-        if len(self.requests) == 0:
-            Fetcher.apEnd(self)
-                                         
 class Backend:
     """
     A backend repository.  There is one Backend for each [...] section
@@ -1214,25 +47,40 @@
     "Sequence of BackendServers, in order of preference"
     uris = []
 
+    "Hash of active cache entries"
+    entries = {}
+
     "Packages database for this backend"
     packages = None
-    base = None
+    name = None
+
+    downloadQueuePerClient = True # Set to true if a download queue should be created per client
 
     def __init__(self, factory, config):
+        log.debug("Creating Backend: " + config.name)
         self.factory = factory
         self.config = config # apBackendConfig configuration information
         self.base = config.name # Name of backend
-        self.uris=[]
+        self.uris = [] # Sequence of BackendServers, in order of preference
+
+        if self.downloadQueuePerClient:
+            self.queue = fetchers.DownloadQueuePerClient()
+        else:
+            self.queue = fetchers.DownloadQueue()
+
+        self.entries = {} # Hash of active cache entries
+        self.packages = None # Packages database for this backend
 
         for uri in config.backends:
             self.addURI(uri)
+
         #self.get_packages_db().load()
 
     def addURI(self, uri):
         newBackend = BackendServer(self, uri)
         self.uris.append(newBackend)
 
-    def get_first_server(self): 
+    def get_first_server(self):
         "Provide first BackendServer for this Backend"
         return self.uris[0]
 
@@ -1246,6 +94,25 @@
     def __str__(self):
         return '('+self.base+')'+' servers:'+str(len(self.uris))
 
+    def get_cache_entry(self, path):
+        """
+        Return CacheEntry for given path
+        a new object is created if it does not already exist
+        """
+        if self.entries.has_key(path):
+            log.debug("Cache entry exists: %s, %s entries" %(path,len(self.entries)))
+            return self.entries[path]
+        else:
+            log.debug("New Cache entry: "+path)
+            e = cache.CacheEntry(self, path)
+            self.entries[path] = e
+            return e
+    def entry_done(self, entry):
+        "A cache entry is finished and clients are disconnected"
+        #if self.entries.has_key(entry.path):
+        log.debug("entry_done: %s" %(entry.path), 'Backend')
+        del self.entries[entry.path]
+
     def get_packages_db(self):
         "Return packages parser object for the backend, creating one if necessary"
         if self.packages == None:
@@ -1255,12 +122,22 @@
     def get_path(self, path):
         """
         'path' is the original uri of the request.
-        
+
         We return the path to be appended to the backend path to
         request the file from the backend server
         """
         return path[len(self.base)+2:]
-        
+
+    def file_served(self, entry):
+        "A cache entry has served a file in this backend"
+        self.get_packages_db().file_updated(entry)
+
+    def start_download(self, entry):
+        """
+        A CacheEntry has requested that a file should be downloaded from the backend
+        """
+        self.queue.addFile(entry)
+
 class BackendServer:
     """
     A repository server.  A BackendServer is created for each URI defined in 'backends'
@@ -1271,10 +148,10 @@
     uri = None            # URI of server
 
     fetchers = {
-        'http' : FetcherHttp,
-        'ftp'  : FetcherFtp,
-        'rsync': FetcherRsync,
-        'file' : FetcherFile,
+        'http' : fetchers.HttpFetcher,
+        'ftp'  : fetchers.FtpFetcher,
+        'rsync': fetchers.RsyncFetcher,
+        'file' : fetchers.FileFetcher,
         }
     ports = {
         'http' : 80,
@@ -1289,7 +166,7 @@
         log.debug("Created new BackendServer: " + uri)
 
         # hack because urlparse doesn't support rsync
-        if uri[0:5] == 'rsync':
+        if uri[0:6] == 'rsync:':
             uri = 'http'+uri[5:]
             is_rsync=1
         else:
@@ -1297,6 +174,8 @@
 
         self.scheme, netloc, self.path, parameters, \
                      query, fragment = urlparse.urlparse(uri)
+        if is_rsync:
+            self.scheme = 'rsync'
 
         if '@' in netloc:
             auth = netloc[:netloc.rindex('@')]
@@ -1304,13 +183,12 @@
             self.username, self.password = auth.split(':')
         else:
             self.username = None
+            self.password = None
         if ':' in netloc:
             self.host, self.port = netloc.split(':')
         else:
             self.host = netloc
             self.port = self.ports[self.scheme]
-        if is_rsync:
-            self.scheme = 'rsync'
         self.fetcher = self.fetchers[self.scheme]
         try:
             self.port = int(self.port)
@@ -1320,21 +198,23 @@
     def __str__(self):
         return ('(' + self.backend.base + ') ' + self.scheme + '://' +
                self.host + ':' + str(self.port))
-              
+
 class Request(http.Request):
     """
     Each new request from connected clients generates a new instance of this
     class, and process() is called.
     """
-    local_mtime = None
+    if_modified_since = None
     local_size = None
     serve_if_cached = 1
     apFetcher = None
     uriIndex = 0             # Index of backend URI
     backend = None           # Backend for this request
     backendServer = None     # Current server to be tried
+    cacheEntry = None        # Cache entry for file requested
     
     def __init__(self, channel, queued):
+        log.debug("New Request, queued=%s" % (queued),'Request');
         self.factory=channel.factory
         http.Request.__init__(self, channel, queued)
 
@@ -1342,298 +222,118 @@
         """
         Each new request begins processing here
         """
-        log.debug("Request: " + self.method + " " + self.uri);
-        # Clean up URL
-        self.uri = self.simplify_path(self.uri)
+        self.uri = self.clean_path(self.uri)
 
-        self.local_file = self.factory.config.cache_dir + self.uri
-        backendName = self.uri[1:].split('/')[0]
-        log.debug("Request: %s %s backend=%s local_file=%s"%(self.method, self.uri, backendName, self.local_file))
+        if_modified_since = self.getHeader('if-modified-since')
+        if if_modified_since != None:
+            self.if_modified_since = http.stringToDatetime(
+                    if_modified_since)
+
+        if self.uri[0] != '/':
+            log.debug("Request must include at least one '/'")
+            self.finishCode(http.FORBIDDEN, "Request must include at least one '/'")
+            return
 
-        if self.factory.config.disable_pipelining:
-            self.setHeader('Connection','close')
-            self.channel.persistent = 0
+        backendName = self.uri[1:].split('/')[0]
+        log.debug("Request: %s %s backend=%s uri=%s"
+                    % (self.method, self.uri, backendName, self.uri),'Request')
 
         if self.method != 'GET':
             #we currently only support GET
-            log.debug("abort - method not implemented")
+            log.debug("abort - method not implemented", 'Request')
             self.finishCode(http.NOT_IMPLEMENTED)
             return
 
         if re.search('/\.\./', self.uri):
-            log.debug("/../ in simplified uri ("+self.uri+")")
+            log.debug("/../ in simplified uri ("+self.uri+")", 'Request')
             self.finishCode(http.FORBIDDEN)
             return
 
         self.backend = self.factory.getBackend(backendName)
         if self.backend is None:
-            if not self.factory.config.dynamic_backends:
-                log.debug("abort - non existent Backend")
-                self.finishCode(http.NOT_FOUND, "NON-EXISTENT BACKEND")
-                return
-
-            # We are using dynamic backends so we will use the name as
-            # the hostname to get the files.
-            backendName = self.uri[1:].split('/')[0]
-            backendServer = "http://" + backendName
-            log.debug("Adding " + backendName + " backend dynamicaly")
-            backendConfig = self.factory.config.addBackend(None, backendName, (backendServer,))
-            self.backend = Backend(self.factory, backendConfig)
-        self.backend_uri = self.backend.get_path(self.uri)
+            self.finishCode(http.NOT_FOUND, "NON-EXISTENT BACKEND")
+            return None
 
         log.debug("backend: %s %s" % (self.backend.base, self.backend.uris))
-        self.backendServer = self.backend.get_first_server()
-        self.filetype = findFileType(self.uri)
 
-        if not self.filetype:
-            log.debug("abort - unknown extension")
-            self.finishCode(http.NOT_FOUND)
+        backend_path = self.uri.split('/',2)[2]
+        self.cacheEntry = self.backend.get_cache_entry(backend_path)
+
+        if not self.cacheEntry.filetype:
+            log.debug("abort - unknown extension for file %s" % (backend_path), 'Request')
+            self.finishCode(http.FORBIDDEN, 'File not found - unknown extension')
             return
 
-        self.setHeader('content-type', self.filetype.contype)
+        self.setHeader('content-type', self.cacheEntry.filetype.contype)
 
-        if os.path.isdir(self.local_file):
-            log.debug("abort - Directory listing not allowed")
-            self.finishCode(http.FORBIDDEN)
+        if os.path.isdir(self.cacheEntry.file_path):
+            log.debug("abort - Directory listing not allowed", 'Request')
+            self.finishCode(http.FORBIDDEN, 'Directory listing not permitted')
             return
 
-        self.fetch()
+        self.cacheEntry.add_request(self)
+
+    def clean_path(self, uri):
+        # Clean up URL given
+        scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
+        return os.path.normpath(path)
 
-    def fetch(self, serve_cached=1):
+
+    def start_streaming(self, size, mtime):
         """
-        Serve 'self' from cache or through the appropriate Fetcher
-        depending on the asociated backend.
-    
-        Use post_convert and gzip_convert regular expresions of the Fetcher
-        to gzip/gunzip file before and after download.
-    
-        'serve_cached': this is somewhat of a hack only useful for
-        LoopbackRequests (See LoopbackRequest class for more information).
+        Prepare client to stream file
+        Return false if streaming is not necessary (i.e. cache hit)
         """
-        def fetch_real(result, dummyFetcher, cached, running):
-            """
-            This is called after verifying if the file is properly cached.
-            
-            If 'cached' the requested file is properly cached.
-            If not 'cached' the requested file was not there, didn't pass the
-            integrity check or may be outdated.
-            """
-            __pychecker__ = 'unusednames=result'
-
-            if len(dummyFetcher.requests)==0:
-                #The request's are gone, the clients probably closed the
-                #conection
-                log.debug("THE REQUESTS ARE GONE (Clients closed conection)", 
-                          'fetch')
-                dummyFetcher.apEnd()
-                return
-
-
-            req = dummyFetcher.request
-
-            log.debug("cached: %s" % cached)
-            
-            if cached:
-                msg = ("Using cached copy of %s"
-                       %(dummyFetcher.request.local_file))
-                fetcher_class = FetcherCachedFile
-            else:
-                msg = ("Consulting server about %s"
-                       %(dummyFetcher.request.local_file))
-                fetcher_class = req.backendServer.fetcher
-
-            if fetcher_class.gzip_convert.search(req.uri):
-                msg = ("Using gzip/gunzip to get %s"
-                       %(dummyFetcher.request.local_file))
-                fetcher_class = FetcherGzip
-
-            log.debug(msg, 'fetch_real')
-            fetcher = dummyFetcher.apEndTransfer(fetcher_class)
-            if (fetcher and fetcher.post_convert.search(req.uri)
-                and not running.has_key(req.uri[:-3])):
-                log.debug("post converting: "+req.uri,'convert')
-                loop = LoopbackRequest(req)
-                loop.uri = req.uri[:-3]
-                loop.local_file = req.local_file[:-3]
-                loop.process()
-                loop.serve_if_cached=0
-                #FetcherGzip will attach as a request of the
-                #original Fetcher, efectively waiting for the
-                #original file if needed
-                gzip = FetcherGzip()
-                gzip.activate(loop, postconverting=1)
-
-        self.serve_if_cached = serve_cached
-        running = self.factory.runningFetchers
-        if (running.has_key(self.uri)):
-            #If we have an active fetcher just use that
-            log.debug("have active fetcher: "+self.uri,'client')
-            running[self.uri].insert_request(self)
-            return running[self.uri]
+        if self.if_modified_since is None or self.if_modified_since < mtime:
+            log.debug("start_streaming size=%s mtime=%s if_modified_since=%s" % (size, mtime, self.if_modified_since) , 'Request')
+            self.setResponseCode(http.OK, 'Streaming file')
+            if mtime is not None:
+                self.setHeader('last-modified', http.datetimeToString(mtime))
+            if size is not None:
+                self.setHeader('content-length', size)
+            return True
         else:
-            #we make a FetcherDummy instance to hold other requests for the
-            #same file while the check is in process. We will transfer all
-            #the requests to a real fetcher when the check is done.
-            dummyFetcher = FetcherDummy(self)
-            #Standard Deferred practice
-            d = self.check_cached()
-            d.addCallbacks(fetch_real, fetch_real,
-                           (dummyFetcher, 1, running,), None,
-                           (dummyFetcher, 0, running,), None)
-            return None
-    
-    def simplify_path(self, old_path):
-        """
-        change //+ with /
-        change /directory/../ with /
-        More than three ocurrences of /../ together will not be
-        properly handled
-        
-        NOTE: os.path.normpath could probably be used here.
-        """
-        path = re.sub(r"//+", "/", old_path)
-        path = re.sub(r"/\./+", "/", path)
-        new_path = re.sub(r"/[^/]+/\.\./", "/", path)
-        while (new_path != path):
-            path = new_path
-            new_path = re.sub(r"/[^/]+/\.\./", "/", path)
-        if (new_path != old_path):
-            log.debug("simplified path from " + old_path + 
-                      " to " + new_path,'simplify_path')
-        return path
+            log.debug("file not modified: mtime=%s if_modified_since=%s" % (mtime, self.if_modified_since) , 'Request')
+            self.setHeader("content-length", 0)
+            self.finishCode(http.NOT_MODIFIED, 'File is up to date')
+            return False
 
     def finishCode(self, responseCode, message=None):
-        "Finish the request with an status code"
+        "Finish the request with a status code and no streamed data"
+        log.debug("finishCode: %s, %s" % (responseCode, message), 'Request')
         self.setResponseCode(responseCode, message)
         self.write("")
         self.finish()
 
     def finish(self):
+        "Finish request after streaming"
+        log.debug("finish. Queued: %s" % (self.queued) , 'Request')
         http.Request.finish(self)
-        if self.factory.config.disable_pipelining:
-            if hasattr(self.transport, 'loseConnection'):
-                self.transport.loseConnection()
-
-    def check_cached(self):
-        """
-        check the existence and ask for the integrity of the requested file and
-        return a Deferred to be trigered when we find out.
-        """
-        def file_ok(result, deferred, self):
-            """
-            called if FileVerifier has determined that the file is cached and
-            in good shape.
-
-            Now we check NOTE: The file may still be too old or not fresh
-            enough.
-            """
-            __pychecker__ = 'unusednames=result'
-            stat_tuple = os.stat(self.local_file)
-
-            self.local_mtime = stat_tuple[stat.ST_MTIME]
-            self.local_size = stat_tuple[stat.ST_SIZE]
-            log.debug("Modification time:" + 
-                      time.asctime(time.localtime(self.local_mtime)), 
-                      "file_ok")
-            update_times = self.factory.update_times
-
-            if update_times.has_key(self.uri): 
-                last_access = update_times[self.uri]
-                log.debug("last_access from db: " + 
-                          time.asctime(time.localtime(last_access)), 
-                          "file_ok")
-            else:
-                last_access = self.local_mtime
-
 
-            cur_time = time.time()
-            min_time = cur_time - self.factory.config.min_refresh_delay
+        if self.cacheEntry:
+            self.cacheEntry.remove_request(self)
+            self.cacheEntry = None
 
-            if not self.filetype.mutable:
-                log.debug("file is immutable: "+self.local_file, 'file_ok')
-                deferred.callback(None)
-            elif last_access < min_time:
-                log.debug("file is too old: "+self.local_file, 'file_ok')
-                update_times[self.uri] = cur_time
-                deferred.errback()
-            else:
-                log.debug("file is ok: "+self.local_file, 'file_ok')
-                deferred.callback(None)
-
-        log.debug("check_cached: "+self.local_file, 'file_ok')
-        deferred = defer.Deferred()
-        if os.path.exists(self.local_file):
-            verifier = FileVerifier(self)
-            verifier.deferred.addCallbacks(file_ok, deferred.errback,
-                                           (deferred, self), None,
-                                           None, None)
-        else:
-            deferred.errback()
-        return deferred
-        
     def connectionLost(self, reason=None):
         """
         The connection with the client was lost, remove this request from its
         Fetcher.
         """
-        __pychecker__ = 'unusednames=reason'
-        #If it is waiting for a file verification it may not have an
-        #apFetcher assigned
-        if self.apFetcher:
-            self.apFetcher.remove_request(self)
-        self.finish()
+        log.debug("connectionLost" , 'Request')
+        if self.cacheEntry:
+            self.cacheEntry.remove_request(self)
+        #self.finish()
 
-    def activateNextBackendServer(self, fetcher):
+    def getFileno(self):
         """
-        The attempt to retrieve a file from the BackendServer failed.
-        Look for the next possible BackendServer and transfer requests to that
-        Returns true if another BackendServer was found
-        """
-        self.backendServer = self.backend.get_next_server(self.backendServer)
-        if(self.backendServer == None):
-            log.debug("no more Backends", "fetcher")
-            return False
-        
-        fetcher_class = self.backendServer.fetcher
-        log.debug('Trying next backendServer', 'fetcher')
-        fetcher.apEndTransfer(fetcher_class)
-        
-        return True
-        
-        
-class LoopbackRequest(Request):
-    """
-    This is just a fake Request so a Fetcher can attach to another
-    Fetcher and be notified when then transaction is completed.
-
-    Look at FetcherGzip for a sample.
-    """
-    __pychecker__ = 'no-callinit'
-    import cStringIO
-    local_mtime = None
-    headers = {}
-    content = cStringIO.StringIO()
-    
-    def __init__(self, other_req, finish=None):
-
-        self.finish_cb = finish
-        http.Request.__init__(self, None, 1)
-        self.backend = other_req.backend
-        self.factory = other_req.factory
-        self.filetype = other_req.filetype
-        self.method = other_req.method
-        self.clientproto = other_req.clientproto
-    def process(self):
-        self.backend_uri = self.backend.get_path(self.uri)
-    def write(self, data):
-        "We don't care for the data, just want to know then it is served."
-        pass
-    def finish(self):
-        "If he wanted to know, tell daddy that we are served."
-        if self.finish_cb:
-            self.finish_cb()
-        self.transport = None
-        pass
+        Get identifier which is unique per apt client
+        """
+        try:
+            fileno = self.channel.transport.fileno()
+        except:
+            fileno = -1
+            log.msg("could not get transport's file descriptor", 'Request')
+        return fileno
 
 class Channel(http.HTTPChannel):
     """
@@ -1647,7 +347,7 @@
 
     def headerReceived(self, line):
         "log and pass over to the base class"
-        #log.debug("Header: " + line)
+        log.debug("Header: " + line)
         if self.log_headers == None:
             self.log_headers = line
         else:
@@ -1663,13 +363,16 @@
     def connectionLost(self, reason=None):
         "If the connection is lost, notify all my requests"
         __pychecker__ = 'unusednames=reason'
-        for req in self.requests:
-            req.connectionLost()
-        log.debug("Client connection closed")
+        log.debug("Client connection closed", 'Channel')
+        http.HTTPChannel.connectionLost(self, reason)
         if log.isEnabled('memleak'):
             memleak.print_top_10()
         #reactor.stop()   # use for shutting down apt-proxy when a client disconnects
 
+    #def requestDone(self, request):
+        #log.debug("========Request Done=========", 'Channel')
+        #http.HTTPChannel.requestDone(self, request)
+        
 class Factory(protocol.ServerFactory):
     """
     This is the center of apt-proxy, it holds all configuration and global data
@@ -1691,81 +394,55 @@
     self.packages: all versions of a certain package name.
     
     """
-    databases=('update_times', 'access_times', 'packages')
+
+
+
+    def __init__ (self, config):
+        self.runningFetchers = {}
+        self.backends = {}
+        self.config = config
+        self.periodicCallback = None
+        self.databases = databaseManager(self)
+        self.recycler = None
+
+    def __del__(self):
+        pass
+        #self.closeDatabases()
 
     def periodic(self):
         "Called periodically as configured mainly to do mirror maintanace."
         log.debug("Doing periodic cleaning up")
+        self.periodicCallback = None
         self.clean_old_files()
         self.recycler.start()
         log.debug("Periodic cleaning done")
-        if (self.config.cleanup_freq != None):
-            reactor.callLater(self.config.cleanup_freq, self.periodic)
-    def __del__(self):
-        for f in self.databases:
-            try:
-                if hasattr(self, f): 
-                    getattr(self, f).close()
-            except Exception:
-                pass
-    def __init__ (self, config):
-        self.runningFetchers = {}
-        self.backends = []
-        self.config = config
+        self.startPeriodic()
+
+    def startPeriodic(self):
+        if (self.config.cleanup_freq != None and self.periodicCallback is None):
+            log.debug("Will do periodic cleaup in %s sec" % (self.config.cleanup_freq))
+            self.periodicCallback = reactor.callLater(self.config.cleanup_freq, self.periodic)
+
+    def stopPeriodic(self):
+        if self.periodicCallback is not None:
+            self.periodicCallback.cancel()
+            self.periodicCallback = None
 
     def __getattr__ (self, name):
-        def open_shelve(dbname):
-            from bsddb3 import db,dbshelve
- 
-            shelve = dbshelve.DBShelf()
-            db_dir = self.config.cache_dir+'/'+status_dir+'/db'
-            if not os.path.exists(db_dir):
-                os.makedirs(db_dir)
-
-            filename = db_dir + '/' + dbname + '.db'
-            if os.path.exists(filename):
-                 try:
-                     log.debug('Verifying database: ' + filename)
-                     shelve.verify(filename)
-                 except:
-                     os.rename(filename, filename+'.error')
-                     log.msg(filename+' could not be opened, moved to '+filename+'.error','db', 1)
-                     log.msg('Recreating '+ filename,'db', 1)
-            try:
-               log.debug('Opening database ' + filename)
-               shelve = dbshelve.open(filename)
-
-            # Handle upgrade to new format included on 1.9.20.
-            except db.DBInvalidArgError:
-                log.msg('Upgrading from previous database format: %s' % filename + '.previous')
-                import bsddb.dbshelve
-                os.rename(filename, filename + '.previous')
-                previous_shelve = bsddb.dbshelve.open(filename + '.previous')
-                shelve = dbshelve.open(filename)
-
-                for k in previous_shelve.keys():
-                    shelve[k] = previous_shelve[k]
-                log.msg('Upgrade complete')
-                    
-            return shelve
-
-        if name == 'update_times':
-            self.update_times = open_shelve('update')
-            return self.update_times
-        elif name == 'access_times':
-            self.access_times = open_shelve('access')
-            return self.access_times
-        elif name == 'packages':
-            self.packages = open_shelve('packages')
-            return self.packages
+        # Auto open database if requested
+        if name in self.databases.table_names:
+            db = self.databases.get(name)
+            setattr(self, name, db)
+            return db
         else:
             raise AttributeError(name)
 
     def startFactory(self):
         #start periodic updates
         self.configurationChanged()
-        self.recycler = misc.MirrorRecycler(self, 1)
-        self.recycler.start()
+        self.dumpdbs()
+        self.recycler = MirrorRecycler(self, 1)
+        #self.recycler.start()
 
     def configurationChanged(self, oldconfig = None):
         """
@@ -1779,7 +456,7 @@
                     setattr(self.config, param, getattr(oldconfig, param))
 
         if self.config.cleanup_freq != None and (oldconfig is None or oldconfig.cleanup_freq == None):
-            reactor.callLater(self.config.cleanup_freq, self.periodic)
+            self.startPeriodic()
         self.createBackends()
 
     def createBackends(self):
@@ -1796,7 +473,18 @@
         """
         if self.backends.has_key(name):
             return self.backends[name]
-        return None
+
+        if not self.config.dynamic_backends:
+            return None
+
+        # We are using dynamic backends so we will use the name as
+        # the hostname to get the files.
+        backendServer = "http://" + name
+        log.debug("Adding dynamic backend:" + name)
+        backendConfig = self.config.addBackend(None, name, (backendServer,))
+        backend = Backend(self, backendConfig)
+        self.backends[name] = backend
+        return backend
 
     def clean_versions(self, packages):
         """
@@ -1825,6 +513,8 @@
         from packages import AptDpkgInfo, get_mirror_versions
         for uri in packages[:]:
             if not os.path.exists(cache_dir +'/'+ uri):
+                log.debug("clean_versions: file %s no longer exists"%(uri),
+                        'versions')
                 packages.remove(uri)
             else:
                 try:
@@ -1833,17 +523,17 @@
                     package_name = info['Package']
                 except SystemError:
                     log.msg("Found problems with %s, aborted cleaning"%(uri),
-                            'max_versions')
+                            'versions')
                     return
 
-        if len(info):
+        if len(cached_packages) > 0:
             import apt_pkg
             cached_packages.sort(reverse_compare)
-            log.debug(str(cached_packages), 'max_versions')
+            log.debug(str(cached_packages), 'versions')
 
             current_packages = get_mirror_versions(self, package_name)
             current_packages.sort(reverse_compare)
-            log.debug("Current Versions: " + str(current_packages), 'max_versions')
+            log.debug("Current Versions: " + str(current_packages), 'versions')
 
             version_count = 0
 
@@ -1865,6 +555,7 @@
                     if version_count > self.config.max_versions:
                         log.msg("Deleting " + cache_dir +'/'+ cached_packages[0][1], 'max_versions')
                         os.unlink(cache_dir +'/'+ cached_packages[0][1])
+                        packages.remove(cached_packages[0][1])
                     del cached_packages[0]
 
     def clean_old_files(self):
@@ -1896,30 +587,52 @@
                 log.debug("old_file: non-existent "+file)
                 del self.update_times[file]
 
-    def file_served(self, uri):
-        "Update the databases, this file has just been served."
-        self.access_times[uri]=time.time()
-        if re.search("\.deb$", uri):
-            package = re.sub("^.*/", "", uri)
+    def file_served(self, cache_path):
+        """
+        Update the databases, this file has just been served.
+        @param cache_path: path of file within cache e.g. debian/dists/stable/Release.gpg
+        """
+        log.debug("File served: %s" % (cache_path))
+        path = os.sep + cache_path # Backwards compat
+        #path = cache_path
+        self.access_times[path]=time.time()
+        if re.search("\.deb$", path):
+            package = re.sub("^.*/", "", path)
             package = re.sub("_.*$", "", package)
             if not self.packages.has_key(package):
-                packages = [uri]
-                self.packages[package] = packages
+                packages = [path]
             else:
                 packages = self.packages[package]
-                if not uri in packages:
-                    packages.append(uri)
+                if not path in packages:
+                    packages.append(path)
                 self.clean_versions(packages)
-                self.packages[package] = packages
+            self.packages[package] = packages
         self.dumpdbs()
 
+    def closeDatabases(self):
+        for db in self.databases.table_names:
+            if getattr(self.databases, db) is not None:
+                log.debug("closing " + db, 'db')
+                getattr(self,db).close()
+                delattr(self,db)
+                setattr(self.databases, db, None)
+
     def stopFactory(self):
+        log.debug('Main factory stop', 'factory')
         import packages
-        self.dumpdbs()
-        self.update_times.close()
-        self.access_times.close()
-        self.packages.close()
+        # self.dumpdbs()
+        
+        # Stop all DownloadQueues and their fetchers
+        for b in self.backends.values():
+            b.queue.stop()
+            b.queue = None
+        self.backends = {}
         packages.cleanup(self)
+        if self.recycler is not None:
+            self.recycler.stop()
+            self.recycler = None
+        self.stopPeriodic()
+        #self.closeDatabases()
 
     def dumpdbs (self):
         def dump_update(key, value):
@@ -1960,3 +673,56 @@
 
     def debug(self, message):
         log.debug(message)
+
+class databaseManager:
+    update_times = None
+    access_times = None
+    packages = None
+    table_names=['update_times', 'access_times', 'packages']
+    database_files=['update', 'access', 'packages']
+
+    def __init__(self, factory):
+        self.factory = factory
+
+    def get(self, name):
+        idx = self.table_names.index(name)
+        db = getattr(self,name)
+        if db is None:
+            db = self.open_shelve(self.database_files[idx])
+            setattr(self, name, db)
+        return db
+
+    def open_shelve(self, dbname):
+        from bsddb import db,dbshelve
+
+        shelve = dbshelve.DBShelf()
+        db_dir = self.factory.config.cache_dir+'/'+status_dir+'/db'
+        if not os.path.exists(db_dir):
+            os.makedirs(db_dir)
+
+        filename = db_dir + '/' + dbname + '.db'
+        if os.path.exists(filename):
+                try:
+                    log.debug('Verifying database: ' + filename)
+                    shelve.verify(filename)
+                except:
+                    os.rename(filename, filename+'.error')
+                    log.msg(filename+' could not be opened, moved to '+filename+'.error','db', 1)
+                    log.msg('Recreating '+ filename,'db', 1)
+        try:
+            log.debug('Opening database ' + filename)
+            shelve = dbshelve.open(filename)
+
+        # Handle upgrade to new format included on 1.9.20.
+        except db.DBInvalidArgError:
+            log.msg('Upgrading from previous database format: %s' % filename + '.previous')
+            import bsddb.dbshelve
+            os.rename(filename, filename + '.previous')
+            previous_shelve = bsddb.dbshelve.open(filename + '.previous')
+            shelve = dbshelve.open(filename)
+
+            for k in previous_shelve.keys():
+                shelve[k] = previous_shelve[k]
+            log.msg('Upgrade complete')
+
+        return shelve

Modified: trunk/apt_proxy/apt_proxy_conf.py
==============================================================================
--- trunk/apt_proxy/apt_proxy_conf.py	(original)
+++ trunk/apt_proxy/apt_proxy_conf.py	Thu Aug  3 23:54:46 2006
@@ -49,6 +49,8 @@
     def gettime(self, section, option):
         mult = 1
         value = self.get(section, option)
+        if len(value) == 0:
+            raise ConfigError("Configuration parse error: [%s] %s" % (section, option))
         suffix = value[-1].lower()
         if suffix in self.time_multipliers.keys():
             mult = self.time_multipliers[suffix]
@@ -58,6 +60,13 @@
         return self.get(section,option)
     def getstringlist(self, section, option):
         return self.get(section,option).split()
+    def getproxyspec(self, section, option):
+        "Get http proxy info from string"
+        p = ProxyConfig(self.get(section,option))
+        if p.host is not None:
+            return p
+        else:
+            return None
 
 class apConfig:
     """
@@ -77,7 +86,7 @@
         ['address', '', 'string'],
         ['port', 9999, 'int'],
         ['min_refresh_delay', 30, 'time'],
-        ['complete_clientless_downloads', '0', 'boolean'],
+        ['complete_clientless_downloads', False, 'boolean'],
         ['telnet_port', 0, 'int'],
         ['telnet_user', '', 'string'],
         ['telnet_pass', '', 'string'],
@@ -87,11 +96,11 @@
         ['max_versions', 3, '*int'],
         ['max_age', 10, '*time'],
         ['import_dir', '/var/cache/apt-proxy/import', 'string'],
-        ['disable_pipelining', '1', 'boolean'],
         ['passive_ftp', 'on', 'boolean'],
         ['dynamic_backends', 'on', 'boolean'],
-        ['http_proxy', '' , 'string'],
-        ['username', 'aptproxy', 'string']
+        ['http_proxy', None , 'proxyspec'],
+        ['username', 'aptproxy', 'string'],
+        ['bandwidth_limit', None, '*int']
         ]
 
     """
@@ -104,7 +113,9 @@
     BACKEND_CONFIG_ITEMS = [
         ['timeout', None, 'time'],
         ['passive_ftp', None, 'boolean'],
-        ['backends', '', 'stringlist']
+        ['backends', '', 'stringlist'],
+        ['http_proxy', None , 'proxyspec'],
+        ['bandwidth_limit', None, '*int']
         ]
 
     DEFAULT_CONFIG_FILE = ['/etc/apt-proxy/apt-proxy-v2.conf',
@@ -160,9 +171,8 @@
         filehandle.close()
         return conf
     
-    def setDebug(self, levels):
+    def setDebug(self):
         "Set logger debug level"
-        self.debug = levels
         for domain in self.debug.split():
             #print "domain:",domain
             if domain.find(':') != -1:
@@ -180,7 +190,7 @@
             self.debug=config.get(DEFAULTSECT, 'debug')
         else:
             self.debug='all:3'
-        self.setDebug(self.debug)
+        self.setDebug()
 
         # read default values
         for name,default,getmethod in self.CONFIG_ITEMS:
@@ -262,3 +272,23 @@
     name = "UNKNOWN"
     def __init__(self, name):
         self.name = name
+
+class ProxyConfig:
+    """
+    Configuration information for backend server proxies
+    """
+    host = None
+    port = None
+    user = None
+    password = None
+
+    def __init__(self, proxyspec):
+        if proxyspec=='':
+            return
+        m = re.match('^((?P<user>.*):(?P<password>.*)@)?(?P<host>[a-zA-Z0-9_.+=-]+):(?P<port>[0-9]+)',
+                     proxyspec)
+        if m:
+            self.host = m.group('host')
+            self.port = m.group('port')
+            self.user = m.group('user')
+            self.password = m.group('password')

Added: trunk/apt_proxy/cache.py
==============================================================================
--- (empty file)
+++ trunk/apt_proxy/cache.py	Thu Aug  3 23:54:46 2006
@@ -0,0 +1,595 @@
+#
+# Copyright (C) 2005 Chris Halls <halls at debian.org>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+# -*- test-case-name: apt_proxy.test.test_cache -*-
+
+"""
+Cache management for apt-proxy
+
+These classes implement functionality for managing apt-proxy's cache.  The most
+important of these is CacheEntry, which manages the lifecycle of a file in ap's cache
+"""
+
+from twisted.internet import protocol, defer, reactor
+from twisted.web import http
+from twisted.protocols import basic
+import os, re, stat, time, sys
+from misc import log
+
+class CacheEntry:
+    """
+    This class manages operations on a file in the cache.  Each physical
+    file on the disk corresponds to one CacheEntry.  Normally a CacheEntry
+    is created when the first Request for this file is received
+
+    Active CacheEntries are managed in their corresponding Backend
+    """
+
+    # Define lifecyle of cache entry
+    STATE_NEW = 1 # Entry is not yet being sent
+    STATE_CONNECTING = 2 # Waiting for connection to download file
+    STATE_DOWNLOAD = 3 # File is in process of downloading
+    STATE_SENDFILE = 4 # File is being sent from cache
+    STATE_SENT = 5 # Post download processing / waiting for clients to complete
+    STATE_FAILED = 6 # Download failed
+    
+
+    bytesDownloaded = 0
+
+    def __init__(self, backend, path):
+        """
+        Create a new cache entry
+        @param backend Backend where this entry belongs
+        @param path Path to file within backend directory
+        """
+        self.backend = backend
+        self.factory = backend.factory
+        self.requests = [] # Active client requests for this cache entry
+        self.streamfile = None
+        self.state = self.STATE_NEW
+
+        # Path of file within backend e.g. 'dists/stable/Release.gpg'
+        self.path = path 
+
+        # Path of file within cache e.g. 'debian/dists/stable/Release.gpg'
+        self.cache_path = backend.base + os.sep + path
+
+        # File in cache '/var/cache/apt-proxy/debian/dists/stable/Release.gpg'
+        self.file_path = (self.factory.config.cache_dir + os.sep + 
+                          self.cache_path)
+
+        # Directory of cache file '/var/cache/apt-proxy/debian/dists/stable'
+        self.filedir = os.path.dirname(self.file_path)
+
+        self.filetype = findFileType(path)
+        self.filename = os.path.basename(path) # 'Release.gpg'
+
+        # filebase='Release' fileext='gpg'
+        (self.filebase, self.fileext) = os.path.splitext(self.filename)
+
+        # self.create_directory()
+        self.file_mtime = None
+        self.file_size = None
+
+        self.fetcher = None
+
+    def add_request(self, request):
+        """
+        A new request has been received for this file
+        """
+        if request in self.requests:
+            raise RuntimeError, \
+                  'this request is already assigned to this CacheEntry'
+        self.requests.append(request)
+        if(len(self.requests)==1):
+            # First request
+            self.get()
+        else:
+            # Subsequent request - client must be brought up to date
+            if self.state == self.STATE_DOWNLOAD:
+                raise RuntimeError, \
+                      'TODO: multiple clients not implemented yet'
+
+    def remove_request(self,request):
+        """
+        Remove request, either because streaming is complete or
+        the client has disconnected
+
+        If parameter request is None, downloading has been aborted early
+        """
+        if request is not None and request in self.requests:
+            self.requests.remove(request)
+        if len(self.requests) != 0:
+            return
+
+        log.debug("Last request removed",'cacheEntry')
+        self.backend.entry_done(self)
+
+        # TODO - fixme
+        #if (self.factory.config.complete_clientless_downloads == False
+             #and self.state == self.STATE_DOWNLOAD
+             #and self.fetcher is not None):
+            ## Cancel download in progress
+            #log.debug("cancelling download (set complete_clientless_downloads to continue)",'cacheEntry')
+            #self.fetcher.cancel_download()
+
+        if self.streamfile is not None:
+            # File was streamed to clients
+            self.streamfile.close()
+            self.streamfile = None
+
+    def start_request_stream(self, request):
+        """
+        Prepare a request for streaming
+        """
+        log.msg("start_request_stream:" + self.file_path, "CacheEntry")
+        request.startStreaming(self.size, self.mtime)
+
+        if self.streamfile.size() != 0:
+            request.write(self.streamfile.read_from(start=0)) # TODO - is this efficient?
+
+
+    def get(self):
+        """
+        Update current version of file in cache
+        """
+        if self.state == self.STATE_NEW:
+            if os.path.exists(self.file_path):
+                self.stat_file()
+                if self.check_age():
+                    self.verify()
+                    return
+
+        self.start_download()
+
+    def verify(self):
+        """
+        check the existence and ask for the integrity of the requested file and
+        return a Deferred to be trigered when we find out.
+        """
+        log.debug("check_cached: "+self.path, 'CacheEntry')
+        verifier = FileVerifier(self.file_path, self.factory.config)
+        d = verifier.verify()
+        d.addCallback(self.send_cached_file)
+        d.addErrback(self.verify_failed)
+
+    def verify_failed(self, parm=None):
+        self.file_mtime = None
+        self.file_size = None
+        self.start_download()
+
+    def stat_file(self):
+        """
+        Read file age
+        """
+        stat_tuple = os.stat(self.file_path)
+
+        self.file_mtime = stat_tuple[stat.ST_MTIME]
+        self.file_size = stat_tuple[stat.ST_SIZE]
+        log.debug("Modification time:" + 
+                  time.asctime(time.localtime(self.file_mtime)), 
+                  "CacheEntry")
+
+    def check_age(self):
+        """
+        Read file age and check if file should be updated / refreshed
+
+        @return True if file is still valid, False if file is out of date
+        """
+
+        update_times = self.factory.update_times
+
+        if update_times.has_key(self.cache_path): 
+            last_access = update_times[self.cache_path]
+            log.debug("last_access from db: " + 
+                      time.asctime(time.localtime(last_access)), 
+                      "CacheEntry")
+        else:
+            last_access = self.file_mtime
+
+
+        cur_time = time.time()
+        min_time = cur_time - self.factory.config.min_refresh_delay
+
+        if not self.filetype.mutable:
+            log.debug("file is immutable: "+self.file_path, 'CacheEntry')
+            return True
+        elif last_access < min_time:
+            log.debug("file is too old: "+self.file_path, 'CacheEntry')
+            return False
+        else:
+            log.debug("file is ok: "+self.file_path, 'CacheEntry')
+            return True
+
+    def send_cached_file(self, unused=None):
+        """
+        File is up to date - send complete file from cache to clients
+        """
+        log.msg("sending file from cache:" + self.file_path, "CacheEntry")
+        self.transfer_file(self.file_path)
+
+    def end_send_cached(self):
+        """
+        Processing continues here when the file has been sent from the cache
+        """
+        self.file_sent()
+
+    def transfer_file(self, filename):
+        """
+        Send given file to clients
+        """
+        log.msg("transfer_file:" + filename, "CacheEntry")
+        try:
+            stat_tuple = os.stat(filename)
+            mtime = stat_tuple[stat.ST_MTIME]
+            size = stat_tuple[stat.ST_SIZE]
+    
+            self.state = self.STATE_SENDFILE
+            if size > 0:
+                log.debug("Sending file to clients:%s size:%s" % (filename, size), 'CacheEntry')
+                self.streamfile = open(filename,'rb')
+                #fcntl.lockf(file.fileno(), fcntl.LOCK_SH)
+
+                for request in self.requests:
+                    if request.start_streaming(size, mtime):
+                        basic.FileSender().beginFileTransfer(self.streamfile, request) \
+                                        .addBoth(self.file_transfer_complete, request, filename)
+            else:
+                log.debug("Sending empty file to clients:%s" % (filename), 'CacheEntry')
+                for request in self.requests:
+                    if request.start_streaming(size, mtime):
+                        request.finish()
+        except Exception, e:
+            log.debug("Unexpected error: %s" % (e), 'CacheEntry')
+            raise
+
+    def file_transfer_complete(self, result, request, filename):
+        log.debug("transfer complete: " + filename, 'CacheEntry')
+        request.finish()
+        if len(self.requests)==0:
+            # Last file was sent
+            self.file_sent()
+
+    def create_directory(self):
+        """
+        Create directory for cache entry's file
+        """
+        if(not os.path.exists(self.filedir)):
+            os.makedirs(self.filedir)
+
+    def start_download(self):
+        """
+        Start file transfer from backend server
+        """
+        log.msg("start download:" + self.path, "CacheEntry")
+        self.backend.start_download(self)
+
+    def download_started(self, fetcher, size, mtime):
+        """
+        Callback from Fetcher
+        A fetcher has begun streaming this file
+        """
+        log.msg("download started:" + self.file_path, "CacheEntry")
+        self.state = self.STATE_DOWNLOAD
+        self.create_directory()
+        self.fetcher = fetcher
+        self.file_mtime = mtime
+
+        """
+        Use post_convert and gzip_convert regular expresions of the Fetcher
+        to gzip/gunzip file before and after download.
+        """
+
+        if self.filename == 'Packages.gz':
+            log.msg('TODO postconvert Packages.gz',CacheEntry)
+#             if (fetcher and fetcher.post_convert.search(req.uri)
+#                 and not running.has_key(req.uri[:-3])):
+#                 log.debug("post converting: "+req.uri,'convert')
+#                 loop = LoopbackRequest(req)
+#                 loop.uri = req.uri[:-3]
+#                 loop.local_file = req.local_file[:-3]
+#                 loop.process()
+#                 loop.serve_if_cached=0
+#                 #FetcherGzip will attach as a request of the
+#                 #original Fetcher, efectively waiting for the
+#                 #original file if needed
+#                 gzip = FetcherGzip()
+#                 gzip.activate(loop, postconverting=1)
+
+
+        for req in self.requests:
+            req.start_streaming(size, mtime)
+
+
+    def download_data_received(self, data):
+        """
+        Callback from Fetcher
+        A block of data has been received from the streaming backend server
+        """
+        #log.msg("download_data_received:" + self.file_path, "CacheEntry")
+        for req in self.requests:
+            req.write(data)
+
+        if self.streamfile:
+            # save to tempfile (if it in use)
+            self.streamfile.append(data)
+
+    def download_data_end(self):
+        """
+        Callback from Fetcher
+        File streaming is complete
+        """
+        log.msg("download_data_end:" + self.file_path, "CacheEntry")
+        self.state = self.STATE_SENT
+
+        if self.streamfile is not None:
+            # File was streamed to clients
+            self.streamfile.close_and_rename(self.file_path)
+            self.streamfile = None
+
+            if self.file_mtime != None:
+                os.utime(self.file_path, (time.time(), self.file_mtime))
+            else:
+                log.debug("no local time: "+self.file_path,'Fetcher')
+                os.utime(self.file_path, (time.time(), 0))
+
+        for req in self.requests:
+            req.finish()
+
+        self.file_sent()
+
+    def download_failure(self, http_code, reason):
+        """
+        Download is not possible
+        """
+        log.msg("download_failure %s: (%s) %s"% (self.file_path, http_code, reason), "CacheEntry")
+
+        for request in self.requests:
+            request.finishCode(http_code, reason)
+        self.state = self.STATE_FAILED
+        ## Remove directory if file was not created
+        #if not os.path.exists(self.file_path):
+            #try:
+                #os.removedirs(self.factory.config.cache_dir + os.sep + self.backend.base)
+            #except:
+                #pass
+
+
+    def file_sent(self):
+        """
+        File has been sent successfully to at least one client
+        Update databases with statistics for this file
+        """
+        log.msg("file_sent:" + self.file_path, "CacheEntry")
+
+        self.state = self.STATE_SENT
+        self.fetcher = None
+        self.backend.file_served(self)
+        self.factory.file_served(self.cache_path)
+        self.factory.update_times[self.cache_path] = time.time()
+        self.state = self.STATE_NEW
+
+    def init_tempfile(self):
+        #log.msg("init_tempfile:" + self.file_path, "CacheEntry")
+        self.create_directory()
+        self.streamFilename = self.file_path + ".apDownload"
+        self.streamfile = StreamFile(self.streamFilename)
+
+class FileType:
+    """
+    This is just a way to distinguish between different filetypes.
+
+    self.regex: regular expression that files of this type should
+    match. It could probably be replaced with something simpler,
+    but... o well, it works.
+    
+    self.contype: mime string for the content-type http header.
+    
+    mutable: do the contents of this file ever change?  Files such as
+    .deb and .dsc are never changed once they are created.
+    
+    """
+    def __init__ (self, regex, contype, mutable):
+        self.regex = regex
+        self.contype = contype
+        self.mutable = mutable
+
+    def check (self, name):
+        "Returns true if name is of this filetype"
+        if self.regex.search(name):
+            return 1
+        else:
+            return 0
+
+# Set up the list of filetypes that we are prepared to deal with.
+# If it is not in this list, then we will ignore the file and
+# return an error.
+filetypes = (
+    FileType(re.compile(r"\.u?deb$"), "application/dpkg", 0),
+    FileType(re.compile(r"\.tar\.gz$"), "application/x-gtar", 0),
+    FileType(re.compile(r"\.dsc$"),"text/plain", 0),
+    FileType(re.compile(r"\.diff\.gz$"), "x-gzip", 0),
+    FileType(re.compile(r"\.bin$"), "application/octet-stream", 0),
+    FileType(re.compile(r"\.tgz$"), "application/x-gtar", 0),
+    FileType(re.compile(r"\.txt$"), "text/plain", 1),
+    FileType(re.compile(r"\.html$"), "text/html", 1),
+
+    FileType(re.compile(r"(?:^|/)(?:Packages|Release(?:\.gpg)?|Sources|(?:Contents|Translation)-[a-z0-9]+)"
+                        r"(?:\.(?:gz|bz2))?$"),
+             "text/plain", 1),
+    FileType(re.compile(r"(?:^|/)(?:Packages|Sources|Contents-[a-z0-9]+)\.diff/Index$"),
+             "text/plain", 1),
+    FileType(re.compile(r"(?:^|/)(?:Packages|Sources|Contents-[a-z0-9]+)\.diff/[a-z0-9.-]+"
+                        r"(?:\.(?:gz|bz2))?$"),
+             "text/plain", 0),
+
+    FileType(re.compile(r"\.rpm$"), "application/rpm", 0),
+
+    FileType(re.compile(r"(?:^|/)(?:pkglist|release|srclist)(?:\.(?:\w|-)+)?"
+                        r"(?:\.(?:gz|bz2))?$"), 
+             "text/plain", 1),
+    FileType(re.compile(r"\.gz$"), "x-gzip", 1)
+    )
+
+
+def findFileType(name):
+    "Look for the FileType of 'name'"
+    for type in filetypes:
+        if type.check(name):
+            return type
+    return None
+
+class StreamFile:
+    """
+    A temporary file used to stream to during download
+    """
+    CHUNKSIZE = 16384
+    def __init__(self, name, mode='w+b'):
+        log.debug("Creating file: " + name, 'cache')
+        self.file = file(name, mode, self.CHUNKSIZE)
+        self.name = name
+    def append(self, data):
+        self.file.write(data)
+    def size(self):
+        return self.file.tell()
+    def read_from(self, size=-1, start=None):
+        if start != None:
+            self.file.seek(start, SEEK_SET)
+        data = self.file.read(self, size)
+        self.file.seek(0, SEEK_END)
+        return data
+    def close(self):
+        log.debug("Closing file: " + self.name, 'cache')
+        self.file.close()
+        self.file = None
+    def close_and_rename(self, new_name):
+        """
+        File was successfully downloaded - close and rename to final destination
+        """
+        self.close()
+        if self.name == new_name:
+            return
+        log.debug("renaming file: %s->%s " % (self.name, new_name), 'cache')
+        os.rename(self.name, new_name)
+        self.name = new_name
+
+class FileVerifier:
+    """
+    Verifies the integrity of a cached file
+
+    self.deferred: a deferred that will be triggered when the command
+    completes, or if a timeout occurs.
+
+    Sample:
+
+    verifier = FileVerifier(self)
+    verifier.deferred.addCallbacks(callback_if_ok, callback_if_fail)
+    verifier.deferred.arm()
+
+    then either callback_if_ok or callback_if_fail will be called
+    when the subprocess finishes execution.
+
+    Checkout twisted.internet.defer.Deferred on how to use self.deferred
+
+    """
+    def __init__(self, path, config):
+        """
+        Initialise verificatoin
+        @param path: filename to be verified (absolute path)
+        @param config apConfig configuration (timeout paramter defines max time)
+        """
+        self.path = path
+        self.timeout = config.timeout
+        self.deferred = defer.Deferred() # Deferred that passes status back
+
+    def verify(self):
+        if re.search(r"\.deb$", self.path):
+            self.worker = FileVerifierProcess(self, '/usr/bin/dpkg', '--fsys-tarfile', self.path)
+        elif re.search(r"\.gz$", self.path):
+            self.worker = FileVerifierProcess(self, '/bin/gunzip', '-t', '-v', self.path)
+        elif re.search(r"\.bz2$", self.path):
+            self.worker = FileVerifierProcess(self, '/usr/bin/bunzip2', '--test', self.path)
+        else:
+            # Unknown file, just check it is not 0 size
+            try:
+                filesize = os.stat(self.path)[stat.ST_SIZE]
+            except:
+                filesize = 0
+
+            if(os.stat(self.path)[stat.ST_SIZE]) < 1:
+                self.failed("Zero length file")
+            else:
+                log.debug('Verification skipped for ' + self.path)
+                self.deferred.callback(None)
+        return self.deferred
+
+    class VerificationFailure:
+        def __init__(self, path, reason):
+            self.path = path
+            self.reason = reason
+    def failed(self, reason):
+        log.msg("cache file verification FAILED for %s: %s"%(self.path, reason), 'verify')
+        os.unlink(self.path)
+        self.deferred.errback(self.VerificationFailure(self.path, reason))
+
+    def passed(self):
+        log.debug("cache file verification passed: %s"%(self.path), 'verify')
+        self.parent.deferred.callback(None)
+
+class FileVerifierProcess(protocol.ProcessProtocol):
+    """
+    Verifies the integrity of a file by running an external command.
+    """
+    def __init__(self, verifier, *args):
+        self.parent = verifier
+
+        self.exe = args[0]
+        log.debug("starting verification: " + self.exe + " " + str(args),'FileVerifierProcess',8)
+	nullhandle = open("/dev/null", "w")
+        self.process = reactor.spawnProcess(self, self.exe, args, childFDs = { 0:"w", 1:nullhandle.fileno(), 2:"r" })
+        self.laterID = reactor.callLater(self.parent.timeout, self.timedout)
+
+    def connectionMade(self):
+        self.data = ''
+
+    def outReceived(self, data):
+        #we only care about errors
+        pass
+
+    def errReceived(self, data):
+        self.data = self.data + data
+
+    def timedout(self):
+        """
+        this should not happen, but if we timeout, we pretend that the
+        operation failed.
+        """
+        self.laterID=None
+        self.parent.failed("Verification process timed out")
+
+    def processEnded(self, reason=None):
+        """
+        This get's automatically called when the process finishes, we check
+        the status and report through the Deferred.
+        """
+        __pychecker__ = 'unusednames=reason'
+        #log.debug("Process Status: %d" %(self.process.status),'verify')
+        #log.debug(self.data, 'verify')
+        if self.laterID:
+            self.laterID.cancel()
+            if self.process.status == 0:
+                self.parent.deferred.callback(None)
+            else:
+                self.parent.failed(os.path.basename(self.exe)+ " failed")

Added: trunk/apt_proxy/fetchers.py
==============================================================================
--- (empty file)
+++ trunk/apt_proxy/fetchers.py	Thu Aug  3 23:54:46 2006
@@ -0,0 +1,1103 @@
+#
+# Copyright (C) 2005 Chris Halls <halls at debian.org>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+"""
+Fetchers for apt-proxy
+
+These classes implement the network code for fetching files from apt-proxy
+network backends
+"""
+
+import re, os, string, time, glob, signal, stat
+from twisted.web import static, http
+from twisted.internet import protocol, reactor, defer, error, abstract
+from twisted.python import failure
+from twisted.protocols import policies, ftp
+
+from misc import log
+
+
+class Fetcher:
+    """
+    This class manages the selection of a BackendServer and downloading from
+    that backend
+    
+    """
+    cacheEntry = None
+    fetcher = None   # connection-specific fetcher
+
+    def __init__(self):
+        self.backendServer = None
+        self.size = None # Size of file notified by fetcher's server
+        self.mtime = None # Mtime of file notified by fetcher's server
+
+    def start(self, cacheEntry):
+        self.cacheEntry = cacheEntry
+        log.debug("fetcher start:" + self.cacheEntry.filename, "fetcher")
+        self.backend = cacheEntry.backend
+        self.len_received = 0
+        self.deferred = defer.Deferred()
+        self.start_download()
+        return self.deferred
+
+    def activateNextBackendServer(self, fetcher):
+        """
+        Returns true if another BackendServer was found
+        """
+        if self.backendServer is None:
+            self.backendServer = self.backend.get_first_server()
+            if(self.backendServer == None):
+                log.err("No backend server found for backend " + self.backend.name, "fetcher")
+                return False
+        else:
+            # Look for the next possible BackendServer
+            self.backendServer = self.backend.get_next_server(self.backendServer)
+
+            if(self.backendServer == None):
+                # The attempt to retrieve a file from the BackendServer failed.
+                log.debug("no more Backends", "fetcher")
+                return False
+        self.connectToBackend()
+
+    def connectToBackend(self):
+        log.debug('Connecting to backend server %s' % (self.backendServer), 'fetcher')
+        self.fetcher = self.backendServer.fetcher(self.backendServer)
+        d = self.fetcher.connect()
+        d.addCallback(self.connected)
+        d.addErrback(self.connection_failed)
+        #fetcher.apEndTransfer(fetcher_class)
+
+        return True
+
+    def __str__(self):
+        return 'Fetcher server=%s file=%s' % (str(self.backendServer), self.cacheEntry.path)
+
+    def start_download(self):
+        """
+        Begin streaming file
+        Serve from cache or through the appropriate Fetcher
+        depending on the asociated backend.
+
+        Use post_convert and gzip_convert regular expresions of the Fetcher
+        to gzip/gunzip file before and after download.
+        """
+        log.debug("Downloading: " + self.cacheEntry.file_path, 'Fetcher')
+        #init_tempfile()
+        if self.backendServer is None:
+            self.activateNextBackendServer(self.fetcher)
+        elif self.fetcher is None:
+            self.connectToBackend()
+        else:
+            self.download()
+
+    def download_complete(self):
+        """
+        Download was successful
+        """
+        log.debug("download complete. Sent:%s bytes" % (self.len_received), "Fetcher")
+        if self.fetcher is not None and not self.fetcher.pipelining:
+            self.connection_closed(self.fetcher)
+        if self.len_received==0:
+            self.download_started() # Send status code to clients
+        self.cacheEntry.download_data_end()
+        self.deferred.callback((True, ""))
+
+    def fail_over(self, reason_code, reason_msg):
+        """
+        A non-fatal download has occured. Attempt download from next
+        backend
+        """
+        if not self.activateNextBackendServer(self.fetcher):
+            self.download_failed(reason_code, reason_msg)
+
+    def download_failed(self, reason_code, reason_msg):
+        #self.cacheEntry.download_data_end()
+        log.debug("download_failed: (%s) %s " %(reason_code, reason_msg), "Fetcher")
+        if self.fetcher is not None and not self.fetcher.pipelining:
+            self.connection_closed(self.fetcher)
+        self.cacheEntry.download_failure(reason_code, reason_msg)
+        self.deferred.callback((False, reason_msg))
+
+    def cancel_download(self):
+        if self.fetcher:
+            log.debug(
+                "telling fetchers to disconnect",'Fetcher')
+            self.fetcher.disconnect()
+        self.download_failed(None, "Download canceled")
+
+    def data_received(self, data, save=True):
+        """
+        File Data has been received from the backend server
+        @param data: raw data received from server
+        @param save: if true, save to disk (rsync saves file itself)
+        """
+        #log.debug("data_received: %s bytes" % len(data), 'Fetcher');
+        if not self.len_received:
+            self.download_started(save)
+        self.len_received = self.len_received + len(data)
+        self.cacheEntry.download_data_received(data)
+
+    def download_started(self, save=True):
+        if save:
+            self.cacheEntry.init_tempfile()
+        self.cacheEntry.download_started(self, self.size, self.mtime)
+
+
+    def server_size(self, len):
+        """
+        The server has sent the expected length of the file
+        """
+        self.size = len
+        log.debug("File size: " + str(len), 'Fetcher');
+
+    def server_mtime(self, mtime):
+        """
+        The server has sent the modification time of the file
+        """
+        self.mtime = mtime
+        log.debug("File mtime: " + str(mtime), 'Fetcher');
+
+    def transfer_complete(self):
+        """
+        All data has been transferred
+        """
+        log.debug("Finished receiving data: " + self.cacheEntry.filename, 'Fetcher');
+        self.download_complete()
+
+    def connection_failed(self, reason = None):
+        """
+        A fetcher has failed to connect to the backend server
+        """
+        msg = '[%s] Connection Failed: %s/%s'%(
+            self.backend.name,
+            self.backendServer.path, self.cacheEntry.path)
+
+        if reason:
+            msg = '%s (%s)'%(msg, reason.getErrorMessage())
+            log.debug("Connection Failed: "+str(reason), 'Fetcher')
+        log.err(msg)
+        self.fail_over(http.SERVICE_UNAVAILABLE, reason)
+
+    def connected(self, result):
+        log.debug("Connected to "+ self.backendServer.uri, 'Fetcher')
+        self.download()
+
+    def download(self):
+        log.debug('downloading:%s mtime:%s' % (self.cacheEntry.path, self.cacheEntry.file_mtime), 'Fetcher')
+        self.fetcher.download(self, self.cacheEntry.path, self.cacheEntry.file_mtime)
+
+    def disconnect(self):
+        if self.fetcher is not None:
+            log.debug('disconnect %s' % (self.cacheEntry.path), 'Fetcher')
+            self.fetcher.disconnect()
+            self.fetcher = None
+
+    def connection_closed(self, fetcher):
+        """
+        A protocol fetcher's connection has closed - we must reopen the connection
+        next time
+        """
+        log.debug("Connection closed for %s, state=%s" %(self.cacheEntry.path, self.cacheEntry.state), 'Fetcher')
+        #if self.cacheEntry.state in \
+        #   (self.cacheEntry.STATE_CONNECTING, self.cacheEntry.STATE_DOWNLOAD, self.cacheEntry.STATE_SENDFILE):
+        #    self.fetcher_internal_error("Backend connection closed")
+        if fetcher == self.fetcher:
+            self.fetcher = None
+
+    def file_not_found(self):
+        log.msg("(%s) file not found: %s" % (self.backendServer.path, self.cacheEntry.path), 'fetcher')
+        # TODO - failover?
+        self.download_failed(http.NOT_FOUND, "file not found on backend")
+
+    def fetcher_internal_error(self, reason):
+        log.msg("(%s) internal error: %s" % (self.backendServer.path, reason), 'fetcher')
+        self.download_failed(http.INTERNAL_SERVER_ERROR, reason)
+
+    def send_complete_file(self, filename):
+        """
+        Send a complete file (used by FileFetcher)
+        """
+        self.cacheEntry.transfer_file(filename)
+
+    def up_to_date(self):
+        """
+        Fetcher has determined that our cached file is up to date
+        so the file is sent from our cache
+        """
+        log.msg("(%s) up_to_date" % (self.cacheEntry.path), 'fetcher')
+        self.cacheEntry.send_cached_file()
+        if not self.fetcher.pipelining:
+            self.connection_closed(self.fetcher)
+        self.deferred.callback((True, ""))
+
+class FileFetcher:
+    """
+    A Fetcher that simply copies files from disk
+    """
+    pipelining = True
+    def __init__(self, backendServer):
+        self.backendServer = backendServer
+        self.isConnected = True # Always connected
+
+    def connect(self):
+        # We always conect
+        return defer.succeed(True)
+
+    def download(self, fetcher, uri, mtime):
+        """
+        Request download
+        %param fetcher: Fetcher class to receive callbacks
+        %param uri: URI of file to be downloaded within backend
+        %param mtime: Modification time of current file in cache
+        """
+        self.parent = fetcher
+        self.cache_mtime = mtime
+        self.request_uri = uri
+
+        self.local_file = self.backendServer.uri[len("file://"):] + '/' + uri
+        if not os.path.exists(self.local_file):
+            self.parent.file_not_found()
+            return
+
+        # start the transfer
+        self.parent.send_complete_file(self.local_file)
+
+    def disconnect(self):
+        pass
+
+class FetcherHttpClient(http.HTTPClient):
+    """
+    This class represents an Http conncetion to a backend
+    server. It is generated by the HttpFetcher class when
+    a connection is made to an http server
+    """
+    def __init__(self, parent):
+        self.parent = parent # HttpFetcher
+        self.proxy = self.parent.proxy
+        self.fetcher = None
+
+    def connectionMade(self):
+        """
+        Http connection made - inform parent, which will
+        trigger callbacks
+        """
+        self.parent.connected(self)
+
+    def download(self, fetcher, uri, mtime):
+        # Request file from backend
+        self.log_headers = None
+        self.close_on_completion = True
+        self.server_mtime = None
+        self.server_size = None
+        self.fetcher = fetcher
+        self.uri = uri
+        self.finished = False
+        backendServer = self.parent.backendServer
+        if self.proxy is None:
+            serverpath = backendServer.path
+        else:
+            serverpath = "http://" + backendServer.host
+            if backendServer.port != 80:
+                serverpath = serverpath + ":" + str(backendServer.port)
+            serverpath = serverpath + "/" + backendServer.path 
+
+        #self.sendCommand(self.request.method, 
+        self.sendCommand("GET", serverpath + "/" + uri)
+
+        self.sendHeader('host', backendServer.host)
+        if self.proxy is not None and self.proxy.user is not None:
+            self.sendHeader('Proxy-Authorization', "Basic " +
+                            encodestring(self.proxy.user + ":" + self.proxy.password))
+
+        if mtime is not None:
+            datetime = http.datetimeToString(mtime)
+            self.sendHeader('if-modified-since', datetime)
+
+        self.endHeaders()
+
+    def download_complete(self):
+        if self.finished: 
+            return
+        log.debug("File transfer complete",'http_client')
+        self.finished = True
+        #if self.close_on_completion:
+            #self.fetcher.disconnect()
+            #self.parent.connection_closed() # We don't have a persistent connection
+            #self.fetcher.disconnect()
+            #self.transport.loseConnection()
+        self.fetcher.download_complete()
+
+    def handleStatus(self, version, code, message):
+        __pychecker__ = 'unusednames=version,message'
+        log.debug('handleStatus %s - %s' % (code, message), 'http_client')
+        self.http_status = int(code)
+
+        #self.setResponseCode(self.http_status)
+
+    def handleResponse(self, buffer):
+        #log.debug('handleResponse, %s bytes' % (len(buffer)), 'http_client')
+        log.debug('handleResponse status=%s' % (self.http_status), 'http_client')
+        if self.http_status == http.NOT_MODIFIED:
+            log.debug("Backend server reported file is not modified: " + self.uri,'http_client')
+            self.fetcher.up_to_date()
+        elif self.http_status == http.NOT_FOUND:
+            log.debug("Not found on backend server",'http_client')
+            self.fetcher.file_not_found()
+        elif self.http_status == http.OK:
+            self.download_complete()
+        else:
+            log.debug("Unknown status code: %s" % (self.http_status),'http_client')
+            self.fetcher.fetcher_internal_error("Unknown status code: %s" % (self.http_status))
+
+    def handleHeader(self, key, value):
+
+        log.debug("Received: " + key + " " + str(value), 'http_client')
+        key = string.lower(key)
+
+        if key == 'last-modified':
+            self.server_mtime = http.stringToDatetime(value)
+            self.fetcher.server_mtime(self.server_mtime)
+        elif key == 'content-length':
+            self.server_size = int(value)
+            self.fetcher.server_size(self.server_size)
+        elif key == 'connection':
+            if value == "close":
+                log.debug('will close on completion', 'http_client')
+                self.close_on_completion = True
+            elif value == "keep-alive":
+                log.debug('will not close on completion', 'http_client')
+                self.close_on_completion = False
+
+    #def handleEndHeaders(self):
+        #if self.http_status == http.NOT_MODIFIED:
+            #log.debug("Backend server reported file is not modified: " + self.uri,'http_client')
+            #self.fetcher.up_to_date()
+        #elif self.http_status == http.NOT_FOUND:
+            #log.debug("Not found on backend server",'http_client')
+            #self.fetcher.file_not_found()
+        #else:
+            #log.debug("Unknown status code: %s" % (self.http_status),'http_client')
+
+    def rawDataReceived(self, data):
+        if self.http_status == http.OK:
+            self.fetcher.data_received(data)
+            #log.debug("Recieved: %s expected: %s" % (self.fetcher.len_received, self.server_size),'http_client')
+            if self.server_size is not None:
+                if self.fetcher.len_received >= self.server_size:
+                    if self.fetcher.len_received == self.server_size:
+                        pass
+                        #self.download_complete()
+                    else:
+                        log.err("File transfer overrun! Expected size:%s Received size:%s" % 
+                                (self.server_size, self.fetcher.len_received), 'http_client')
+                        self.parent.fetcher_internal_error("Data overrun")
+
+#     def handleResponse(self, buffer):
+#         if self.length == 0:
+#             self.setResponseCode(http.NOT_FOUND)
+#         # print "length: " + str(self.length), "response:", self.status_code
+#         if self.http_status == http.NOT_MODIFIED:
+#             self.apDataEnd(self.transfered, False)
+#         else:
+#             self.apDataEnd(self.transfered, True)
+
+    def lineReceived(self, line):
+        """