r376 - in branches/rewrite: . src

Otavio Salvador partial-mirror-devel@lists.alioth.debian.org
Sat, 27 Nov 2004 18:57:54 -0700


Author: otavio
Date: Sat Nov 27 18:57:49 2004
New Revision: 376

Modified:
   branches/rewrite/   (props changed)
   branches/rewrite/src/Dists.py
   branches/rewrite/src/Download.py
Log:
 r388@nurf:  otavio | 2004-11-28T01:57:13.995425Z
 Add support for async transfers. So we now use only *one* thread for all downloads.


Modified: branches/rewrite/src/Dists.py
==============================================================================
--- branches/rewrite/src/Dists.py	(original)
+++ branches/rewrite/src/Dists.py	Sat Nov 27 18:57:49 2004
@@ -137,13 +137,16 @@
     def update (self):
         """ Get only files that need updates """
         self._fill_files()
+        download = Download()
+        print "Adding on queue..."
         for server, filename in self._files:
             self._fs.create(os.path.dirname(filename))
-            Download().get(server, filename)
+            download.get(server, filename)
 
-        for d in Download().fetchers:
-            d.join()
+        print "Requesting join..."
+        download.join()
 
+        print "Handling files..."
         for server, filename in self._files:
             if os.path.basename(str(filename)) != "Release":
                 self._fs.uncompress(filename)

Modified: branches/rewrite/src/Download.py
==============================================================================
--- branches/rewrite/src/Download.py	(original)
+++ branches/rewrite/src/Download.py	Sat Nov 27 18:57:49 2004
@@ -18,6 +18,7 @@
 
 import pycurl
 import threading
+import pdb
 
 from Queue import Queue, Empty
 from os.path import getsize, exists, getmtime
@@ -31,98 +32,133 @@
         if item not in self.queue:
             self.queue.append(item)
 
-class DownloadThread(threading.Thread):
-    """ Implement a Download Thread and use a DisplayStatus class to
-    notify the user about what it's currently doing."""
-    _Lock = threading.Lock()
+class Curl:
+    def __init__(self, DisplayStatus):
+        self._curl = pycurl.Curl()
+        self._curl.setopt(pycurl.FOLLOWLOCATION, 1)
+        self._curl.setopt(pycurl.MAXREDIRS, 5)
+        self._curl.setopt(pycurl.NOSIGNAL, 0)
+        self._curl.setopt(pycurl.CONNECTTIMEOUT, 30)
+        self._curl.setopt(pycurl.PROGRESSFUNCTION, self.progress)
+        self._curl.setopt(pycurl.NOPROGRESS, 0)
+        self._curl.setopt(pycurl.FAILONERROR, 1)
+        self._fp = None
+        self._curl.parent = self
 
-    DisplayStatus = None
-    
-    def __init__(self, info = None):
+        self.DisplayStatus = DisplayStatus
+        
+    def setUrl(self, url):
+        self._curl.setopt(pycurl.URL, url)
+        self._curl.url = url
+
+    def setFilename(self, filename):
+        if self._fp is not None:
+            try:
+                self._fp.close()
+            except IOError:
+                self._fp = None
+        self._fp = open(filename, "wb")
+        self._curl.setopt(pycurl.WRITEFUNCTION, self._fp.write)
+
+    def perform(self):
+        self._curl.perform()
+
+    def close(self):
+        self.http_code = self._curl.getinfo(pycurl.HTTP_CODE)
+        self._fp.close()
+        self._fp = None
+        self._curl.close()
+
+    def progress(self, download_t, download_d, upload_t, upload_d):
+        # If we doesn't know how much data we will receive, return.
+        if download_t == 0.0:
+            return
+        
+        if self.DisplayStatus[self._curl.url] == None:
+            self.DisplayStatus.start(self._curl.url, download_t)
+        self.DisplayStatus.update(self._curl.url, download_d)
+
+class DownloadFetcher(threading.Thread):
+    def __init__(self, info = None, downloaders = 3):
+        # Add DisplayStatus object.
         if info == None:
             self.DisplayStatus = TextDisplayStatus()
         else:
             self.DisplayStatus = info
 
+        # Create the needed pycurl objects to manage the connections.
+        self._multi = pycurl.CurlMulti()
+        self._multi.handles = []
+        self._free = Queue()
+        for i in range(downloaders):
+            curl = Curl(self.DisplayStatus)
+            self._multi.handles.append(curl)
+
+        self._running = False
+        map(lambda x: self._free.put(x), self._multi.handles)
+
         threading.Thread.__init__(self)
-        
+
     def run(self):
+        if self._running:
+            return # We already running
 
+        self._running = True
         while 1:
-            try:
-                url, filename = Download.queue.get_nowait()
-            except Empty:
-                # Doesn't have any other file to download so exit.
-                return 
-
-            curl = pycurl.Curl()
-            if exists(filename):
-                # FIXME: Need check the size of file and if it doesn't
-                # match download it (or resume).
-                curl.setopt(pycurl.TIMECONDITION, pycurl.TIMECONDITION_IFMODSINCE)
-                curl.setopt(pycurl.TIMEVALUE, getmtime(filename))
-                #curl.setopt(pycurl.RESUME_FROM_LARGE, getsize(filename))
+            #pdb.set_trace()
+            while 1:
+                try:
+                    fetcher = self._free.get_nowait()
+                except Empty:
+                    break # No fetcher available, process the pending.
                 
-            curl.setopt(pycurl.FOLLOWLOCATION, 1)
-            curl.setopt(pycurl.MAXREDIRS, 5)
-            curl.setopt(pycurl.URL, url)
-            curl.setopt(pycurl.NOSIGNAL, 1)
-            curl.setopt(pycurl.CONNECTTIMEOUT, 30)
-            curl.setopt(pycurl.PROGRESSFUNCTION, self.progress)
-            curl.setopt(pycurl.NOPROGRESS, 0)
-            curl.setopt(pycurl.FAILONERROR, 1)
-
-            f = open(filename, "w+b")
-            curl.setopt(pycurl.WRITEFUNCTION, f.write)
-
-            self.url = url
-
-            # Store counter information about it
-            self._Lock.acquire()
-            DownloadQueue.counter += 1
-            self._counter = DownloadQueue.counter
-            self._Lock.release()
-            
-            try:
-                curl.perform()
-            except Exception, e:
-                # 416 is returned when we already have full file
-                if e[1].split()[-1] != '416':
-                    self.DisplayStatus.errored(url, e)
-
-            curl.close()
-            try:
-                f.close()
-            except IOError:
-                pass
-
-            # Clear counter information about it
-            self._Lock.acquire()
-            DownloadQueue.counter -= 1
-            self._counter = DownloadQueue.counter
-            self._Lock.release()
-
-    def progress(self, download_t, download_d, upload_t, upload_d):
-        if self.DisplayStatus[self.url] == None:
-            self.DisplayStatus.start(self.url, download_t)
-        self.DisplayStatus.update(self.url, download_d)
+                try:
+                    url, filename = Download.queue.get_nowait()
+                except Empty:
+                    pass # Empty queue. Continue processing the others.
+
+                # Get a free fetcher
+                fetcher.setUrl(url)
+                fetcher.setFilename(filename)
+
+                self._multi.add_handle(fetcher._curl)
+
+            # Run the internal curl state machine for the multi stack
+            self._multi.select()
+            while 1:
+                ret, num_handles = self._multi.perform()
+                if ret != pycurl.E_CALL_MULTI_PERFORM:
+                    break
+                if num_handles == 0:
+                    self._running = False
+                    return
+
+            # Check for curl objects which have terminated, and add them to the freelist
+            while 1:
+                num_q, ok_list, err_list = self._multi.info_read()
+                for c in ok_list:
+                    self._multi.remove_handle(c)
+                    self._free.put(c.parent)
+                for c, errno, errmsg in err_list:
+                    self._multi.remove_handle(c)
+                    self.DisplayStatus.errored(c.url, errmsg)
+                    self._free.put(c.parent)
+                if num_q == 0:
+                    break
 
 class Download:
     """ Download queue """
     queue = DownloadQueue()
     """ Fetcher to use """
-    fetchers = []
+    fetcher = None
 
-    def get(self, uri, destine, max_threads=3):
-        self.queue.put((uri, destine))
+    def __init__(self, max=3):
+        # Create the needed fetcher.
+        Download.fetcher = DownloadFetcher(None, max)
+        Download.fetcher.start()
 
-        # Alloc all needed threads.
-        if len(self.fetchers) < max_threads:
-            for i in range(max_threads - len(self.fetchers)):
-                t = DownloadThread()
-                self.fetchers.append(t)
-                t.start()
-            
+    def get(self, uri, destine):
+        self.queue.put((uri, destine))
+                        
     def join(self):
-        for t in self.fetchers:
-            t.join()
+        self.fetcher.join()