Source code for jacinle.comm.distrib.pipe

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : pipe.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 01/22/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.

import queue
import uuid

from jacinle.utils.meta import notnone_property
from .controller import BroadcastMessage

__all__ = ['DistribInputPipe', 'DistribOutputPipe']


[docs] class PipeBase(object):
[docs] def __init__(self, direction, name, bufsize, identifier=None): self._direction = direction self._name = name self._controller = None self._queue = queue.Queue(maxsize=bufsize) self._identifier = identifier or uuid.uuid4().hex
@property def direction(self): return self._direction @property def name(self): return self._name @property def identifier(self): return self._identifier @notnone_property def controller(self): return self._controller @property def raw_queue(self): return self._queue
[docs] def set_controller(self, controller): self._controller = controller
[docs] def put(self, data): self._queue.put(self._wrap_send_message(data))
[docs] def put_nowait(self, data): try: self._queue.put_nowait(self._wrap_send_message(data)) return True except queue.Full: return False
[docs] def get(self): return self._unwrap_recv_message(self._queue.get())
[docs] def get_nowait(self): try: return self._unwrap_recv_message(self._queue.get_nowait()) except queue.Empty: return None
[docs] def empty(self): return self._queue.empty()
[docs] def full(self): return self._queue.full()
def _wrap_send_message(self, data): raise NotImplementedError() def _unwrap_recv_message(self, data): raise NotImplementedError()
[docs] class DistribBroadcastPipeBase(PipeBase):
[docs] def __init__(self, direction, name, bufsize=10): super().__init__(direction, name, bufsize)
def _unwrap_recv_message(self, data): return data.payload def _wrap_send_message(self, data): return BroadcastMessage(self.identifier, data)
[docs] class DistribInputPipe(DistribBroadcastPipeBase):
[docs] def __init__(self, name, bufsize=10): super().__init__('IN', name, bufsize)
[docs] class DistribOutputPipe(DistribBroadcastPipeBase):
[docs] def __init__(self, name, bufsize=10): super().__init__('OUT', name, bufsize)