226 lines
6.5 KiB
Python
226 lines
6.5 KiB
Python
from copy import deepcopy
|
|
import functools
|
|
import inspect
|
|
import importlib
|
|
import sys
|
|
|
|
from ..output import Output
|
|
from ..exceptions import WrongResult
|
|
|
|
|
|
class Action:
|
|
parent = None
|
|
contextualize = []
|
|
regexps = {
|
|
r'([\w]+):': '{cyan}\\1{gray}:{reset}',
|
|
r'(^|\n)( *)\- ': '\\1\\2{red}-{reset} ',
|
|
}
|
|
options = dict(
|
|
debug=dict(
|
|
alias='d',
|
|
default='visit',
|
|
help='''
|
|
Display debug output. Supports values (combinable): cmd,out,visit
|
|
'''.strip(),
|
|
immediate=True,
|
|
),
|
|
)
|
|
|
|
def __init__(self, *args, doc=None, **kwargs):
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
self.call_args = []
|
|
self.call_kwargs = {}
|
|
self._doc = doc
|
|
self.menu = {
|
|
name: value
|
|
for name, value in kwargs.items()
|
|
if isinstance(value, Action)
|
|
}
|
|
|
|
@property
|
|
def context(self):
|
|
if not self.parent:
|
|
if '_context' not in self.__dict__:
|
|
self._context = dict()
|
|
return self._context
|
|
else:
|
|
return self.parent.context
|
|
|
|
def actions_filter(self, results, f=None, **filters):
|
|
if f:
|
|
def ff(a):
|
|
try:
|
|
return f(a)
|
|
except:
|
|
return False
|
|
results = [*filter(ff, results)]
|
|
|
|
for k, v in filters.items():
|
|
if k == 'type':
|
|
results = [*filter(
|
|
lambda s: type(s).__name__.lower() == str(v).lower(),
|
|
results
|
|
)]
|
|
else:
|
|
results = [*filter(
|
|
lambda s: getattr(s, k, None) == v,
|
|
results
|
|
)]
|
|
|
|
return results
|
|
|
|
|
|
def sibblings(self, f=None, **filters):
|
|
if not self.parent:
|
|
return []
|
|
return self.actions_filter(
|
|
[a for a in self.parent.actions if a is not self],
|
|
f, **filters
|
|
)
|
|
|
|
def parents(self, f=None, **filters):
|
|
if self.parent:
|
|
return self.actions_filter(
|
|
[self.parent] + self.parent.parents(),
|
|
f, **filters
|
|
)
|
|
return []
|
|
|
|
def children(self, f=None, **filters):
|
|
children = []
|
|
def add(parent):
|
|
if parent != self:
|
|
children.append(parent)
|
|
if 'actions' not in parent.__dict__:
|
|
return
|
|
|
|
for action in parent.actions:
|
|
add(action)
|
|
add(self)
|
|
return self.actions_filter(children, f, **filters)
|
|
|
|
def __getattr__(self, name):
|
|
for a in self.parents() + self.sibblings() + self.children():
|
|
if name in a.contextualize:
|
|
return getattr(a, name)
|
|
raise AttributeError(f'{type(self).__name__} has no {name}')
|
|
|
|
async def call(self, *args, **kwargs):
|
|
print(f'{self}.call(*args, **kwargs) not implemented')
|
|
sys.exit(1)
|
|
|
|
def output_factory(self, *args, **kwargs):
|
|
kwargs.setdefault('regexps', self.regexps)
|
|
return Output(**kwargs)
|
|
|
|
async def __call__(self, *args, **kwargs):
|
|
self.call_args = list(self.call_args) + list(args)
|
|
self.call_kwargs.update(kwargs)
|
|
self.output = self.output_factory(*args, **kwargs)
|
|
self.output_start()
|
|
self.status = 'running'
|
|
try:
|
|
result = await self.call(*args, **kwargs)
|
|
except Exception as e:
|
|
self.output_fail(e)
|
|
self.status = 'fail'
|
|
proc = getattr(e, 'proc', None)
|
|
if proc:
|
|
result = proc.rc
|
|
else:
|
|
raise
|
|
else:
|
|
self.output_success()
|
|
if self.status == 'running':
|
|
self.status = 'success'
|
|
finally:
|
|
clean = getattr(self, 'clean', None)
|
|
if clean:
|
|
self.output.clean(self)
|
|
await clean(*args, **kwargs)
|
|
return result
|
|
|
|
def output_start(self):
|
|
if self.kwargs.get('quiet', False):
|
|
return
|
|
self.output.start(self)
|
|
|
|
def output_fail(self, exception=None):
|
|
if self.kwargs.get('quiet', False):
|
|
return
|
|
self.output.fail(self, exception)
|
|
|
|
def output_success(self):
|
|
if self.kwargs.get('quiet', False):
|
|
return
|
|
self.output.success(self)
|
|
|
|
def __repr__(self):
|
|
return ' '.join([type(self).__name__] + list(self.args) + [
|
|
f'{k}={v}'
|
|
for k, v in self.kwargs.items()
|
|
])
|
|
|
|
def colorized(self):
|
|
return ' '.join([
|
|
self.output.colors['pink1']
|
|
+ type(self).__name__
|
|
+ self.output.colors['yellow']
|
|
] + list(self.args) + [
|
|
f'{self.output.colors["blue"]}{k}{self.output.colors["gray"]}={self.output.colors["green2"]}{v}'
|
|
for k, v in self.kwargs_output().items()
|
|
] + [self.output.colors['reset']])
|
|
|
|
def callable(self):
|
|
from ..targets import Localhost
|
|
async def cb(*a, **k):
|
|
from shlax.cli import cli
|
|
script = Localhost(self, quiet=True)
|
|
result = await script(*a, **k)
|
|
|
|
success = functools.reduce(
|
|
lambda a, b: a + b,
|
|
[1 for c in script.children() if c.status == 'success'] or [0])
|
|
if success:
|
|
script.output.success(f'{success} PASS')
|
|
|
|
failures = functools.reduce(
|
|
lambda a, b: a + b,
|
|
[1 for c in script.children() if c.status == 'fail'] or [0])
|
|
if failures:
|
|
script.output.fail(f'{failures} FAIL')
|
|
cli.exit_code = failures
|
|
return result
|
|
return cb
|
|
|
|
def kwargs_output(self):
|
|
return self.kwargs
|
|
|
|
def action(self, action, *args, **kwargs):
|
|
if isinstance(action, str):
|
|
import cli2
|
|
a = cli2.Callable.factory(action).target
|
|
if not a:
|
|
a = cli2.Callable.factory(
|
|
'.'.join(['shlax', action])
|
|
).target
|
|
if a:
|
|
action = a
|
|
|
|
p = action(*args, **kwargs)
|
|
for parent in self.parents():
|
|
if hasattr(parent, 'actions'):
|
|
break
|
|
p.parent = parent
|
|
if 'actions' not in self.__dict__:
|
|
# "mutate" to Strategy
|
|
from ..strategies.script import Actions
|
|
self.actions = Actions(self, [p])
|
|
return p
|
|
|
|
def bind(self, *args):
|
|
clone = deepcopy(self)
|
|
clone.call_args = args
|
|
return clone
|