Source code for jacinle.comm.service

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : service.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 12/19/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.

import sys
import uuid
import inspect
import contextlib

from jacinle.logging import get_logger
from jacinle.utils.printing import kvformat
from jacinle.utils.exception import format_exc

from .cs import ServerPipe, ClientPipe
from .echo import EchoToPipe, echo_from_pipe

logger = get_logger(__file__)

__all__ = ['Service', 'SocketServer', 'SocketClient']


[docs] class Service(object):
[docs] def __init__(self, configs=None, spec=None): self.configs = configs self.spec = spec
[docs] def serve_socket(self, name=None, tcp_port=None): if name is None: name = self.__class__.__name__ return SocketServer(self, name, tcp_port=tcp_port).serve()
[docs] def initialize(self): pass
[docs] def call(self, *args, **kwargs): raise NotImplementedError()
[docs] def finalize(self): pass
[docs] class ServiceException(object):
[docs] def __init__(self, remote_message): self.remote_message = remote_message
def __repr__(self): return 'Service exception: ' + self.remote_message
[docs] class SocketServer(object):
[docs] def __init__(self, service, name, tcp_port=None, ipc_port=None): self.service = service self.name = name self.tcp_port = tcp_port self.ipc_port = ipc_port self.mode = 'tcp' if self.ipc_port is not None: self.mode = 'ipc' self.identifier = self.name + '-server-' + uuid.uuid4().hex self.server = ServerPipe(self.identifier, mode=self.mode) self.server.dispatcher.register('get_name', self.call_get_name) self.server.dispatcher.register('get_identifier', self.call_get_identifier) self.server.dispatcher.register('get_conn_info', self.call_get_conn_info) self.server.dispatcher.register('get_spec', self.call_get_spec) self.server.dispatcher.register('get_configs', self.call_get_configs) self.server.dispatcher.register('get_signature', self.call_get_signature) self.server.dispatcher.register('query', self.call_query)
[docs] def serve(self): with self.server.activate(tcp_port=self.tcp_port, ipc_port=self.ipc_port): logger.info('Server started.') logger.info(' Name: {}'.format(self.name)) logger.info(' Identifier: {}'.format(self.identifier)) logger.info(' Conn info: {} {}'.format(*self.conn_info)) while True: import time; time.sleep(1)
[docs] @contextlib.contextmanager def activate(self): with self.server.activate(tcp_port=self.tcp_port, ipc_port=self.ipc_port): logger.info('Server started.') logger.info(' Name: {}'.format(self.name)) logger.info(' Identifier: {}'.format(self.identifier)) logger.info(' Conn info: {} {}'.format(*self.conn_info)) yield
@property def conn_info(self): return self.server.conn_info
[docs] def call_get_name(self, pipe, identifier, inp): pipe.send(identifier, self.name)
[docs] def call_get_identifier(self, pipe, identifier, inp): pipe.send(identifier, self.identifier)
[docs] def call_get_conn_info(self, pipe, identifier, inp): pipe.send(identifier, self.conn_info)
[docs] def call_get_spec(self, pipe, identifier, inp): pipe.send(identifier, self.service.spec)
[docs] def call_get_configs(self, pipe, identifier, inp): pipe.send(identifier, self.service.configs)
[docs] def call_get_signature(self, pipe, identifier, inp): pipe.send(identifier, repr(inspect.getfullargspec(self.service.call)))
[docs] def call_query(self, pipe, identifier, feed_dict): logger.info('Received query from: {}.'.format(identifier)) try: if feed_dict['echo']: with EchoToPipe(pipe, identifier).activate(): output_dict = self.service.call(*feed_dict['args'], **feed_dict['kwargs']) else: output_dict = self.service.call(*feed_dict['args'], **feed_dict['kwargs']) except: output_dict = ServiceException(format_exc(sys.exc_info())) pipe.send(identifier, output_dict)
[docs] class SocketClient(object):
[docs] def __init__(self, name, conn_info, echo=True): self.name = name self.identifier = self.name + '-client-' + uuid.uuid4().hex self.conn_info = conn_info self.client = ClientPipe(self.identifier, conn_info=self.conn_info) self.echo = echo self._initialized = False
[docs] def initialize(self): self.client.initialize() logger.info('Client started.') logger.info(' Name: {}'.format(self.name)) logger.info(' Identifier: {}'.format(self.identifier)) logger.info(' Conn info: {}'.format(self.conn_info)) logger.info(' Server name: {}'.format(self.get_server_name())) logger.info(' Server identifier: {}'.format(self.get_server_identifier())) logger.info(' Server signaature: {}'.format(self.get_signature())) configs = self.get_configs() if configs is not None: logger.info(' Server configs: {}'.format(configs)) self._initialized = True
[docs] def finalize(self): self.client.finalize() self._initialized = False
@property def initialized(self): return self._initialized
[docs] @contextlib.contextmanager def activate(self): try: self.initialize() yield finally: self.finalize()
[docs] def get_server_name(self): return self.client.query('get_name')
[docs] def get_server_identifier(self): return self.client.query('get_identifier')
[docs] def get_client_identifier(self): return self.identifier
[docs] def get_server_conn_info(self): return self.client.query('get_conn_info')
[docs] def get_spec(self): return self.client.query('get_spec')
[docs] def get_configs(self): return self.client.query('get_configs')
[docs] def get_signature(self): return self.client.query('get_signature')
[docs] def call(self, *args, echo=None, **kwargs): if echo is None: echo = self.echo self.client.query('query', {'args': args, 'kwargs': kwargs, 'echo': echo}, do_recv=False) if echo: echo_from_pipe(self.client) output = self.client.recv() if isinstance(output, ServiceException): raise RuntimeError(repr(output)) return output
def __getattr__(self, name): def _call(*args, **kwargs): return self.call(name, *args, **kwargs) return _call