r376 - in branches/rewrite: . src
Otavio Salvador
partial-mirror-devel@lists.alioth.debian.org
Sat, 27 Nov 2004 18:57:54 -0700
Author: otavio
Date: Sat Nov 27 18:57:49 2004
New Revision: 376
Modified:
branches/rewrite/ (props changed)
branches/rewrite/src/Dists.py
branches/rewrite/src/Download.py
Log:
r388@nurf: otavio | 2004-11-28T01:57:13.995425Z
Add support for async transfers. So we now use only *one* thread for all downloads.
Modified: branches/rewrite/src/Dists.py
==============================================================================
--- branches/rewrite/src/Dists.py (original)
+++ branches/rewrite/src/Dists.py Sat Nov 27 18:57:49 2004
@@ -137,13 +137,16 @@
def update (self):
""" Get only files that need updates """
self._fill_files()
+ download = Download()
+ print "Adding on queue..."
for server, filename in self._files:
self._fs.create(os.path.dirname(filename))
- Download().get(server, filename)
+ download.get(server, filename)
- for d in Download().fetchers:
- d.join()
+ print "Requesting join..."
+ download.join()
+ print "Handling files..."
for server, filename in self._files:
if os.path.basename(str(filename)) != "Release":
self._fs.uncompress(filename)
Modified: branches/rewrite/src/Download.py
==============================================================================
--- branches/rewrite/src/Download.py (original)
+++ branches/rewrite/src/Download.py Sat Nov 27 18:57:49 2004
@@ -18,6 +18,7 @@
import pycurl
import threading
+import pdb
from Queue import Queue, Empty
from os.path import getsize, exists, getmtime
@@ -31,98 +32,133 @@
if item not in self.queue:
self.queue.append(item)
-class DownloadThread(threading.Thread):
- """ Implement a Download Thread and use a DisplayStatus class to
- notify the user about what it's currently doing."""
- _Lock = threading.Lock()
+class Curl:
+ def __init__(self, DisplayStatus):
+ self._curl = pycurl.Curl()
+ self._curl.setopt(pycurl.FOLLOWLOCATION, 1)
+ self._curl.setopt(pycurl.MAXREDIRS, 5)
+ self._curl.setopt(pycurl.NOSIGNAL, 0)
+ self._curl.setopt(pycurl.CONNECTTIMEOUT, 30)
+ self._curl.setopt(pycurl.PROGRESSFUNCTION, self.progress)
+ self._curl.setopt(pycurl.NOPROGRESS, 0)
+ self._curl.setopt(pycurl.FAILONERROR, 1)
+ self._fp = None
+ self._curl.parent = self
- DisplayStatus = None
-
- def __init__(self, info = None):
+ self.DisplayStatus = DisplayStatus
+
+ def setUrl(self, url):
+ self._curl.setopt(pycurl.URL, url)
+ self._curl.url = url
+
+ def setFilename(self, filename):
+ if self._fp is not None:
+ try:
+ self._fp.close()
+ except IOError:
+ self._fp = None
+ self._fp = open(filename, "wb")
+ self._curl.setopt(pycurl.WRITEFUNCTION, self._fp.write)
+
+ def perform(self):
+ self._curl.perform()
+
+ def close(self):
+ self.http_code = self._curl.getinfo(pycurl.HTTP_CODE)
+ self._fp.close()
+ self._fp = None
+ self._curl.close()
+
+ def progress(self, download_t, download_d, upload_t, upload_d):
+ # If we doesn't know how much data we will receive, return.
+ if download_t == 0.0:
+ return
+
+ if self.DisplayStatus[self._curl.url] == None:
+ self.DisplayStatus.start(self._curl.url, download_t)
+ self.DisplayStatus.update(self._curl.url, download_d)
+
+class DownloadFetcher(threading.Thread):
+ def __init__(self, info = None, downloaders = 3):
+ # Add DisplayStatus object.
if info == None:
self.DisplayStatus = TextDisplayStatus()
else:
self.DisplayStatus = info
+ # Create the needed pycurl objects to manage the connections.
+ self._multi = pycurl.CurlMulti()
+ self._multi.handles = []
+ self._free = Queue()
+ for i in range(downloaders):
+ curl = Curl(self.DisplayStatus)
+ self._multi.handles.append(curl)
+
+ self._running = False
+ map(lambda x: self._free.put(x), self._multi.handles)
+
threading.Thread.__init__(self)
-
+
def run(self):
+ if self._running:
+ return # We already running
+ self._running = True
while 1:
- try:
- url, filename = Download.queue.get_nowait()
- except Empty:
- # Doesn't have any other file to download so exit.
- return
-
- curl = pycurl.Curl()
- if exists(filename):
- # FIXME: Need check the size of file and if it doesn't
- # match download it (or resume).
- curl.setopt(pycurl.TIMECONDITION, pycurl.TIMECONDITION_IFMODSINCE)
- curl.setopt(pycurl.TIMEVALUE, getmtime(filename))
- #curl.setopt(pycurl.RESUME_FROM_LARGE, getsize(filename))
+ #pdb.set_trace()
+ while 1:
+ try:
+ fetcher = self._free.get_nowait()
+ except Empty:
+ break # No fetcher available, process the pending.
- curl.setopt(pycurl.FOLLOWLOCATION, 1)
- curl.setopt(pycurl.MAXREDIRS, 5)
- curl.setopt(pycurl.URL, url)
- curl.setopt(pycurl.NOSIGNAL, 1)
- curl.setopt(pycurl.CONNECTTIMEOUT, 30)
- curl.setopt(pycurl.PROGRESSFUNCTION, self.progress)
- curl.setopt(pycurl.NOPROGRESS, 0)
- curl.setopt(pycurl.FAILONERROR, 1)
-
- f = open(filename, "w+b")
- curl.setopt(pycurl.WRITEFUNCTION, f.write)
-
- self.url = url
-
- # Store counter information about it
- self._Lock.acquire()
- DownloadQueue.counter += 1
- self._counter = DownloadQueue.counter
- self._Lock.release()
-
- try:
- curl.perform()
- except Exception, e:
- # 416 is returned when we already have full file
- if e[1].split()[-1] != '416':
- self.DisplayStatus.errored(url, e)
-
- curl.close()
- try:
- f.close()
- except IOError:
- pass
-
- # Clear counter information about it
- self._Lock.acquire()
- DownloadQueue.counter -= 1
- self._counter = DownloadQueue.counter
- self._Lock.release()
-
- def progress(self, download_t, download_d, upload_t, upload_d):
- if self.DisplayStatus[self.url] == None:
- self.DisplayStatus.start(self.url, download_t)
- self.DisplayStatus.update(self.url, download_d)
+ try:
+ url, filename = Download.queue.get_nowait()
+ except Empty:
+ pass # Empty queue. Continue processing the others.
+
+ # Get a free fetcher
+ fetcher.setUrl(url)
+ fetcher.setFilename(filename)
+
+ self._multi.add_handle(fetcher._curl)
+
+ # Run the internal curl state machine for the multi stack
+ self._multi.select()
+ while 1:
+ ret, num_handles = self._multi.perform()
+ if ret != pycurl.E_CALL_MULTI_PERFORM:
+ break
+ if num_handles == 0:
+ self._running = False
+ return
+
+ # Check for curl objects which have terminated, and add them to the freelist
+ while 1:
+ num_q, ok_list, err_list = self._multi.info_read()
+ for c in ok_list:
+ self._multi.remove_handle(c)
+ self._free.put(c.parent)
+ for c, errno, errmsg in err_list:
+ self._multi.remove_handle(c)
+ self.DisplayStatus.errored(c.url, errmsg)
+ self._free.put(c.parent)
+ if num_q == 0:
+ break
class Download:
""" Download queue """
queue = DownloadQueue()
""" Fetcher to use """
- fetchers = []
+ fetcher = None
- def get(self, uri, destine, max_threads=3):
- self.queue.put((uri, destine))
+ def __init__(self, max=3):
+ # Create the needed fetcher.
+ Download.fetcher = DownloadFetcher(None, max)
+ Download.fetcher.start()
- # Alloc all needed threads.
- if len(self.fetchers) < max_threads:
- for i in range(max_threads - len(self.fetchers)):
- t = DownloadThread()
- self.fetchers.append(t)
- t.start()
-
+ def get(self, uri, destine):
+ self.queue.put((uri, destine))
+
def join(self):
- for t in self.fetchers:
- t.join()
+ self.fetcher.join()