Blob Blame History Raw
From 1cb0255252aa5ef22483041188567f0915c98f49 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 20 Sep 2018 15:15:20 -0400
Subject: [PATCH] Manage stomp heartbeat in a separate thread.

Hopefully, this will resolve all kinds of issues we've been seeing with stomp
heartbeat management.  With the call to `callLater`, the same thread is used
but unfortunately the call is often made *after* the `interval` elapses, even
when the queue is not that busy.

By putting heartbeat events in a separate thread, hopefully the hub can keep up
with the promise it made to the broker.
---
 moksha.hub/moksha/hub/stomp/stomp.py | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/moksha.hub/moksha/hub/stomp/stomp.py b/moksha.hub/moksha/hub/stomp/stomp.py
index 8e469ffc..2ed485af 100644
--- a/moksha.hub/moksha/hub/stomp/stomp.py
+++ b/moksha.hub/moksha/hub/stomp/stomp.py
@@ -27,6 +27,7 @@ except ImportError:
         pass
 
 import logging
+import time
 
 import six
 from twisted.internet.protocol import ClientFactory
@@ -110,7 +111,7 @@ class StompHubExtension(MessagingHubExtension, ClientFactory):
             # that often.  Here, we'll send them twice as often to give plenty
             # of room for latency.
             # https://stomp.github.io/stomp-specification-1.2.html#Heart-beating
-            fudge_factor = 0.5
+            fudge_factor = 0.75
             self.start_heartbeat(interval=(interval * fudge_factor))
         else:
             log.debug("Skipping heartbeat initialization")
@@ -155,12 +156,13 @@ class StompHubExtension(MessagingHubExtension, ClientFactory):
 
     def start_heartbeat(self, interval):
         self._heartbeat_enabled = True
-        reactor.callLater(interval / 1000.0, self.heartbeat, interval)
+        reactor.callInThread(self.heartbeat, interval)
 
     def heartbeat(self, interval):
+        time.sleep(interval / 1000.0)
         if self._heartbeat_enabled:
             self.proto.transport.write(chr(0x0A).encode('utf-8'))  # Lub-dub
-            reactor.callLater(interval / 1000.0, self.heartbeat, interval)
+            reactor.callInThread(self.heartbeat, interval)
         else:
             log.debug("(heartbeat stopped)")
 
-- 
2.17.1