From 1cb0255252aa5ef22483041188567f0915c98f49 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 20 Sep 2018 15:15:20 -0400
Subject: [PATCH] Manage stomp heartbeat in a separate thread.
Hopefully, this will resolve all kinds of issues we've been seeing with stomp
heartbeat management. With the call to `callLater`, the same thread is used
but unfortunately the call is often made *after* the `interval` elapses, even
when the queue is not that busy.
By putting heartbeat events in a separate thread, hopefully the hub can keep up
with the promise it made to the broker.
---
moksha.hub/moksha/hub/stomp/stomp.py | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/moksha.hub/moksha/hub/stomp/stomp.py b/moksha.hub/moksha/hub/stomp/stomp.py
index 8e469ffc..2ed485af 100644
--- a/moksha.hub/moksha/hub/stomp/stomp.py
+++ b/moksha.hub/moksha/hub/stomp/stomp.py
@@ -27,6 +27,7 @@ except ImportError:
pass
import logging
+import time
import six
from twisted.internet.protocol import ClientFactory
@@ -110,7 +111,7 @@ class StompHubExtension(MessagingHubExtension, ClientFactory):
# that often. Here, we'll send them twice as often to give plenty
# of room for latency.
# https://stomp.github.io/stomp-specification-1.2.html#Heart-beating
- fudge_factor = 0.5
+ fudge_factor = 0.75
self.start_heartbeat(interval=(interval * fudge_factor))
else:
log.debug("Skipping heartbeat initialization")
@@ -155,12 +156,13 @@ class StompHubExtension(MessagingHubExtension, ClientFactory):
def start_heartbeat(self, interval):
self._heartbeat_enabled = True
- reactor.callLater(interval / 1000.0, self.heartbeat, interval)
+ reactor.callInThread(self.heartbeat, interval)
def heartbeat(self, interval):
+ time.sleep(interval / 1000.0)
if self._heartbeat_enabled:
self.proto.transport.write(chr(0x0A).encode('utf-8')) # Lub-dub
- reactor.callLater(interval / 1000.0, self.heartbeat, interval)
+ reactor.callInThread(self.heartbeat, interval)
else:
log.debug("(heartbeat stopped)")
--
2.17.1