[Pkg-bitcoin-commits] [python-quamash] 256/269: bug fix: lock to prevent event race conditions
Jonas Smedegaard
dr at jones.dk
Fri Nov 24 11:26:42 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository python-quamash.
commit 04e795af8ea6163dc80600605bbb5a5066ac457a
Author: Peter Azmanov <peter.azmanov at transas.com>
Date: Wed Nov 18 17:58:33 2015 +0300
bug fix: lock to prevent event race conditions
---
quamash/_windows.py | 65 ++++++++++++++++++++++++++++++++++++-----------------
1 file changed, 44 insertions(+), 21 deletions(-)
diff --git a/quamash/_windows.py b/quamash/_windows.py
index c6e6721..510e0da 100644
--- a/quamash/_windows.py
+++ b/quamash/_windows.py
@@ -5,6 +5,7 @@
"""Windows specific Quamash functionality."""
import asyncio
+import threading
try:
import _winapi
@@ -57,6 +58,7 @@ class _IocpProactor(windows_events.IocpProactor):
def __init__(self):
self.__events = []
super(_IocpProactor, self).__init__()
+ self._lock = threading.Lock()
def select(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
@@ -70,6 +72,14 @@ class _IocpProactor(windows_events.IocpProactor):
self._logger.debug('Closing')
super(_IocpProactor, self).close()
+ def recv(self, conn, nbytes, flags=0):
+ with self._lock:
+ return super(_IocpProactor, self).recv(conn, nbytes, flags)
+
+ def send(self, conn, buf, flags=0):
+ with self._lock:
+ return super(_IocpProactor, self).send(conn, buf, flags)
+
def _poll(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
if timeout is None:
@@ -83,31 +93,44 @@ class _IocpProactor(windows_events.IocpProactor):
if ms >= UINT32_MAX:
raise ValueError("timeout too big")
- while True:
- # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
- # ms, threading.get_ident()))
- status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
- if status is None:
- break
+ with self._lock:
+ while True:
+ # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
+ # ms, threading.get_ident()))
+ status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
+ if status is None:
+ break
+
+ err, transferred, key, address = status
+ try:
+ f, ov, obj, callback = self._cache.pop(address)
+ except KeyError:
+ # key is either zero, or it is used to return a pipe
+ # handle which should be closed to avoid a leak.
+ if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
+ _winapi.CloseHandle(key)
+ ms = 0
+ continue
+
+ if obj in self._stopped_serving:
+ f.cancel()
+ # Futures might already be resolved or cancelled
+ elif not f.done():
+ self.__events.append((f, callback, transferred, key, ov))
- err, transferred, key, address = status
- try:
- f, ov, obj, callback = self._cache.pop(address)
- except KeyError:
- # key is either zero, or it is used to return a pipe
- # handle which should be closed to avoid a leak.
- if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
- _winapi.CloseHandle(key)
ms = 0
- continue
- if obj in self._stopped_serving:
- f.cancel()
- # Futures might already be resolved or cancelled
- elif not f.done():
- self.__events.append((f, callback, transferred, key, ov))
+ def _wait_for_handle(self, handle, timeout, _is_cancel):
+ with self._lock:
+ return super(_IocpProactor, self)._wait_for_handle(handle, timeout, _is_cancel)
+
+ def accept(self, listener):
+ with self._lock:
+ return super(_IocpProactor, self).accept(listener)
- ms = 0
+ def connect(self, conn, address):
+ with self._lock:
+ return super(_IocpProactor, self).connect(conn, address)
@with_logger
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-bitcoin/python-quamash.git
More information about the Pkg-bitcoin-commits
mailing list