Source code for jacinle.comm.echo

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : echo.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 12/19/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.

import threading
import contextlib
import sys

from jacinle.utils.printing import PrintToStringContext

__all__ = ['EchoToPipe', 'echo_from_pipe']


[docs] class StringQueue(object): """Adapted from: http://code.activestate.com/recipes/426060-a-queue-for-string-data-which-looks-like-a-file-ob/"""
[docs] def __init__(self): self.l_buffer = [] self.s_buffer = "" self.lock = threading.Lock()
[docs] def write(self, data): self.l_buffer.append(data)
[docs] def flush(self): pass
def _build_str(self): with self.lock: new_string = ''.join(self.l_buffer) self.s_buffer = ''.join((self.s_buffer, new_string)) self.l_buffer = [] def __len__(self): with self.lock: return sum(len(i) for i in self.l_buffer) + len(self.s_buffer)
[docs] def read(self, count=None): if count is None or count > len(self.s_buffer): self._build_str() if count is None: result, self.s_buffer = self.s_buffer, '' return result if count > len(self.s_buffer): return '' else: result = self.s_buffer[:count] self.s_buffer = self.s_buffer[len(result):] return result
[docs] class EchoMessage(object):
[docs] def __init__(self, source, message): self.source = source self.message = message
[docs] class EndEcho(object): pass
[docs] class EchoToPipe(object):
[docs] def __init__(self, pipe, identifier): self.pipe = pipe self.identifier = identifier self.echo_thread = None self.stop_event = threading.Event() self.out = StringQueue() self.err = StringQueue() self.out_ctx = PrintToStringContext(target='STDOUT', stream=self.out) self.err_ctx = PrintToStringContext(target='STDERR', stream=self.err)
[docs] def echo(self): to_close = False while True: msg = self.out.read() if len(msg) > 0: self.pipe.send(self.identifier, EchoMessage(1, msg)) msg = self.err.read() if len(msg) > 0: self.pipe.send(self.identifier, EchoMessage(2, msg)) if to_close: self.pipe.send(self.identifier, EndEcho()) break if self.stop_event.wait(0.1): to_close = True
[docs] def initialize(self): self.echo_thread = threading.Thread(target=self.echo) self.echo_thread.start()
[docs] def finalize(self): self.stop_event.set() self.echo_thread.join()
[docs] @contextlib.contextmanager def activate(self): try: self.initialize() with self.out_ctx, self.err_ctx: yield finally: self.finalize()
[docs] def echo_from_pipe(pipe): count = 0 while True: msg = pipe.recv() if isinstance(msg, EchoMessage): count += 1 if msg.source == 1: sys.stdout.write(msg.message) elif msg.source == 2: sys.stderr.write(msg.message) elif isinstance(msg, EndEcho): if count: sys.stdout.write('\r') break else: raise ValueError('Unknwon echo: {}.'.format(msg))