Blob Blame History Raw
From 547a06478d51b150257578304aabf2ee7b585d01 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 09:40:11 -0500
Subject: [PATCH 1/8] Implement and test our own multiprocessing Pool.

---
 fmn/lib/multiproc.py            | 70 +++++++++++++++++++++++++++++++++++++++++
 fmn/lib/tests/test_multiproc.py | 68 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 138 insertions(+)
 create mode 100644 fmn/lib/multiproc.py
 create mode 100644 fmn/lib/tests/test_multiproc.py

diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
new file mode 100644
index 0000000..ae233f0
--- /dev/null
+++ b/fmn/lib/multiproc.py
@@ -0,0 +1,70 @@
+import logging
+
+from multiprocessing import Queue, Process
+
+__all__ = ['FixedPool']
+
+log = logging.getLogger('fedmsg')
+
+
+class FixedPool(object):
+    """ Our own multiprocessing pool.
+
+    This avoids the 'importable' requirement of multiprocessing.Pool.
+    """
+    def __init__(self, N):
+        log.debug('Initializing fmn multiproc pool, size %i' % N)
+        self.incoming = Queue(1)
+        self.outgoing = Queue()
+        self.processes = []
+        self.N = N
+
+    @property
+    def targeted(self):
+        # If I have processes, then I am targetted on some func.
+        return bool(self.processes)
+
+    def target(self, fn):
+        log.info('Multiprocessing pool targeting %r' % fn)
+        if self.targeted:
+            self.close()
+
+        args = (fn, self.incoming, self.outgoing)
+        self.processes = [
+            Process(target=work, args=args) for i in range(self.N)]
+
+        for p in self.processes:
+            p.daemon = True
+            p.start()
+
+        log.info('Multiprocessing pool targeting done.')
+
+    def apply(self, items):
+        """ Items are not guaranteed to come back in the same order. """
+        if not self.targeted:
+            raise ValueError('.target(fn) must be called before .apply(items)')
+        for item in items:
+            if not isinstance(item, tuple):
+                item = item,
+            self.incoming.put(item)
+        return set(self.outgoing.get() for i in range(len(items)))
+
+    def close(self):
+        if not self.targeted:
+            log.warning('No need to close pool.  Not yet targeted.')
+            return
+        log.info('Closing fmn multiproc pool.')
+        for _ in self.processes:
+            self.incoming.put(StopIteration)
+        log.debug('Waiting on workers to die.')
+        for p in self.processes:
+            p.join()
+        self.processes = []
+        log.info('Multiproc pool closed.')
+
+def work(fn, incoming, outgoing):
+    while True:
+        args = incoming.get()
+        if args is StopIteration:
+            break
+        outgoing.put(fn(*args))
diff --git a/fmn/lib/tests/test_multiproc.py b/fmn/lib/tests/test_multiproc.py
new file mode 100644
index 0000000..3155cd7
--- /dev/null
+++ b/fmn/lib/tests/test_multiproc.py
@@ -0,0 +1,68 @@
+from nose.tools import eq_, raises
+
+import fmn.lib.tests
+import fmn.lib.multiproc
+
+
+class TestMultiproc(fmn.lib.tests.Base):
+    def test_single_map(self):
+        pool = fmn.lib.multiproc.FixedPool(1)
+        try:
+            def fn(x):
+                return x + 2
+            pool.target(fn)
+            results = pool.apply(range(2))
+            eq_(results, set([2, 3]))
+        finally:
+            pool.close()
+
+    def test_multi_map(self):
+        pool = fmn.lib.multiproc.FixedPool(5)
+        try:
+            def fn(x):
+                return x + 2
+            pool.target(fn)
+            results = pool.apply(range(10))
+            eq_(results, set([2, 3, 4, 5, 6, 7, 8, 9, 10, 11]))
+        finally:
+            pool.close()
+
+    def test_is_targeted(self):
+        pool = fmn.lib.multiproc.FixedPool(5)
+        eq_(pool.targeted, False)
+        try:
+            def fn(x):
+                return x + 2
+            pool.target(fn)
+            eq_(pool.targeted, True)
+        finally:
+            pool.close()
+
+    @raises(ValueError)
+    def test_target_before_apply(self):
+        pool = fmn.lib.multiproc.FixedPool(5)
+        pool.apply([1, 2, 3])
+
+
+    def test_reuse(self):
+        pool = fmn.lib.multiproc.FixedPool(5)
+
+        def fn1(x):
+            return x + 1
+
+        def fn2(x):
+            return x + 2
+
+        try:
+            pool.target(fn1)
+            results = pool.apply([1, 2])
+            eq_(results, set([2, 3]))
+        finally:
+            pool.close()
+
+        try:
+            pool.target(fn2)
+            results = pool.apply([1, 2])
+            eq_(results, set([3, 4]))
+        finally:
+            pool.close()

From 00927b3a950fbdd042b1916a482d7b8a1947ab09 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 09:52:35 -0500
Subject: [PATCH 2/8] Sets run into problems with unhashable types, so avoid
 that.

---
 fmn/lib/multiproc.py            | 2 +-
 fmn/lib/tests/test_multiproc.py | 8 ++++----
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
index ae233f0..f260356 100644
--- a/fmn/lib/multiproc.py
+++ b/fmn/lib/multiproc.py
@@ -47,7 +47,7 @@ def apply(self, items):
             if not isinstance(item, tuple):
                 item = item,
             self.incoming.put(item)
-        return set(self.outgoing.get() for i in range(len(items)))
+        return [self.outgoing.get() for i in range(len(items))]
 
     def close(self):
         if not self.targeted:
diff --git a/fmn/lib/tests/test_multiproc.py b/fmn/lib/tests/test_multiproc.py
index 3155cd7..02f5951 100644
--- a/fmn/lib/tests/test_multiproc.py
+++ b/fmn/lib/tests/test_multiproc.py
@@ -12,7 +12,7 @@ def fn(x):
                 return x + 2
             pool.target(fn)
             results = pool.apply(range(2))
-            eq_(results, set([2, 3]))
+            eq_(set(results), set([2, 3]))
         finally:
             pool.close()
 
@@ -23,7 +23,7 @@ def fn(x):
                 return x + 2
             pool.target(fn)
             results = pool.apply(range(10))
-            eq_(results, set([2, 3, 4, 5, 6, 7, 8, 9, 10, 11]))
+            eq_(set(results), set([2, 3, 4, 5, 6, 7, 8, 9, 10, 11]))
         finally:
             pool.close()
 
@@ -56,13 +56,13 @@ def fn2(x):
         try:
             pool.target(fn1)
             results = pool.apply([1, 2])
-            eq_(results, set([2, 3]))
+            eq_(set(results), set([2, 3]))
         finally:
             pool.close()
 
         try:
             pool.target(fn2)
             results = pool.apply([1, 2])
-            eq_(results, set([3, 4]))
+            eq_(set(results), set([3, 4]))
         finally:
             pool.close()

From 47606f69d98afaeda9f15eb692f213fe00400534 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 10:00:23 -0500
Subject: [PATCH 3/8] Rework the recipients function to use multiproc if
 provided.

---
 fmn/lib/__init__.py | 64 +++++++++++++++++++++++++++++++++++------------------
 1 file changed, 42 insertions(+), 22 deletions(-)

diff --git a/fmn/lib/__init__.py b/fmn/lib/__init__.py
index e693d44..f444fc8 100644
--- a/fmn/lib/__init__.py
+++ b/fmn/lib/__init__.py
@@ -25,7 +25,7 @@
 gcm_regex = r'^[\w-]+$'
 
 
-def recipients(preferences, message, valid_paths, config):
+def recipients(preferences, message, valid_paths, config, pool=None):
     """ The main API function.
 
     Accepts a fedmsg message as an argument.
@@ -33,33 +33,53 @@ def recipients(preferences, message, valid_paths, config):
     Returns a dict mapping context names to lists of recipients.
     """
 
-    rule_cache = dict()
     results = defaultdict(list)
-    notified = set()
 
-    for preference in preferences:
-        user = preference['user']
-        context = preference['context']
-        if (user['openid'], context['name']) in notified:
-            continue
+    arguments = [(p, message, valid_paths, config) for p in preferences]
+
+    if not pool:
+        fn = lambda args: _recipient(*args)
+        result_list = map(fn, arguments)
+    else:
+        if not pool.targeted:
+            pool.target(_recipient)
+        result_list = pool.apply(arguments)
+
+    # Flatten the nested list
+    result_list = sum(result_list, [])
 
-        for filter in preference['filters']:
-            if matches(filter, message, valid_paths, rule_cache, config):
-                for detail_value in preference['detail_values']:
-                    results[context['name']].append({
-                        'user': user['openid'],
-                        context['detail_name']: detail_value,
-                        'filter_name': filter['name'],
-                        'filter_id': filter['id'],
-                        'markup_messages': preference['markup_messages'],
-                        'triggered_by_links': preference['triggered_by_links'],
-                        'shorten_links': preference['shorten_links'],
-                    })
-                notified.add((user['openid'], context['name']))
-                break
+    # Transform our list of tuples (context, dict) into a
+    # dict like {context: [dict, dict, ...]}
+    for context, match in result_list:
+        results[context].append(match)
 
     return results
 
+def _recipient(preference, message, valid_paths, config):
+    recipient = []
+
+    # TODO -- turn this into a dogpile cache some day
+    rule_cache = dict()
+
+    user = preference['user']
+    context = preference['context']
+
+    for filter in preference['filters']:
+        if matches(filter, message, valid_paths, rule_cache, config):
+            for detail_value in preference['detail_values']:
+                recipient.append([context['name'], {
+                    'user': user['openid'],
+                    context['detail_name']: detail_value,
+                    'filter_name': filter['name'],
+                    'filter_id': filter['id'],
+                    'markup_messages': preference['markup_messages'],
+                    'triggered_by_links': preference['triggered_by_links'],
+                    'shorten_links': preference['shorten_links'],
+                }])
+            break
+
+    return recipient
+
 
 def matches(filter, message, valid_paths, rule_cache, config):
     """ Returns True if the given filter matches the given message. """

From c6a4350a505c8d93d062e480b0598d1ead6dea4b Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 10:35:16 -0500
Subject: [PATCH 4/8] Handle and test exceptions from inside subprocesses.

---
 fmn/lib/multiproc.py            | 17 +++++++++++++++--
 fmn/lib/tests/test_multiproc.py | 17 +++++++++++++++++
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
index f260356..40120a0 100644
--- a/fmn/lib/multiproc.py
+++ b/fmn/lib/multiproc.py
@@ -1,4 +1,6 @@
 import logging
+import sys
+import traceback as tb
 
 from multiprocessing import Queue, Process
 
@@ -47,7 +49,11 @@ def apply(self, items):
             if not isinstance(item, tuple):
                 item = item,
             self.incoming.put(item)
-        return [self.outgoing.get() for i in range(len(items))]
+        results = [self.outgoing.get() for i in range(len(items))]
+        for result in results:
+            if isinstance(result, Exception):
+                raise result
+        return results
 
     def close(self):
         if not self.targeted:
@@ -67,4 +73,11 @@ def work(fn, incoming, outgoing):
         args = incoming.get()
         if args is StopIteration:
             break
-        outgoing.put(fn(*args))
+        try:
+            result = fn(*args)
+        except Exception as e:
+            result = type(e)(
+                "... which was originally caused by:\n" +
+                "".join(tb.format_exception(*sys.exc_info())))
+        finally:
+            outgoing.put(result)
diff --git a/fmn/lib/tests/test_multiproc.py b/fmn/lib/tests/test_multiproc.py
index 02f5951..7a7080f 100644
--- a/fmn/lib/tests/test_multiproc.py
+++ b/fmn/lib/tests/test_multiproc.py
@@ -66,3 +66,20 @@ def fn2(x):
             eq_(set(results), set([3, 4]))
         finally:
             pool.close()
+
+    def test_inner_exception(self):
+        pool = fmn.lib.multiproc.FixedPool(5)
+
+        error_message = "oh no!"
+
+        def fn(x):
+            raise ValueError(error_message)
+
+        try:
+            pool.target(fn)
+            pool.apply(['whatever'])
+            assert False
+        except ValueError as e:
+            assert error_message in e.message
+        finally:
+            pool.close()

From 3644046140dcb53c31e48bddf2176b6f5b354faf Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 11:32:58 -0500
Subject: [PATCH 5/8] Log thread name.

---
 fmn/lib/multiproc.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
index 40120a0..dc6b03a 100644
--- a/fmn/lib/multiproc.py
+++ b/fmn/lib/multiproc.py
@@ -1,5 +1,6 @@
 import logging
 import sys
+import threading
 import traceback as tb
 
 from multiprocessing import Queue, Process
@@ -15,7 +16,8 @@ class FixedPool(object):
     This avoids the 'importable' requirement of multiprocessing.Pool.
     """
     def __init__(self, N):
-        log.debug('Initializing fmn multiproc pool, size %i' % N)
+        log.info('Initializing fmn multiproc pool, size %i for thread %s' % (
+            N, threading.current_thread().name))
         self.incoming = Queue(1)
         self.outgoing = Queue()
         self.processes = []

From 0dfe8decaf90b32f2ad96437063aaae386f83c91 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 13:45:50 -0500
Subject: [PATCH 6/8] More pythonic.

---
 fmn/lib/__init__.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/fmn/lib/__init__.py b/fmn/lib/__init__.py
index f444fc8..8f1225b 100644
--- a/fmn/lib/__init__.py
+++ b/fmn/lib/__init__.py
@@ -38,8 +38,7 @@ def recipients(preferences, message, valid_paths, config, pool=None):
     arguments = [(p, message, valid_paths, config) for p in preferences]
 
     if not pool:
-        fn = lambda args: _recipient(*args)
-        result_list = map(fn, arguments)
+        result_list = [_recipient(*a) for a in arguments]
     else:
         if not pool.targeted:
             pool.target(_recipient)

From a2ed68ac011e08973e8d81a8ce78c44bcb5ba155 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 14:02:59 -0500
Subject: [PATCH 7/8] Typo.  Do not limit the size of the incoming queue.

---
 fmn/lib/multiproc.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
index dc6b03a..a25effc 100644
--- a/fmn/lib/multiproc.py
+++ b/fmn/lib/multiproc.py
@@ -18,7 +18,7 @@ class FixedPool(object):
     def __init__(self, N):
         log.info('Initializing fmn multiproc pool, size %i for thread %s' % (
             N, threading.current_thread().name))
-        self.incoming = Queue(1)
+        self.incoming = Queue()
         self.outgoing = Queue()
         self.processes = []
         self.N = N

From 5cf0944e19e75992870437481eb28168b5f3d089 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 14:06:58 -0500
Subject: [PATCH 8/8] Omit .join() on the subprocesses due to twisted wildness.

---
 fmn/lib/multiproc.py | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
index a25effc..1667b93 100644
--- a/fmn/lib/multiproc.py
+++ b/fmn/lib/multiproc.py
@@ -61,12 +61,17 @@ def close(self):
         if not self.targeted:
             log.warning('No need to close pool.  Not yet targeted.')
             return
+
         log.info('Closing fmn multiproc pool.')
         for _ in self.processes:
             self.incoming.put(StopIteration)
-        log.debug('Waiting on workers to die.')
-        for p in self.processes:
-            p.join()
+
+        # XXX - we could call process.join() on all of our child processes, but
+        # that only works if *this* process is the same PID as the process that
+        # created them, and that isn't always the case when under the care of
+        # Twisted.  So, we'll just omit that.  Twisted cleans itself up nicely
+        # without it.
+
         self.processes = []
         log.info('Multiproc pool closed.')