diff --git a/stomp/connect.py b/stomp/connect.py index 66d49c2d..452bb8f2 100644 --- a/stomp/connect.py +++ b/stomp/connect.py @@ -145,14 +145,15 @@ def __init__(self, keepalive=None, vhost=None, auto_decode=True, - auto_content_length=True): + auto_content_length=True, + heart_beat_receive_scale=1.5): transport = Transport(host_and_ports, prefer_localhost, try_loopback_connect, reconnect_sleep_initial, reconnect_sleep_increase, reconnect_sleep_jitter, reconnect_sleep_max, reconnect_attempts_max, use_ssl, ssl_key_file, ssl_cert_file, ssl_ca_certs, ssl_cert_validator, wait_on_receipt, ssl_version, timeout, keepalive, vhost, auto_decode) BaseConnection.__init__(self, transport) - Protocol11.__init__(self, transport, heartbeats, auto_content_length) + Protocol11.__init__(self, transport, heartbeats, auto_content_length, heart_beat_receive_scale=heart_beat_receive_scale) def connect(self, *args, **kwargs): self.transport.start() @@ -196,14 +197,15 @@ def __init__(self, keepalive=None, vhost=None, auto_decode=True, - auto_content_length=True): + auto_content_length=True, + heart_beat_receive_scale=1.5): transport = Transport(host_and_ports, prefer_localhost, try_loopback_connect, reconnect_sleep_initial, reconnect_sleep_increase, reconnect_sleep_jitter, reconnect_sleep_max, reconnect_attempts_max, use_ssl, ssl_key_file, ssl_cert_file, ssl_ca_certs, ssl_cert_validator, wait_on_receipt, ssl_version, timeout, keepalive, vhost, auto_decode) BaseConnection.__init__(self, transport) - Protocol12.__init__(self, transport, heartbeats, auto_content_length) + Protocol12.__init__(self, transport, heartbeats, auto_content_length, heart_beat_receive_scale=heart_beat_receive_scale) def connect(self, *args, **kwargs): self.transport.start() diff --git a/stomp/listener.py b/stomp/listener.py index 4743b458..fcce7433 100644 --- a/stomp/listener.py +++ b/stomp/listener.py @@ -172,8 +172,10 @@ def on_connected(self, headers, body): if self.heartbeats != (0, 0): self.send_sleep = self.heartbeats[0] / 1000 - # receive gets an additional grace of 50% - self.receive_sleep = (self.heartbeats[1] / 1000) * 1.5 + # by default, receive gets an additional grace of 50% + # set a different heart-beat-receive-scale when creating the connection to override that + self.receive_sleep = (self.heartbeats[1] / 1000) * self.heart_beat_receive_scale + log.debug("Setting receive_sleep to %s", self.receive_sleep) # Give grace of receiving the first heartbeat self.received_heartbeat = monotonic() + self.receive_sleep diff --git a/stomp/protocol.py b/stomp/protocol.py index 324e286b..3221472a 100644 --- a/stomp/protocol.py +++ b/stomp/protocol.py @@ -214,13 +214,15 @@ class Protocol11(HeartbeatListener, ConnectionListener): :param transport: :param (int,int) heartbeats: :param bool auto_content_length: Whether to calculate and send the content-length header automatically if it has not been set + :param float heart_beat_receive_scale: how long to wait for a heartbeat before timing out, as a scale factor of receive time """ - def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True): + def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True, heart_beat_receive_scale=1.5): HeartbeatListener.__init__(self, heartbeats) self.transport = transport self.auto_content_length = auto_content_length transport.set_listener('protocol-listener', self) self.version = '1.1' + self.heart_beat_receive_scale = heart_beat_receive_scale def _escape_headers(self, headers): """ @@ -439,9 +441,10 @@ class Protocol12(Protocol11): :param transport: :param (int,int) heartbeats: :param bool auto_content_length: Whether to calculate and send the content-length header automatically if it has not been set + :param float heart_beat_receive_scale: how long to wait for a heartbeat before timing out, as a scale factor of receive time """ - def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True): - Protocol11.__init__(self, transport, heartbeats, auto_content_length) + def __init__(self, transport, heartbeats=(0, 0), auto_content_length=True, heart_beat_receive_scale=1.5): + Protocol11.__init__(self, transport, heartbeats, auto_content_length, heart_beat_receive_scale=heart_beat_receive_scale) self.version = '1.2' def _escape_headers(self, headers):