#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : pretty.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 02/15/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.
"""Functions to dump Python objects into human-readable formats."""
import io as _io
import json
import functools
import collections
import xml.etree.ElementTree as et
import yaml
import inspect
from typing import Optional, Any, Iterable, Sequence, List, Dict
from jacinle.utils.meta import dict_deep_kv
from jacinle.utils.printing import stformat, kvformat
from .fs import as_file_descriptor, io_function_registry
__all__ = [
'iter_txt',
'pretty_dump', 'pretty_load',
'dumps_json', 'dump_json', 'loads_json', 'load_json', 'pretty_dumps_json', 'pretty_dump_json',
'dumps_jsonc', 'dump_jsonc', 'loads_jsonc', 'load_jsonc',
'dumps_xml', 'dump_xml', 'loads_xml', 'load_xml',
'dumps_yaml', 'dump_yaml', 'loads_yaml', 'load_yaml',
'dumps_txt', 'dump_txt',
'dumps_struct', 'dump_struct',
'dumps_kv', 'dump_kv',
'dumps_env', 'dump_env'
]
[docs]
def iter_txt(fd: _io.IOBase, strip: bool = True) -> Iterable[str]:
"""Iterate over lines in a text file. This function will ignore empty lines.
Args:
fd: a file descriptor.
strip: whether to strip the line.
"""
for line in as_file_descriptor(fd):
line_strip = line.strip()
if line_strip == '':
continue
yield line_strip if strip else line
[docs]
def loads_json(value: str) -> Any:
"""Load a JSON object from a string."""
return json.loads(value)
[docs]
def loads_jsonc(value: str) -> List[Dict]:
"""Load a list of JSON dictionaries from a string. This function supports multiple JSON objects in a single string, separated by newlines.
Note that this function only support a list of plain dictionaries. Do not use this function to load JSON objects with
nested lists or dictionaries.
Args:
value: a string.
Returns:
a list of dictionaries.
"""
strings = value.split('}\n{')
ret = []
for i, s in enumerate(strings):
if i > 0:
s = '{' + s
if i < len(strings) - 1:
s += '}'
ret.append(json.loads(s))
return ret
[docs]
def loads_xml(value: str, name_key: str = '__name__', attribute_key: Optional[str] = '__attribute__') -> Dict:
"""Load an XML object from a string. It will return a dictionary as the root node.
For each node, it will have a key named "__name__" as the tag name, and a key "__attributes__" as a dictionary of attributes.
Each child node will be a nested dictionary under the root node. If there are multiple child nodes with the same tag name,
they will be stored in a list.
Args:
value: a string.
name_key: the key name for the tag name.
attribute_key: the key name for the attributes. If set to None, the attributes will be stored in the root node.
Returns:
a dictionary.
"""
return _xml2dict(et.fromstring(value), name_key=name_key, attribute_key=attribute_key)
[docs]
def loads_yaml(value: str) -> Any:
"""Load a YAML object from a string."""
return yaml.safe_load(value)
[docs]
def dumps_txt(value: Iterable[str]) -> str:
"""Dump a list of strings into a string, separated by newlines.
Args:
value: a list of strings.
Returns:
a string.
"""
assert isinstance(value, collections.Iterable), 'dump(s) txt supports only list as input.'
with _io.StringIO() as buf:
for v in value:
v = str(v)
buf.write(v)
if v[-1] != '\n':
buf.write('\n')
return buf.getvalue()
[docs]
def dumps_json(value: Any, compressed: bool = True) -> str:
"""Dump a JSON object into a string. In addition to the standard JSON format, this function also supports
- ``__jsonify__``: an instance method for objects that returns a JSON-serializable object.
- For classes, it will store ``__dict__`` as the JSON object.
Note that both features can not be preserved when loading the JSON object back.
Args:
value: the object to dump.
compressed: whether to use compressed format. If set to False, the dumped string will be pretty-printed.
Returns:
a string.
"""
if compressed:
return json.dumps(value, cls=_JsonObjectEncoder)
return json.dumps(value, cls=_JsonObjectEncoder, sort_keys=True, indent=4, separators=(',', ': '))
[docs]
def pretty_dumps_json(value: Any, compressed: bool = False) -> str:
"""Dump a JSON object into a string, with pretty-printing."""
return dumps_json(value, compressed=compressed)
[docs]
def dumps_jsonc(value: Iterable[Dict]) -> str:
"""Dump a list of dictionary into a JSON string, separated by new lines, with compressed format."""
assert isinstance(value, Sequence)
ret = ''
for v in value:
ret += pretty_dumps_json(v) + '\n'
return ret
[docs]
def dumps_xml(value: Dict, **kwargs) -> str:
"""Dump an XML object into a string."""
return _dict2xml(value, **kwargs)
[docs]
def dumps_yaml(value: Any) -> str:
"""Dump a YAML object into a string."""
return yaml.dump(value, width=80, indent=4)
[docs]
def dumps_struct(value):
"""Dump a structured object into a string, using :meth:`jacinle.utils.printing.stformat`."""
return stformat(value)
[docs]
def dumps_kv(value):
"""Dump a structured object into a string, using :meth:`jacinle.utils.printing.kvformat`."""
return kvformat(value)
[docs]
def dumps_env(value):
"""Dump a structured object into a string, similar to :meth:`os.environ`."""
return '\n'.join(['{} = {}'.format(k, v) for k, v in dict_deep_kv(value)])
def _wrap_load(loads_func):
@functools.wraps(loads_func)
def load(file, **kwargs):
with as_file_descriptor(file, 'r') as f:
return loads_func(f.read(), **kwargs)
load.__name__ = loads_func.__name__[:-1]
load.__qualname__ = loads_func.__qualname__[:-1]
return load
def _wrap_dump(dumps_func):
@functools.wraps(dumps_func)
def dump(file, obj, **kwargs):
with as_file_descriptor(file, 'w') as f:
return f.write(dumps_func(obj, **kwargs))
dump.__name__ = dumps_func.__name__[:-1]
dump.__qualname__ = dumps_func.__qualname__[:-1]
dump.__doc__ = dumps_func.__doc__.replace('a string', 'a file')
return dump
load_json = _wrap_load(loads_json)
load_jsonc = _wrap_load(loads_jsonc)
load_xml = _wrap_load(loads_xml)
load_yaml = _wrap_load(loads_yaml)
dump_txt = _wrap_dump(dumps_txt)
dump_json = _wrap_dump(dumps_json)
dump_jsonc = _wrap_dump(dumps_jsonc)
dump_xml = _wrap_dump(dumps_xml)
dump_yaml = _wrap_dump(dumps_yaml)
dump_struct = _wrap_dump(dumps_struct)
dump_kv = _wrap_dump(dumps_kv)
dump_env = _wrap_dump(dumps_env)
pretty_dump_json = _wrap_dump(pretty_dumps_json)
for registry in ['load', 'pretty_load']:
io_function_registry.register(registry, '.json', load_json)
io_function_registry.register(registry, '.jsonc', load_jsonc)
io_function_registry.register(registry, '.xml', load_xml)
io_function_registry.register(registry, '.yaml', load_yaml)
io_function_registry.register(registry, '.yml', load_yaml)
for registry in ['dump', 'pretty_dump']:
io_function_registry.register(registry, '.txt', dump_txt)
if registry == 'pretty_dump':
io_function_registry.register(registry, '.json', pretty_dump_json)
else:
io_function_registry.register(registry, '.json', dump_json)
io_function_registry.register(registry, '.jsonc', dump_jsonc)
io_function_registry.register(registry, '.xml', dump_xml)
io_function_registry.register(registry, '.yaml', dump_yaml)
io_function_registry.register(registry, '.yml', dump_yaml)
io_function_registry.register(registry, '.struct', dump_struct)
io_function_registry.register(registry, '.kv', dump_kv)
io_function_registry.register(registry, '.env', dump_env)
[docs]
def pretty_load(file, **kwargs):
"""Load a file with pretty-printing."""
return io_function_registry.dispatch('pretty_load', file, **kwargs)
[docs]
def pretty_dump(file, obj, **kwargs):
"""Dump a file with pretty-printing."""
return io_function_registry.dispatch('pretty_dump', file, obj, **kwargs)
def _dict2xml(d, indent=4, *, root_node=None, root_indent=0, name_key='__name__', attribute_key='__attribute__'):
"""Adapted from: https://gist.github.com/reimund/5435343/"""
indent_str = '\n' + ' ' * (indent * root_indent)
if root_node is None and name_key is not None:
root_node = d[name_key]
wrap = False if root_node is None or isinstance(d, list) else True
root = 'data' if root_node is None else root_node
root_singular = root[:-1] if 's' == root[-1] and root_node is None else root
xml = ''
children = []
if isinstance(d, dict):
for key, value in d.items():
if attribute_key is not None and key == attribute_key:
continue
if name_key is not None and key == name_key:
continue
if isinstance(value, dict):
children.append(_dict2xml(value, indent=indent, root_node=key, root_indent=root_indent+1))
elif isinstance(value, list):
children.append(_dict2xml(value, indent=indent, root_node=key, root_indent=root_indent))
else:
children.append(indent_str + ' ' * indent + '<' + key + '>' + str(value) + '</' + key + '>')
else:
for value in d:
children.append(_dict2xml(value, indent=indent, root_node=root_singular, root_indent=root_indent+1))
end_tag = '>' if len(children) > 0 else '/>'
if attribute_key is not None and attribute_key in d:
for key, value in d[attribute_key].items():
xml += f' {key}="{value}"'
if wrap or isinstance(d, dict):
xml = indent_str + '<' + root + xml + end_tag
if len(children) > 0:
for child in children:
xml = xml + child
if wrap or isinstance(d, dict):
xml = xml + indent_str + '</' + root + '>'
return xml
def _xml2dict(element, name_key='__name__', attribute_key='__attribute__'):
output_dict = {}
if name_key is not None:
output_dict[name_key] = element.tag
if attribute_key is None:
output_dict.update(element.attrib)
else:
output_dict[attribute_key] = element.attrib
if len(output_dict) == 0 and len(element) == 0:
return element.text # is a leaf node
list_elements = set()
for c in element:
if c.tag in output_dict and c.tag not in list_elements:
output_dict[c.tag] = [output_dict[c.tag]]
list_elements.add(c.tag)
if c.tag in output_dict:
output_dict[c.tag].append(_xml2dict(c))
else:
output_dict[c.tag] = _xml2dict(c)
return output_dict
class _JsonObjectEncoder(json.JSONEncoder):
"""Adapted from https://stackoverflow.com/a/35483750"""
def default(self, obj):
if hasattr(obj, '__jsonify__'):
json_object = obj.__jsonify__()
if isinstance(json_object, (str, bytes)):
return json_object
return self.encode(json_object)
else:
raise TypeError("Object of type '%s' is not JSON serializable." % obj.__class__.__name__)
if hasattr(obj, '__dict__'):
d = dict(
(key, value)
for key, value in inspect.getmembers(obj)
if not key.startswith("__")
and not inspect.isabstract(value)
and not inspect.isbuiltin(value)
and not inspect.isfunction(value)
and not inspect.isgenerator(value)
and not inspect.isgeneratorfunction(value)
and not inspect.ismethod(value)
and not inspect.ismethoddescriptor(value)
and not inspect.isroutine(value)
)
return self.default(d)
return obj