Source code for jacinle.concurrency.event

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : event.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 01/22/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.

import queue
import threading
import multiprocessing
import functools

__all__ = [
    'MPLibExtension', 'instantiate_mplib_ext',
    'MTBooleanEvent', 'MPBooleanEvent',
    'MTOrEvent', 'MPOrEvent',
    'MTCoordinatorEvent'
]


[docs] class MPLibExtension(object): __mplib__ = threading
[docs] def instantiate_mplib_ext(base_class): class MultiThreadingImpl(base_class): __name__ = 'MT' + base_class.__name__ __mplib__ = threading class MultiProcessingImpl(base_class): __name__ = 'MP' + base_class.__name__ __mplib__ = multiprocessing return MultiThreadingImpl, MultiProcessingImpl
[docs] class BooleanEvent(MPLibExtension):
[docs] def __init__(self): self._t = type(self).__mplib__.Event() self._f = type(self).__mplib__.Event() self._t.clear() self._f.set() self._lock = type(self).__mplib__.Lock()
[docs] def is_true(self): with self._lock: return self._t.is_set()
[docs] def is_false(self): with self._lock: return self._f.is_set()
[docs] def set(self): with self._lock: self._t.set() self._f.clear()
[docs] def clear(self): with self._lock: self._t.clear() self._f.set()
[docs] def wait(self, predicate=True, timeout=None): target = self._t if predicate else self._f return target.wait(timeout)
[docs] def wait_true(self, timeout=None): return self.wait(True, timeout=timeout)
[docs] def wait_false(self, timeout=None): return self.wait(False, timeout=timeout)
[docs] def set_true(self): self.set()
[docs] def set_false(self): self.clear()
[docs] def value(self): return self.is_true()
MTBooleanEvent, MPBooleanEvent = instantiate_mplib_ext(BooleanEvent) def _or_event_set(self): self._set() self.changed() def _or_event_clear(self): self._clear() self.changed() def _orify(e, changed_callback): e._set = e.set e._clear = e.clear e.changed = changed_callback e.set = lambda: _or_event_set(e) e.clear = lambda: _or_event_clear(e)
[docs] def OrEvent(*events, mplib=threading): """Waiting on several events together. http://stackoverflow.com/questions/12317940/python-threading-can-i-sleep-on-two-threading-events-simultaneously""" or_event = mplib.Event() def changed(): bools = [e.is_set() for e in events] if any(bools): or_event.set() else: or_event.clear() for e in events: _orify(e, changed) changed() return or_event
MTOrEvent = functools.partial(OrEvent, mplib=threading) MPOrEvent = functools.partial(OrEvent, mplib=multiprocessing)
[docs] class MTCoordinatorEvent(object):
[docs] def __init__(self, nr_workers): self._event = threading.Event() self._queue = queue.Queue() self._nr_workers = nr_workers
[docs] def broadcast(self): self._event.set() for i in range(self._nr_workers): self._queue.get() self._event.clear()
[docs] def wait(self): self._event.wait() self._queue.put(1)
[docs] def check(self): rc = self._event.is_set() if rc: self._queue.put(1) return rc