Skip to content

Commit

Permalink
Merge pull request #180 from scottfweintraub/heartbeat-receive-scale
Browse files Browse the repository at this point in the history
Issue 179 Additional grace for received heartbeat timeouts should be alterable
  • Loading branch information
jasonrbriggs authored Jan 11, 2018
2 parents 77a7439 + 498613b commit f7b159b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
10 changes: 6 additions & 4 deletions stomp/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,15 @@ def __init__(self,
keepalive=None,
vhost=None,
auto_decode=True,
auto_content_length=True):
auto_content_length=True,
heart_beat_receive_scale=1.5):
transport = Transport(host_and_ports, prefer_localhost, try_loopback_connect,
reconnect_sleep_initial, reconnect_sleep_increase, reconnect_sleep_jitter,
reconnect_sleep_max, reconnect_attempts_max, use_ssl, ssl_key_file, ssl_cert_file,
ssl_ca_certs, ssl_cert_validator, wait_on_receipt, ssl_version, timeout,
keepalive, vhost, auto_decode)
BaseConnection.__init__(self, transport)
Protocol11.__init__(self, transport, heartbeats, auto_content_length)
Protocol11.__init__(self, transport, heartbeats, auto_content_length, heart_beat_receive_scale=heart_beat_receive_scale)

def connect(self, *args, **kwargs):
self.transport.start()
Expand Down Expand Up @@ -196,14 +197,15 @@ def __init__(self,
keepalive=None,
vhost=None,
auto_decode=True,
auto_content_length=True):
auto_content_length=True,
heart_beat_receive_scale=1.5):
transport = Transport(host_and_ports, prefer_localhost, try_loopback_connect,
reconnect_sleep_initial, reconnect_sleep_increase, reconnect_sleep_jitter,
reconnect_sleep_max, reconnect_attempts_max, use_ssl, ssl_key_file, ssl_cert_file,
ssl_ca_certs, ssl_cert_validator, wait_on_receipt, ssl_version, timeout,
keepalive, vhost, auto_decode)
BaseConnection.__init__(self, transport)
Protocol12.__init__(self, transport, heartbeats, auto_content_length)
Protocol12.__init__(self, transport, heartbeats, auto_content_length, heart_beat_receive_scale=heart_beat_receive_scale)

def connect(self, *args, **kwargs):
self.transport.start()
Expand Down
6 changes: 4 additions & 2 deletions stomp/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ def on_connected(self, headers, body):
if self.heartbeats != (0, 0):
self.send_sleep = self.heartbeats[0] / 1000

# receive gets an additional grace of 50%
self.receive_sleep = (self.heartbeats[1] / 1000) * 1.5
# by default, receive gets an additional grace of 50%
# set a different heart-beat-receive-scale when creating the connection to override that
self.receive_sleep = (self.heartbeats[1] / 1000) * self.heart_beat_receive_scale
log.debug("Setting receive_sleep to %s", self.receive_sleep)

# Give grace of receiving the first heartbeat
self.received_heartbeat = monotonic() + self.receive_sleep
Expand Down
9 changes: 6 additions & 3 deletions stomp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,15 @@ class Protocol11(HeartbeatListener, ConnectionListener):
:param transport:
:param (int,int) heartbeats:
:param bool auto_content_length: Whether to calculate and send the content-length header automatically if it has not been set
:param float heart_beat_receive_scale: how long to wait for a heartbeat before timing out, as a scale factor of receive time
"""
def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True):
def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True, heart_beat_receive_scale=1.5):
HeartbeatListener.__init__(self, heartbeats)
self.transport = transport
self.auto_content_length = auto_content_length
transport.set_listener('protocol-listener', self)
self.version = '1.1'
self.heart_beat_receive_scale = heart_beat_receive_scale

def _escape_headers(self, headers):
"""
Expand Down Expand Up @@ -439,9 +441,10 @@ class Protocol12(Protocol11):
:param transport:
:param (int,int) heartbeats:
:param bool auto_content_length: Whether to calculate and send the content-length header automatically if it has not been set
:param float heart_beat_receive_scale: how long to wait for a heartbeat before timing out, as a scale factor of receive time
"""
def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True):
Protocol11.__init__(self, transport, heartbeats, auto_content_length)
def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True, heart_beat_receive_scale=1.5):
Protocol11.__init__(self, transport, heartbeats, auto_content_length, heart_beat_receive_scale=heart_beat_receive_scale)
self.version = '1.2'

def _escape_headers(self, headers):
Expand Down

0 comments on commit f7b159b

Please sign in to comment.