#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : service_name_server.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 01/16/2025
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import time
import threading
from jacinle.utils.env import jac_getenv
from jacinle.comm.service import Service, SocketClient
__all__ = ['SimpleNameServer', 'SimpleNameServerClient', 'sns_register', 'sns_get', 'sns_has']
[docs]
class SimpleNameServer(Service):
DEFAULT_PORT = 11103
[docs]
def __init__(self):
super().__init__()
self.mutex = threading.Lock()
self.name2objects = dict()
self.name2last_heartbeat = dict()
[docs]
def heartbeat(self, name, obj):
print('Heartbeat from {}.'.format(name))
with self.mutex:
self.name2objects[name] = obj
self.name2last_heartbeat[name] = time.time()
[docs]
def get(self, name):
with self.mutex:
return self.name2objects.get(name, None)
[docs]
def has(self, name):
with self.mutex:
return name in self.name2objects
[docs]
def cleaner(self):
while True:
time.sleep(30)
with self.mutex:
now = time.time()
for name, last_heartbeat in list(self.name2last_heartbeat.items()):
if now - last_heartbeat > 60:
print('Removing {} due to timeout.'.format(name))
del self.name2objects[name]
del self.name2last_heartbeat[name]
[docs]
def call(self, func_name, *args, **kwargs):
if func_name == 'heartbeat':
return self.heartbeat(*args, **kwargs)
elif func_name == 'get':
return self.get(*args, **kwargs)
elif func_name == 'has':
return self.has(*args, **kwargs)
else:
raise NotImplementedError('Unknown function name: {}.'.format(func_name))
[docs]
def serve_socket(self, name='jacinle/nameserver', tcp_port=DEFAULT_PORT, use_simple=True):
threading.Thread(target=self.cleaner, args=tuple(), daemon=True).start()
super().serve_socket(name, tcp_port, use_simple=True)
[docs]
class SimpleNameServerClient(SocketClient):
[docs]
def __init__(self, host='localhost', port=11103, verbose=False):
conn_info = 'tcp://{}:{}'.format(host, port)
if verbose:
print('Connecting to the name server at {}... Waiting...'.format(conn_info))
super().__init__('jacinle/nameserver::client', conn_info, use_simple=True, verbose=False)
self.initialize(auto_close=True)
if verbose:
print('Connected to the name server.')
[docs]
def heartbeat(self, name, obj):
self.call('heartbeat', name, obj)
[docs]
def register(self, name, obj):
def thread():
while True:
self.heartbeat(name, obj)
time.sleep(10)
threading.Thread(target=thread, args=tuple(), daemon=True).start()
[docs]
def get(self, name):
return self.call('get', name)
[docs]
def has(self, name):
return self.call('has', name)
_default_name_server_client = None
[docs]
def get_default_name_server_client(verbose: bool = False):
global _default_name_server_client
if _default_name_server_client is None:
host = jac_getenv('SNS_HOST', 'localhost')
port = jac_getenv('SNS_PORT', SimpleNameServer.DEFAULT_PORT)
_default_name_server_client = SimpleNameServerClient(host, port, verbose=verbose)
return _default_name_server_client
[docs]
def sns_register(name, obj, verbose: bool = False):
get_default_name_server_client(verbose=verbose).register(name, obj)
if verbose:
print(f'Endpoint {name} registered at {obj}.')
[docs]
def sns_get(name, verbose: bool = False):
return get_default_name_server_client(verbose=verbose).get(name)
[docs]
def sns_has(name, verbose: bool = False):
return get_default_name_server_client(verbose=verbose).has(name)