Source code for jacinle.comm.service_name_server

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : service_name_server.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 01/16/2025
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import time
import threading

from jacinle.utils.env import jac_getenv
from jacinle.comm.service import Service, SocketClient

__all__ = ['SimpleNameServer', 'SimpleNameServerClient', 'sns_register', 'sns_get', 'sns_has']


[docs] class SimpleNameServer(Service): DEFAULT_PORT = 11103
[docs] def __init__(self): super().__init__() self.mutex = threading.Lock() self.name2objects = dict() self.name2last_heartbeat = dict()
[docs] def heartbeat(self, name, obj): print('Heartbeat from {}.'.format(name)) with self.mutex: self.name2objects[name] = obj self.name2last_heartbeat[name] = time.time()
[docs] def get(self, name): with self.mutex: return self.name2objects.get(name, None)
[docs] def has(self, name): with self.mutex: return name in self.name2objects
[docs] def cleaner(self): while True: time.sleep(30) with self.mutex: now = time.time() for name, last_heartbeat in list(self.name2last_heartbeat.items()): if now - last_heartbeat > 60: print('Removing {} due to timeout.'.format(name)) del self.name2objects[name] del self.name2last_heartbeat[name]
[docs] def call(self, func_name, *args, **kwargs): if func_name == 'heartbeat': return self.heartbeat(*args, **kwargs) elif func_name == 'get': return self.get(*args, **kwargs) elif func_name == 'has': return self.has(*args, **kwargs) else: raise NotImplementedError('Unknown function name: {}.'.format(func_name))
[docs] def serve_socket(self, name='jacinle/nameserver', tcp_port=DEFAULT_PORT, use_simple=True): threading.Thread(target=self.cleaner, args=tuple(), daemon=True).start() super().serve_socket(name, tcp_port, use_simple=True)
[docs] class SimpleNameServerClient(SocketClient):
[docs] def __init__(self, host='localhost', port=11103, verbose=False): conn_info = 'tcp://{}:{}'.format(host, port) if verbose: print('Connecting to the name server at {}... Waiting...'.format(conn_info)) super().__init__('jacinle/nameserver::client', conn_info, use_simple=True, verbose=False) self.initialize(auto_close=True) if verbose: print('Connected to the name server.')
[docs] def heartbeat(self, name, obj): self.call('heartbeat', name, obj)
[docs] def register(self, name, obj): def thread(): while True: self.heartbeat(name, obj) time.sleep(10) threading.Thread(target=thread, args=tuple(), daemon=True).start()
[docs] def get(self, name): return self.call('get', name)
[docs] def has(self, name): return self.call('has', name)
_default_name_server_client = None
[docs] def get_default_name_server_client(verbose: bool = False): global _default_name_server_client if _default_name_server_client is None: host = jac_getenv('SNS_HOST', 'localhost') port = jac_getenv('SNS_PORT', SimpleNameServer.DEFAULT_PORT) _default_name_server_client = SimpleNameServerClient(host, port, verbose=verbose) return _default_name_server_client
[docs] def sns_register(name, obj, verbose: bool = False): get_default_name_server_client(verbose=verbose).register(name, obj) if verbose: print(f'Endpoint {name} registered at {obj}.')
[docs] def sns_get(name, verbose: bool = False): return get_default_name_server_client(verbose=verbose).get(name)
[docs] def sns_has(name, verbose: bool = False): return get_default_name_server_client(verbose=verbose).has(name)