diff --git a/src/xpra/net/fake_jitter.py b/src/xpra/net/fake_jitter.py new file mode 100755 index 0000000000..adaaabed12 --- /dev/null +++ b/src/xpra/net/fake_jitter.py @@ -0,0 +1,52 @@ +# This file is part of Xpra. +# Copyright (C) 2011-2014 Antoine Martin +# Xpra is released under the terms of the GNU GPL v2, or, at your option, any +# later version. See the file COPYING for details. + +import time +from threading import Lock + +from xpra.log import Logger +log = Logger("network", "protocol") + + +class FakeJitter(object): + + def __init__(self, timeout_add, process_packet_cb, delay): + self.timeout_add = timeout_add + self.real_process_packet_cb = process_packet_cb + self.delay = delay + self.ok_delay = 10*1000 + self.switch_time = time.time() + self.delaying = False + self.pending = [] + self.lock = Lock() + self.flush() + + def start_buffering(self): + log.info("FakeJitter.start_buffering() will buffer for %s ms", self.delay) + self.delaying = True + self.timeout_add(self.delay, self.flush) + + def flush(self): + log.info("FakeJitter.flush() processing %s delayed packets", len(self.pending)) + try: + self.lock.acquire() + for proto, packet in self.pending: + self.real_process_packet_cb(proto, packet) + self.pending = [] + self.delaying = False + finally: + self.lock.release() + self.timeout_add(self.ok_delay, self.start_buffering) + log.info("FakeJitter.flush() will start buffering again in %s ms", self.ok_delay) + + def process_packet_cb(self, proto, packet): + try: + self.lock.acquire() + if self.delaying: + self.pending.append((proto, packet)) + else: + self.real_process_packet_cb(proto, packet) + finally: + self.lock.release() diff --git a/src/xpra/net/protocol.py b/src/xpra/net/protocol.py index d8c2f6cd32..a43d76bfd8 100644 --- a/src/xpra/net/protocol.py +++ b/src/xpra/net/protocol.py @@ -8,7 +8,6 @@ # but it works on win32, for whatever that's worth. -import time import sys from socket import error as socket_error import struct @@ -262,6 +261,7 @@ def __init__(self, scheduler, conn, process_packet_cb, get_packet_cb=None): self.idle_add = scheduler.idle_add self._conn = conn if FAKE_JITTER>0: + from xpra.net.fake_jitter import FakeJitter fj = FakeJitter(self.timeout_add, process_packet_cb) self._process_packet_cb = fj.process_packet_cb else: @@ -1034,45 +1034,3 @@ def terminate_queue_threads(self): self._read_queue.put_nowait(None) except: pass - - -class FakeJitter(object): - - def __init__(self, timeout_add, process_packet_cb): - self.timeout_add = timeout_add - self.real_process_packet_cb = process_packet_cb - self.delay = FAKE_JITTER - self.ok_delay = 10*1000 - self.switch_time = time.time() - self.delaying = False - self.pending = [] - self.lock = Lock() - self.flush() - - def start_buffering(self): - log.info("FakeJitter.start_buffering() will buffer for %s ms", FAKE_JITTER) - self.delaying = True - self.timeout_add(FAKE_JITTER, self.flush) - - def flush(self): - log.info("FakeJitter.flush() processing %s delayed packets", len(self.pending)) - try: - self.lock.acquire() - for proto, packet in self.pending: - self.real_process_packet_cb(proto, packet) - self.pending = [] - self.delaying = False - finally: - self.lock.release() - self.timeout_add(self.ok_delay, self.start_buffering) - log.info("FakeJitter.flush() will start buffering again in %s ms", self.ok_delay) - - def process_packet_cb(self, proto, packet): - try: - self.lock.acquire() - if self.delaying: - self.pending.append((proto, packet)) - else: - self.real_process_packet_cb(proto, packet) - finally: - self.lock.release()