r302 - in branches/rewrite: . src
Otavio Salvador
partial-mirror-devel@lists.alioth.debian.org
Thu, 11 Nov 2004 09:44:19 -0700
Author: otavio
Date: Thu Nov 11 09:44:16 2004
New Revision: 302
Modified:
branches/rewrite/ (props changed)
branches/rewrite/src/Download.py
Log:
r244@nurf: otavio | 2004-11-11T16:43:15.151753Z
Uses only one thread to DownloadFetchers and then do it async.
Modified: branches/rewrite/src/Download.py
==============================================================================
--- branches/rewrite/src/Download.py (original)
+++ branches/rewrite/src/Download.py Thu Nov 11 09:44:16 2004
@@ -30,84 +30,125 @@
if item not in self.queue:
self.queue.append(item)
-class DownloadThread(threading.Thread):
- """ Implement a Download Thread and use a DisplayStatus class to
- notify the user about what it's currently doing."""
- _Lock = threading.Lock()
+class Curl:
+ def __init__(self, DisplayStatus):
+ self._curl = pycurl.Curl()
+ self._curl.setopt(pycurl.FOLLOWLOCATION, 1)
+ self._curl.setopt(pycurl.MAXREDIRS, 5)
+ self._curl.setopt(pycurl.NOSIGNAL, 1)
+ self._curl.setopt(pycurl.CONNECTTIMEOUT, 30)
+ self._curl.setopt(pycurl.PROGRESSFUNCTION, self.progress)
+ self._curl.setopt(pycurl.NOPROGRESS, 0)
+ self._curl.setopt(pycurl.TIMEOUT, 300)
+ self._curl.setopt(pycurl.FAILONERROR, 1)
+ self._fp = None
+ self._curl.parent = self
- DisplayStatus = None
+ self.DisplayStatus = DisplayStatus
+
+ def setUrl(self, url):
+ self._curl.setopt(pycurl.URL, url)
+ self._curl.url = url
+
+ def setFilename(self, filename):
+ if self._fp is not None:
+ try:
+ self._fp.close()
+ except IOError:
+ self._fp = None
+ self._fp = open(filename, "wb")
+ self._curl.setopt(pycurl.WRITEFUNCTION, self._fp.write)
+
+ def perform(self):
+ self._curl.perform()
+
+ def close(self):
+ self.http_code = self._curl.getinfo(pycurl.HTTP_CODE)
+ self._fp.close()
+ self._fp = None
+ self._curl.close()
+
+ def progress(self, download_t, download_d, upload_t, upload_d):
+ if self.DisplayStatus[self._curl.url] == None:
+ self.DisplayStatus.start(self._curl.url, download_t)
+ self.DisplayStatus.update(self._curl.url, download_d)
- def __init__(self, info = None):
+
+class DownloadFetcher(threading.Thread):
+ def __init__(self, info = None, downloaders = 3):
+ # Add DisplayStatus object.
if info == None:
self.DisplayStatus = TextDisplayStatus()
else:
self.DisplayStatus = info
+ # Create the needed pycurl objects to manage the connections.
+ self._multi = pycurl.CurlMulti()
+ self._multi.handles = []
+ self._free = Queue()
+ for i in range(downloaders):
+ curl = Curl(self.DisplayStatus)
+ self._multi.handles.append(curl)
+
+ self._running = False
+ map(lambda x: self._free.put(x), self._multi.handles)
+
threading.Thread.__init__(self)
-
+
def run(self):
+ if self._running:
+ return # We already running
+
+ self._running = True
while 1:
- try:
- url, filename = Download.queue.get_nowait()
- except Empty:
- # Doesn't have any other file to download so exit.
- return
+ while 1:
+ try:
+ fetcher = self._free.get_nowait()
+ except Empty:
+ self._multi.select()
+ break # No fetcher available, process the pending.
- f = open(filename, "wb")
- curl = pycurl.Curl()
- curl.setopt(pycurl.FOLLOWLOCATION, 1)
- curl.setopt(pycurl.MAXREDIRS, 5)
- curl.setopt(pycurl.URL, url)
- curl.setopt(pycurl.WRITEFUNCTION, f.write)
- curl.setopt(pycurl.NOSIGNAL, 1)
- curl.setopt(pycurl.CONNECTTIMEOUT, 30)
- curl.setopt(pycurl.PROGRESSFUNCTION, self.progress)
- curl.setopt(pycurl.NOPROGRESS, 0)
- curl.setopt(pycurl.TIMEOUT, 300)
- curl.setopt(pycurl.FAILONERROR, 1)
-
- self.url = url
-
- # Store counter information about it
- self._Lock.acquire()
- DownloadQueue.counter += 1
- self._counter = DownloadQueue.counter
- self._Lock.release()
+ try:
+ url, filename = Download.queue.get_nowait()
+ except Empty:
+ # Doesn't have any other file to download so exit.
+ self._running = False
+ return
+
+ # Get a free fetcher
+ fetcher.setUrl(url)
+ fetcher.setFilename(filename)
+
+ self._multi.add_handle(fetcher._curl)
+
+ # Run the internal curl state machine for the multi stack
+ while 1:
+ self._multi.select()
+ ret, num_handles = self._multi.perform()
+ if ret != pycurl.E_CALL_MULTI_PERFORM:
+ break
- try:
- curl.perform()
- except Exception, e:
- self.DisplayStatus.errored(url, e)
-
- curl.close()
- try:
- f.close()
- except IOError:
- pass
-
- # Clear counter information about it
- self._Lock.acquire()
- DownloadQueue.counter -= 1
- self._counter = DownloadQueue.counter
- self._Lock.release()
-
- def progress(self, download_t, download_d, upload_t, upload_d):
- if self.DisplayStatus[self.url] == None:
- self.DisplayStatus.start(self.url, download_t)
- self.DisplayStatus.update(self.url, download_d)
+ # Check for curl objects which have terminated, and add them to the freelist
+ while 1:
+ num_q, ok_list, err_list = self._multi.info_read()
+ for c in ok_list:
+ self._multi.remove_handle(c)
+ self._free.put(c.parent)
+ for c, errno, errmsg in err_list:
+ self._multi.remove_handle(c)
+ self.DisplayStatus.errored(c.url, errmsg)
+ self._free.put(c.parent)
+ if num_q == 0:
+ break
class Download:
""" Download queue """
queue = DownloadQueue()
""" Fetcher to use """
- fetchers = []
+ fetcher = None
- def __init__(self, uri, destine, max_threads=3):
+ def __init__(self, uri, destine, max_concurrent=3):
self.queue.put((uri, destine))
-
- # Alloc all needed threads.
- if len(self.fetchers) < max_threads:
- for i in range(max_threads - len(self.fetchers)):
- t = DownloadThread()
- self.fetchers.append(t)
- t.start()
+ if Download.fetcher is None:
+ Download.fetcher = DownloadFetcher(None, max_concurrent)
+ Download.fetcher.start()