Got crazy on output coloration

This commit is contained in:
jpic 2020-02-13 01:26:33 +01:00
parent 40335a88a7
commit e7b059ab44
2 changed files with 104 additions and 20 deletions

View File

@ -3,10 +3,87 @@ Asynchronous process execution wrapper.
"""
import asyncio
from colorama import Fore, Back, Style
import os
import shlex
import sys
from .exceptions import WrongResult
import pygments
from pygments import lexers
from pygments.formatters import TerminalFormatter
from pygments.formatters import Terminal256Formatter
class Output:
colors = (
'\x1b[1;36;45m',
'\x1b[1;36;41m',
'\x1b[1;36;40m',
'\x1b[1;37;45m',
'\x1b[1;32m',
'\x1b[1;37;44m',
)
def __init__(self):
self.prefixes = dict()
self.prefix_length = 0
def __call__(self, line, prefix, highlight=True):
if prefix not in self.prefixes:
self.prefixes[prefix] = (
self.colors[len([*self.prefixes.keys()]) - 1]
)
if len(prefix) > self.prefix_length:
self.prefix_length = len(prefix)
prefix_color = self.prefixes[prefix]
prefix_padding = '.' * (self.prefix_length - len(prefix) - 2)
if prefix_padding:
prefix_padding = ' ' + prefix_padding + ' '
if not prefix:
breakpoint()
#breakpoint()
sys.stdout.buffer.write((
(
prefix_color
+ prefix_padding
+ prefix
+ ' '
+ Back.RESET
+ Style.RESET_ALL
+ Fore.LIGHTBLACK_EX
+ '| '
+ Style.RESET_ALL
if prefix
else ''
)
+ self.highlight(line, highlight)
).encode('utf8'))
def cmd(self, line, prefix):
self(
Fore.LIGHTBLACK_EX
+ '+ '
+ Style.RESET_ALL
+ self.highlight(line, 'bash'),
prefix,
highlight=False
)
def highlight(self, line, highlight=True):
if not highlight:
return line
elif isinstance(highlight, str):
lexer = lexers.get_lexer_by_name(highlight)
else:
lexer = lexers.get_lexer_by_name('python')
formatter = Terminal256Formatter(
style=os.getenv('PODCTL_STYLE', 'fruity'))
return pygments.highlight(line, lexer, formatter)
output = Output()
class PrefixStreamProtocol(asyncio.subprocess.SubprocessStreamProtocol):
@ -27,10 +104,7 @@ class PrefixStreamProtocol(asyncio.subprocess.SubprocessStreamProtocol):
for line in data.split(b'\n'):
if not line:
continue
sys.stdout.buffer.write(
self.prefix.encode('utf8') + b' | ' + line + b'\n'
if self.prefix else line + b'\n'
)
output(line, self.prefix)
sys.stdout.flush()
super().pipe_data_received(fd, data)
@ -61,13 +135,14 @@ class Proc:
"""
def __init__(self, *args, prefix=None, raises=True):
args = [str(a) for a in args]
self.cmd = ' '.join(args)
if len(args) == 1:
if isinstance(args[0], (list, tuple)):
args = args[0]
args = self.cmd = args[0]
else:
args = ['sh', '-euc', ' '.join(args)]
self.args = [str(a) for a in args]
self.cmd = shlex.join(self.args)
self.args = args
self.prefix = prefix
self.raises = raises
self.called = False
@ -80,10 +155,7 @@ class Proc:
from .console_script import console_script
debug = console_script.options.get('debug', False)
if debug is True or 'cmd' in str(debug):
if self.prefix:
print(f'{self.prefix} | + {self.cmd}')
else:
print(f'+ {self.cmd}')
output.cmd(self.cmd, self.prefix)
loop = asyncio.events.get_event_loop()
transport, protocol = await loop.subprocess_exec(

View File

@ -3,8 +3,9 @@ import copy
import cli2
import os
import textwrap
import sys
from .proc import Proc
from .proc import output, Proc
class Script:
@ -52,13 +53,18 @@ class Script:
method = 'clean_' + self.name
result = getattr(visitor, method)(self)
if debug is True or 'visit' in str(debug):
print(
getattr(visitable, 'name', '') + ' | ',
'.'.join([type(visitor).__name__, method]),
' '.join(f'{k}={v}' for k, v in visitor.__dict__.items())
output(
''.join([
'.'.join([type(visitor).__name__, method]),
'(',
', '.join(f'{k}={v}' for k, v in visitor.__dict__.items()),
')'
]),
getattr(visitable, 'name', None)
)
if result:
await result
sys.stdout.flush()
for prefix in ('init_', 'pre_', '', 'post_', 'clean_'):
method = prefix + self.name
@ -67,11 +73,16 @@ class Script:
continue
if debug is True or 'visit' in str(debug):
print(
getattr(visitable, 'name', '') + ' | ',
'.'.join([type(visitor).__name__, method]),
' '.join(f'{k}={v}' for k, v in visitor.__dict__.items())
output(
''.join([
'.'.join([type(visitor).__name__, method]),
'(',
', '.join(f'{k}={v}' for k, v in visitor.__dict__.items()),
')'
]),
getattr(visitable, 'name', None)
)
result = getattr(visitor, method)(self)
if result:
try:
@ -79,6 +90,7 @@ class Script:
except Exception as e:
await clean()
raise
sys.stdout.flush()
async def run(self, *args, **kwargs):
if self.unshare and os.getuid() != 0: