Source code for jacinle.comm.distrib.pipe
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : pipe.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 01/22/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.
import queue
import uuid
from jacinle.utils.meta import notnone_property
from .controller import BroadcastMessage
__all__ = ['DistribInputPipe', 'DistribOutputPipe']
[docs]
class PipeBase(object):
[docs]
def __init__(self, direction, name, bufsize, identifier=None):
self._direction = direction
self._name = name
self._controller = None
self._queue = queue.Queue(maxsize=bufsize)
self._identifier = identifier or uuid.uuid4().hex
@property
def direction(self):
return self._direction
@property
def name(self):
return self._name
@property
def identifier(self):
return self._identifier
@notnone_property
def controller(self):
return self._controller
@property
def raw_queue(self):
return self._queue
[docs]
def set_controller(self, controller):
self._controller = controller
[docs]
def put(self, data):
self._queue.put(self._wrap_send_message(data))
[docs]
def put_nowait(self, data):
try:
self._queue.put_nowait(self._wrap_send_message(data))
return True
except queue.Full:
return False
[docs]
def get(self):
return self._unwrap_recv_message(self._queue.get())
[docs]
def get_nowait(self):
try:
return self._unwrap_recv_message(self._queue.get_nowait())
except queue.Empty:
return None
[docs]
def empty(self):
return self._queue.empty()
[docs]
def full(self):
return self._queue.full()
def _wrap_send_message(self, data):
raise NotImplementedError()
def _unwrap_recv_message(self, data):
raise NotImplementedError()
[docs]
class DistribBroadcastPipeBase(PipeBase):
[docs]
def __init__(self, direction, name, bufsize=10):
super().__init__(direction, name, bufsize)
def _unwrap_recv_message(self, data):
return data.payload
def _wrap_send_message(self, data):
return BroadcastMessage(self.identifier, data)
[docs]
class DistribOutputPipe(DistribBroadcastPipeBase):
[docs]
def __init__(self, name, bufsize=10):
super().__init__('OUT', name, bufsize)