Source code for jacinle.concurrency.zmq_utils

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : zmq_utils.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 01/22/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.

import zmq
import socket
import uuid
import json

from jacinle.utils.network import get_local_addr_v2
from jacinle.concurrency.packing import loadb, dumpb


json_dumpb = lambda x: json.dumps(x).encode('utf-8')
json_loadb = lambda x: json.loads(x.decode('utf-8'))

get_addr = get_local_addr_v2


[docs] def router_recv_json(sock, flag=zmq.NOBLOCK, loader=json_loadb): try: identifier, delim, *payload = sock.recv_multipart(flag) return [identifier] + list(map(lambda x: loader(x), payload)) except zmq.error.ZMQError: return None, None
[docs] def router_send_json(sock, identifier, *payloads, flag=0, dumper=json_dumpb): try: buf = [identifier, b''] buf.extend(map(lambda x: dumper(x), payloads)) sock.send_multipart(buf, flags=flag) except zmq.error.ZMQError: return False return True
[docs] def req_recv_json(sock, flag=0, loader=json_loadb): try: response = sock.recv_multipart(flag) response = list(map(lambda x: loader(x), response)) return response[0] if len(response) == 1 else response except zmq.error.ZMQError: return None
[docs] def req_send_json(sock, *payloads, flag=0, dumper=json_dumpb): buf = [] buf.extend(map(lambda x: dumper(x), payloads)) try: sock.send_multipart(buf, flag) except zmq.error.ZMQError: return False return True
[docs] def iter_recv(meth, sock): while True: res = meth(sock, flag=zmq.NOBLOCK) succ = res[0] is not None if isinstance(res, (tuple, list)) else res is not None if succ: yield res else: break
[docs] def req_send_and_recv(sock, *payloads): req_send_json(sock, *payloads) return req_recv_json(sock)
[docs] def push_pyobj(sock, data, flag=zmq.NOBLOCK): try: sock.send(dumpb(data), flag, copy=False) except zmq.error.ZMQError: return False return True
[docs] def pull_pyobj(sock, flag=zmq.NOBLOCK): try: response = loadb(sock.recv(flag, copy=False).bytes) return response except zmq.error.ZMQError: return None
[docs] def bind_to_random_ipc(sock, name): name = name + uuid.uuid4().hex[:8] conn = 'ipc:///tmp/{}'.format(name) sock.bind(conn) return conn
[docs] def uid(): return socket.gethostname() + '/' + uuid.uuid4().hex
[docs] def graceful_close(sock): if sock is None: return sock.setsockopt(zmq.LINGER, 0) sock.close()