From 547a06478d51b150257578304aabf2ee7b585d01 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 09:40:11 -0500
Subject: [PATCH 1/8] Implement and test our own multiprocessing Pool.
---
fmn/lib/multiproc.py | 70 +++++++++++++++++++++++++++++++++++++++++
fmn/lib/tests/test_multiproc.py | 68 +++++++++++++++++++++++++++++++++++++++
2 files changed, 138 insertions(+)
create mode 100644 fmn/lib/multiproc.py
create mode 100644 fmn/lib/tests/test_multiproc.py
diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
new file mode 100644
index 0000000..ae233f0
--- /dev/null
+++ b/fmn/lib/multiproc.py
@@ -0,0 +1,70 @@
+import logging
+
+from multiprocessing import Queue, Process
+
+__all__ = ['FixedPool']
+
+log = logging.getLogger('fedmsg')
+
+
+class FixedPool(object):
+ """ Our own multiprocessing pool.
+
+ This avoids the 'importable' requirement of multiprocessing.Pool.
+ """
+ def __init__(self, N):
+ log.debug('Initializing fmn multiproc pool, size %i' % N)
+ self.incoming = Queue(1)
+ self.outgoing = Queue()
+ self.processes = []
+ self.N = N
+
+ @property
+ def targeted(self):
+ # If I have processes, then I am targetted on some func.
+ return bool(self.processes)
+
+ def target(self, fn):
+ log.info('Multiprocessing pool targeting %r' % fn)
+ if self.targeted:
+ self.close()
+
+ args = (fn, self.incoming, self.outgoing)
+ self.processes = [
+ Process(target=work, args=args) for i in range(self.N)]
+
+ for p in self.processes:
+ p.daemon = True
+ p.start()
+
+ log.info('Multiprocessing pool targeting done.')
+
+ def apply(self, items):
+ """ Items are not guaranteed to come back in the same order. """
+ if not self.targeted:
+ raise ValueError('.target(fn) must be called before .apply(items)')
+ for item in items:
+ if not isinstance(item, tuple):
+ item = item,
+ self.incoming.put(item)
+ return set(self.outgoing.get() for i in range(len(items)))
+
+ def close(self):
+ if not self.targeted:
+ log.warning('No need to close pool. Not yet targeted.')
+ return
+ log.info('Closing fmn multiproc pool.')
+ for _ in self.processes:
+ self.incoming.put(StopIteration)
+ log.debug('Waiting on workers to die.')
+ for p in self.processes:
+ p.join()
+ self.processes = []
+ log.info('Multiproc pool closed.')
+
+def work(fn, incoming, outgoing):
+ while True:
+ args = incoming.get()
+ if args is StopIteration:
+ break
+ outgoing.put(fn(*args))
diff --git a/fmn/lib/tests/test_multiproc.py b/fmn/lib/tests/test_multiproc.py
new file mode 100644
index 0000000..3155cd7
--- /dev/null
+++ b/fmn/lib/tests/test_multiproc.py
@@ -0,0 +1,68 @@
+from nose.tools import eq_, raises
+
+import fmn.lib.tests
+import fmn.lib.multiproc
+
+
+class TestMultiproc(fmn.lib.tests.Base):
+ def test_single_map(self):
+ pool = fmn.lib.multiproc.FixedPool(1)
+ try:
+ def fn(x):
+ return x + 2
+ pool.target(fn)
+ results = pool.apply(range(2))
+ eq_(results, set([2, 3]))
+ finally:
+ pool.close()
+
+ def test_multi_map(self):
+ pool = fmn.lib.multiproc.FixedPool(5)
+ try:
+ def fn(x):
+ return x + 2
+ pool.target(fn)
+ results = pool.apply(range(10))
+ eq_(results, set([2, 3, 4, 5, 6, 7, 8, 9, 10, 11]))
+ finally:
+ pool.close()
+
+ def test_is_targeted(self):
+ pool = fmn.lib.multiproc.FixedPool(5)
+ eq_(pool.targeted, False)
+ try:
+ def fn(x):
+ return x + 2
+ pool.target(fn)
+ eq_(pool.targeted, True)
+ finally:
+ pool.close()
+
+ @raises(ValueError)
+ def test_target_before_apply(self):
+ pool = fmn.lib.multiproc.FixedPool(5)
+ pool.apply([1, 2, 3])
+
+
+ def test_reuse(self):
+ pool = fmn.lib.multiproc.FixedPool(5)
+
+ def fn1(x):
+ return x + 1
+
+ def fn2(x):
+ return x + 2
+
+ try:
+ pool.target(fn1)
+ results = pool.apply([1, 2])
+ eq_(results, set([2, 3]))
+ finally:
+ pool.close()
+
+ try:
+ pool.target(fn2)
+ results = pool.apply([1, 2])
+ eq_(results, set([3, 4]))
+ finally:
+ pool.close()
From 00927b3a950fbdd042b1916a482d7b8a1947ab09 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 09:52:35 -0500
Subject: [PATCH 2/8] Sets run into problems with unhashable types, so avoid
that.
---
fmn/lib/multiproc.py | 2 +-
fmn/lib/tests/test_multiproc.py | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
index ae233f0..f260356 100644
--- a/fmn/lib/multiproc.py
+++ b/fmn/lib/multiproc.py
@@ -47,7 +47,7 @@ def apply(self, items):
if not isinstance(item, tuple):
item = item,
self.incoming.put(item)
- return set(self.outgoing.get() for i in range(len(items)))
+ return [self.outgoing.get() for i in range(len(items))]
def close(self):
if not self.targeted:
diff --git a/fmn/lib/tests/test_multiproc.py b/fmn/lib/tests/test_multiproc.py
index 3155cd7..02f5951 100644
--- a/fmn/lib/tests/test_multiproc.py
+++ b/fmn/lib/tests/test_multiproc.py
@@ -12,7 +12,7 @@ def fn(x):
return x + 2
pool.target(fn)
results = pool.apply(range(2))
- eq_(results, set([2, 3]))
+ eq_(set(results), set([2, 3]))
finally:
pool.close()
@@ -23,7 +23,7 @@ def fn(x):
return x + 2
pool.target(fn)
results = pool.apply(range(10))
- eq_(results, set([2, 3, 4, 5, 6, 7, 8, 9, 10, 11]))
+ eq_(set(results), set([2, 3, 4, 5, 6, 7, 8, 9, 10, 11]))
finally:
pool.close()
@@ -56,13 +56,13 @@ def fn2(x):
try:
pool.target(fn1)
results = pool.apply([1, 2])
- eq_(results, set([2, 3]))
+ eq_(set(results), set([2, 3]))
finally:
pool.close()
try:
pool.target(fn2)
results = pool.apply([1, 2])
- eq_(results, set([3, 4]))
+ eq_(set(results), set([3, 4]))
finally:
pool.close()
From 47606f69d98afaeda9f15eb692f213fe00400534 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 10:00:23 -0500
Subject: [PATCH 3/8] Rework the recipients function to use multiproc if
provided.
---
fmn/lib/__init__.py | 64 +++++++++++++++++++++++++++++++++++------------------
1 file changed, 42 insertions(+), 22 deletions(-)
diff --git a/fmn/lib/__init__.py b/fmn/lib/__init__.py
index e693d44..f444fc8 100644
--- a/fmn/lib/__init__.py
+++ b/fmn/lib/__init__.py
@@ -25,7 +25,7 @@
gcm_regex = r'^[\w-]+$'
-def recipients(preferences, message, valid_paths, config):
+def recipients(preferences, message, valid_paths, config, pool=None):
""" The main API function.
Accepts a fedmsg message as an argument.
@@ -33,33 +33,53 @@ def recipients(preferences, message, valid_paths, config):
Returns a dict mapping context names to lists of recipients.
"""
- rule_cache = dict()
results = defaultdict(list)
- notified = set()
- for preference in preferences:
- user = preference['user']
- context = preference['context']
- if (user['openid'], context['name']) in notified:
- continue
+ arguments = [(p, message, valid_paths, config) for p in preferences]
+
+ if not pool:
+ fn = lambda args: _recipient(*args)
+ result_list = map(fn, arguments)
+ else:
+ if not pool.targeted:
+ pool.target(_recipient)
+ result_list = pool.apply(arguments)
+
+ # Flatten the nested list
+ result_list = sum(result_list, [])
- for filter in preference['filters']:
- if matches(filter, message, valid_paths, rule_cache, config):
- for detail_value in preference['detail_values']:
- results[context['name']].append({
- 'user': user['openid'],
- context['detail_name']: detail_value,
- 'filter_name': filter['name'],
- 'filter_id': filter['id'],
- 'markup_messages': preference['markup_messages'],
- 'triggered_by_links': preference['triggered_by_links'],
- 'shorten_links': preference['shorten_links'],
- })
- notified.add((user['openid'], context['name']))
- break
+ # Transform our list of tuples (context, dict) into a
+ # dict like {context: [dict, dict, ...]}
+ for context, match in result_list:
+ results[context].append(match)
return results
+def _recipient(preference, message, valid_paths, config):
+ recipient = []
+
+ # TODO -- turn this into a dogpile cache some day
+ rule_cache = dict()
+
+ user = preference['user']
+ context = preference['context']
+
+ for filter in preference['filters']:
+ if matches(filter, message, valid_paths, rule_cache, config):
+ for detail_value in preference['detail_values']:
+ recipient.append([context['name'], {
+ 'user': user['openid'],
+ context['detail_name']: detail_value,
+ 'filter_name': filter['name'],
+ 'filter_id': filter['id'],
+ 'markup_messages': preference['markup_messages'],
+ 'triggered_by_links': preference['triggered_by_links'],
+ 'shorten_links': preference['shorten_links'],
+ }])
+ break
+
+ return recipient
+
def matches(filter, message, valid_paths, rule_cache, config):
""" Returns True if the given filter matches the given message. """
From c6a4350a505c8d93d062e480b0598d1ead6dea4b Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 10:35:16 -0500
Subject: [PATCH 4/8] Handle and test exceptions from inside subprocesses.
---
fmn/lib/multiproc.py | 17 +++++++++++++++--
fmn/lib/tests/test_multiproc.py | 17 +++++++++++++++++
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
index f260356..40120a0 100644
--- a/fmn/lib/multiproc.py
+++ b/fmn/lib/multiproc.py
@@ -1,4 +1,6 @@
import logging
+import sys
+import traceback as tb
from multiprocessing import Queue, Process
@@ -47,7 +49,11 @@ def apply(self, items):
if not isinstance(item, tuple):
item = item,
self.incoming.put(item)
- return [self.outgoing.get() for i in range(len(items))]
+ results = [self.outgoing.get() for i in range(len(items))]
+ for result in results:
+ if isinstance(result, Exception):
+ raise result
+ return results
def close(self):
if not self.targeted:
@@ -67,4 +73,11 @@ def work(fn, incoming, outgoing):
args = incoming.get()
if args is StopIteration:
break
- outgoing.put(fn(*args))
+ try:
+ result = fn(*args)
+ except Exception as e:
+ result = type(e)(
+ "... which was originally caused by:\n" +
+ "".join(tb.format_exception(*sys.exc_info())))
+ finally:
+ outgoing.put(result)
diff --git a/fmn/lib/tests/test_multiproc.py b/fmn/lib/tests/test_multiproc.py
index 02f5951..7a7080f 100644
--- a/fmn/lib/tests/test_multiproc.py
+++ b/fmn/lib/tests/test_multiproc.py
@@ -66,3 +66,20 @@ def fn2(x):
eq_(set(results), set([3, 4]))
finally:
pool.close()
+
+ def test_inner_exception(self):
+ pool = fmn.lib.multiproc.FixedPool(5)
+
+ error_message = "oh no!"
+
+ def fn(x):
+ raise ValueError(error_message)
+
+ try:
+ pool.target(fn)
+ pool.apply(['whatever'])
+ assert False
+ except ValueError as e:
+ assert error_message in e.message
+ finally:
+ pool.close()
From 3644046140dcb53c31e48bddf2176b6f5b354faf Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 11:32:58 -0500
Subject: [PATCH 5/8] Log thread name.
---
fmn/lib/multiproc.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
index 40120a0..dc6b03a 100644
--- a/fmn/lib/multiproc.py
+++ b/fmn/lib/multiproc.py
@@ -1,5 +1,6 @@
import logging
import sys
+import threading
import traceback as tb
from multiprocessing import Queue, Process
@@ -15,7 +16,8 @@ class FixedPool(object):
This avoids the 'importable' requirement of multiprocessing.Pool.
"""
def __init__(self, N):
- log.debug('Initializing fmn multiproc pool, size %i' % N)
+ log.info('Initializing fmn multiproc pool, size %i for thread %s' % (
+ N, threading.current_thread().name))
self.incoming = Queue(1)
self.outgoing = Queue()
self.processes = []
From 0dfe8decaf90b32f2ad96437063aaae386f83c91 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 13:45:50 -0500
Subject: [PATCH 6/8] More pythonic.
---
fmn/lib/__init__.py | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/fmn/lib/__init__.py b/fmn/lib/__init__.py
index f444fc8..8f1225b 100644
--- a/fmn/lib/__init__.py
+++ b/fmn/lib/__init__.py
@@ -38,8 +38,7 @@ def recipients(preferences, message, valid_paths, config, pool=None):
arguments = [(p, message, valid_paths, config) for p in preferences]
if not pool:
- fn = lambda args: _recipient(*args)
- result_list = map(fn, arguments)
+ result_list = [_recipient(*a) for a in arguments]
else:
if not pool.targeted:
pool.target(_recipient)
From a2ed68ac011e08973e8d81a8ce78c44bcb5ba155 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 14:02:59 -0500
Subject: [PATCH 7/8] Typo. Do not limit the size of the incoming queue.
---
fmn/lib/multiproc.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
index dc6b03a..a25effc 100644
--- a/fmn/lib/multiproc.py
+++ b/fmn/lib/multiproc.py
@@ -18,7 +18,7 @@ class FixedPool(object):
def __init__(self, N):
log.info('Initializing fmn multiproc pool, size %i for thread %s' % (
N, threading.current_thread().name))
- self.incoming = Queue(1)
+ self.incoming = Queue()
self.outgoing = Queue()
self.processes = []
self.N = N
From 5cf0944e19e75992870437481eb28168b5f3d089 Mon Sep 17 00:00:00 2001
From: Ralph Bean <rbean@redhat.com>
Date: Thu, 22 Jan 2015 14:06:58 -0500
Subject: [PATCH 8/8] Omit .join() on the subprocesses due to twisted wildness.
---
fmn/lib/multiproc.py | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py
index a25effc..1667b93 100644
--- a/fmn/lib/multiproc.py
+++ b/fmn/lib/multiproc.py
@@ -61,12 +61,17 @@ def close(self):
if not self.targeted:
log.warning('No need to close pool. Not yet targeted.')
return
+
log.info('Closing fmn multiproc pool.')
for _ in self.processes:
self.incoming.put(StopIteration)
- log.debug('Waiting on workers to die.')
- for p in self.processes:
- p.join()
+
+ # XXX - we could call process.join() on all of our child processes, but
+ # that only works if *this* process is the same PID as the process that
+ # created them, and that isn't always the case when under the care of
+ # Twisted. So, we'll just omit that. Twisted cleans itself up nicely
+ # without it.
+
self.processes = []
log.info('Multiproc pool closed.')