Source code for jacinle.utils.debug

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : debug.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 08/26/2019
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.

import sys
import functools
import threading
import contextlib
import cProfile
import pstats
from typing import Optional, Callable

from .inspect import func_name
from .printing import indent_text

__all__ = [
    'hook_exception_ipdb', 'unhook_exception_ipdb', 'exception_hook', 'decorate_exception_hook', 'timeout_ipdb',
    'log_function', 'indent_log', 'indent_print',
    'profile', 'time'
]


def _custom_exception_hook(type, value, tb):
    if hasattr(sys, 'ps1') or not sys.stderr.isatty():
        # we are in interactive mode or we don't have a tty-like
        # device, so we call the default hook
        sys.__excepthook__(type, value, tb)
    else:
        import traceback, ipdb
        # we are NOT in interactive mode, print the exception...
        traceback.print_exception(type, value, tb)
        # ...then start the debugger in post-mortem mode.
        ipdb.post_mortem(tb)


[docs] def hook_exception_ipdb(): """Add a hook to ipdb when an exception is raised.""" if not hasattr(_custom_exception_hook, 'origin_hook'): _custom_exception_hook.origin_hook = sys.excepthook sys.excepthook = _custom_exception_hook
[docs] def unhook_exception_ipdb(): """Remove the hook to ipdb when an exception is raised.""" assert hasattr(_custom_exception_hook, 'origin_hook') sys.excepthook = _custom_exception_hook.origin_hook
[docs] @contextlib.contextmanager def exception_hook(enable: bool = True): """A context manager to temporarily enable the ipdb exception hook.""" if enable: hook_exception_ipdb() yield unhook_exception_ipdb() else: yield
[docs] def decorate_exception_hook(func: Callable) -> Callable: """A decorator to decorate the exception hook when calling a function.""" @functools.wraps(func) def wrapped(*args, **kwargs): with exception_hook(): return func(*args, **kwargs) return wrapped
def _timeout_enter_ipdb_thread(locals_, cv, timeout): with cv: if not cv.wait(timeout): import ipdb; ipdb.set_trace()
[docs] @contextlib.contextmanager def timeout_ipdb(locals_, timeout: float = 3): """A context manager that enters ipdb when timeout. This is useful when you want to debug a function that is stuck in a loop. Example: .. code-block:: python with timeout_ipdb(locals(), timeout=3): while True: pass Args: locals_: the locals() of the function. timeout: the timeout in seconds. """ cv = threading.Condition() thread = threading.Thread(target=_timeout_enter_ipdb_thread, args=(locals_, cv, timeout)) thread.start() yield with cv: cv.notify_all()
[docs] def log_function(init_function: Optional[Callable] = None, *, verbose: bool = False, is_generator: bool = False): """A decorator to log the function call. This is useful to debug the function call stack. The call stack will be formated with indentations. Args: init_function: the function to be wrapped. If not specified, this function will return a decorator. verbose: whether to print detailed log. By default, only the function name is printed. is_generator: whether the function is a generator. If True, the function will be treated as a generator. Example: .. code-block:: python @log_function def foo(): bar() @log_function def bar(): pass foo() Output: .. code-block:: none Entering: foo Entering: bar Exiting: bar Return: None Exiting: foo Return: None """ if not is_generator: def wrapper(function: Callable) -> Callable: print_self = False if '.' in function.__qualname__ and '<locals>' not in function.__qualname__: print_self = True @functools.wraps(function) def wrapped(*args, **kwargs): self_info = '' if print_self and verbose: self_info = '(self={})'.format(args[0]) # print(indent_text(f'Entering: {func_name(function)}', log_function.indent_level, indent_format='| ')) if verbose: print(indent_text(f'Entering: {func_name(function)}{self_info}', log_function.indent_level, indent_format='| ')) arguments = ', '.join([str(arg) for arg in args]) print(indent_text(f'Args: {arguments}', log_function.indent_level, indent_format='| ')) print(indent_text(f'kwargs: {kwargs}', log_function.indent_level, indent_format='| ')) log_function.indent_level += 1 rv = 'exception' try: rv = function(*args, **kwargs) return rv except Exception as e: rv = str(e) raise finally: log_function.indent_level -= 1 # print(indent_text(f'Exiting: {func_name(function)}', log_function.indent_level, indent_format='| ')) if verbose: print(indent_text(f'Exiting: {func_name(function)}{self_info}', log_function.indent_level, indent_format='| ')) print(indent_text(f'Returns: {rv}', log_function.indent_level, indent_format='| ')) return wrapped else: def wrapper(function: Callable) -> Callable: print_self = False if '.' in function.__qualname__ and '<locals>' not in function.__qualname__: print_self = True @functools.wraps(function) def wrapped(*args, **kwargs): self_info = '' if print_self and verbose: self_info = '(self={})'.format(args[0]) if verbose: print(indent_text(f'Entering: {func_name(function)}{self_info}', log_function.indent_level, indent_format='| ')) arguments = ', '.join([str(arg) for arg in args]) print(indent_text(f'Args: {arguments}', log_function.indent_level, indent_format='| ')) print(indent_text(f'kwargs: {kwargs}', log_function.indent_level, indent_format='| ')) rv = 'exception' try: log_function.indent_level += 1 for x in function(*args, **kwargs): log_function.indent_level -= 1 yield x log_function.indent_level += 1 yield from function(*args, **kwargs) except Exception as e: rv = str(e) raise finally: log_function.indent_level -= 1 # print(indent_text(f'Exiting: {func_name(function)}', log_function.indent_level, indent_format='| ')) if verbose: print(indent_text(f'Exiting: {func_name(function)}{self_info}', log_function.indent_level, indent_format='| ')) print(indent_text(f'Returns: {rv}', log_function.indent_level, indent_format='| ')) return wrapped if init_function is None: return wrapper return wrapper(init_function)
log_function.indent_level = 0
[docs] def indent_log(string): """Print a log message with the current indent level. The indent level is managed by log_function.""" print(indent_text(str(string), log_function.indent_level, indent_format='| '))
[docs] def indent_print(*args, sep=' ', end='\n'): """Print a message with the current indent level. The indent level is managed by log_function.""" string = sep.join([str(arg) for arg in args]) print(indent_text(str(string), log_function.indent_level, indent_format='| ').rstrip() + end, end='')
log_function.log = indent_log log_function.print= indent_print
[docs] @contextlib.contextmanager def profile(field='tottime', top_k=10): """A context manager to profile the code in the context. After profiling is done, the top k functions will be printed. Args: field: the field to sort the profile result. See `cProfile.Profile.print_stats` for more details. top_k: the number of top functions to print. Example: .. code-block:: python with profile() as profiler: foo() print(profiler) # If you need to print additional information inside the context """ FIELDS = ['tottime', 'cumtime', None] assert field in FIELDS profiler = cProfile.Profile() profiler.enable() yield profiler profiler.disable() stats = pstats.Stats(profiler).sort_stats(field) stats.print_stats(top_k)
[docs] @contextlib.contextmanager def time(name=None): """A context manager to time the code in the context. After timing is done, the time will be printed. Args: name: the name of the context. If None, the default name ``'DEFAULT'`` will be used. Example: .. code-block:: python with time(): foo() """ from time import time as time_func if name is None: name = 'DEFAULT' print(f'[Timer::{name}] Start...') start = time_func() yield print(f'[Timer::{name}] End. Time elapsed = {time_func() - start}')