r302 - in branches/rewrite: . src

Otavio Salvador partial-mirror-devel@lists.alioth.debian.org
Thu, 11 Nov 2004 09:44:19 -0700


Author: otavio
Date: Thu Nov 11 09:44:16 2004
New Revision: 302

Modified:
   branches/rewrite/   (props changed)
   branches/rewrite/src/Download.py
Log:
 r244@nurf:  otavio | 2004-11-11T16:43:15.151753Z
 Uses only one thread to DownloadFetchers and then do it async.


Modified: branches/rewrite/src/Download.py
==============================================================================
--- branches/rewrite/src/Download.py	(original)
+++ branches/rewrite/src/Download.py	Thu Nov 11 09:44:16 2004
@@ -30,84 +30,125 @@
         if item not in self.queue:
             self.queue.append(item)
 
-class DownloadThread(threading.Thread):
-    """ Implement a Download Thread and use a DisplayStatus class to
-    notify the user about what it's currently doing."""
-    _Lock = threading.Lock()
+class Curl:
+    def __init__(self, DisplayStatus):
+        self._curl = pycurl.Curl()
+        self._curl.setopt(pycurl.FOLLOWLOCATION, 1)
+        self._curl.setopt(pycurl.MAXREDIRS, 5)
+        self._curl.setopt(pycurl.NOSIGNAL, 1)
+        self._curl.setopt(pycurl.CONNECTTIMEOUT, 30)
+        self._curl.setopt(pycurl.PROGRESSFUNCTION, self.progress)
+        self._curl.setopt(pycurl.NOPROGRESS, 0)
+        self._curl.setopt(pycurl.TIMEOUT, 300)
+        self._curl.setopt(pycurl.FAILONERROR, 1)
+        self._fp = None
+        self._curl.parent = self
 
-    DisplayStatus = None
+        self.DisplayStatus = DisplayStatus
+        
+    def setUrl(self, url):
+        self._curl.setopt(pycurl.URL, url)
+        self._curl.url = url
+
+    def setFilename(self, filename):
+        if self._fp is not None:
+            try:
+                self._fp.close()
+            except IOError:
+                self._fp = None
+        self._fp = open(filename, "wb")
+        self._curl.setopt(pycurl.WRITEFUNCTION, self._fp.write)
+
+    def perform(self):
+        self._curl.perform()
+
+    def close(self):
+        self.http_code = self._curl.getinfo(pycurl.HTTP_CODE)
+        self._fp.close()
+        self._fp = None
+        self._curl.close()
+
+    def progress(self, download_t, download_d, upload_t, upload_d):
+        if self.DisplayStatus[self._curl.url] == None:
+            self.DisplayStatus.start(self._curl.url, download_t)
+        self.DisplayStatus.update(self._curl.url, download_d)
     
-    def __init__(self, info = None):
+
+class DownloadFetcher(threading.Thread):
+    def __init__(self, info = None, downloaders = 3):
+        # Add DisplayStatus object.
         if info == None:
             self.DisplayStatus = TextDisplayStatus()
         else:
             self.DisplayStatus = info
 
+        # Create the needed pycurl objects to manage the connections.
+        self._multi = pycurl.CurlMulti()
+        self._multi.handles = []
+        self._free = Queue()
+        for i in range(downloaders):
+            curl = Curl(self.DisplayStatus)
+            self._multi.handles.append(curl)
+
+        self._running = False
+        map(lambda x: self._free.put(x), self._multi.handles)
+
         threading.Thread.__init__(self)
-        
+
     def run(self):
+        if self._running:
+            return # We already running
+
+        self._running = True
         while 1:
-            try:
-                url, filename = Download.queue.get_nowait()
-            except Empty:
-                # Doesn't have any other file to download so exit.
-                return 
+            while 1:
+                try:
+                    fetcher = self._free.get_nowait()
+                except Empty:
+                    self._multi.select()
+                    break # No fetcher available, process the pending.
                 
-            f = open(filename, "wb")
-            curl = pycurl.Curl()
-            curl.setopt(pycurl.FOLLOWLOCATION, 1)
-            curl.setopt(pycurl.MAXREDIRS, 5)
-            curl.setopt(pycurl.URL, url)
-            curl.setopt(pycurl.WRITEFUNCTION, f.write)
-            curl.setopt(pycurl.NOSIGNAL, 1)
-            curl.setopt(pycurl.CONNECTTIMEOUT, 30)
-            curl.setopt(pycurl.PROGRESSFUNCTION, self.progress)
-            curl.setopt(pycurl.NOPROGRESS, 0)
-            curl.setopt(pycurl.TIMEOUT, 300)
-            curl.setopt(pycurl.FAILONERROR, 1)
-
-            self.url = url
-
-            # Store counter information about it
-            self._Lock.acquire()
-            DownloadQueue.counter += 1
-            self._counter = DownloadQueue.counter
-            self._Lock.release()
+                try:
+                    url, filename = Download.queue.get_nowait()
+                except Empty:
+                    # Doesn't have any other file to download so exit.
+                    self._running = False
+                    return
+
+                # Get a free fetcher
+                fetcher.setUrl(url)
+                fetcher.setFilename(filename)
+
+                self._multi.add_handle(fetcher._curl)
+
+            # Run the internal curl state machine for the multi stack
+            while 1:
+                self._multi.select()
+                ret, num_handles = self._multi.perform()
+                if ret != pycurl.E_CALL_MULTI_PERFORM:
+                    break
             
-            try:
-                curl.perform()
-            except Exception, e:
-                self.DisplayStatus.errored(url, e)
-
-            curl.close()
-            try:
-                f.close()
-            except IOError:
-                pass
-
-            # Clear counter information about it
-            self._Lock.acquire()
-            DownloadQueue.counter -= 1
-            self._counter = DownloadQueue.counter
-            self._Lock.release()
-
-    def progress(self, download_t, download_d, upload_t, upload_d):
-        if self.DisplayStatus[self.url] == None:
-            self.DisplayStatus.start(self.url, download_t)
-        self.DisplayStatus.update(self.url, download_d)
+            # Check for curl objects which have terminated, and add them to the freelist
+            while 1:
+                num_q, ok_list, err_list = self._multi.info_read()
+                for c in ok_list:
+                    self._multi.remove_handle(c)
+                    self._free.put(c.parent)
+                for c, errno, errmsg in err_list:
+                    self._multi.remove_handle(c)
+                    self.DisplayStatus.errored(c.url, errmsg)
+                    self._free.put(c.parent)
+                if num_q == 0:
+                    break
 
 class Download:
     """ Download queue """
     queue = DownloadQueue()
     """ Fetcher to use """
-    fetchers = []
+    fetcher = None
 
-    def __init__(self, uri, destine, max_threads=3):
+    def __init__(self, uri, destine, max_concurrent=3):
         self.queue.put((uri, destine))
-
-        # Alloc all needed threads.
-        if len(self.fetchers) < max_threads:
-            for i in range(max_threads - len(self.fetchers)):
-                t = DownloadThread()
-                self.fetchers.append(t)
-                t.start()
+        if Download.fetcher is None:
+            Download.fetcher = DownloadFetcher(None, max_concurrent)
+            Download.fetcher.start()