2024-04-22 16:43:21 +08:00

153 lines
4.6 KiB
Python

import ast
import inspect
import sys
import rich
from inspect import getfullargspec
from functools import wraps
from . import handlers
from . import formatters
from . import flags
from .utils import sign, to_source, comment_to_file, isinstance_noexcept
class Commentor(object):
def __init__(self, output="<stderr>", _globals=dict(), fmt=[], check=True, _exit=True) -> None:
self._locals = dict()
self._globals = dict().update(_globals)
self._return = None
self._formatters = fmt + formatters.LIST
self._lines = []
self._lines_category = []
self.indent = 0
self.state = flags.SOURCE
self.file = output
self._stack_event = flags.NORMAL
self._exit = _exit
self._check = check
def __call__(self, func):
raw_lines, start_lineno = inspect.getsourcelines(func)
self.indent = len(raw_lines[0]) - len(raw_lines[0].lstrip())
unindented_source = ''.join([l[self.indent:] for l in raw_lines])
self.root = ast.parse(unindented_source).body[0]
if self._check: self.check_support()
pt = getfullargspec(func)
if flags.DEBUG:
with open("debug.log", "wt") as f:
print(ast.dump(self.root, indent=4), file=f)
@wraps(func)
def proxy_func(*args, **kwargs):
# input {
self._locals = dict()
# args specified
for target, value in zip(pt.args, args):
self._locals[target] = value
# args defaults
if pt.defaults is not None:
for target, value in zip(pt.args[-len(pt.defaults):], pt.defaults):
self._locals.setdefault(target, value)
# varargs
if pt.varargs is not None:
self._locals[pt.varargs] = args[len(pt.args):]
# kwargs specified
self._locals.update(kwargs)
# kwargs default
if pt.kwonlydefaults is not None:
for target, value in pt.kwonlydefaults.items():
self._locals.setdefault(target, value)
# }
self.process(self.root)
# output {
comments = "\n".join(self._lines)
if comment_to_file(comments, file=self.file):
if self._exit:
exit(0)
return self._return
else:
return comments
# }
return proxy_func
def process(self, node: ast.AST, *args, **kwargs):
node_type = node.__class__.__name__
handler = getattr(handlers, node_type, None)
if handler is None:
raise NotImplementedError(f"Unknown how to handle {node_type} node.")
return handler(node, self, *args, **kwargs)
def eval(self, node: ast.Expr, format=True):
src = node if type(node) == str else to_source(node)
obj = eval(src, self._globals, self._locals)
if not format:
return obj
fmt = self.get_formatter(obj)
fmt_obj = fmt(obj)
if fmt_obj is not None:
return f"{fmt(obj)} : {src}"
def exec(self, node: ast.stmt):
src = to_source(node)
exec(src, self._globals, self._locals)
def get_formatter(self, obj):
for typ, fmt in self._formatters:
if isinstance_noexcept(obj, typ):
return fmt
elif callable(typ) and typ(obj):
return fmt
else:
return repr
def next_line(self) -> int:
return len(self._lines)
def __append(self, line):
self._lines.append(" " * self.indent + str(line))
self._lines_category.append((self.state, self.indent))
return len(self._lines) - 1
def append_source(self, line=None):
if self.state == flags.COMMENT:
self.__append('"""')
self.state = flags.SOURCE
if line is not None:
return self.__append(sign(line, 2))
def append_comment(self, line=None):
if self.state == flags.SOURCE:
self.state = flags.COMMENT
self.__append('"""')
if line is not None:
return self.__append(sign(line, 2))
def check_support(self):
unimpl = []
for node in ast.walk(self.root):
if node.__class__ in flags.HANDLER_FREE_NODES:
continue
node_type = node.__class__.__name__
handler = getattr(handlers, node_type, None)
if handler is None:
unimpl.append(node_type)
if unimpl:
print("Unsupported nodes: ", ", ".join(unimpl))
exit(0)