Source code for jacinle.comm.broadcast
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : broadcast.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 01/22/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.
import contextlib
import queue
import threading
import zmq
from jacinle.concurrency.packing import loadb, dumpb
from jacinle.concurrency.zmq_utils import get_addr, bind_to_random_ipc, graceful_close
from jacinle.utils.meta import notnone_property
__all__ = ['BroadcastOutputPipe', 'BroadcastInputPipe', 'make_broadcast_pair']
BROADCAST_HWM = 2
[docs]
class BroadcastOutputPipe(object):
[docs]
def __init__(self, name, send_qsize=10, mode='tcp'):
self._name = name
self._send_qsize = send_qsize
self._mode = mode
self._conn_info = None
self._context = zmq.Context()
self._sock = self._context.socket(zmq.PUB)
self._sock.set_hwm(BROADCAST_HWM)
self._send_queue = None
self._send_thread = None
@notnone_property
def conn_info(self):
return self._conn_info
[docs]
def initialize(self):
if self._conn_info is not None:
return
if self._mode == 'tcp':
port = self._sock.bind_to_random_port('tcp://*')
self._conn_info = 'tcp://{}:{}'.format(get_addr(), port)
elif self._mode == 'ipc':
self._conn_info = bind_to_random_ipc(self._sock, self._name)
self._send_queue = queue.Queue(maxsize=self._send_qsize)
self._send_thread = threading.Thread(target=self.mainloop_send, daemon=True)
self._send_thread.start()
[docs]
def finalize(self):
graceful_close(self._sock)
self._context.term()
[docs]
@contextlib.contextmanager
def activate(self):
self.initialize()
try:
yield
finally:
self.finalize()
[docs]
def mainloop_send(self):
try:
while True:
job = self._send_queue.get()
self._sock.send(dumpb(job), copy=False)
except zmq.ContextTerminated:
pass
[docs]
def send(self, payload):
self._send_queue.put(payload)
return self
[docs]
def make_broadcast_pair(name, nr_workers=None, mode='tcp', send_qsize=10):
push = BroadcastOutputPipe(name, send_qsize=send_qsize, mode=mode)
push.initialize()
nr_pulls = nr_workers or 1
pulls = [BroadcastInputPipe(push.conn_info) for _ in range(nr_pulls)]
if nr_workers is None:
return push, pulls[0]
return push, pulls