[apt-proxy-devel] r613 - in trunk: . apt_proxy apt_proxy/test
debian/po doc doc/po
Chris Halls
halls at costa.debian.org
Thu Aug 3 23:54:49 UTC 2006
Author: halls
Date: Thu Aug 3 23:54:46 2006
New Revision: 613
Added:
trunk/apt_proxy/cache.py
trunk/apt_proxy/fetchers.py
trunk/apt_proxy/test/test_cache.py
trunk/apt_proxy/test/test_fetchers.py
trunk/apt_proxy/test/test_requests.py
Removed:
trunk/debian/TODO
Modified:
trunk/apt_proxy/apt_proxy.py
trunk/apt_proxy/apt_proxy_conf.py
trunk/apt_proxy/misc.py
trunk/apt_proxy/packages.py
trunk/apt_proxy/test/test_apt_proxy.py
trunk/apt_proxy/test/test_config.py
trunk/apt_proxy/test/test_packages.py
trunk/debian/changelog
trunk/debian/control
trunk/debian/po/cs.po
trunk/debian/po/da.po
trunk/debian/po/fr.po
trunk/debian/po/nl.po
trunk/debian/po/vi.po
trunk/debian/postinst
trunk/debian/rules
trunk/doc/TODO
trunk/doc/apt-proxy-import.8.inc
trunk/doc/apt-proxy.8
trunk/doc/apt-proxy.conf
trunk/doc/apt-proxy.conf.5
trunk/doc/po/apt-proxy.pot
trunk/doc/po/fr.po
trunk/doc/po4a.cfg
trunk/runtests
Log:
Merge branch people/halls/rework to trunk
Modified: trunk/apt_proxy/apt_proxy.py
==============================================================================
--- trunk/apt_proxy/apt_proxy.py (original)
+++ trunk/apt_proxy/apt_proxy.py Thu Aug 3 23:54:46 2006
@@ -14,21 +14,19 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-from twisted.internet import reactor, defer, abstract, protocol
-from twisted.protocols import ftp, basic
-
import os, stat, signal, fcntl, exceptions
from os.path import dirname, basename
-import tempfile
-import glob
-import re
-import urlparse
-import time
-import string
-import packages
+import tempfile, glob, re, urlparse, time
+from twisted.internet import reactor
from twisted.python.failure import Failure
+from twisted.internet import error, protocol
+from twisted.web import http
+
import memleak
-from twisted.internet import error
+import fetchers, cache, packages
+from misc import log, MirrorRecycler
+import twisted_compat
+
#from posixfile import SEEK_SET, SEEK_CUR, SEEK_END
#since posixfile is considered obsolete I'll define the SEEK_* constants
#myself.
@@ -38,1173 +36,8 @@
from types import *
-#sibling imports
-import misc
-log = misc.log
-
-from twisted_compat import compat
-from twisted_compat import http
-
status_dir = '.apt-proxy'
-class FileType:
- """
- This is just a way to distinguish between different filetypes.
-
- self.regex: regular expression that files of this type should
- match. It could probably be replaced with something simpler,
- but... o well, it works.
-
- self.contype: mime string for the content-type http header.
-
- mutable: do the contents of this file ever change? Files such as
- .deb and .dsc are never changed once they are created.
-
- """
- def __init__ (self, regex, contype, mutable):
- self.regex = regex
- self.contype = contype
- self.mutable = mutable
-
- def check (self, name):
- "Returns true if name is of this filetype"
- if self.regex.search(name):
- return 1
- else:
- return 0
-
-# Set up the list of filetypes that we are prepared to deal with.
-# If it is not in this list, then we will ignore the file and
-# return an error.
-filetypes = (
- FileType(re.compile(r"\.deb$"), "application/dpkg", 0),
- FileType(re.compile(r"\.udeb$"), "application/dpkg", 0),
- FileType(re.compile(r"\.tar\.gz$"), "application/x-gtar", 0),
- FileType(re.compile(r"\.dsc$"),"text/plain", 0),
- FileType(re.compile(r"\.diff\.gz$"), "application/x-gzip", 0),
- FileType(re.compile(r"\.gz$"), "application/x-gzip", 1),
- FileType(re.compile(r"\.bin$"), "application/octet-stream", 0),
- FileType(re.compile(r"\.tgz$"), "application/x-gtar", 0),
- FileType(re.compile(r"\.txt$"), "application/plain-text", 1),
- FileType(re.compile(r"\.html$"), "application/text-html", 1),
-
- FileType(re.compile(r"/(Packages|Release(\.gpg)?|Sources|Contents-.*)"
- r"(\.(gz|bz2))?$"),
- "text/plain", 1),
-
- FileType(re.compile(r"\.rpm$"), "application/rpm", 0),
-
- FileType(re.compile(r"/(pkglist|release|srclist)(\.(\w|-)+)?"
- r"(\.(gz|bz2))?$"),
- "text/plain", 1),
- )
-
-class FileVerifier(protocol.ProcessProtocol):
- """
- Verifies the integrity of a file by running an external
- command.
-
- self.deferred: a deferred that will be triggered when the command
- completes, or if a timeout occurs.
-
- Sample:
-
- verifier = FileVerifier(self)
- verifier.deferred.addCallbacks(callback_if_ok, callback_if_fail)
-
- then either callback_if_ok or callback_if_fail will be called
- when the subprocess finishes execution.
-
- Checkout twisted.internet.defer.Deferred on how to use self.deferred
-
- """
- def __init__(self, request):
- self.factory = request.factory
- self.deferred = defer.Deferred() # Deferred that passes status back
- self.path = request.local_file
-
- if re.search(r"\.deb$", self.path):
- exe = '/usr/bin/dpkg'
- args = (exe, '--fsys-tarfile', self.path)
- elif re.search(r"\.gz$", self.path):
- exe = '/bin/gunzip'
- args = (exe, '-t', '-v', self.path)
- elif re.search(r"\.bz2$", self.path):
- exe = '/usr/bin/bunzip2'
- args = (exe, '--test', self.path)
- else:
- # Unknown file, just check it is not 0 size
- try:
- filesize = os.stat(self.path)[stat.ST_SIZE]
- except:
- filesize = 0
-
- if(os.stat(self.path)[stat.ST_SIZE]) < 1:
- log.debug('Verification failed for ' + self.path)
- self.failed()
- else:
- log.debug('Verification skipped for ' + self.path)
- self.deferred.callback(None)
- return
-
- log.debug("starting verification: " + exe + " " + str(args))
- self.nullhandle = open("/dev/null", "w")
- self.process = reactor.spawnProcess(self, exe, args, childFDs = { 0:"w", 1:self.nullhandle.fileno(), 2:"r" })
- self.laterID = reactor.callLater(self.factory.config.timeout, self.timedout)
-
- def connectionMade(self):
- self.data = ''
-
- def outReceived(self, data):
- #we only care about errors
- pass
-
- def errReceived(self, data):
- self.data = self.data + data
-
- def failed(self):
- log.debug("verification failed: %s"%(self.path), 'verify', 1)
- os.unlink(self.path)
- self.deferred.errback(None)
-
- def timedout(self):
- """
- this should not happen, but if we timeout, we pretend that the
- operation failed.
- """
- self.laterID=None
- log.debug("Process Timedout:",'verify')
- self.failed()
-
- def processEnded(self, reason=None):
- """
- This get's automatically called when the process finishes, we check
- the status and report through the Deferred.
- """
- __pychecker__ = 'unusednames=reason'
- #log.debug("Process Status: %d" %(self.process.status),'verify')
- #log.debug(self.data, 'verify')
- if self.laterID:
- self.laterID.cancel()
- if self.process.status == 0:
- self.deferred.callback(None)
- else:
- self.failed()
-
-def findFileType(name):
- "Look for the FileType of 'name'"
- for type in filetypes:
- if type.check(name):
- return type
- return None
-
-class TempFile (file):
- def __init__(self, mode='w+b', bufsize=-1):
- (fd, name) = tempfile.mkstemp('.apt-proxy')
- os.close(fd)
- file.__init__(self, name, mode, bufsize)
- os.unlink(name)
- def append(self, data):
- self.seek(0, SEEK_END)
- self.write(data)
- def size(self):
- return self.tell()
- def read_from(self, size=-1, start=None):
- if start != None:
- self.seek(start, SEEK_SET)
- data = file.read(self, size)
- return data
-
-
-class Fetcher:
- """
- This is the base class for all Fetcher*, it tries to hold as much
- common code as posible.
-
- Subclasses of this class are the ones responsible for contacting
- the backend servers and fetching the actual data.
- """
- gzip_convert = re.compile(r"/Packages$")
- post_convert = re.compile(r"/Packages.gz$")
- status_code = http.OK
- status_message = None
- requests = None
- request = None
- length = None
- transport = None
-
- def insert_request(self, request):
- """
- Request should be served through this Fetcher because it asked for
- the same uri that we are waiting for.
-
- We also have to get it up to date, give it all received data, send it
- the appropriate headers and set the response code.
- """
- if request in self.requests:
- raise RuntimeError, \
- 'this request is already assigned to this Fetcher'
- self.requests.append(request)
- request.apFetcher = self
- if (self.request):
- self.update_request(request)
-
- def update_request(self, request):
- """
- get a new request up to date
- """
- request.local_mtime = self.request.local_mtime
- request.local_size = self.request.local_size
- if(self.status_code != None):
- request.setResponseCode(self.status_code, self.status_message)
- for name, value in self.request.headers.items():
- request.setHeader(name, value)
- if self.transfered.size() != 0:
- request.write(self.transfered.read_from(start=0))
-
- def remove_request(self, request):
- """
- Request should NOT be served through this Fetcher, the client
- probably closed the connection.
-
- If this is our last request, we may also close the connection with the
- server depending on the configuration.
-
- We keep the last request for reference even if the client closed the
- connection.
- """
- self.requests.remove(request)
- if len(self.requests) == 0:
- log.debug("Last request removed",'Fetcher')
- if not self.factory.config.complete_clientless_downloads:
- if self.transport:
- log.debug(
- "telling the transport to loseConnection",'Fetcher')
- try:
- self.transport.loseConnection()
- except KeyError:
- # Rsync fetcher already loses conneciton for us
- pass
- if hasattr(self, 'loseConnection'):
- self.loseConnection()
- else:
- self.request = self.requests[0]
- request.apFetcher = None
-
- def transfer_requests(self, fetcher):
- "Transfer all requests from self to fetcher"
- for req in self.requests:
- self.remove_request(req)
- fetcher.insert_request(req)
-
- def setResponseCode(self, code, message=None):
- "Set response code for all requests"
- #log.debug('Response code: %d - %s' % (code, message),'Fetcher')
- self.status_code = code
- self.status_message = message
- for req in self.requests:
- req.setResponseCode(code, message)
-
- def setResponseHeader(self, name, value):
- "set 'value' for header 'name' on all requests"
- for req in self.requests:
- req.setHeader(name, value)
-
- def __init__(self, request=None):
- self.requests = []
- self.transfered = TempFile()
- if(request):
- self.activate(request)
-
- def activate(self, request):
- log.debug(str(request.backend) + request.uri, 'Fetcher.activate')
- self.local_file = request.local_file
- self.local_mtime = request.local_mtime
- self.factory = request.factory
- self.request = request
- request.content.read()
-
- for req in self.requests:
- self.update_request(req)
- self.requests.append(request)
-
- request.apFetcher = self
- if self.factory.runningFetchers.has_key(request.uri):
- raise RuntimeError, 'There already is a running fetcher'
- self.factory.runningFetchers[request.uri]=self
-
- def apDataReceived(self, data):
- """
- Should be called from the subclasses when data is available for
- streaming.
-
- Keeps all transfered data in 'self.transfered' for requests which arrive
- later and to write it in the cache at the end.
-
- Note: self.length if != None is the amount of data pending to be
- received.
- """
- if self.length != None:
- self.transfered.append(data[:self.length])
- for req in self.requests:
- req.write(data[:self.length])
- else:
- self.transfered.append(data)
- for req in self.requests:
- req.write(data)
-
- def apDataEnd(self, data, saveData=True):
- """
- Called by subclasses when the data transfer is over.
-
- -caches the received data if everyting went well (if saveData=True)
- -takes care of mtime and atime
- -finishes connection with server and the requests
-
- """
- import shutil
- log.debug("Finished receiving data, status:%d saveData:%d" %(self.status_code, saveData), 'Fetcher');
- if (self.status_code == http.OK):
- if saveData:
- dir = dirname(self.local_file)
- if(not os.path.exists(dir)):
- os.makedirs(dir)
- f = open(self.local_file, "w")
- fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
- f.truncate(0)
- if type(data) is StringType:
- f.write(data)
- else:
- data.seek(0, SEEK_SET)
- shutil.copyfileobj(data, f)
- f.close()
- if self.local_mtime != None:
- os.utime(self.local_file, (time.time(), self.local_mtime))
- else:
- log.debug("no local time: "+self.local_file,'Fetcher')
- os.utime(self.local_file, (time.time(), 0))
-
- self.factory.file_served(self.request.uri)
-
- #self.request.backend.get_packages_db().packages_file(self.request.uri)
-
- if self.transport:
- try:
- self.transport.loseConnection()
- except exceptions.KeyError:
- # Couldn't close connection - already closed?
- log.debug("transport.loseConnection() - "
- "connection already closed", 'Fetcher')
- pass
-
- for req in self.requests:
- req.finish()
-
- self.transfered.close()
- self.apEnd()
-
- def apEnd(self):
- """
- Called by subclasses when apDataEnd does too many things.
-
- Let's everyone know that we are not the active Fetcher for our uri.
- """
- try:
- del self.factory.runningFetchers[self.request.uri]
- except exceptions.KeyError:
- log.debug("We are not on runningFetchers!!!",'Fetcher')
- log.debug("Class is not in runningFetchers: "+str(self.__class__),
- 'Fetcher')
- if self.request:
- log.debug(' URI:' + self.request.uri, 'Fetcher')
- log.debug('Running fetchers: '
- +str(self.factory.runningFetchers),'Fetcher')
- #raise exceptions.KeyError
- for req in self.requests[:]:
- self.remove_request(req)
-
- import gc
- #Cleanup circular references
- reactor.callLater(5, gc.collect)
-
- def apEndCached(self):
- """
- A backend has indicated that this file has not changed,
- so serve the file from the disk cache
- """
- self.setResponseCode(http.OK)
- self.apEndTransfer(FetcherCachedFile)
-
- def apEndTransfer(self, fetcher_class):
- """
- Remove this Fetcher and transfer all it's requests to a new instance of
- 'fetcher_class'.
- """
- #Consider something like this:
- #req = dummyFetcher.fix_ref_request()
- #fetcher = fetcher_class()
- #dummyFetcher.transfer_requests(fetcher)
- #dummyFetcher.apEnd()
- #fetcher.activate(req)
-
- #self.setResponseCode(http.OK)
- requests = self.requests[:]
- self.apEnd() # Remove requests from this fetcher
- fetcher = None
- for req in requests:
- if (fetcher_class != FetcherCachedFile or req.serve_if_cached):
- running = req.factory.runningFetchers
- if (running.has_key(req.uri)):
- #If we have an active Fetcher just use that
- log.debug("have active Fetcher",'Fetcher')
- running[req.uri].insert_request(req)
- fetcher = running[req.uri]
- else:
- fetcher = fetcher_class(req)
- else:
- req.finish()
- return fetcher
-
- def connectionFailed(self, reason=None):
- """
- Tell our requests that the connection with the server failed.
- """
- msg = '[%s] Connection Failed: %s/%s'%(
- self.request.backend.base,
- self.request.backendServer.path, self.request.backend_uri)
-
- if reason:
- msg = '%s (%s)'%(msg, reason.getErrorMessage())
- log.debug("Connection Failed: "+str(reason), 'Fetcher')
- log.err(msg)
-
- # Look for alternative fetchers
- if not self.request.activateNextBackendServer(self):
- # No more backends, send error response back to client
- if reason.check(error.ConnectError):
- self.setResponseCode(http.SERVICE_UNAVAILABLE, "Connect Error")
- else:
- self.setResponseCode(http.SERVICE_UNAVAILABLE)
- self.apDataReceived("")
- self.apDataEnd(self.transfered, False)
- #Because of a bug in tcp.Client we may be called twice,
- #Make sure that next time nothing will happen
- #FIXME: This hack is probably not anymore pertinent.
- self.connectionFailed = lambda : log.debug('connectionFailed(2)',
- 'Fetcher','9')
-
-
-class FetcherDummy(Fetcher):
- """
- """
- gzip_convert = re.compile(r"^Nothing should match this$")
- post_convert = re.compile(r"^Nothing should match this$")
- status_code = http.INTERNAL_SERVER_ERROR
- status_message = None
-
- def insert_request(self, request):
- """
- """
- if request in self.requests:
- raise RuntimeError, \
- 'this request is already assigned to this Fetcher'
- self.requests.append(request)
- request.apFetcher = self
-
- def remove_request(self, request):
- """
- """
- #make sure that it has updated values, since the requests
- #may be cached and we need them to serve it.
- request.local_mtime = self.request.local_mtime
- request.local_size = self.request.local_size
-
- self.requests.remove(request)
- request.apFetcher = None
-
- def fix_ref_request(self):
- if self.requests != []:
- if self.request not in self.requests:
- request = self.requests[0]
- request.local_mtime = self.request.local_mtime
- request.local_size = self.request.local_size
- self.request = request
- self.remove_request(self.request)
- else:
- self.request = None
-
- return self.request
-
-class FetcherFile(Fetcher):
-
- def activate(self, request):
- Fetcher.activate(self, request)
- log.debug("FetcherFile.activate(): uri='%s' server='%s'" % (request.uri, request.backendServer.uri))
- if not request.apFetcher:
- log.debug("no request.apFetcher")
- return
-
- self.factory.file_served(request.uri)
-
- # start the transfer
- self.local_file = request.backendServer.uri[len("file:"):]+ request.uri
- if not os.path.exists(self.local_file):
- log.debug("not found: %s" % self.local_file)
- request.setResponseCode(http.NOT_FOUND)
- request.write("")
- request.finish()
- self.remove_request(request)
- Fetcher.apEnd(self)
- return
- self.local_size = os.stat(self.local_file)[stat.ST_SIZE]
-
- log.debug("Serving local file: " + self.local_file + " size:" + str(self.local_size), 'FetcherCachedFile')
- file = open(self.local_file,'rb')
- fcntl.lockf(file.fileno(), fcntl.LOCK_SH)
-
- request.setHeader("Content-Length", self.local_size)
- #request.setHeader("Last-modified",
- # http.datetimeToString(request.local_mtime))
- basic.FileSender().beginFileTransfer(file, request) \
- .addBoth(self.file_transfer_complete, request) \
- .addBoth(lambda r: file.close())
-
- # A file transfer has completed
- def file_transfer_complete(self, result, request):
- log.debug("transfer complete", 'FetcherCachedFile')
- request.finish()
- # Remove this client from request list
- self.remove_request(request)
- if len(self.requests) == 0:
- Fetcher.apEnd(self)
-
-class FetcherHttp(Fetcher, http.HTTPClient):
-
- forward_headers = [
- 'last-modified',
- 'content-length'
- ]
- log_headers = None
-
- proxy_host = None
- proxy_port = None
-
- def activate(self, request):
- Fetcher.activate(self, request)
-
- if not self.factory.config.http_proxy is '':
- (self.proxy_host, self.proxy_port) = request.factory.config.http_proxy.split(':')
-
- if not request.apFetcher:
- return
-
- class ClientFactory(protocol.ClientFactory):
- "Dummy ClientFactory to comply with current twisted API"
- #FIXME: Double check this, haggai thinks it is to blame for the
- #hangs.
- def __init__(self, instance):
- self.instance = instance
- def buildProtocol(self, addr):
- return self.instance
- def clientConnectionFailed(self, connector, reason):
- self.instance.connectionFailed(reason)
- def clientConnectionLost(self, connector, reason):
- log.debug("XXX clientConnectionLost", "http-client")
-
- if not self.proxy_host:
- reactor.connectTCP(request.backendServer.host, request.backendServer.port,
- ClientFactory(self), request.backend.config.timeout)
- else:
- reactor.connectTCP(self.proxy_host, int(self.proxy_port),
- ClientFactory(self), request.backend.config.timeout)
- def connectionMade(self):
- if not self.proxy_host:
- self.sendCommand(self.request.method, self.request.backendServer.path
- + "/" + self.request.backend_uri)
- else:
- self.sendCommand(self.request.method, "http://"
- + self.request.backendServer.host + ":" + str(self.request.backendServer.port)
- + "/" + self.request.backendServer.path
- + "/" + self.request.backend_uri)
-
- self.sendHeader('host', self.request.backendServer.host)
-
- if self.local_mtime != None:
- datetime = http.datetimeToString(self.local_mtime)
- self.sendHeader('if-modified-since', datetime)
-
- self.endHeaders()
-
- def handleStatus(self, version, code, message):
- __pychecker__ = 'unusednames=version,message'
- log.debug('handleStatus %s - %s' % (code, message), 'http_client')
- self.status_code = int(code)
-
- # Keep a record of server response even if overriden later by setReponseCode
- self.http_status = self.status_code
-
- self.setResponseCode(self.status_code)
-
- def handleHeader(self, key, value):
-
- log.debug("Received: " + key + " " + str(value))
- key = string.lower(key)
-
- if key == 'last-modified':
- self.local_mtime = http.stringToDatetime(value)
-
- if key in self.forward_headers:
- self.setResponseHeader(key, value)
-
- def handleEndHeaders(self):
- if self.http_status == http.NOT_MODIFIED:
- log.debug("NOT_MODIFIED " + str(self.status_code),'http_client')
- self.apEndCached()
-
- def rawDataReceived(self, data):
- self.apDataReceived(data)
-
- def handleResponse(self, buffer):
- if self.length == 0:
- self.setResponseCode(http.NOT_FOUND)
- # print "length: " + str(self.length), "response:", self.status_code
- if self.http_status == http.NOT_MODIFIED:
- self.apDataEnd(self.transfered, False)
- else:
- self.apDataEnd(self.transfered, True)
-
- def lineReceived(self, line):
- """
- log the line and handle it to the appropriate the base classe.
-
- The location header gave me trouble at some point, so I filter it just
- in case.
-
- Note: when running a class method directly and not from an object you
- have to give the 'self' parameter manualy.
- """
- #log.debug(line,'http_client')
- if self.log_headers == None:
- self.log_headers = line
- else:
- self.log_headers += ", " + line;
- if not re.search('^Location:', line):
- http.HTTPClient.lineReceived(self, line)
-
- def sendCommand(self, command, path):
- "log the line and handle it to the base class."
- log.debug(command + ":" + path,'http_client')
- http.HTTPClient.sendCommand(self, command, path)
-
- def endHeaders(self):
- "log and handle to the base class."
- if self.log_headers != None:
- log.debug(" Headers: " + self.log_headers, 'http_client')
- self.log_headers = None;
- http.HTTPClient.endHeaders(self)
-
- def sendHeader(self, name, value):
- "log and handle to the base class."
- log.debug(name + ":" + value,'http_client')
- http.HTTPClient.sendHeader(self, name, value)
-
-class FetcherFtp(Fetcher, protocol.Protocol):
- """
- This is the secuence here:
-
- -Start and connect the FTPClient
- -Ask for mtime
- -Ask for size
- -if couldn't get the size
- -try to get it by listing
- -get all that juicy data
-
- NOTE: Twisted's FTPClient code uses it's own timeouts here and there,
- so the timeout specified for the backend may not always be used
- """
- def activate (self, request):
- Fetcher.activate(self, request)
- if not request.apFetcher:
- return
-
- self.passive_ftp = self.request.backend.config.passive_ftp
-
- self.remote_file = (self.request.backendServer.path + "/"
- + self.request.backend_uri)
-
- from twisted.internet.protocol import ClientCreator
-
- if not request.backendServer.username:
- creator = ClientCreator(reactor, ftp.FTPClient, passive=0)
- else:
- creator = ClientCreator(reactor, ftp.FTPClient, request.backendServer.username,
- request.backendServer.password, passive=0)
- d = creator.connectTCP(request.backendServer.host, request.backendServer.port,
- request.backend.config.timeout)
- d.addCallback(self.controlConnectionMade)
- d.addErrback(self.connectionFailed)
-
- def controlConnectionMade(self, ftpclient):
- self.ftpclient = ftpclient
-
- if(self.passive_ftp):
- log.debug('Got control connection, using passive ftp', 'ftp_client')
- self.ftpclient.passive = 1
- else:
- log.debug('Got control connection, using active ftp', 'ftp_client')
- self.ftpclient.passive = 0
-
- if log.isEnabled('ftp_client'):
- self.ftpclient.debug = 1
-
- self.ftpFetchMtime()
-
- def ftpFinish(self, code, message=None):
- "Finish the transfer with code 'code'"
- self.ftpclient.quit()
- self.setResponseCode(code, message)
- self.apDataReceived("")
- self.apDataEnd(self.transfered)
-
- def ftpFinishCached(self):
- "Finish the transfer giving the requests the cached file."
- self.ftpclient.quit()
- self.apEndCached()
-
- def ftpFetchMtime(self):
- "Get the modification time from the server."
- def apFtpMtimeFinish(msgs, fetcher, fail):
- """
- Got an answer to the mtime request.
-
- Someone should check that this is timezone independent.
- """
- code = None
- if not fail:
- code, msg = msgs[0].split()
- mtime = None
- if code == '213':
- time_tuple=time.strptime(msg[:14], "%Y%m%d%H%M%S")
- #replace day light savings with -1 (current)
- time_tuple = time_tuple[:8] + (-1,)
- #correct the result to GMT
- mtime = time.mktime(time_tuple) - time.altzone
- if (fetcher.local_mtime and mtime
- and fetcher.local_mtime >= mtime):
- fetcher.ftpFinishCached()
- else:
- fetcher.local_mtime = mtime
- fetcher.ftpFetchSize()
-
- d = self.ftpclient.queueStringCommand('MDTM ' + self.remote_file)
- d.addCallbacks(apFtpMtimeFinish, apFtpMtimeFinish,
- (self, 0), None, (self, 1), None)
-
- def ftpFetchSize(self):
- "Get the size of the file from the server"
- def apFtpSizeFinish(msgs, fetcher, fail):
- code = None
- if not fail:
- code, msg = msgs[0].split()
- if code != '213':
- log.debug("SIZE FAILED",'ftp_client')
- fetcher.ftpFetchList()
- else:
- fetcher.setResponseHeader('content-length', msg)
- fetcher.ftpFetchFile()
-
- d = self.ftpclient.queueStringCommand('SIZE ' + self.remote_file)
- d.addCallbacks(apFtpSizeFinish, apFtpSizeFinish,
- (self, 0), None, (self, 1), None)
-
- def ftpFetchList(self):
- "If ftpFetchSize didn't work try to get the size with a list command."
- def apFtpListFinish(msg, filelist, fetcher, fail):
- __pychecker__ = 'unusednames=msg'
- if fail:
- fetcher.ftpFinish(http.INTERNAL_SERVER_ERROR)
- return
- if len(filelist.files)== 0:
- fetcher.ftpFinish(http.NOT_FOUND)
- return
- file = filelist.files[0]
- fetcher.setResponseHeader('content-length', file['size'])
- fetcher.ftpFetchFile()
- filelist = ftp.FTPFileListProtocol()
- d = self.ftpclient.list(self.remote_file, filelist)
- d.addCallbacks(apFtpListFinish, apFtpListFinish,
- (filelist, self, 0), None,
- (filelist, self, 1), None)
-
- def ftpFetchFile(self):
- "And finally, we ask for the file."
- def apFtpFetchFinish(msg, code, status, fetcher):
- __pychecker__ = 'unusednames=msg,status'
- fetcher.ftpFinish(code)
- log.debug('ftpFetchFile: ' + self.remote_file, 'ftp_client')
- d = self.ftpclient.retrieveFile(self.remote_file, self)
- d.addCallbacks(apFtpFetchFinish, apFtpFetchFinish,
- (http.OK, "good", self), None,
- (http.NOT_FOUND, "fail", self), None)
-
- def dataReceived(self, data):
- self.setResponseCode(http.OK)
- self.apDataReceived(data)
-
- def connectionLost(self, reason=None):
- """
- Maybe we should do some recovery here, I don't know, but the Deferred
- should be enough.
- """
- log.debug("lost connection: %s"%(reason),'ftp_client')
-
-class FetcherGzip(Fetcher, protocol.ProcessProtocol):
- """
- This is a fake Fetcher, it uses the real Fetcher from the request's
- backend via LoopbackRequest to get the data and gzip's or gunzip's as
- needed.
-
- NOTE: We use the serve_cached=0 parameter to Request.fetch so if
- it is cached it doesn't get uselessly read, we just get it from the cache.
- """
- post_convert = re.compile(r"^Should not match anything$")
- gzip_convert = post_convert
-
- exe = '/bin/gzip'
- def activate(self, request, postconverting=0):
- log.debug("FetcherGzip request:" + str(request.uri) + " postconvert:" + str(postconverting), 'gzip')
- Fetcher.activate(self, request)
- if not request.apFetcher:
- return
-
- self.args = (self.exe, '-c', '-9', '-n')
- if(log.isEnabled('gzip',9)):
- self.args += ('-v',)
-
- if request.uri[-3:] == '.gz':
- host_uri = request.uri[:-3]
- else:
- host_uri = request.uri+'.gz'
- self.args += ('-d',)
- self.host_file = self.factory.config.cache_dir + host_uri
- self.args += (self.host_file,)
-
- running = self.factory.runningFetchers
- if not postconverting or running.has_key(host_uri):
- #Make sure that the file is there
- loop = LoopbackRequest(request, self.host_transfer_done)
- loop.uri = host_uri
- loop.local_file = self.host_file
- loop.process()
- self.loop_req = loop
- loop.serve_if_cached=0
- if running.has_key(host_uri):
- #the file is on it's way, wait for it.
- running[host_uri].insert_request(loop)
- else:
- #we are not postconverting, so we need to fetch the host file.
- loop.fetch(serve_cached=0)
- else:
- #The file should be there already.
- self.loop_req = None
- self.host_transfer_done()
-
- def host_transfer_done(self):
- """
- Called by our LoopbackRequest when the real Fetcher calls
- finish() on it.
-
- If everything went well, check mtimes and only do the work if needed.
-
- If posible arrange things so the target file gets the same mtime as
- the host file.
- """
- log.debug('transfer done', 'gzip')
- if self.loop_req and self.loop_req.code != http.OK:
- self.setResponseCode(self.loop_req.code,
- self.loop_req.code_message)
- self.apDataReceived("")
- self.apDataEnd("")
- return
-
- if os.path.exists(self.host_file):
- self.local_mtime = os.stat(self.host_file)[stat.ST_MTIME]
- old_mtime = None
- if os.path.exists(self.local_file):
- old_mtime = os.stat(self.local_file)[stat.ST_MTIME]
- if self.local_mtime == old_mtime:
- self.apEndCached()
- else:
- log.debug("Starting process: " + self.exe + " " + str(self.args), 'gzip')
- self.process = reactor.spawnProcess(self, self.exe, self.args)
-
- def outReceived(self, data):
- self.setResponseCode(http.OK)
- self.apDataReceived(data)
-
- def errReceived(self, data):
- log.debug('gzip: ' + data,'gzip')
-
- def loseConnection(self):
- """
- This is a bad workaround Process.loseConnection not doing it's
- job right.
- The problem only happends when we try to finish the process
- while decompresing.
- """
- if hasattr(self, 'process') and self.process.pid:
- try:
- os.kill(self.process.pid, signal.SIGTERM)
- self.process.connectionLost()
- except exceptions.OSError, Error:
- import errno
- (Errno, Errstr) = Error
- if Errno != errno.ESRCH:
- log.debug('Passing OSError exception '+Errstr)
- raise
- else:
- log.debug('Threw away exception OSError no such process')
-
- def processEnded(self, reason=None):
- __pychecker__ = 'unusednames=reason'
- log.debug("Status: %d" %(self.process.status),'gzip')
- if self.process.status != 0:
- self.setResponseCode(http.NOT_FOUND)
-
- self.apDataReceived("")
- self.apDataEnd(self.transfered)
-
-class FetcherRsync(Fetcher, protocol.ProcessProtocol):
- """
- I frequently am not called directly, Request.fetch makes the
- arrangement for FetcherGzip to use us and gzip the result if needed.
- """
- post_convert = re.compile(r"^Should not match anything$")
- gzip_convert = re.compile(r"/Packages.gz$")
-
- "Temporary filename that rsync streams to"
- rsyncTempFile = None
-
- "Number of bytes sent to client already"
- bytes_sent = 0
-
- def activate (self, request):
- Fetcher.activate(self, request)
- if not request.apFetcher:
- return
-
- # Change /path/to/FILE -> /path/to/.FILE.* to match rsync tempfile
- self.globpattern = re.sub(r'/([^/]*)$', r'/.\1.*', self.local_file)
-
- for file in glob.glob(self.globpattern):
- log.msg('Deleting stale tempfile:' + file)
- unlink(file)
-
- uri = 'rsync://'+request.backendServer.host\
- +request.backendServer.path+'/'+request.backend_uri
- self.local_dir=re.sub(r"/[^/]*$", "", self.local_file)+'/'
-
- exe = '/usr/bin/rsync'
- if(log.isEnabled('rsync',9)):
- args = (exe, '--partial', '--progress', '--verbose', '--times',
- '--timeout', "%d"%(request.backend.config.timeout),
- uri, '.',)
- else:
- args = (exe, '--quiet', '--times', uri, '.',
- '--timeout', "%d"%(request.backend.config.timeout),
- )
- if(not os.path.exists(self.local_dir)):
- os.makedirs(self.local_dir)
- self.process = reactor.spawnProcess(self, exe, args, None,
- self.local_dir)
-
- def findRsyncTempFile(self):
- """
- Look for temporary file created by rsync during streaming
- """
- files = glob.glob(self.globpattern)
-
- if len(files)==1:
- self.rsyncTempFile = files[0]
- log.debug('tempfile: ' + self.rsyncTempFile, 'rsync_client')
- elif not files:
- # No file created yet
- pass
- else:
- log.err('found more than one tempfile, abort rsync')
- self.transport.loseConnection()
-
- def connectionMade(self):
- pass
-
- "Data received from rsync process to stdout"
- def outReceived(self, data):
- for s in string.split(data, '\n'):
- if len(s):
- log.debug('rsync: ' + s, 'rsync_client')
- #self.apDataReceived(data)
- if not self.rsyncTempFile:
- self.findRsyncTempFile()
- # Got tempfile?
- if self.rsyncTempFile:
- self.setResponseCode(http.OK)
- if self.rsyncTempFile:
- self.sendData()
-
-
- "Data received from rsync process to stderr"
- def errReceived(self, data):
- for s in string.split(data, '\n'):
- if len(s):
- log.err('rsync error: ' + s, 'rsync_client')
-
- def sendData(self):
- f = None
- if self.rsyncTempFile:
- try:
- f = open(self.rsyncTempFile, 'rb')
- except IOError:
- return
- else:
- # Tempfile has gone, stream main file
- #log.debug("sendData open dest " + str(self.bytes_sent))
- f = open(self.local_file, 'rb')
-
- if f:
- f.seek(self.bytes_sent)
- data = f.read(abstract.FileDescriptor.bufferSize)
- #log.debug("sendData got " + str(len(data)))
- f.close()
- if data:
- self.apDataReceived(data)
- self.bytes_sent = self.bytes_sent + len(data)
- reactor.callLater(0, self.sendData)
- elif not self.rsyncTempFile:
- # Finished reading final file
- #self.transport = None
- log.debug("sendData complete")
- # Tell clients, but data is already saved by rsync so don't
- # write file again
- self.apDataEnd(self.transfered, False)
-
-
- def processEnded(self, status_object):
- __pychecker__ = 'unusednames=reason'
- log.debug("Status: %d" %(status_object.value.exitCode)
- ,'rsync_client')
- self.rsyncTempFile = None
-
- # Success?
- exitcode = status_object.value.exitCode
-
- if exitcode == 0:
- # File received. Send to clients.
- self.local_mtime = os.stat(self.local_file)[stat.ST_MTIME]
- reactor.callLater(0, self.sendData)
- else:
- if exitcode == 10:
- # Host not found
- self.setResponseCode(http.INTERNAL_SERVER_ERROR)
- else:
- self.setResponseCode(http.NOT_FOUND)
-
- if not os.path.exists(self.local_file):
- try:
- os.removedirs(self.local_dir)
- except:
- pass
- self.apDataReceived("")
- self.apDataEnd(self.transfered)
-
- def loseConnection(self):
- "Kill rsync process"
- if self.transport:
- if self.transport.pid:
- log.debug("killing rsync child" +
- str(self.transport.pid), 'rsync_client')
- os.kill(self.transport.pid, signal.SIGTERM)
- #self.transport.loseConnection()
-
-
-
-class FetcherCachedFile(Fetcher):
- """
- Sends the cached file or tells the client that the file was not
- 'modified-since' if appropriate.
- """
- post_convert = re.compile(r"/Packages.gz$")
- gzip_convert = re.compile(r"^Should not match anything$")
-
- request = None
- def if_modified(self, request):
- """
- Check if the file was 'modified-since' and tell the client if it
- wasn't.
- """
- if_modified_since = request.getHeader('if-modified-since')
- if if_modified_since != None:
- if_modified_since = http.stringToDatetime(
- if_modified_since)
-
- if request.local_mtime <= if_modified_since:
- request.setResponseCode(http.NOT_MODIFIED)
- request.setHeader("Content-Length", 0)
- request.write("")
- request.finish()
- self.remove_request(request)
-
- def insert_request(self, request):
- if not request.serve_if_cached:
- request.finish()
- return
- Fetcher.insert_request(self, request)
-
- log.debug("Serving from cache for additional client: " + self.local_file + " size:" + str(self.size))
- self.start_transfer(request)
-
- def activate(self, request):
- Fetcher.activate(self, request)
- if not request.apFetcher:
- return
- self.factory.file_served(request.uri)
- self.size = request.local_size
-
- self.start_transfer(request)
-
- def start_transfer(self, request):
- self.if_modified(request)
-
- if len(self.requests) == 0:
- #we had a single request and didn't have to send it
- self.apEnd()
- return
-
- if self.size:
- log.debug("Serving from cache: " + self.local_file + " size:" + str(self.size), 'FetcherCachedFile')
- file = open(self.local_file,'rb')
- fcntl.lockf(file.fileno(), fcntl.LOCK_SH)
-
- request.setHeader("Content-Length", request.local_size)
- request.setHeader("Last-modified",
- http.datetimeToString(request.local_mtime))
- basic.FileSender().beginFileTransfer(file, request) \
- .addBoth(self.file_transfer_complete, request) \
- .addBoth(lambda r: file.close())
-# .addBoth(lambda r: request.transport.loseConnection())
- else:
- log.debug("Zero length file! " + self.local_file, 'FetcherCachedFile')
- self.file_transfer_complete(None, request)
- request.finish()
-
- # A file transfer has completed
- def file_transfer_complete(self, result, request):
- log.debug("transfer complete", 'FetcherCachedFile')
- request.finish()
- # Remove this client from request list
- self.remove_request(request)
- if len(self.requests) == 0:
- Fetcher.apEnd(self)
-
class Backend:
"""
A backend repository. There is one Backend for each [...] section
@@ -1214,25 +47,40 @@
"Sequence of BackendServers, in order of preference"
uris = []
+ "Hash of active cache entries"
+ entries = {}
+
"Packages database for this backend"
packages = None
- base = None
+ name = None
+
+ downloadQueuePerClient = True # Set to true if a download queue should be created per client
def __init__(self, factory, config):
+ log.debug("Creating Backend: " + config.name)
self.factory = factory
self.config = config # apBackendConfig configuration information
self.base = config.name # Name of backend
- self.uris=[]
+ self.uris = [] # Sequence of BackendServers, in order of preference
+
+ if self.downloadQueuePerClient:
+ self.queue = fetchers.DownloadQueuePerClient()
+ else:
+ self.queue = fetchers.DownloadQueue()
+
+ self.entries = {} # Hash of active cache entries
+ self.packages = None # Packages database for this backend
for uri in config.backends:
self.addURI(uri)
+
#self.get_packages_db().load()
def addURI(self, uri):
newBackend = BackendServer(self, uri)
self.uris.append(newBackend)
- def get_first_server(self):
+ def get_first_server(self):
"Provide first BackendServer for this Backend"
return self.uris[0]
@@ -1246,6 +94,25 @@
def __str__(self):
return '('+self.base+')'+' servers:'+str(len(self.uris))
+ def get_cache_entry(self, path):
+ """
+ Return CacheEntry for given path
+ a new object is created if it does not already exist
+ """
+ if self.entries.has_key(path):
+ log.debug("Cache entry exists: %s, %s entries" %(path,len(self.entries)))
+ return self.entries[path]
+ else:
+ log.debug("New Cache entry: "+path)
+ e = cache.CacheEntry(self, path)
+ self.entries[path] = e
+ return e
+ def entry_done(self, entry):
+ "A cache entry is finished and clients are disconnected"
+ #if self.entries.has_key(entry.path):
+ log.debug("entry_done: %s" %(entry.path), 'Backend')
+ del self.entries[entry.path]
+
def get_packages_db(self):
"Return packages parser object for the backend, creating one if necessary"
if self.packages == None:
@@ -1255,12 +122,22 @@
def get_path(self, path):
"""
'path' is the original uri of the request.
-
+
We return the path to be appended to the backend path to
request the file from the backend server
"""
return path[len(self.base)+2:]
-
+
+ def file_served(self, entry):
+ "A cache entry has served a file in this backend"
+ self.get_packages_db().file_updated(entry)
+
+ def start_download(self, entry):
+ """
+ A CacheEntry has requested that a file should be downloaded from the backend
+ """
+ self.queue.addFile(entry)
+
class BackendServer:
"""
A repository server. A BackendServer is created for each URI defined in 'backends'
@@ -1271,10 +148,10 @@
uri = None # URI of server
fetchers = {
- 'http' : FetcherHttp,
- 'ftp' : FetcherFtp,
- 'rsync': FetcherRsync,
- 'file' : FetcherFile,
+ 'http' : fetchers.HttpFetcher,
+ 'ftp' : fetchers.FtpFetcher,
+ 'rsync': fetchers.RsyncFetcher,
+ 'file' : fetchers.FileFetcher,
}
ports = {
'http' : 80,
@@ -1289,7 +166,7 @@
log.debug("Created new BackendServer: " + uri)
# hack because urlparse doesn't support rsync
- if uri[0:5] == 'rsync':
+ if uri[0:6] == 'rsync:':
uri = 'http'+uri[5:]
is_rsync=1
else:
@@ -1297,6 +174,8 @@
self.scheme, netloc, self.path, parameters, \
query, fragment = urlparse.urlparse(uri)
+ if is_rsync:
+ self.scheme = 'rsync'
if '@' in netloc:
auth = netloc[:netloc.rindex('@')]
@@ -1304,13 +183,12 @@
self.username, self.password = auth.split(':')
else:
self.username = None
+ self.password = None
if ':' in netloc:
self.host, self.port = netloc.split(':')
else:
self.host = netloc
self.port = self.ports[self.scheme]
- if is_rsync:
- self.scheme = 'rsync'
self.fetcher = self.fetchers[self.scheme]
try:
self.port = int(self.port)
@@ -1320,21 +198,23 @@
def __str__(self):
return ('(' + self.backend.base + ') ' + self.scheme + '://' +
self.host + ':' + str(self.port))
-
+
class Request(http.Request):
"""
Each new request from connected clients generates a new instance of this
class, and process() is called.
"""
- local_mtime = None
+ if_modified_since = None
local_size = None
serve_if_cached = 1
apFetcher = None
uriIndex = 0 # Index of backend URI
backend = None # Backend for this request
backendServer = None # Current server to be tried
+ cacheEntry = None # Cache entry for file requested
def __init__(self, channel, queued):
+ log.debug("New Request, queued=%s" % (queued),'Request');
self.factory=channel.factory
http.Request.__init__(self, channel, queued)
@@ -1342,298 +222,118 @@
"""
Each new request begins processing here
"""
- log.debug("Request: " + self.method + " " + self.uri);
- # Clean up URL
- self.uri = self.simplify_path(self.uri)
+ self.uri = self.clean_path(self.uri)
- self.local_file = self.factory.config.cache_dir + self.uri
- backendName = self.uri[1:].split('/')[0]
- log.debug("Request: %s %s backend=%s local_file=%s"%(self.method, self.uri, backendName, self.local_file))
+ if_modified_since = self.getHeader('if-modified-since')
+ if if_modified_since != None:
+ self.if_modified_since = http.stringToDatetime(
+ if_modified_since)
+
+ if self.uri[0] != '/':
+ log.debug("Request must include at least one '/'")
+ self.finishCode(http.FORBIDDEN, "Request must include at least one '/'")
+ return
- if self.factory.config.disable_pipelining:
- self.setHeader('Connection','close')
- self.channel.persistent = 0
+ backendName = self.uri[1:].split('/')[0]
+ log.debug("Request: %s %s backend=%s uri=%s"
+ % (self.method, self.uri, backendName, self.uri),'Request')
if self.method != 'GET':
#we currently only support GET
- log.debug("abort - method not implemented")
+ log.debug("abort - method not implemented", 'Request')
self.finishCode(http.NOT_IMPLEMENTED)
return
if re.search('/\.\./', self.uri):
- log.debug("/../ in simplified uri ("+self.uri+")")
+ log.debug("/../ in simplified uri ("+self.uri+")", 'Request')
self.finishCode(http.FORBIDDEN)
return
self.backend = self.factory.getBackend(backendName)
if self.backend is None:
- if not self.factory.config.dynamic_backends:
- log.debug("abort - non existent Backend")
- self.finishCode(http.NOT_FOUND, "NON-EXISTENT BACKEND")
- return
-
- # We are using dynamic backends so we will use the name as
- # the hostname to get the files.
- backendName = self.uri[1:].split('/')[0]
- backendServer = "http://" + backendName
- log.debug("Adding " + backendName + " backend dynamicaly")
- backendConfig = self.factory.config.addBackend(None, backendName, (backendServer,))
- self.backend = Backend(self.factory, backendConfig)
- self.backend_uri = self.backend.get_path(self.uri)
+ self.finishCode(http.NOT_FOUND, "NON-EXISTENT BACKEND")
+ return None
log.debug("backend: %s %s" % (self.backend.base, self.backend.uris))
- self.backendServer = self.backend.get_first_server()
- self.filetype = findFileType(self.uri)
- if not self.filetype:
- log.debug("abort - unknown extension")
- self.finishCode(http.NOT_FOUND)
+ backend_path = self.uri.split('/',2)[2]
+ self.cacheEntry = self.backend.get_cache_entry(backend_path)
+
+ if not self.cacheEntry.filetype:
+ log.debug("abort - unknown extension for file %s" % (backend_path), 'Request')
+ self.finishCode(http.FORBIDDEN, 'File not found - unknown extension')
return
- self.setHeader('content-type', self.filetype.contype)
+ self.setHeader('content-type', self.cacheEntry.filetype.contype)
- if os.path.isdir(self.local_file):
- log.debug("abort - Directory listing not allowed")
- self.finishCode(http.FORBIDDEN)
+ if os.path.isdir(self.cacheEntry.file_path):
+ log.debug("abort - Directory listing not allowed", 'Request')
+ self.finishCode(http.FORBIDDEN, 'Directory listing not permitted')
return
- self.fetch()
+ self.cacheEntry.add_request(self)
+
+ def clean_path(self, uri):
+ # Clean up URL given
+ scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
+ return os.path.normpath(path)
- def fetch(self, serve_cached=1):
+
+ def start_streaming(self, size, mtime):
"""
- Serve 'self' from cache or through the appropriate Fetcher
- depending on the asociated backend.
-
- Use post_convert and gzip_convert regular expresions of the Fetcher
- to gzip/gunzip file before and after download.
-
- 'serve_cached': this is somewhat of a hack only useful for
- LoopbackRequests (See LoopbackRequest class for more information).
+ Prepare client to stream file
+ Return false if streaming is not necessary (i.e. cache hit)
"""
- def fetch_real(result, dummyFetcher, cached, running):
- """
- This is called after verifying if the file is properly cached.
-
- If 'cached' the requested file is properly cached.
- If not 'cached' the requested file was not there, didn't pass the
- integrity check or may be outdated.
- """
- __pychecker__ = 'unusednames=result'
-
- if len(dummyFetcher.requests)==0:
- #The request's are gone, the clients probably closed the
- #conection
- log.debug("THE REQUESTS ARE GONE (Clients closed conection)",
- 'fetch')
- dummyFetcher.apEnd()
- return
-
-
- req = dummyFetcher.request
-
- log.debug("cached: %s" % cached)
-
- if cached:
- msg = ("Using cached copy of %s"
- %(dummyFetcher.request.local_file))
- fetcher_class = FetcherCachedFile
- else:
- msg = ("Consulting server about %s"
- %(dummyFetcher.request.local_file))
- fetcher_class = req.backendServer.fetcher
-
- if fetcher_class.gzip_convert.search(req.uri):
- msg = ("Using gzip/gunzip to get %s"
- %(dummyFetcher.request.local_file))
- fetcher_class = FetcherGzip
-
- log.debug(msg, 'fetch_real')
- fetcher = dummyFetcher.apEndTransfer(fetcher_class)
- if (fetcher and fetcher.post_convert.search(req.uri)
- and not running.has_key(req.uri[:-3])):
- log.debug("post converting: "+req.uri,'convert')
- loop = LoopbackRequest(req)
- loop.uri = req.uri[:-3]
- loop.local_file = req.local_file[:-3]
- loop.process()
- loop.serve_if_cached=0
- #FetcherGzip will attach as a request of the
- #original Fetcher, efectively waiting for the
- #original file if needed
- gzip = FetcherGzip()
- gzip.activate(loop, postconverting=1)
-
- self.serve_if_cached = serve_cached
- running = self.factory.runningFetchers
- if (running.has_key(self.uri)):
- #If we have an active fetcher just use that
- log.debug("have active fetcher: "+self.uri,'client')
- running[self.uri].insert_request(self)
- return running[self.uri]
+ if self.if_modified_since is None or self.if_modified_since < mtime:
+ log.debug("start_streaming size=%s mtime=%s if_modified_since=%s" % (size, mtime, self.if_modified_since) , 'Request')
+ self.setResponseCode(http.OK, 'Streaming file')
+ if mtime is not None:
+ self.setHeader('last-modified', http.datetimeToString(mtime))
+ if size is not None:
+ self.setHeader('content-length', size)
+ return True
else:
- #we make a FetcherDummy instance to hold other requests for the
- #same file while the check is in process. We will transfer all
- #the requests to a real fetcher when the check is done.
- dummyFetcher = FetcherDummy(self)
- #Standard Deferred practice
- d = self.check_cached()
- d.addCallbacks(fetch_real, fetch_real,
- (dummyFetcher, 1, running,), None,
- (dummyFetcher, 0, running,), None)
- return None
-
- def simplify_path(self, old_path):
- """
- change //+ with /
- change /directory/../ with /
- More than three ocurrences of /../ together will not be
- properly handled
-
- NOTE: os.path.normpath could probably be used here.
- """
- path = re.sub(r"//+", "/", old_path)
- path = re.sub(r"/\./+", "/", path)
- new_path = re.sub(r"/[^/]+/\.\./", "/", path)
- while (new_path != path):
- path = new_path
- new_path = re.sub(r"/[^/]+/\.\./", "/", path)
- if (new_path != old_path):
- log.debug("simplified path from " + old_path +
- " to " + new_path,'simplify_path')
- return path
+ log.debug("file not modified: mtime=%s if_modified_since=%s" % (mtime, self.if_modified_since) , 'Request')
+ self.setHeader("content-length", 0)
+ self.finishCode(http.NOT_MODIFIED, 'File is up to date')
+ return False
def finishCode(self, responseCode, message=None):
- "Finish the request with an status code"
+ "Finish the request with a status code and no streamed data"
+ log.debug("finishCode: %s, %s" % (responseCode, message), 'Request')
self.setResponseCode(responseCode, message)
self.write("")
self.finish()
def finish(self):
+ "Finish request after streaming"
+ log.debug("finish. Queued: %s" % (self.queued) , 'Request')
http.Request.finish(self)
- if self.factory.config.disable_pipelining:
- if hasattr(self.transport, 'loseConnection'):
- self.transport.loseConnection()
-
- def check_cached(self):
- """
- check the existence and ask for the integrity of the requested file and
- return a Deferred to be trigered when we find out.
- """
- def file_ok(result, deferred, self):
- """
- called if FileVerifier has determined that the file is cached and
- in good shape.
-
- Now we check NOTE: The file may still be too old or not fresh
- enough.
- """
- __pychecker__ = 'unusednames=result'
- stat_tuple = os.stat(self.local_file)
-
- self.local_mtime = stat_tuple[stat.ST_MTIME]
- self.local_size = stat_tuple[stat.ST_SIZE]
- log.debug("Modification time:" +
- time.asctime(time.localtime(self.local_mtime)),
- "file_ok")
- update_times = self.factory.update_times
-
- if update_times.has_key(self.uri):
- last_access = update_times[self.uri]
- log.debug("last_access from db: " +
- time.asctime(time.localtime(last_access)),
- "file_ok")
- else:
- last_access = self.local_mtime
-
- cur_time = time.time()
- min_time = cur_time - self.factory.config.min_refresh_delay
+ if self.cacheEntry:
+ self.cacheEntry.remove_request(self)
+ self.cacheEntry = None
- if not self.filetype.mutable:
- log.debug("file is immutable: "+self.local_file, 'file_ok')
- deferred.callback(None)
- elif last_access < min_time:
- log.debug("file is too old: "+self.local_file, 'file_ok')
- update_times[self.uri] = cur_time
- deferred.errback()
- else:
- log.debug("file is ok: "+self.local_file, 'file_ok')
- deferred.callback(None)
-
- log.debug("check_cached: "+self.local_file, 'file_ok')
- deferred = defer.Deferred()
- if os.path.exists(self.local_file):
- verifier = FileVerifier(self)
- verifier.deferred.addCallbacks(file_ok, deferred.errback,
- (deferred, self), None,
- None, None)
- else:
- deferred.errback()
- return deferred
-
def connectionLost(self, reason=None):
"""
The connection with the client was lost, remove this request from its
Fetcher.
"""
- __pychecker__ = 'unusednames=reason'
- #If it is waiting for a file verification it may not have an
- #apFetcher assigned
- if self.apFetcher:
- self.apFetcher.remove_request(self)
- self.finish()
+ log.debug("connectionLost" , 'Request')
+ if self.cacheEntry:
+ self.cacheEntry.remove_request(self)
+ #self.finish()
- def activateNextBackendServer(self, fetcher):
+ def getFileno(self):
"""
- The attempt to retrieve a file from the BackendServer failed.
- Look for the next possible BackendServer and transfer requests to that
- Returns true if another BackendServer was found
- """
- self.backendServer = self.backend.get_next_server(self.backendServer)
- if(self.backendServer == None):
- log.debug("no more Backends", "fetcher")
- return False
-
- fetcher_class = self.backendServer.fetcher
- log.debug('Trying next backendServer', 'fetcher')
- fetcher.apEndTransfer(fetcher_class)
-
- return True
-
-
-class LoopbackRequest(Request):
- """
- This is just a fake Request so a Fetcher can attach to another
- Fetcher and be notified when then transaction is completed.
-
- Look at FetcherGzip for a sample.
- """
- __pychecker__ = 'no-callinit'
- import cStringIO
- local_mtime = None
- headers = {}
- content = cStringIO.StringIO()
-
- def __init__(self, other_req, finish=None):
-
- self.finish_cb = finish
- http.Request.__init__(self, None, 1)
- self.backend = other_req.backend
- self.factory = other_req.factory
- self.filetype = other_req.filetype
- self.method = other_req.method
- self.clientproto = other_req.clientproto
- def process(self):
- self.backend_uri = self.backend.get_path(self.uri)
- def write(self, data):
- "We don't care for the data, just want to know then it is served."
- pass
- def finish(self):
- "If he wanted to know, tell daddy that we are served."
- if self.finish_cb:
- self.finish_cb()
- self.transport = None
- pass
+ Get identifier which is unique per apt client
+ """
+ try:
+ fileno = self.channel.transport.fileno()
+ except:
+ fileno = -1
+ log.msg("could not get transport's file descriptor", 'Request')
+ return fileno
class Channel(http.HTTPChannel):
"""
@@ -1647,7 +347,7 @@
def headerReceived(self, line):
"log and pass over to the base class"
- #log.debug("Header: " + line)
+ log.debug("Header: " + line)
if self.log_headers == None:
self.log_headers = line
else:
@@ -1663,13 +363,16 @@
def connectionLost(self, reason=None):
"If the connection is lost, notify all my requests"
__pychecker__ = 'unusednames=reason'
- for req in self.requests:
- req.connectionLost()
- log.debug("Client connection closed")
+ log.debug("Client connection closed", 'Channel')
+ http.HTTPChannel.connectionLost(self, reason)
if log.isEnabled('memleak'):
memleak.print_top_10()
#reactor.stop() # use for shutting down apt-proxy when a client disconnects
+ #def requestDone(self, request):
+ #log.debug("========Request Done=========", 'Channel')
+ #http.HTTPChannel.requestDone(self, request)
+
class Factory(protocol.ServerFactory):
"""
This is the center of apt-proxy, it holds all configuration and global data
@@ -1691,81 +394,55 @@
self.packages: all versions of a certain package name.
"""
- databases=('update_times', 'access_times', 'packages')
+
+
+
+ def __init__ (self, config):
+ self.runningFetchers = {}
+ self.backends = {}
+ self.config = config
+ self.periodicCallback = None
+ self.databases = databaseManager(self)
+ self.recycler = None
+
+ def __del__(self):
+ pass
+ #self.closeDatabases()
def periodic(self):
"Called periodically as configured mainly to do mirror maintanace."
log.debug("Doing periodic cleaning up")
+ self.periodicCallback = None
self.clean_old_files()
self.recycler.start()
log.debug("Periodic cleaning done")
- if (self.config.cleanup_freq != None):
- reactor.callLater(self.config.cleanup_freq, self.periodic)
- def __del__(self):
- for f in self.databases:
- try:
- if hasattr(self, f):
- getattr(self, f).close()
- except Exception:
- pass
- def __init__ (self, config):
- self.runningFetchers = {}
- self.backends = []
- self.config = config
+ self.startPeriodic()
+
+ def startPeriodic(self):
+ if (self.config.cleanup_freq != None and self.periodicCallback is None):
+ log.debug("Will do periodic cleaup in %s sec" % (self.config.cleanup_freq))
+ self.periodicCallback = reactor.callLater(self.config.cleanup_freq, self.periodic)
+
+ def stopPeriodic(self):
+ if self.periodicCallback is not None:
+ self.periodicCallback.cancel()
+ self.periodicCallback = None
def __getattr__ (self, name):
- def open_shelve(dbname):
- from bsddb3 import db,dbshelve
-
- shelve = dbshelve.DBShelf()
- db_dir = self.config.cache_dir+'/'+status_dir+'/db'
- if not os.path.exists(db_dir):
- os.makedirs(db_dir)
-
- filename = db_dir + '/' + dbname + '.db'
- if os.path.exists(filename):
- try:
- log.debug('Verifying database: ' + filename)
- shelve.verify(filename)
- except:
- os.rename(filename, filename+'.error')
- log.msg(filename+' could not be opened, moved to '+filename+'.error','db', 1)
- log.msg('Recreating '+ filename,'db', 1)
- try:
- log.debug('Opening database ' + filename)
- shelve = dbshelve.open(filename)
-
- # Handle upgrade to new format included on 1.9.20.
- except db.DBInvalidArgError:
- log.msg('Upgrading from previous database format: %s' % filename + '.previous')
- import bsddb.dbshelve
- os.rename(filename, filename + '.previous')
- previous_shelve = bsddb.dbshelve.open(filename + '.previous')
- shelve = dbshelve.open(filename)
-
- for k in previous_shelve.keys():
- shelve[k] = previous_shelve[k]
- log.msg('Upgrade complete')
-
- return shelve
-
- if name == 'update_times':
- self.update_times = open_shelve('update')
- return self.update_times
- elif name == 'access_times':
- self.access_times = open_shelve('access')
- return self.access_times
- elif name == 'packages':
- self.packages = open_shelve('packages')
- return self.packages
+ # Auto open database if requested
+ if name in self.databases.table_names:
+ db = self.databases.get(name)
+ setattr(self, name, db)
+ return db
else:
raise AttributeError(name)
def startFactory(self):
#start periodic updates
self.configurationChanged()
- self.recycler = misc.MirrorRecycler(self, 1)
- self.recycler.start()
+ self.dumpdbs()
+ self.recycler = MirrorRecycler(self, 1)
+ #self.recycler.start()
def configurationChanged(self, oldconfig = None):
"""
@@ -1779,7 +456,7 @@
setattr(self.config, param, getattr(oldconfig, param))
if self.config.cleanup_freq != None and (oldconfig is None or oldconfig.cleanup_freq == None):
- reactor.callLater(self.config.cleanup_freq, self.periodic)
+ self.startPeriodic()
self.createBackends()
def createBackends(self):
@@ -1796,7 +473,18 @@
"""
if self.backends.has_key(name):
return self.backends[name]
- return None
+
+ if not self.config.dynamic_backends:
+ return None
+
+ # We are using dynamic backends so we will use the name as
+ # the hostname to get the files.
+ backendServer = "http://" + name
+ log.debug("Adding dynamic backend:" + name)
+ backendConfig = self.config.addBackend(None, name, (backendServer,))
+ backend = Backend(self, backendConfig)
+ self.backends[name] = backend
+ return backend
def clean_versions(self, packages):
"""
@@ -1825,6 +513,8 @@
from packages import AptDpkgInfo, get_mirror_versions
for uri in packages[:]:
if not os.path.exists(cache_dir +'/'+ uri):
+ log.debug("clean_versions: file %s no longer exists"%(uri),
+ 'versions')
packages.remove(uri)
else:
try:
@@ -1833,17 +523,17 @@
package_name = info['Package']
except SystemError:
log.msg("Found problems with %s, aborted cleaning"%(uri),
- 'max_versions')
+ 'versions')
return
- if len(info):
+ if len(cached_packages) > 0:
import apt_pkg
cached_packages.sort(reverse_compare)
- log.debug(str(cached_packages), 'max_versions')
+ log.debug(str(cached_packages), 'versions')
current_packages = get_mirror_versions(self, package_name)
current_packages.sort(reverse_compare)
- log.debug("Current Versions: " + str(current_packages), 'max_versions')
+ log.debug("Current Versions: " + str(current_packages), 'versions')
version_count = 0
@@ -1865,6 +555,7 @@
if version_count > self.config.max_versions:
log.msg("Deleting " + cache_dir +'/'+ cached_packages[0][1], 'max_versions')
os.unlink(cache_dir +'/'+ cached_packages[0][1])
+ packages.remove(cached_packages[0][1])
del cached_packages[0]
def clean_old_files(self):
@@ -1896,30 +587,52 @@
log.debug("old_file: non-existent "+file)
del self.update_times[file]
- def file_served(self, uri):
- "Update the databases, this file has just been served."
- self.access_times[uri]=time.time()
- if re.search("\.deb$", uri):
- package = re.sub("^.*/", "", uri)
+ def file_served(self, cache_path):
+ """
+ Update the databases, this file has just been served.
+ @param cache_path: path of file within cache e.g. debian/dists/stable/Release.gpg
+ """
+ log.debug("File served: %s" % (cache_path))
+ path = os.sep + cache_path # Backwards compat
+ #path = cache_path
+ self.access_times[path]=time.time()
+ if re.search("\.deb$", path):
+ package = re.sub("^.*/", "", path)
package = re.sub("_.*$", "", package)
if not self.packages.has_key(package):
- packages = [uri]
- self.packages[package] = packages
+ packages = [path]
else:
packages = self.packages[package]
- if not uri in packages:
- packages.append(uri)
+ if not path in packages:
+ packages.append(path)
self.clean_versions(packages)
- self.packages[package] = packages
+ self.packages[package] = packages
self.dumpdbs()
+ def closeDatabases(self):
+ for db in self.databases.table_names:
+ if getattr(self.databases, db) is not None:
+ log.debug("closing " + db, 'db')
+ getattr(self,db).close()
+ delattr(self,db)
+ setattr(self.databases, db, None)
+
def stopFactory(self):
+ log.debug('Main factory stop', 'factory')
import packages
- self.dumpdbs()
- self.update_times.close()
- self.access_times.close()
- self.packages.close()
+ # self.dumpdbs()
+
+ # Stop all DownloadQueues and their fetchers
+ for b in self.backends.values():
+ b.queue.stop()
+ b.queue = None
+ self.backends = {}
packages.cleanup(self)
+ if self.recycler is not None:
+ self.recycler.stop()
+ self.recycler = None
+ self.stopPeriodic()
+ #self.closeDatabases()
def dumpdbs (self):
def dump_update(key, value):
@@ -1960,3 +673,56 @@
def debug(self, message):
log.debug(message)
+
+class databaseManager:
+ update_times = None
+ access_times = None
+ packages = None
+ table_names=['update_times', 'access_times', 'packages']
+ database_files=['update', 'access', 'packages']
+
+ def __init__(self, factory):
+ self.factory = factory
+
+ def get(self, name):
+ idx = self.table_names.index(name)
+ db = getattr(self,name)
+ if db is None:
+ db = self.open_shelve(self.database_files[idx])
+ setattr(self, name, db)
+ return db
+
+ def open_shelve(self, dbname):
+ from bsddb import db,dbshelve
+
+ shelve = dbshelve.DBShelf()
+ db_dir = self.factory.config.cache_dir+'/'+status_dir+'/db'
+ if not os.path.exists(db_dir):
+ os.makedirs(db_dir)
+
+ filename = db_dir + '/' + dbname + '.db'
+ if os.path.exists(filename):
+ try:
+ log.debug('Verifying database: ' + filename)
+ shelve.verify(filename)
+ except:
+ os.rename(filename, filename+'.error')
+ log.msg(filename+' could not be opened, moved to '+filename+'.error','db', 1)
+ log.msg('Recreating '+ filename,'db', 1)
+ try:
+ log.debug('Opening database ' + filename)
+ shelve = dbshelve.open(filename)
+
+ # Handle upgrade to new format included on 1.9.20.
+ except db.DBInvalidArgError:
+ log.msg('Upgrading from previous database format: %s' % filename + '.previous')
+ import bsddb.dbshelve
+ os.rename(filename, filename + '.previous')
+ previous_shelve = bsddb.dbshelve.open(filename + '.previous')
+ shelve = dbshelve.open(filename)
+
+ for k in previous_shelve.keys():
+ shelve[k] = previous_shelve[k]
+ log.msg('Upgrade complete')
+
+ return shelve
Modified: trunk/apt_proxy/apt_proxy_conf.py
==============================================================================
--- trunk/apt_proxy/apt_proxy_conf.py (original)
+++ trunk/apt_proxy/apt_proxy_conf.py Thu Aug 3 23:54:46 2006
@@ -49,6 +49,8 @@
def gettime(self, section, option):
mult = 1
value = self.get(section, option)
+ if len(value) == 0:
+ raise ConfigError("Configuration parse error: [%s] %s" % (section, option))
suffix = value[-1].lower()
if suffix in self.time_multipliers.keys():
mult = self.time_multipliers[suffix]
@@ -58,6 +60,13 @@
return self.get(section,option)
def getstringlist(self, section, option):
return self.get(section,option).split()
+ def getproxyspec(self, section, option):
+ "Get http proxy info from string"
+ p = ProxyConfig(self.get(section,option))
+ if p.host is not None:
+ return p
+ else:
+ return None
class apConfig:
"""
@@ -77,7 +86,7 @@
['address', '', 'string'],
['port', 9999, 'int'],
['min_refresh_delay', 30, 'time'],
- ['complete_clientless_downloads', '0', 'boolean'],
+ ['complete_clientless_downloads', False, 'boolean'],
['telnet_port', 0, 'int'],
['telnet_user', '', 'string'],
['telnet_pass', '', 'string'],
@@ -87,11 +96,11 @@
['max_versions', 3, '*int'],
['max_age', 10, '*time'],
['import_dir', '/var/cache/apt-proxy/import', 'string'],
- ['disable_pipelining', '1', 'boolean'],
['passive_ftp', 'on', 'boolean'],
['dynamic_backends', 'on', 'boolean'],
- ['http_proxy', '' , 'string'],
- ['username', 'aptproxy', 'string']
+ ['http_proxy', None , 'proxyspec'],
+ ['username', 'aptproxy', 'string'],
+ ['bandwidth_limit', None, '*int']
]
"""
@@ -104,7 +113,9 @@
BACKEND_CONFIG_ITEMS = [
['timeout', None, 'time'],
['passive_ftp', None, 'boolean'],
- ['backends', '', 'stringlist']
+ ['backends', '', 'stringlist'],
+ ['http_proxy', None , 'proxyspec'],
+ ['bandwidth_limit', None, '*int']
]
DEFAULT_CONFIG_FILE = ['/etc/apt-proxy/apt-proxy-v2.conf',
@@ -160,9 +171,8 @@
filehandle.close()
return conf
- def setDebug(self, levels):
+ def setDebug(self):
"Set logger debug level"
- self.debug = levels
for domain in self.debug.split():
#print "domain:",domain
if domain.find(':') != -1:
@@ -180,7 +190,7 @@
self.debug=config.get(DEFAULTSECT, 'debug')
else:
self.debug='all:3'
- self.setDebug(self.debug)
+ self.setDebug()
# read default values
for name,default,getmethod in self.CONFIG_ITEMS:
@@ -262,3 +272,23 @@
name = "UNKNOWN"
def __init__(self, name):
self.name = name
+
+class ProxyConfig:
+ """
+ Configuration information for backend server proxies
+ """
+ host = None
+ port = None
+ user = None
+ password = None
+
+ def __init__(self, proxyspec):
+ if proxyspec=='':
+ return
+ m = re.match('^((?P<user>.*):(?P<password>.*)@)?(?P<host>[a-zA-Z0-9_.+=-]+):(?P<port>[0-9]+)',
+ proxyspec)
+ if m:
+ self.host = m.group('host')
+ self.port = m.group('port')
+ self.user = m.group('user')
+ self.password = m.group('password')
Added: trunk/apt_proxy/cache.py
==============================================================================
--- (empty file)
+++ trunk/apt_proxy/cache.py Thu Aug 3 23:54:46 2006
@@ -0,0 +1,595 @@
+#
+# Copyright (C) 2005 Chris Halls <halls at debian.org>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+# -*- test-case-name: apt_proxy.test.test_cache -*-
+
+"""
+Cache management for apt-proxy
+
+These classes implement functionality for managing apt-proxy's cache. The most
+important of these is CacheEntry, which manages the lifecycle of a file in ap's cache
+"""
+
+from twisted.internet import protocol, defer, reactor
+from twisted.web import http
+from twisted.protocols import basic
+import os, re, stat, time, sys
+from misc import log
+
+class CacheEntry:
+ """
+ This class manages operations on a file in the cache. Each physical
+ file on the disk corresponds to one CacheEntry. Normally a CacheEntry
+ is created when the first Request for this file is received
+
+ Active CacheEntries are managed in their corresponding Backend
+ """
+
+ # Define lifecyle of cache entry
+ STATE_NEW = 1 # Entry is not yet being sent
+ STATE_CONNECTING = 2 # Waiting for connection to download file
+ STATE_DOWNLOAD = 3 # File is in process of downloading
+ STATE_SENDFILE = 4 # File is being sent from cache
+ STATE_SENT = 5 # Post download processing / waiting for clients to complete
+ STATE_FAILED = 6 # Download failed
+
+
+ bytesDownloaded = 0
+
+ def __init__(self, backend, path):
+ """
+ Create a new cache entry
+ @param backend Backend where this entry belongs
+ @param path Path to file within backend directory
+ """
+ self.backend = backend
+ self.factory = backend.factory
+ self.requests = [] # Active client requests for this cache entry
+ self.streamfile = None
+ self.state = self.STATE_NEW
+
+ # Path of file within backend e.g. 'dists/stable/Release.gpg'
+ self.path = path
+
+ # Path of file within cache e.g. 'debian/dists/stable/Release.gpg'
+ self.cache_path = backend.base + os.sep + path
+
+ # File in cache '/var/cache/apt-proxy/debian/dists/stable/Release.gpg'
+ self.file_path = (self.factory.config.cache_dir + os.sep +
+ self.cache_path)
+
+ # Directory of cache file '/var/cache/apt-proxy/debian/dists/stable'
+ self.filedir = os.path.dirname(self.file_path)
+
+ self.filetype = findFileType(path)
+ self.filename = os.path.basename(path) # 'Release.gpg'
+
+ # filebase='Release' fileext='gpg'
+ (self.filebase, self.fileext) = os.path.splitext(self.filename)
+
+ # self.create_directory()
+ self.file_mtime = None
+ self.file_size = None
+
+ self.fetcher = None
+
+ def add_request(self, request):
+ """
+ A new request has been received for this file
+ """
+ if request in self.requests:
+ raise RuntimeError, \
+ 'this request is already assigned to this CacheEntry'
+ self.requests.append(request)
+ if(len(self.requests)==1):
+ # First request
+ self.get()
+ else:
+ # Subsequent request - client must be brought up to date
+ if self.state == self.STATE_DOWNLOAD:
+ raise RuntimeError, \
+ 'TODO: multiple clients not implemented yet'
+
+ def remove_request(self,request):
+ """
+ Remove request, either because streaming is complete or
+ the client has disconnected
+
+ If parameter request is None, downloading has been aborted early
+ """
+ if request is not None and request in self.requests:
+ self.requests.remove(request)
+ if len(self.requests) != 0:
+ return
+
+ log.debug("Last request removed",'cacheEntry')
+ self.backend.entry_done(self)
+
+ # TODO - fixme
+ #if (self.factory.config.complete_clientless_downloads == False
+ #and self.state == self.STATE_DOWNLOAD
+ #and self.fetcher is not None):
+ ## Cancel download in progress
+ #log.debug("cancelling download (set complete_clientless_downloads to continue)",'cacheEntry')
+ #self.fetcher.cancel_download()
+
+ if self.streamfile is not None:
+ # File was streamed to clients
+ self.streamfile.close()
+ self.streamfile = None
+
+ def start_request_stream(self, request):
+ """
+ Prepare a request for streaming
+ """
+ log.msg("start_request_stream:" + self.file_path, "CacheEntry")
+ request.startStreaming(self.size, self.mtime)
+
+ if self.streamfile.size() != 0:
+ request.write(self.streamfile.read_from(start=0)) # TODO - is this efficient?
+
+
+ def get(self):
+ """
+ Update current version of file in cache
+ """
+ if self.state == self.STATE_NEW:
+ if os.path.exists(self.file_path):
+ self.stat_file()
+ if self.check_age():
+ self.verify()
+ return
+
+ self.start_download()
+
+ def verify(self):
+ """
+ check the existence and ask for the integrity of the requested file and
+ return a Deferred to be trigered when we find out.
+ """
+ log.debug("check_cached: "+self.path, 'CacheEntry')
+ verifier = FileVerifier(self.file_path, self.factory.config)
+ d = verifier.verify()
+ d.addCallback(self.send_cached_file)
+ d.addErrback(self.verify_failed)
+
+ def verify_failed(self, parm=None):
+ self.file_mtime = None
+ self.file_size = None
+ self.start_download()
+
+ def stat_file(self):
+ """
+ Read file age
+ """
+ stat_tuple = os.stat(self.file_path)
+
+ self.file_mtime = stat_tuple[stat.ST_MTIME]
+ self.file_size = stat_tuple[stat.ST_SIZE]
+ log.debug("Modification time:" +
+ time.asctime(time.localtime(self.file_mtime)),
+ "CacheEntry")
+
+ def check_age(self):
+ """
+ Read file age and check if file should be updated / refreshed
+
+ @return True if file is still valid, False if file is out of date
+ """
+
+ update_times = self.factory.update_times
+
+ if update_times.has_key(self.cache_path):
+ last_access = update_times[self.cache_path]
+ log.debug("last_access from db: " +
+ time.asctime(time.localtime(last_access)),
+ "CacheEntry")
+ else:
+ last_access = self.file_mtime
+
+
+ cur_time = time.time()
+ min_time = cur_time - self.factory.config.min_refresh_delay
+
+ if not self.filetype.mutable:
+ log.debug("file is immutable: "+self.file_path, 'CacheEntry')
+ return True
+ elif last_access < min_time:
+ log.debug("file is too old: "+self.file_path, 'CacheEntry')
+ return False
+ else:
+ log.debug("file is ok: "+self.file_path, 'CacheEntry')
+ return True
+
+ def send_cached_file(self, unused=None):
+ """
+ File is up to date - send complete file from cache to clients
+ """
+ log.msg("sending file from cache:" + self.file_path, "CacheEntry")
+ self.transfer_file(self.file_path)
+
+ def end_send_cached(self):
+ """
+ Processing continues here when the file has been sent from the cache
+ """
+ self.file_sent()
+
+ def transfer_file(self, filename):
+ """
+ Send given file to clients
+ """
+ log.msg("transfer_file:" + filename, "CacheEntry")
+ try:
+ stat_tuple = os.stat(filename)
+ mtime = stat_tuple[stat.ST_MTIME]
+ size = stat_tuple[stat.ST_SIZE]
+
+ self.state = self.STATE_SENDFILE
+ if size > 0:
+ log.debug("Sending file to clients:%s size:%s" % (filename, size), 'CacheEntry')
+ self.streamfile = open(filename,'rb')
+ #fcntl.lockf(file.fileno(), fcntl.LOCK_SH)
+
+ for request in self.requests:
+ if request.start_streaming(size, mtime):
+ basic.FileSender().beginFileTransfer(self.streamfile, request) \
+ .addBoth(self.file_transfer_complete, request, filename)
+ else:
+ log.debug("Sending empty file to clients:%s" % (filename), 'CacheEntry')
+ for request in self.requests:
+ if request.start_streaming(size, mtime):
+ request.finish()
+ except Exception, e:
+ log.debug("Unexpected error: %s" % (e), 'CacheEntry')
+ raise
+
+ def file_transfer_complete(self, result, request, filename):
+ log.debug("transfer complete: " + filename, 'CacheEntry')
+ request.finish()
+ if len(self.requests)==0:
+ # Last file was sent
+ self.file_sent()
+
+ def create_directory(self):
+ """
+ Create directory for cache entry's file
+ """
+ if(not os.path.exists(self.filedir)):
+ os.makedirs(self.filedir)
+
+ def start_download(self):
+ """
+ Start file transfer from backend server
+ """
+ log.msg("start download:" + self.path, "CacheEntry")
+ self.backend.start_download(self)
+
+ def download_started(self, fetcher, size, mtime):
+ """
+ Callback from Fetcher
+ A fetcher has begun streaming this file
+ """
+ log.msg("download started:" + self.file_path, "CacheEntry")
+ self.state = self.STATE_DOWNLOAD
+ self.create_directory()
+ self.fetcher = fetcher
+ self.file_mtime = mtime
+
+ """
+ Use post_convert and gzip_convert regular expresions of the Fetcher
+ to gzip/gunzip file before and after download.
+ """
+
+ if self.filename == 'Packages.gz':
+ log.msg('TODO postconvert Packages.gz',CacheEntry)
+# if (fetcher and fetcher.post_convert.search(req.uri)
+# and not running.has_key(req.uri[:-3])):
+# log.debug("post converting: "+req.uri,'convert')
+# loop = LoopbackRequest(req)
+# loop.uri = req.uri[:-3]
+# loop.local_file = req.local_file[:-3]
+# loop.process()
+# loop.serve_if_cached=0
+# #FetcherGzip will attach as a request of the
+# #original Fetcher, efectively waiting for the
+# #original file if needed
+# gzip = FetcherGzip()
+# gzip.activate(loop, postconverting=1)
+
+
+ for req in self.requests:
+ req.start_streaming(size, mtime)
+
+
+ def download_data_received(self, data):
+ """
+ Callback from Fetcher
+ A block of data has been received from the streaming backend server
+ """
+ #log.msg("download_data_received:" + self.file_path, "CacheEntry")
+ for req in self.requests:
+ req.write(data)
+
+ if self.streamfile:
+ # save to tempfile (if it in use)
+ self.streamfile.append(data)
+
+ def download_data_end(self):
+ """
+ Callback from Fetcher
+ File streaming is complete
+ """
+ log.msg("download_data_end:" + self.file_path, "CacheEntry")
+ self.state = self.STATE_SENT
+
+ if self.streamfile is not None:
+ # File was streamed to clients
+ self.streamfile.close_and_rename(self.file_path)
+ self.streamfile = None
+
+ if self.file_mtime != None:
+ os.utime(self.file_path, (time.time(), self.file_mtime))
+ else:
+ log.debug("no local time: "+self.file_path,'Fetcher')
+ os.utime(self.file_path, (time.time(), 0))
+
+ for req in self.requests:
+ req.finish()
+
+ self.file_sent()
+
+ def download_failure(self, http_code, reason):
+ """
+ Download is not possible
+ """
+ log.msg("download_failure %s: (%s) %s"% (self.file_path, http_code, reason), "CacheEntry")
+
+ for request in self.requests:
+ request.finishCode(http_code, reason)
+ self.state = self.STATE_FAILED
+ ## Remove directory if file was not created
+ #if not os.path.exists(self.file_path):
+ #try:
+ #os.removedirs(self.factory.config.cache_dir + os.sep + self.backend.base)
+ #except:
+ #pass
+
+
+ def file_sent(self):
+ """
+ File has been sent successfully to at least one client
+ Update databases with statistics for this file
+ """
+ log.msg("file_sent:" + self.file_path, "CacheEntry")
+
+ self.state = self.STATE_SENT
+ self.fetcher = None
+ self.backend.file_served(self)
+ self.factory.file_served(self.cache_path)
+ self.factory.update_times[self.cache_path] = time.time()
+ self.state = self.STATE_NEW
+
+ def init_tempfile(self):
+ #log.msg("init_tempfile:" + self.file_path, "CacheEntry")
+ self.create_directory()
+ self.streamFilename = self.file_path + ".apDownload"
+ self.streamfile = StreamFile(self.streamFilename)
+
+class FileType:
+ """
+ This is just a way to distinguish between different filetypes.
+
+ self.regex: regular expression that files of this type should
+ match. It could probably be replaced with something simpler,
+ but... o well, it works.
+
+ self.contype: mime string for the content-type http header.
+
+ mutable: do the contents of this file ever change? Files such as
+ .deb and .dsc are never changed once they are created.
+
+ """
+ def __init__ (self, regex, contype, mutable):
+ self.regex = regex
+ self.contype = contype
+ self.mutable = mutable
+
+ def check (self, name):
+ "Returns true if name is of this filetype"
+ if self.regex.search(name):
+ return 1
+ else:
+ return 0
+
+# Set up the list of filetypes that we are prepared to deal with.
+# If it is not in this list, then we will ignore the file and
+# return an error.
+filetypes = (
+ FileType(re.compile(r"\.u?deb$"), "application/dpkg", 0),
+ FileType(re.compile(r"\.tar\.gz$"), "application/x-gtar", 0),
+ FileType(re.compile(r"\.dsc$"),"text/plain", 0),
+ FileType(re.compile(r"\.diff\.gz$"), "x-gzip", 0),
+ FileType(re.compile(r"\.bin$"), "application/octet-stream", 0),
+ FileType(re.compile(r"\.tgz$"), "application/x-gtar", 0),
+ FileType(re.compile(r"\.txt$"), "text/plain", 1),
+ FileType(re.compile(r"\.html$"), "text/html", 1),
+
+ FileType(re.compile(r"(?:^|/)(?:Packages|Release(?:\.gpg)?|Sources|(?:Contents|Translation)-[a-z0-9]+)"
+ r"(?:\.(?:gz|bz2))?$"),
+ "text/plain", 1),
+ FileType(re.compile(r"(?:^|/)(?:Packages|Sources|Contents-[a-z0-9]+)\.diff/Index$"),
+ "text/plain", 1),
+ FileType(re.compile(r"(?:^|/)(?:Packages|Sources|Contents-[a-z0-9]+)\.diff/[a-z0-9.-]+"
+ r"(?:\.(?:gz|bz2))?$"),
+ "text/plain", 0),
+
+ FileType(re.compile(r"\.rpm$"), "application/rpm", 0),
+
+ FileType(re.compile(r"(?:^|/)(?:pkglist|release|srclist)(?:\.(?:\w|-)+)?"
+ r"(?:\.(?:gz|bz2))?$"),
+ "text/plain", 1),
+ FileType(re.compile(r"\.gz$"), "x-gzip", 1)
+ )
+
+
+def findFileType(name):
+ "Look for the FileType of 'name'"
+ for type in filetypes:
+ if type.check(name):
+ return type
+ return None
+
+class StreamFile:
+ """
+ A temporary file used to stream to during download
+ """
+ CHUNKSIZE = 16384
+ def __init__(self, name, mode='w+b'):
+ log.debug("Creating file: " + name, 'cache')
+ self.file = file(name, mode, self.CHUNKSIZE)
+ self.name = name
+ def append(self, data):
+ self.file.write(data)
+ def size(self):
+ return self.file.tell()
+ def read_from(self, size=-1, start=None):
+ if start != None:
+ self.file.seek(start, SEEK_SET)
+ data = self.file.read(self, size)
+ self.file.seek(0, SEEK_END)
+ return data
+ def close(self):
+ log.debug("Closing file: " + self.name, 'cache')
+ self.file.close()
+ self.file = None
+ def close_and_rename(self, new_name):
+ """
+ File was successfully downloaded - close and rename to final destination
+ """
+ self.close()
+ if self.name == new_name:
+ return
+ log.debug("renaming file: %s->%s " % (self.name, new_name), 'cache')
+ os.rename(self.name, new_name)
+ self.name = new_name
+
+class FileVerifier:
+ """
+ Verifies the integrity of a cached file
+
+ self.deferred: a deferred that will be triggered when the command
+ completes, or if a timeout occurs.
+
+ Sample:
+
+ verifier = FileVerifier(self)
+ verifier.deferred.addCallbacks(callback_if_ok, callback_if_fail)
+ verifier.deferred.arm()
+
+ then either callback_if_ok or callback_if_fail will be called
+ when the subprocess finishes execution.
+
+ Checkout twisted.internet.defer.Deferred on how to use self.deferred
+
+ """
+ def __init__(self, path, config):
+ """
+ Initialise verificatoin
+ @param path: filename to be verified (absolute path)
+ @param config apConfig configuration (timeout paramter defines max time)
+ """
+ self.path = path
+ self.timeout = config.timeout
+ self.deferred = defer.Deferred() # Deferred that passes status back
+
+ def verify(self):
+ if re.search(r"\.deb$", self.path):
+ self.worker = FileVerifierProcess(self, '/usr/bin/dpkg', '--fsys-tarfile', self.path)
+ elif re.search(r"\.gz$", self.path):
+ self.worker = FileVerifierProcess(self, '/bin/gunzip', '-t', '-v', self.path)
+ elif re.search(r"\.bz2$", self.path):
+ self.worker = FileVerifierProcess(self, '/usr/bin/bunzip2', '--test', self.path)
+ else:
+ # Unknown file, just check it is not 0 size
+ try:
+ filesize = os.stat(self.path)[stat.ST_SIZE]
+ except:
+ filesize = 0
+
+ if(os.stat(self.path)[stat.ST_SIZE]) < 1:
+ self.failed("Zero length file")
+ else:
+ log.debug('Verification skipped for ' + self.path)
+ self.deferred.callback(None)
+ return self.deferred
+
+ class VerificationFailure:
+ def __init__(self, path, reason):
+ self.path = path
+ self.reason = reason
+ def failed(self, reason):
+ log.msg("cache file verification FAILED for %s: %s"%(self.path, reason), 'verify')
+ os.unlink(self.path)
+ self.deferred.errback(self.VerificationFailure(self.path, reason))
+
+ def passed(self):
+ log.debug("cache file verification passed: %s"%(self.path), 'verify')
+ self.parent.deferred.callback(None)
+
+class FileVerifierProcess(protocol.ProcessProtocol):
+ """
+ Verifies the integrity of a file by running an external command.
+ """
+ def __init__(self, verifier, *args):
+ self.parent = verifier
+
+ self.exe = args[0]
+ log.debug("starting verification: " + self.exe + " " + str(args),'FileVerifierProcess',8)
+ nullhandle = open("/dev/null", "w")
+ self.process = reactor.spawnProcess(self, self.exe, args, childFDs = { 0:"w", 1:nullhandle.fileno(), 2:"r" })
+ self.laterID = reactor.callLater(self.parent.timeout, self.timedout)
+
+ def connectionMade(self):
+ self.data = ''
+
+ def outReceived(self, data):
+ #we only care about errors
+ pass
+
+ def errReceived(self, data):
+ self.data = self.data + data
+
+ def timedout(self):
+ """
+ this should not happen, but if we timeout, we pretend that the
+ operation failed.
+ """
+ self.laterID=None
+ self.parent.failed("Verification process timed out")
+
+ def processEnded(self, reason=None):
+ """
+ This get's automatically called when the process finishes, we check
+ the status and report through the Deferred.
+ """
+ __pychecker__ = 'unusednames=reason'
+ #log.debug("Process Status: %d" %(self.process.status),'verify')
+ #log.debug(self.data, 'verify')
+ if self.laterID:
+ self.laterID.cancel()
+ if self.process.status == 0:
+ self.parent.deferred.callback(None)
+ else:
+ self.parent.failed(os.path.basename(self.exe)+ " failed")
Added: trunk/apt_proxy/fetchers.py
==============================================================================
--- (empty file)
+++ trunk/apt_proxy/fetchers.py Thu Aug 3 23:54:46 2006
@@ -0,0 +1,1103 @@
+#
+# Copyright (C) 2005 Chris Halls <halls at debian.org>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Fetchers for apt-proxy
+
+These classes implement the network code for fetching files from apt-proxy
+network backends
+"""
+
+import re, os, string, time, glob, signal, stat
+from twisted.web import static, http
+from twisted.internet import protocol, reactor, defer, error, abstract
+from twisted.python import failure
+from twisted.protocols import policies, ftp
+
+from misc import log
+
+
+class Fetcher:
+ """
+ This class manages the selection of a BackendServer and downloading from
+ that backend
+
+ """
+ cacheEntry = None
+ fetcher = None # connection-specific fetcher
+
+ def __init__(self):
+ self.backendServer = None
+ self.size = None # Size of file notified by fetcher's server
+ self.mtime = None # Mtime of file notified by fetcher's server
+
+ def start(self, cacheEntry):
+ self.cacheEntry = cacheEntry
+ log.debug("fetcher start:" + self.cacheEntry.filename, "fetcher")
+ self.backend = cacheEntry.backend
+ self.len_received = 0
+ self.deferred = defer.Deferred()
+ self.start_download()
+ return self.deferred
+
+ def activateNextBackendServer(self, fetcher):
+ """
+ Returns true if another BackendServer was found
+ """
+ if self.backendServer is None:
+ self.backendServer = self.backend.get_first_server()
+ if(self.backendServer == None):
+ log.err("No backend server found for backend " + self.backend.name, "fetcher")
+ return False
+ else:
+ # Look for the next possible BackendServer
+ self.backendServer = self.backend.get_next_server(self.backendServer)
+
+ if(self.backendServer == None):
+ # The attempt to retrieve a file from the BackendServer failed.
+ log.debug("no more Backends", "fetcher")
+ return False
+ self.connectToBackend()
+
+ def connectToBackend(self):
+ log.debug('Connecting to backend server %s' % (self.backendServer), 'fetcher')
+ self.fetcher = self.backendServer.fetcher(self.backendServer)
+ d = self.fetcher.connect()
+ d.addCallback(self.connected)
+ d.addErrback(self.connection_failed)
+ #fetcher.apEndTransfer(fetcher_class)
+
+ return True
+
+ def __str__(self):
+ return 'Fetcher server=%s file=%s' % (str(self.backendServer), self.cacheEntry.path)
+
+ def start_download(self):
+ """
+ Begin streaming file
+ Serve from cache or through the appropriate Fetcher
+ depending on the asociated backend.
+
+ Use post_convert and gzip_convert regular expresions of the Fetcher
+ to gzip/gunzip file before and after download.
+ """
+ log.debug("Downloading: " + self.cacheEntry.file_path, 'Fetcher')
+ #init_tempfile()
+ if self.backendServer is None:
+ self.activateNextBackendServer(self.fetcher)
+ elif self.fetcher is None:
+ self.connectToBackend()
+ else:
+ self.download()
+
+ def download_complete(self):
+ """
+ Download was successful
+ """
+ log.debug("download complete. Sent:%s bytes" % (self.len_received), "Fetcher")
+ if self.fetcher is not None and not self.fetcher.pipelining:
+ self.connection_closed(self.fetcher)
+ if self.len_received==0:
+ self.download_started() # Send status code to clients
+ self.cacheEntry.download_data_end()
+ self.deferred.callback((True, ""))
+
+ def fail_over(self, reason_code, reason_msg):
+ """
+ A non-fatal download has occured. Attempt download from next
+ backend
+ """
+ if not self.activateNextBackendServer(self.fetcher):
+ self.download_failed(reason_code, reason_msg)
+
+ def download_failed(self, reason_code, reason_msg):
+ #self.cacheEntry.download_data_end()
+ log.debug("download_failed: (%s) %s " %(reason_code, reason_msg), "Fetcher")
+ if self.fetcher is not None and not self.fetcher.pipelining:
+ self.connection_closed(self.fetcher)
+ self.cacheEntry.download_failure(reason_code, reason_msg)
+ self.deferred.callback((False, reason_msg))
+
+ def cancel_download(self):
+ if self.fetcher:
+ log.debug(
+ "telling fetchers to disconnect",'Fetcher')
+ self.fetcher.disconnect()
+ self.download_failed(None, "Download canceled")
+
+ def data_received(self, data, save=True):
+ """
+ File Data has been received from the backend server
+ @param data: raw data received from server
+ @param save: if true, save to disk (rsync saves file itself)
+ """
+ #log.debug("data_received: %s bytes" % len(data), 'Fetcher');
+ if not self.len_received:
+ self.download_started(save)
+ self.len_received = self.len_received + len(data)
+ self.cacheEntry.download_data_received(data)
+
+ def download_started(self, save=True):
+ if save:
+ self.cacheEntry.init_tempfile()
+ self.cacheEntry.download_started(self, self.size, self.mtime)
+
+
+ def server_size(self, len):
+ """
+ The server has sent the expected length of the file
+ """
+ self.size = len
+ log.debug("File size: " + str(len), 'Fetcher');
+
+ def server_mtime(self, mtime):
+ """
+ The server has sent the modification time of the file
+ """
+ self.mtime = mtime
+ log.debug("File mtime: " + str(mtime), 'Fetcher');
+
+ def transfer_complete(self):
+ """
+ All data has been transferred
+ """
+ log.debug("Finished receiving data: " + self.cacheEntry.filename, 'Fetcher');
+ self.download_complete()
+
+ def connection_failed(self, reason = None):
+ """
+ A fetcher has failed to connect to the backend server
+ """
+ msg = '[%s] Connection Failed: %s/%s'%(
+ self.backend.name,
+ self.backendServer.path, self.cacheEntry.path)
+
+ if reason:
+ msg = '%s (%s)'%(msg, reason.getErrorMessage())
+ log.debug("Connection Failed: "+str(reason), 'Fetcher')
+ log.err(msg)
+ self.fail_over(http.SERVICE_UNAVAILABLE, reason)
+
+ def connected(self, result):
+ log.debug("Connected to "+ self.backendServer.uri, 'Fetcher')
+ self.download()
+
+ def download(self):
+ log.debug('downloading:%s mtime:%s' % (self.cacheEntry.path, self.cacheEntry.file_mtime), 'Fetcher')
+ self.fetcher.download(self, self.cacheEntry.path, self.cacheEntry.file_mtime)
+
+ def disconnect(self):
+ if self.fetcher is not None:
+ log.debug('disconnect %s' % (self.cacheEntry.path), 'Fetcher')
+ self.fetcher.disconnect()
+ self.fetcher = None
+
+ def connection_closed(self, fetcher):
+ """
+ A protocol fetcher's connection has closed - we must reopen the connection
+ next time
+ """
+ log.debug("Connection closed for %s, state=%s" %(self.cacheEntry.path, self.cacheEntry.state), 'Fetcher')
+ #if self.cacheEntry.state in \
+ # (self.cacheEntry.STATE_CONNECTING, self.cacheEntry.STATE_DOWNLOAD, self.cacheEntry.STATE_SENDFILE):
+ # self.fetcher_internal_error("Backend connection closed")
+ if fetcher == self.fetcher:
+ self.fetcher = None
+
+ def file_not_found(self):
+ log.msg("(%s) file not found: %s" % (self.backendServer.path, self.cacheEntry.path), 'fetcher')
+ # TODO - failover?
+ self.download_failed(http.NOT_FOUND, "file not found on backend")
+
+ def fetcher_internal_error(self, reason):
+ log.msg("(%s) internal error: %s" % (self.backendServer.path, reason), 'fetcher')
+ self.download_failed(http.INTERNAL_SERVER_ERROR, reason)
+
+ def send_complete_file(self, filename):
+ """
+ Send a complete file (used by FileFetcher)
+ """
+ self.cacheEntry.transfer_file(filename)
+
+ def up_to_date(self):
+ """
+ Fetcher has determined that our cached file is up to date
+ so the file is sent from our cache
+ """
+ log.msg("(%s) up_to_date" % (self.cacheEntry.path), 'fetcher')
+ self.cacheEntry.send_cached_file()
+ if not self.fetcher.pipelining:
+ self.connection_closed(self.fetcher)
+ self.deferred.callback((True, ""))
+
+class FileFetcher:
+ """
+ A Fetcher that simply copies files from disk
+ """
+ pipelining = True
+ def __init__(self, backendServer):
+ self.backendServer = backendServer
+ self.isConnected = True # Always connected
+
+ def connect(self):
+ # We always conect
+ return defer.succeed(True)
+
+ def download(self, fetcher, uri, mtime):
+ """
+ Request download
+ %param fetcher: Fetcher class to receive callbacks
+ %param uri: URI of file to be downloaded within backend
+ %param mtime: Modification time of current file in cache
+ """
+ self.parent = fetcher
+ self.cache_mtime = mtime
+ self.request_uri = uri
+
+ self.local_file = self.backendServer.uri[len("file://"):] + '/' + uri
+ if not os.path.exists(self.local_file):
+ self.parent.file_not_found()
+ return
+
+ # start the transfer
+ self.parent.send_complete_file(self.local_file)
+
+ def disconnect(self):
+ pass
+
+class FetcherHttpClient(http.HTTPClient):
+ """
+ This class represents an Http conncetion to a backend
+ server. It is generated by the HttpFetcher class when
+ a connection is made to an http server
+ """
+ def __init__(self, parent):
+ self.parent = parent # HttpFetcher
+ self.proxy = self.parent.proxy
+ self.fetcher = None
+
+ def connectionMade(self):
+ """
+ Http connection made - inform parent, which will
+ trigger callbacks
+ """
+ self.parent.connected(self)
+
+ def download(self, fetcher, uri, mtime):
+ # Request file from backend
+ self.log_headers = None
+ self.close_on_completion = True
+ self.server_mtime = None
+ self.server_size = None
+ self.fetcher = fetcher
+ self.uri = uri
+ self.finished = False
+ backendServer = self.parent.backendServer
+ if self.proxy is None:
+ serverpath = backendServer.path
+ else:
+ serverpath = "http://" + backendServer.host
+ if backendServer.port != 80:
+ serverpath = serverpath + ":" + str(backendServer.port)
+ serverpath = serverpath + "/" + backendServer.path
+
+ #self.sendCommand(self.request.method,
+ self.sendCommand("GET", serverpath + "/" + uri)
+
+ self.sendHeader('host', backendServer.host)
+ if self.proxy is not None and self.proxy.user is not None:
+ self.sendHeader('Proxy-Authorization', "Basic " +
+ encodestring(self.proxy.user + ":" + self.proxy.password))
+
+ if mtime is not None:
+ datetime = http.datetimeToString(mtime)
+ self.sendHeader('if-modified-since', datetime)
+
+ self.endHeaders()
+
+ def download_complete(self):
+ if self.finished:
+ return
+ log.debug("File transfer complete",'http_client')
+ self.finished = True
+ #if self.close_on_completion:
+ #self.fetcher.disconnect()
+ #self.parent.connection_closed() # We don't have a persistent connection
+ #self.fetcher.disconnect()
+ #self.transport.loseConnection()
+ self.fetcher.download_complete()
+
+ def handleStatus(self, version, code, message):
+ __pychecker__ = 'unusednames=version,message'
+ log.debug('handleStatus %s - %s' % (code, message), 'http_client')
+ self.http_status = int(code)
+
+ #self.setResponseCode(self.http_status)
+
+ def handleResponse(self, buffer):
+ #log.debug('handleResponse, %s bytes' % (len(buffer)), 'http_client')
+ log.debug('handleResponse status=%s' % (self.http_status), 'http_client')
+ if self.http_status == http.NOT_MODIFIED:
+ log.debug("Backend server reported file is not modified: " + self.uri,'http_client')
+ self.fetcher.up_to_date()
+ elif self.http_status == http.NOT_FOUND:
+ log.debug("Not found on backend server",'http_client')
+ self.fetcher.file_not_found()
+ elif self.http_status == http.OK:
+ self.download_complete()
+ else:
+ log.debug("Unknown status code: %s" % (self.http_status),'http_client')
+ self.fetcher.fetcher_internal_error("Unknown status code: %s" % (self.http_status))
+
+ def handleHeader(self, key, value):
+
+ log.debug("Received: " + key + " " + str(value), 'http_client')
+ key = string.lower(key)
+
+ if key == 'last-modified':
+ self.server_mtime = http.stringToDatetime(value)
+ self.fetcher.server_mtime(self.server_mtime)
+ elif key == 'content-length':
+ self.server_size = int(value)
+ self.fetcher.server_size(self.server_size)
+ elif key == 'connection':
+ if value == "close":
+ log.debug('will close on completion', 'http_client')
+ self.close_on_completion = True
+ elif value == "keep-alive":
+ log.debug('will not close on completion', 'http_client')
+ self.close_on_completion = False
+
+ #def handleEndHeaders(self):
+ #if self.http_status == http.NOT_MODIFIED:
+ #log.debug("Backend server reported file is not modified: " + self.uri,'http_client')
+ #self.fetcher.up_to_date()
+ #elif self.http_status == http.NOT_FOUND:
+ #log.debug("Not found on backend server",'http_client')
+ #self.fetcher.file_not_found()
+ #else:
+ #log.debug("Unknown status code: %s" % (self.http_status),'http_client')
+
+ def rawDataReceived(self, data):
+ if self.http_status == http.OK:
+ self.fetcher.data_received(data)
+ #log.debug("Recieved: %s expected: %s" % (self.fetcher.len_received, self.server_size),'http_client')
+ if self.server_size is not None:
+ if self.fetcher.len_received >= self.server_size:
+ if self.fetcher.len_received == self.server_size:
+ pass
+ #self.download_complete()
+ else:
+ log.err("File transfer overrun! Expected size:%s Received size:%s" %
+ (self.server_size, self.fetcher.len_received), 'http_client')
+ self.parent.fetcher_internal_error("Data overrun")
+
+# def handleResponse(self, buffer):
+# if self.length == 0:
+# self.setResponseCode(http.NOT_FOUND)
+# # print "length: " + str(self.length), "response:", self.status_code
+# if self.http_status == http.NOT_MODIFIED:
+# self.apDataEnd(self.transfered, False)
+# else:
+# self.apDataEnd(self.transfered, True)
+
+ def lineReceived(self, line):
+ """