From 547a06478d51b150257578304aabf2ee7b585d01 Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Thu, 22 Jan 2015 09:40:11 -0500 Subject: [PATCH 1/8] Implement and test our own multiprocessing Pool. --- fmn/lib/multiproc.py | 70 +++++++++++++++++++++++++++++++++++++++++ fmn/lib/tests/test_multiproc.py | 68 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 fmn/lib/multiproc.py create mode 100644 fmn/lib/tests/test_multiproc.py diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py new file mode 100644 index 0000000..ae233f0 --- /dev/null +++ b/fmn/lib/multiproc.py @@ -0,0 +1,70 @@ +import logging + +from multiprocessing import Queue, Process + +__all__ = ['FixedPool'] + +log = logging.getLogger('fedmsg') + + +class FixedPool(object): + """ Our own multiprocessing pool. + + This avoids the 'importable' requirement of multiprocessing.Pool. + """ + def __init__(self, N): + log.debug('Initializing fmn multiproc pool, size %i' % N) + self.incoming = Queue(1) + self.outgoing = Queue() + self.processes = [] + self.N = N + + @property + def targeted(self): + # If I have processes, then I am targetted on some func. + return bool(self.processes) + + def target(self, fn): + log.info('Multiprocessing pool targeting %r' % fn) + if self.targeted: + self.close() + + args = (fn, self.incoming, self.outgoing) + self.processes = [ + Process(target=work, args=args) for i in range(self.N)] + + for p in self.processes: + p.daemon = True + p.start() + + log.info('Multiprocessing pool targeting done.') + + def apply(self, items): + """ Items are not guaranteed to come back in the same order. """ + if not self.targeted: + raise ValueError('.target(fn) must be called before .apply(items)') + for item in items: + if not isinstance(item, tuple): + item = item, + self.incoming.put(item) + return set(self.outgoing.get() for i in range(len(items))) + + def close(self): + if not self.targeted: + log.warning('No need to close pool. Not yet targeted.') + return + log.info('Closing fmn multiproc pool.') + for _ in self.processes: + self.incoming.put(StopIteration) + log.debug('Waiting on workers to die.') + for p in self.processes: + p.join() + self.processes = [] + log.info('Multiproc pool closed.') + +def work(fn, incoming, outgoing): + while True: + args = incoming.get() + if args is StopIteration: + break + outgoing.put(fn(*args)) diff --git a/fmn/lib/tests/test_multiproc.py b/fmn/lib/tests/test_multiproc.py new file mode 100644 index 0000000..3155cd7 --- /dev/null +++ b/fmn/lib/tests/test_multiproc.py @@ -0,0 +1,68 @@ +from nose.tools import eq_, raises + +import fmn.lib.tests +import fmn.lib.multiproc + + +class TestMultiproc(fmn.lib.tests.Base): + def test_single_map(self): + pool = fmn.lib.multiproc.FixedPool(1) + try: + def fn(x): + return x + 2 + pool.target(fn) + results = pool.apply(range(2)) + eq_(results, set([2, 3])) + finally: + pool.close() + + def test_multi_map(self): + pool = fmn.lib.multiproc.FixedPool(5) + try: + def fn(x): + return x + 2 + pool.target(fn) + results = pool.apply(range(10)) + eq_(results, set([2, 3, 4, 5, 6, 7, 8, 9, 10, 11])) + finally: + pool.close() + + def test_is_targeted(self): + pool = fmn.lib.multiproc.FixedPool(5) + eq_(pool.targeted, False) + try: + def fn(x): + return x + 2 + pool.target(fn) + eq_(pool.targeted, True) + finally: + pool.close() + + @raises(ValueError) + def test_target_before_apply(self): + pool = fmn.lib.multiproc.FixedPool(5) + pool.apply([1, 2, 3]) + + + def test_reuse(self): + pool = fmn.lib.multiproc.FixedPool(5) + + def fn1(x): + return x + 1 + + def fn2(x): + return x + 2 + + try: + pool.target(fn1) + results = pool.apply([1, 2]) + eq_(results, set([2, 3])) + finally: + pool.close() + + try: + pool.target(fn2) + results = pool.apply([1, 2]) + eq_(results, set([3, 4])) + finally: + pool.close() From 00927b3a950fbdd042b1916a482d7b8a1947ab09 Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Thu, 22 Jan 2015 09:52:35 -0500 Subject: [PATCH 2/8] Sets run into problems with unhashable types, so avoid that. --- fmn/lib/multiproc.py | 2 +- fmn/lib/tests/test_multiproc.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py index ae233f0..f260356 100644 --- a/fmn/lib/multiproc.py +++ b/fmn/lib/multiproc.py @@ -47,7 +47,7 @@ def apply(self, items): if not isinstance(item, tuple): item = item, self.incoming.put(item) - return set(self.outgoing.get() for i in range(len(items))) + return [self.outgoing.get() for i in range(len(items))] def close(self): if not self.targeted: diff --git a/fmn/lib/tests/test_multiproc.py b/fmn/lib/tests/test_multiproc.py index 3155cd7..02f5951 100644 --- a/fmn/lib/tests/test_multiproc.py +++ b/fmn/lib/tests/test_multiproc.py @@ -12,7 +12,7 @@ def fn(x): return x + 2 pool.target(fn) results = pool.apply(range(2)) - eq_(results, set([2, 3])) + eq_(set(results), set([2, 3])) finally: pool.close() @@ -23,7 +23,7 @@ def fn(x): return x + 2 pool.target(fn) results = pool.apply(range(10)) - eq_(results, set([2, 3, 4, 5, 6, 7, 8, 9, 10, 11])) + eq_(set(results), set([2, 3, 4, 5, 6, 7, 8, 9, 10, 11])) finally: pool.close() @@ -56,13 +56,13 @@ def fn2(x): try: pool.target(fn1) results = pool.apply([1, 2]) - eq_(results, set([2, 3])) + eq_(set(results), set([2, 3])) finally: pool.close() try: pool.target(fn2) results = pool.apply([1, 2]) - eq_(results, set([3, 4])) + eq_(set(results), set([3, 4])) finally: pool.close() From 47606f69d98afaeda9f15eb692f213fe00400534 Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Thu, 22 Jan 2015 10:00:23 -0500 Subject: [PATCH 3/8] Rework the recipients function to use multiproc if provided. --- fmn/lib/__init__.py | 64 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/fmn/lib/__init__.py b/fmn/lib/__init__.py index e693d44..f444fc8 100644 --- a/fmn/lib/__init__.py +++ b/fmn/lib/__init__.py @@ -25,7 +25,7 @@ gcm_regex = r'^[\w-]+$' -def recipients(preferences, message, valid_paths, config): +def recipients(preferences, message, valid_paths, config, pool=None): """ The main API function. Accepts a fedmsg message as an argument. @@ -33,33 +33,53 @@ def recipients(preferences, message, valid_paths, config): Returns a dict mapping context names to lists of recipients. """ - rule_cache = dict() results = defaultdict(list) - notified = set() - for preference in preferences: - user = preference['user'] - context = preference['context'] - if (user['openid'], context['name']) in notified: - continue + arguments = [(p, message, valid_paths, config) for p in preferences] + + if not pool: + fn = lambda args: _recipient(*args) + result_list = map(fn, arguments) + else: + if not pool.targeted: + pool.target(_recipient) + result_list = pool.apply(arguments) + + # Flatten the nested list + result_list = sum(result_list, []) - for filter in preference['filters']: - if matches(filter, message, valid_paths, rule_cache, config): - for detail_value in preference['detail_values']: - results[context['name']].append({ - 'user': user['openid'], - context['detail_name']: detail_value, - 'filter_name': filter['name'], - 'filter_id': filter['id'], - 'markup_messages': preference['markup_messages'], - 'triggered_by_links': preference['triggered_by_links'], - 'shorten_links': preference['shorten_links'], - }) - notified.add((user['openid'], context['name'])) - break + # Transform our list of tuples (context, dict) into a + # dict like {context: [dict, dict, ...]} + for context, match in result_list: + results[context].append(match) return results +def _recipient(preference, message, valid_paths, config): + recipient = [] + + # TODO -- turn this into a dogpile cache some day + rule_cache = dict() + + user = preference['user'] + context = preference['context'] + + for filter in preference['filters']: + if matches(filter, message, valid_paths, rule_cache, config): + for detail_value in preference['detail_values']: + recipient.append([context['name'], { + 'user': user['openid'], + context['detail_name']: detail_value, + 'filter_name': filter['name'], + 'filter_id': filter['id'], + 'markup_messages': preference['markup_messages'], + 'triggered_by_links': preference['triggered_by_links'], + 'shorten_links': preference['shorten_links'], + }]) + break + + return recipient + def matches(filter, message, valid_paths, rule_cache, config): """ Returns True if the given filter matches the given message. """ From c6a4350a505c8d93d062e480b0598d1ead6dea4b Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Thu, 22 Jan 2015 10:35:16 -0500 Subject: [PATCH 4/8] Handle and test exceptions from inside subprocesses. --- fmn/lib/multiproc.py | 17 +++++++++++++++-- fmn/lib/tests/test_multiproc.py | 17 +++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py index f260356..40120a0 100644 --- a/fmn/lib/multiproc.py +++ b/fmn/lib/multiproc.py @@ -1,4 +1,6 @@ import logging +import sys +import traceback as tb from multiprocessing import Queue, Process @@ -47,7 +49,11 @@ def apply(self, items): if not isinstance(item, tuple): item = item, self.incoming.put(item) - return [self.outgoing.get() for i in range(len(items))] + results = [self.outgoing.get() for i in range(len(items))] + for result in results: + if isinstance(result, Exception): + raise result + return results def close(self): if not self.targeted: @@ -67,4 +73,11 @@ def work(fn, incoming, outgoing): args = incoming.get() if args is StopIteration: break - outgoing.put(fn(*args)) + try: + result = fn(*args) + except Exception as e: + result = type(e)( + "... which was originally caused by:\n" + + "".join(tb.format_exception(*sys.exc_info()))) + finally: + outgoing.put(result) diff --git a/fmn/lib/tests/test_multiproc.py b/fmn/lib/tests/test_multiproc.py index 02f5951..7a7080f 100644 --- a/fmn/lib/tests/test_multiproc.py +++ b/fmn/lib/tests/test_multiproc.py @@ -66,3 +66,20 @@ def fn2(x): eq_(set(results), set([3, 4])) finally: pool.close() + + def test_inner_exception(self): + pool = fmn.lib.multiproc.FixedPool(5) + + error_message = "oh no!" + + def fn(x): + raise ValueError(error_message) + + try: + pool.target(fn) + pool.apply(['whatever']) + assert False + except ValueError as e: + assert error_message in e.message + finally: + pool.close() From 3644046140dcb53c31e48bddf2176b6f5b354faf Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Thu, 22 Jan 2015 11:32:58 -0500 Subject: [PATCH 5/8] Log thread name. --- fmn/lib/multiproc.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py index 40120a0..dc6b03a 100644 --- a/fmn/lib/multiproc.py +++ b/fmn/lib/multiproc.py @@ -1,5 +1,6 @@ import logging import sys +import threading import traceback as tb from multiprocessing import Queue, Process @@ -15,7 +16,8 @@ class FixedPool(object): This avoids the 'importable' requirement of multiprocessing.Pool. """ def __init__(self, N): - log.debug('Initializing fmn multiproc pool, size %i' % N) + log.info('Initializing fmn multiproc pool, size %i for thread %s' % ( + N, threading.current_thread().name)) self.incoming = Queue(1) self.outgoing = Queue() self.processes = [] From 0dfe8decaf90b32f2ad96437063aaae386f83c91 Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Thu, 22 Jan 2015 13:45:50 -0500 Subject: [PATCH 6/8] More pythonic. --- fmn/lib/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fmn/lib/__init__.py b/fmn/lib/__init__.py index f444fc8..8f1225b 100644 --- a/fmn/lib/__init__.py +++ b/fmn/lib/__init__.py @@ -38,8 +38,7 @@ def recipients(preferences, message, valid_paths, config, pool=None): arguments = [(p, message, valid_paths, config) for p in preferences] if not pool: - fn = lambda args: _recipient(*args) - result_list = map(fn, arguments) + result_list = [_recipient(*a) for a in arguments] else: if not pool.targeted: pool.target(_recipient) From a2ed68ac011e08973e8d81a8ce78c44bcb5ba155 Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Thu, 22 Jan 2015 14:02:59 -0500 Subject: [PATCH 7/8] Typo. Do not limit the size of the incoming queue. --- fmn/lib/multiproc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py index dc6b03a..a25effc 100644 --- a/fmn/lib/multiproc.py +++ b/fmn/lib/multiproc.py @@ -18,7 +18,7 @@ class FixedPool(object): def __init__(self, N): log.info('Initializing fmn multiproc pool, size %i for thread %s' % ( N, threading.current_thread().name)) - self.incoming = Queue(1) + self.incoming = Queue() self.outgoing = Queue() self.processes = [] self.N = N From 5cf0944e19e75992870437481eb28168b5f3d089 Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Thu, 22 Jan 2015 14:06:58 -0500 Subject: [PATCH 8/8] Omit .join() on the subprocesses due to twisted wildness. --- fmn/lib/multiproc.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/fmn/lib/multiproc.py b/fmn/lib/multiproc.py index a25effc..1667b93 100644 --- a/fmn/lib/multiproc.py +++ b/fmn/lib/multiproc.py @@ -61,12 +61,17 @@ def close(self): if not self.targeted: log.warning('No need to close pool. Not yet targeted.') return + log.info('Closing fmn multiproc pool.') for _ in self.processes: self.incoming.put(StopIteration) - log.debug('Waiting on workers to die.') - for p in self.processes: - p.join() + + # XXX - we could call process.join() on all of our child processes, but + # that only works if *this* process is the same PID as the process that + # created them, and that isn't always the case when under the care of + # Twisted. So, we'll just omit that. Twisted cleans itself up nicely + # without it. + self.processes = [] log.info('Multiproc pool closed.')