Complete core rewrite, with documentation

Still missing documentation about Output core component

And actual Action/Targets etc ... in the process of migrating to the new
engine
This commit is contained in:
jpic 2020-02-16 20:14:20 +01:00
parent 87ac000e87
commit b279760374
36 changed files with 693 additions and 1084 deletions

228
README.md Normal file
View File

@ -0,0 +1,228 @@
# Shlax: Pythonic automation tool
Shlax is a Python framework for system automation, initially with the purpose
of replacing docker, docker-compose and ansible with a single tool, with the
purpose of code-reuse. It may be viewed as "async fabric rewrite by a
megalomanic Django fanboy".
The pattern resolves around two moving parts: Actions and Targets.
## Action
An action is a function that takes a target argument, it may execute nested
actions by passing over the target argument which collects the results.
Example:
```python
async def hello_world(target):
"""Bunch of silly commands to demonstrate action programming."""
await target.mkdir('foo')
python = await target.which('python3', 'python')
await target.exec(f'{python} --version > foo/test')
version = target.exec('cat foo/test').output
print('version')
```
### Recursion
An action may call other actions recursively. There are two ways:
```python
async def something(target):
# just run the other action code
hello_world(target)
# or delegate the call to target
target(hello_world)
```
In the first case, the resulting count of ran actions will remain 1:
"something" action.
In the second case, the resulting count of ran actions will be 2: "something"
and "hello_world".
### Callable classes
Actually in practice, Actions are basic callable Python classes, here's a basic
example to run a command:
```python
class Run:
def __init__(self, cmd):
self.cmd = cmd
async def __call__(self, target):
return await target.exec(self.cmd)
```
This allows to create callable objects which may be called just like functions
and as such be appropriate actions, instead of:
```python
async def one(target):
target.exec('one')
async def two(target):
target.exec('two')
```
You can do:
```python
one = Run('one')
two = Run('two')
```
### Parallel execution
Actions may be executed in parallel with an action named ... Parallel. This
defines an action that will execute three actions in parallel:
```python
action = Parallel(
hello_world,
something,
Run('echo hi'),
)
```
In this case, all actions must succeed for the parallel action to be considered
a success.
### Methods
An action may also be a method, as long as it just takes a target argument, for
example:
```python
class Thing:
def start(self, target):
"""Starts thing"""
def stop(self, target):
"""Stops thing"""
action = Thing().start
```
### Cleaning
If an action defines a `clean` method, it will always be called wether or not
the action succeeded. Example:
```python
class Thing:
def __call__(self, target):
"""Do some thing"""
def clean(self, target):
"""Clean-up target after __call__"""
```
### Colorful actions
If an action defines a `colorize` method, it will be called with the colorset
as argument for every output, this allows to code custom output rendering.
## Target
A Target is mainly an object providing an abstraction layer over the system we
want to automate with actions. It defines functions to execute a command, mount
a directory, copy a file, manage environment variables and so on.
### Pre-configuration
A Target can be pre-configured with a list of Actions in which case calling the
target without argument will execute its Actions until one fails by raising an
Exception:
```python
say_hello = Localhost(
hello_world,
Run('echo hi'),
)
await say_hello()
```
### Results
Every time a target execute an action, it will set the "status" attribute on it
to "success" or "failure", and add it to the "results" attribute:
```python
say_hello = Localhost(Run('echo hi'))
await say_hello()
say_hello.results # contains the action with status="success"
```
## Targets as Actions: the nesting story
We've seen that any callable taking a target argument is good to be considered
an action, and that targets are callables.
To make a Target runnable like any action, all we had to do is add the target
keyword argument to `Target.__call__`.
But `target()` fills `self.results`, so nested action results would not
propagate to the parent target.
That's why if Target receives a non-None target argument, it will has to set
`self.parent` with it.
This allows nested targets to traverse parents and get to the root Target
with `target.caller`, where it can then attach results to.
This opens the nice side effect that a target implementation may call the
parent target if any, you could write a Docker target as such:
```python
class Docker(Target):
def __init__(self, *actions, name):
self.name = name
super().__init__(*actions)
async def exec(self, *args):
return await self.parent.exec(*['docker', 'exec', self.name] + args)
```
Don't worry about `self.parent` being set, it is enforced to `Localhost` if
unset so that we always have something that actually spawns a process in the
chain ;)
The result of that design is that the following use cases are open for
business:
```python
# This action installs my favorite package on any distro
action = Packages('python3')
# Run it right here: apt install python3
Localhost()(action)
# Or remotely: ssh yourhost apt install python3
Ssh(host='yourhost')(action)
# Let's make a container build receipe with that action
build = Buildah(package)
# Run it locally: buildah exec apt install python3
Localhost()(build)
# Or on a server: ssh yourhost build exec apt install python3
Ssh(host='yourhost')(build)
# Or on a server behingh a bastion:
# ssh yourbastion ssh yourhost build exec apt install python3
Ssh(host='bastion')(Ssh(host='yourhost')(build))
# That's going to do the same
Ssh(
Ssh(
build,
host='yourhost'
),
host='bastion'
)()
```

View File

@ -1,7 +0,0 @@
from .actions import *
from .image import Image
from .strategies import *
from .output import Output
from .proc import Proc
from .targets import *
from .shlaxfile import Shlaxfile

View File

@ -1,6 +0,0 @@
from .copy import Copy
from .packages import Packages # noqa
from .base import Action # noqa
from .run import Run # noqa
from .pip import Pip
from .service import Service

View File

@ -1,211 +1,2 @@
import functools
import inspect
import importlib
import sys
from ..output import Output
from ..exceptions import WrongResult
class Action: class Action:
parent = None pass
contextualize = []
regexps = {
r'([\w]+):': '{cyan}\\1{gray}:{reset}',
r'(^|\n)( *)\- ': '\\1\\2{red}-{reset} ',
}
options = dict(
debug=dict(
alias='d',
default='visit',
help='''
Display debug output. Supports values (combinable): cmd,out,visit
'''.strip(),
immediate=True,
),
)
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
@property
def context(self):
if not self.parent:
if '_context' not in self.__dict__:
self._context = dict()
return self._context
else:
return self.parent.context
def actions_filter(self, results, f=None, **filters):
if f:
def ff(a):
try:
return f(a)
except:
return False
results = [*filter(ff, results)]
for k, v in filters.items():
if k == 'type':
results = [*filter(
lambda s: type(s).__name__.lower() == str(v).lower(),
results
)]
else:
results = [*filter(
lambda s: getattr(s, k, None) == v,
results
)]
return results
def sibblings(self, f=None, **filters):
if not self.parent:
return []
return self.actions_filter(
[a for a in self.parent.actions if a is not self],
f, **filters
)
def parents(self, f=None, **filters):
if self.parent:
return self.actions_filter(
[self.parent] + self.parent.parents(),
f, **filters
)
return []
def children(self, f=None, **filters):
children = []
def add(parent):
if parent != self:
children.append(parent)
if 'actions' not in parent.__dict__:
return
for action in parent.actions:
add(action)
add(self)
return self.actions_filter(children, f, **filters)
def __getattr__(self, name):
for a in self.parents() + self.sibblings() + self.children():
if name in a.contextualize:
return getattr(a, name)
raise AttributeError(f'{type(self).__name__} has no {name}')
async def call(self, *args, **kwargs):
print(f'{self}.call(*args, **kwargs) not implemented')
sys.exit(1)
def output_factory(self, *args, **kwargs):
kwargs.setdefault('regexps', self.regexps)
return Output(**kwargs)
async def __call__(self, *args, **kwargs):
self.call_args = args
self.call_kwargs = kwargs
self.output = self.output_factory(*args, **kwargs)
self.output_start()
self.status = 'running'
try:
result = await self.call(*args, **kwargs)
except Exception as e:
self.output_fail(e)
self.status = 'fail'
proc = getattr(e, 'proc', None)
if proc:
result = proc.rc
else:
raise
else:
self.output_success()
if self.status == 'running':
self.status = 'success'
finally:
clean = getattr(self, 'clean', None)
if clean:
self.output.clean(self)
await clean(*args, **kwargs)
return result
def output_start(self):
if self.kwargs.get('quiet', False):
return
self.output.start(self)
def output_fail(self, exception=None):
if self.kwargs.get('quiet', False):
return
self.output.fail(self, exception)
def output_success(self):
if self.kwargs.get('quiet', False):
return
self.output.success(self)
def __repr__(self):
return ' '.join([type(self).__name__] + list(self.args) + [
f'{k}={v}'
for k, v in self.kwargs.items()
])
def colorized(self):
return ' '.join([
self.output.colors['pink1']
+ type(self).__name__
+ self.output.colors['yellow']
] + list(self.args) + [
f'{self.output.colors["blue"]}{k}{self.output.colors["gray"]}={self.output.colors["green2"]}{v}'
for k, v in self.kwargs_output().items()
] + [self.output.colors['reset']])
def callable(self):
from ..targets import Localhost
async def cb(*a, **k):
from shlax.cli import cli
script = Localhost(self, quiet=True)
result = await script(*a, **k)
success = functools.reduce(
lambda a, b: a + b,
[1 for c in script.children() if c.status == 'success'] or [0])
if success:
script.output.success(f'{success} PASS')
failures = functools.reduce(
lambda a, b: a + b,
[1 for c in script.children() if c.status == 'fail'] or [0])
if failures:
script.output.fail(f'{failures} FAIL')
cli.exit_code = failures
return result
return cb
def kwargs_output(self):
return self.kwargs
def action(self, action, *args, **kwargs):
if isinstance(action, str):
import cli2
a = cli2.Callable.factory(action).target
if not a:
a = cli2.Callable.factory(
'.'.join(['shlax', action])
).target
if a:
action = a
p = action(*args, **kwargs)
for parent in self.parents():
if hasattr(parent, 'actions'):
break
p.parent = parent
if 'actions' not in self.__dict__:
# "mutate" to Strategy
from ..strategies.script import Actions
self.actions = Actions(self, [p])
return p

View File

@ -1,6 +0,0 @@
from .base import Action
class Copy(Action):
async def call(self, *args, **kwargs):
await self.copy(*self.args)

View File

@ -7,19 +7,16 @@ import os
import subprocess import subprocess
from textwrap import dedent from textwrap import dedent
from .base import Action
class Packages:
class Packages(Action):
""" """
The Packages visitor wraps around the container's package manager. Package manager abstract layer with caching.
It's a central piece of the build process, and does iterate over other It's a central piece of the build process, and does iterate over other
container visitors in order to pick up packages. For example, the Pip container visitors in order to pick up packages. For example, the Pip
visitor will declare ``self.packages = dict(apt=['python3-pip'])``, and the visitor will declare ``self.packages = dict(apt=['python3-pip'])``, and the
Packages visitor will pick it up. Packages visitor will pick it up.
""" """
contextualize = ['mgr']
regexps = { regexps = {
#r'Installing ([\w\d-]+)': '{cyan}\\1', #r'Installing ([\w\d-]+)': '{cyan}\\1',
r'Installing': '{cyan}lol', r'Installing': '{cyan}lol',
@ -55,12 +52,11 @@ class Packages(Action):
installed = [] installed = []
def __init__(self, *packages, **kwargs): def __init__(self, *packages):
self.packages = [] self.packages = []
for package in packages: for package in packages:
line = dedent(package).strip().replace('\n', ' ') line = dedent(package).strip().replace('\n', ' ')
self.packages += line.split(' ') self.packages += line.split(' ')
super().__init__(*packages, **kwargs)
@property @property
def cache_root(self): def cache_root(self):
@ -69,9 +65,9 @@ class Packages(Action):
else: else:
return os.path.join(os.getenv('HOME'), '.cache') return os.path.join(os.getenv('HOME'), '.cache')
async def update(self): async def update(self, target):
# run pkgmgr_setup functions ie. apk_setup # run pkgmgr_setup functions ie. apk_setup
cachedir = await getattr(self, self.mgr + '_setup')() cachedir = await getattr(self, self.mgr + '_setup')(target)
lastupdate = None lastupdate = None
if os.path.exists(cachedir + '/lastupdate'): if os.path.exists(cachedir + '/lastupdate'):
@ -95,7 +91,7 @@ class Packages(Action):
f.write(str(os.getpid())) f.write(str(os.getpid()))
try: try:
await self.rexec(self.cmds['update']) await target.rexec(self.cmds['update'])
finally: finally:
os.unlink(lockfile) os.unlink(lockfile)
@ -103,15 +99,15 @@ class Packages(Action):
f.write(str(now)) f.write(str(now))
else: else:
while os.path.exists(lockfile): while os.path.exists(lockfile):
print(f'{self.container.name} | Waiting for update ...') print(f'{self.target} | Waiting for {lockfile} ...')
await asyncio.sleep(1) await asyncio.sleep(1)
async def call(self, *args, **kwargs): async def __call__(self, target):
cached = getattr(self, '_pagkages_mgr', None) cached = getattr(target, 'pkgmgr', None)
if cached: if cached:
self.mgr = cached self.mgr = cached
else: else:
mgr = await self.which(*self.mgrs.keys()) mgr = await target.which(*self.mgrs.keys())
if mgr: if mgr:
self.mgr = mgr[0].split('/')[-1] self.mgr = mgr[0].split('/')[-1]
@ -119,11 +115,8 @@ class Packages(Action):
raise Exception('Packages does not yet support this distro') raise Exception('Packages does not yet support this distro')
self.cmds = self.mgrs[self.mgr] self.cmds = self.mgrs[self.mgr]
if not getattr(self, '_packages_upgraded', None): await self.update(target)
await self.update() await target.rexec(self.cmds['upgrade'])
if self.kwargs.get('upgrade', True):
await self.rexec(self.cmds['upgrade'])
self._packages_upgraded = True
packages = [] packages = []
for package in self.packages: for package in self.packages:
@ -136,22 +129,22 @@ class Packages(Action):
else: else:
packages.append(package) packages.append(package)
await self.rexec(*self.cmds['install'].split(' ') + packages) await target.rexec(*self.cmds['install'].split(' ') + packages)
async def apk_setup(self): async def apk_setup(self, target):
cachedir = os.path.join(self.cache_root, self.mgr) cachedir = os.path.join(self.cache_root, self.mgr)
await self.mount(cachedir, '/var/cache/apk') await target.mount(cachedir, '/var/cache/apk')
# special step to enable apk cache # special step to enable apk cache
await self.rexec('ln -sf /var/cache/apk /etc/apk/cache') await target.rexec('ln -sf /var/cache/apk /etc/apk/cache')
return cachedir return cachedir
async def dnf_setup(self): async def dnf_setup(self, target):
cachedir = os.path.join(self.cache_root, self.mgr) cachedir = os.path.join(self.cache_root, self.mgr)
await self.mount(cachedir, f'/var/cache/{self.mgr}') await target.mount(cachedir, f'/var/cache/{self.mgr}')
await self.rexec('echo keepcache=True >> /etc/dnf/dnf.conf') await target.rexec('echo keepcache=True >> /etc/dnf/dnf.conf')
return cachedir return cachedir
async def apt_setup(self): async def apt_setup(self, target):
codename = (await self.rexec( codename = (await self.rexec(
f'source {self.mnt}/etc/os-release; echo $VERSION_CODENAME' f'source {self.mnt}/etc/os-release; echo $VERSION_CODENAME'
)).out )).out
@ -163,7 +156,7 @@ class Packages(Action):
await self.mount(cache_lists, f'/var/lib/apt/lists') await self.mount(cache_lists, f'/var/lib/apt/lists')
return cachedir return cachedir
async def pacman_setup(self): async def pacman_setup(self, target):
return self.cache_root + '/pacman' return self.cache_root + '/pacman'
def __repr__(self): def __repr__(self):

11
shlax/actions/parallel.py Normal file
View File

@ -0,0 +1,11 @@
import asyncio
class Parallel:
def __init__(self, *actions):
self.actions = actions
async def __call__(self, target):
return await asyncio.gather(*[
target(action) for action in self.actions
])

View File

@ -1,57 +0,0 @@
from glob import glob
import os
from .base import Action
class Pip(Action):
def __init__(self, *pip_packages, pip=None, requirements=None):
self.requirements = requirements
super().__init__(*pip_packages, pip=pip, requirements=requirements)
async def call(self, *args, **kwargs):
pip = self.kwargs.get('pip', None)
if not pip:
pip = await self.which('pip3', 'pip', 'pip2')
if pip:
pip = pip[0]
else:
from .packages import Packages
action = self.action(
Packages,
'python3,apk', 'python3-pip,apt',
args=args, kwargs=kwargs
)
await action(*args, **kwargs)
pip = await self.which('pip3', 'pip', 'pip2')
if not pip:
raise Exception('Could not install a pip command')
else:
pip = pip[0]
if 'CACHE_DIR' in os.environ:
cache = os.path.join(os.getenv('CACHE_DIR'), 'pip')
else:
cache = os.path.join(os.getenv('HOME'), '.cache', 'pip')
if getattr(self, 'mount', None):
# we are in a target which shares a mount command
await self.mount(cache, '/root/.cache/pip')
await self.exec(f'{pip} install --upgrade pip')
# https://github.com/pypa/pip/issues/5599
if 'pip' not in self.kwargs:
pip = 'python3 -m pip'
source = [p for p in self.args if p.startswith('/') or p.startswith('.')]
if source:
await self.exec(
f'{pip} install --upgrade --editable {" ".join(source)}'
)
nonsource = [p for p in self.args if not p.startswith('/')]
if nonsource:
await self.exec(f'{pip} install --upgrade {" ".join(nonsource)}')
if self.requirements:
await self.exec(f'{pip} install --upgrade -r {self.requirements}')

View File

@ -1,18 +1,11 @@
from ..targets.buildah import Buildah
from ..targets.docker import Docker
from .base import Action
class Run(Action): class Run:
async def call(self, *args, **kwargs): def __init__(self, cmd):
image = self.kwargs.get('image', None) self.cmd = cmd
if not image:
return await self.exec(*self.args, **self.kwargs)
if isinstance(image, Buildah):
breakpoint()
result = await self.action(image, *args, **kwargs)
return await Docker( async def __call__(self, target):
image=image, self.proc = await target.exec(self.cmd)
).exec(*args, **kwargs)
def __str__(self):
return f'Run({self.cmd})'

View File

@ -1,16 +0,0 @@
import asyncio
from .base import Action
class Service(Action):
def __init__(self, *names, state=None):
self.state = state or 'started'
self.names = names
super().__init__()
async def call(self, *args, **kwargs):
return asyncio.gather(*[
self.exec('systemctl', 'start', name, user='root')
for name in self.names
])

View File

@ -1,146 +1,55 @@
''' """
shlax is a micro-framework to orchestrate commands. Shlax executes mostly in 3 ways:
- Execute actions on targets with the command line
shlax yourfile.py: to list actions you have declared. - With your shlaxfile as first argument: offer defined Actions
shlax yourfile.py <action>: to execute a given action - With the name of a module in shlax.repo: a community maintained shlaxfile
#!/usr/bin/env shlax: when making yourfile.py an executable. """
''' import ast
import asyncio
import cli2 import cli2
import copy import glob
import inspect import inspect
import importlib
import os import os
import sys import sys
from .exceptions import *
from .shlaxfile import Shlaxfile
from .targets import Localhost
async def runall(*args, **kwargs):
for name, action in cli.shlaxfile.actions.items():
await Localhost(action)(*args, **kwargs)
@cli2.option('debug', alias='d', help='Display debug output.')
async def test(*args, **kwargs):
"""Run podctl test over a bunch of paths."""
report = []
for arg in args:
candidates = [
os.path.join(os.getcwd(), arg, 'pod.py'),
os.path.join(os.getcwd(), arg, 'pod_test.py'),
]
for candidate in candidates:
if not os.path.exists(candidate):
continue
podfile = Podfile.factory(candidate)
# disable push
for name, container in podfile.containers.items():
commit = container.visitor('commit')
if commit:
commit.push = False
output.print(
'\n\x1b[1;38;5;160;48;5;118m BUILD START \x1b[0m'
+ ' ' + podfile.path + '\n'
)
old_exit_code = console_script.exit_code
console_script.exit_code = 0
try:
await podfile.pod.script('build')()
except Exception as e:
report.append(('build ' + candidate, False))
continue
if console_script.exit_code != 0:
report.append(('build ' + candidate, False))
continue
console_script.exit_code = old_exit_code
for name, test in podfile.tests.items():
name = '::'.join([podfile.path, name])
output.print(
'\n\x1b[1;38;5;160;48;5;118m TEST START \x1b[0m'
+ ' ' + name + '\n'
)
try:
await test(podfile.pod)
except Exception as e:
report.append((name, False))
output.print('\x1b[1;38;5;15;48;5;196m TEST FAIL \x1b[0m' + name)
else:
report.append((name, True))
output.print('\x1b[1;38;5;200;48;5;44m TEST SUCCESS \x1b[0m' + name)
output.print('\n')
print('\n')
for name, success in report:
if success:
output.print('\n\x1b[1;38;5;200;48;5;44m TEST SUCCESS \x1b[0m' + name)
else:
output.print('\n\x1b[1;38;5;15;48;5;196m TEST FAIL \x1b[0m' + name)
print('\n')
success = [*filter(lambda i: i[1], report)]
failures = [*filter(lambda i: not i[1], report)]
output.print(
'\n\x1b[1;38;5;200;48;5;44m TEST TOTAL: \x1b[0m'
+ str(len(report))
)
if success:
output.print(
'\n\x1b[1;38;5;200;48;5;44m TEST SUCCESS: \x1b[0m'
+ str(len(success))
)
if failures:
output.print(
'\n\x1b[1;38;5;15;48;5;196m TEST FAIL: \x1b[0m'
+ str(len(failures))
)
if failures:
console_script.exit_code = 1
class ConsoleScript(cli2.ConsoleScript): class ConsoleScript(cli2.ConsoleScript):
def __call__(self, *args, **kwargs): def __call__(self):
self.shlaxfile = None repo = os.path.join(os.path.dirname(__file__), 'repo')
shlaxfile = sys.argv.pop(1) if len(sys.argv) > 1 else ''
if os.path.exists(shlaxfile.split('::')[0]):
self.shlaxfile = Shlaxfile()
self.shlaxfile.parse(shlaxfile)
for name, action in self.shlaxfile.actions.items():
self[name] = cli2.Callable(
name,
action.callable(),
options={
k: cli2.Option(name=k, **v)
for k, v in action.options.items()
},
color=getattr(action, 'color', cli2.YELLOW),
)
return super().__call__(*args, **kwargs)
def call(self, command): if len(self.argv) > 1:
kwargs = copy.copy(self.parser.funckwargs) repofile = os.path.join(repo, sys.argv[1] + '.py')
kwargs.update(self.parser.options) if os.path.isfile(self.argv[1]):
try: self.argv = sys.argv[1:]
return command(*self.parser.funcargs, **kwargs) self.load_shlaxfile(sys.argv[1])
except WrongResult as e: elif os.path.isfile(repofile):
print(e) self.argv = sys.argv[1:]
self.exit_code = e.proc.rc self.load_shlaxfile(repofile)
except ShlaxException as e: else:
print(e) raise Exception('File not found ' + sys.argv[1])
self.exit_code = 1 else:
available = glob.glob(os.path.join(repo, '*.py'))
return super().__call__()
cli = ConsoleScript(__doc__).add_module('shlax.cli') def load_shlaxfile(self, path):
with open(path) as f:
src = f.read()
tree = ast.parse(src)
members = []
for node in tree.body:
if not isinstance(node, ast.Assign):
continue
if not isinstance(node.value, ast.Call):
continue
members.append(node.targets[0].id)
spec = importlib.util.spec_from_file_location('shlaxfile', sys.argv[1])
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
for member in members:
from shlax.targets.localhost import Localhost
self[member] = cli2.Callable(member, Localhost(getattr(mod, member)))
cli = ConsoleScript(__doc__)

View File

@ -1,37 +0,0 @@
from copy import deepcopy
import yaml
from shlax import *
class GitLabCI(Script):
async def call(self, *args, write=True, **kwargs):
output = dict()
for key, value in self.kwargs.items():
if isinstance(value, dict):
output[key] = deepcopy(value)
image = output[key].get('image', 'alpine')
if hasattr(image, 'image'):
output[key]['image'] = image.image.repository + ':$CI_COMMIT_SHORT_SHA'
else:
output[key] = value
output = yaml.dump(output)
if kwargs['debug'] is True:
self.output(output)
if write:
with open('.gitlab-ci.yml', 'w+') as f:
f.write(output)
from shlax.cli import cli
for arg in args:
job = self.kwargs[arg]
_args = []
if not isinstance(job['image'], str):
image = str(job['image'].image)
else:
image = job['image']
await self.action('Docker', Run(job['script']), image=image)(*_args, **kwargs)
def colorized(self):
return type(self).__name__

View File

@ -1,22 +0,0 @@
class ShlaxException(Exception):
pass
class Mistake(ShlaxException):
pass
class WrongResult(ShlaxException):
def __init__(self, proc):
self.proc = proc
msg = f'FAIL exit with {proc.rc} ' + proc.args[0]
if not proc.debug or 'cmd' not in str(proc.debug):
msg += '\n' + proc.cmd
if not proc.debug or 'out' not in str(proc.debug):
msg += '\n' + proc.out
msg += '\n' + proc.err
super().__init__(msg)

View File

@ -1,6 +1,7 @@
import os import os
import re import re
class Image: class Image:
ENV_TAGS = ( ENV_TAGS = (
# gitlab # gitlab

View File

@ -25,7 +25,23 @@ class Output:
def colorize(self, code, content): def colorize(self, code, content):
return self.color(code) + content + self.color() return self.color(code) + content + self.color()
def __init__(self, prefix=None, regexps=None, debug=False, write=None, flush=None, **kwargs): def colorized(self):
if hasattr(self.subject, 'colorized'):
return self.subject.colorized(self.colors)
else:
return str(self.subject)
def __init__(
self,
subject=None,
prefix=None,
regexps=None,
debug='cmd,visit,out',
write=None,
flush=None,
**kwargs
):
self.subject = subject
self.prefix = prefix self.prefix = prefix
self.debug = debug self.debug = debug
self.prefix_length = 0 self.prefix_length = 0
@ -98,52 +114,77 @@ class Output:
return line return line
def test(self, action): def test(self):
if self.debug is True:
self(''.join([ self(''.join([
self.colors['purplebold'], self.colors['purplebold'],
'! TEST ', '! TEST ',
self.colors['reset'], self.colors['reset'],
action.colorized(), self.colorized(),
'\n', '\n',
])) ]))
def clean(self, action): def clean(self):
if self.debug is True: if self.debug:
self(''.join([ self(''.join([
self.colors['bluebold'], self.colors['bluebold'],
'+ CLEAN ', '+ CLEAN ',
self.colors['reset'], self.colors['reset'],
action.colorized(), self.colorized(),
'\n', '\n',
])) ]))
def start(self, action): def start(self):
if self.debug is True or 'visit' in str(self.debug): if self.debug is True or 'visit' in str(self.debug):
self(''.join([ self(''.join([
self.colors['orangebold'], self.colors['orangebold'],
'⚠ START ', '⚠ START ',
self.colors['reset'], self.colors['reset'],
action.colorized(), self.colorized(),
'\n', '\n',
])) ]))
def success(self, action): def success(self):
if self.debug is True or 'visit' in str(self.debug): if self.debug is True or 'visit' in str(self.debug):
self(''.join([ self(''.join([
self.colors['greenbold'], self.colors['greenbold'],
'✔ SUCCESS ', '✔ SUCCESS ',
self.colors['reset'], self.colors['reset'],
action.colorized() if hasattr(action, 'colorized') else str(action), self.colorized(),
'\n', '\n',
])) ]))
def fail(self, action, exception=None): def fail(self, exception=None):
if self.debug is True or 'visit' in str(self.debug): if self.debug is True or 'visit' in str(self.debug):
self(''.join([ self(''.join([
self.colors['redbold'], self.colors['redbold'],
'✘ FAIL ', '✘ FAIL ',
self.colors['reset'], self.colors['reset'],
action.colorized() if hasattr(action, 'colorized') else str(action), self.colorized(),
'\n',
]))
def results(self):
success = 0
fail = 0
for result in self.subject.results:
if result.status == 'success':
success += 1
if result.status == 'failure':
fail += 1
self(''.join([
self.colors['greenbold'],
'✔ SUCCESS REPORT: ',
self.colors['reset'],
str(success),
'\n',
]))
if fail:
self(''.join([
self.colors['redbold'],
'✘ FAIL REPORT: ',
self.colors['reset'],
str(fail),
'\n', '\n',
])) ]))

View File

@ -7,10 +7,25 @@ import os
import shlex import shlex
import sys import sys
from .exceptions import WrongResult
from .output import Output from .output import Output
class ProcFailure(Exception):
def __init__(self, proc):
self.proc = proc
msg = f'FAIL exit with {proc.rc} ' + proc.args[0]
if not proc.output.debug or 'cmd' not in str(proc.output.debug):
msg += '\n' + proc.cmd
if not proc.output.debug or 'out' not in str(proc.output.debug):
msg += '\n' + proc.out
msg += '\n' + proc.err
super().__init__(msg)
class PrefixStreamProtocol(asyncio.subprocess.SubprocessStreamProtocol): class PrefixStreamProtocol(asyncio.subprocess.SubprocessStreamProtocol):
""" """
Internal subprocess stream protocol to add a prefix in front of output to Internal subprocess stream protocol to add a prefix in front of output to
@ -54,8 +69,7 @@ class Proc:
""" """
test = False test = False
def __init__(self, *args, prefix=None, raises=True, debug=None, output=None): def __init__(self, *args, prefix=None, raises=True, output=None):
self.debug = debug if not self.test else False
self.output = output or Output() self.output = output or Output()
self.cmd = ' '.join(args) self.cmd = ' '.join(args)
self.args = args self.args = args
@ -87,7 +101,7 @@ class Proc:
if self.called: if self.called:
raise Exception('Already called: ' + self.cmd) raise Exception('Already called: ' + self.cmd)
if self.debug is True or 'cmd' in str(self.debug): if 'cmd' in str(self.output.debug):
self.output.cmd(self.cmd) self.output.cmd(self.cmd)
if self.test: if self.test:
@ -123,7 +137,7 @@ class Proc:
if not self.communicated: if not self.communicated:
await self.communicate() await self.communicate()
if self.raises and self.proc.returncode: if self.raises and self.proc.returncode:
raise WrongResult(self) raise ProcFailure(self)
return self return self
@property @property

13
shlax/result.py Normal file
View File

@ -0,0 +1,13 @@
class Result:
def __init__(self, target, action):
self.target = target
self.action = action
self.status = 'pending'
self.exception = None
class Results(list):
def new(self, target, action):
result = Result(target, action)
self.append(result)
return result

View File

@ -1,27 +0,0 @@
import importlib
import os
from .actions.base import Action
class Shlaxfile:
def __init__(self, actions=None, tests=None):
self.actions = actions or {}
self.tests = tests or {}
self.paths = []
def parse(self, path):
spec = importlib.util.spec_from_file_location('shlaxfile', path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
for name, value in mod.__dict__.items():
if isinstance(value, Action):
value.name = name
self.actions[name] = value
elif callable(value) and getattr(value, '__name__', '').startswith('test_'):
self.tests[value.__name__] = value
self.paths.append(path)
@property
def path(self):
return self.paths[0]

7
shlax/shortcuts.py Normal file
View File

@ -0,0 +1,7 @@
from .targets.base import Target
from .targets.buildah import Buildah
from .targets.localhost import Localhost
from .targets.stub import Stub
from .actions.packages import Packages
from .actions.run import Run

View File

@ -1,3 +0,0 @@
from .asyn import Async
from .script import Script
from .pod import Pod, Container

View File

@ -1,11 +0,0 @@
import asyncio
from .script import Script
class Async(Script):
async def call(self, *args, **kwargs):
return await asyncio.gather(*[
action(*args, **kwargs)
for action in self.actions
])

View File

@ -1,27 +0,0 @@
import os
from .script import Script
class Container(Script):
async def call(self, *args, **kwargs):
if not args or 'build' in args:
await self.kwargs['build'](**kwargs)
self.image = self.kwargs['build'].image
if not args or 'test' in args:
self.output.test(self)
await self.action('Docker',
*self.kwargs['test'].actions,
image=self.image,
mount={'.': '/app'},
workdir='/app',
)(**kwargs)
if not args or 'push' in args:
await self.image.push(action=self)
#name = kwargs.get('name', os.getcwd()).split('/')[-1]
class Pod(Script):
pass

View File

@ -1,34 +0,0 @@
import copy
import os
from ..exceptions import WrongResult
from ..actions.base import Action
from ..proc import Proc
class Actions(list):
def __init__(self, owner, actions):
self.owner = owner
super().__init__()
for action in actions:
self.append(action)
def append(self, value):
value = copy.deepcopy(value)
value.parent = self.owner
value.status = 'pending'
super().append(value)
class Script(Action):
contextualize = ['shargs', 'exec', 'rexec', 'env', 'which', 'copy']
def __init__(self, *actions, **kwargs):
super().__init__(**kwargs)
self.actions = Actions(self, actions)
async def call(self, *args, **kwargs):
for action in self.actions:
result = await action(*args, **kwargs)
if action.status != 'success':
break

View File

@ -1,4 +0,0 @@
from .buildah import Buildah
from .docker import Docker
from .localhost import Localhost
from .ssh import Ssh

80
shlax/targets/base.py Normal file
View File

@ -0,0 +1,80 @@
import copy
import re
from ..output import Output
from ..proc import Proc
from ..result import Result, Results
class Target:
def __init__(self, *actions, **options):
self.actions = actions
self.options = options
self.results = []
self.output = Output(self, **self.options)
self.parent = None
@property
def caller(self):
"""Traverse parents and return the top-levels Target."""
if not self.parent:
return self
caller = self.parent
while caller.parent:
caller = caller.parent
return caller
async def __call__(self, *actions, target=None):
if target:
# that's going to be used by other target methods, to access
# the calling target
self.parent = target
for action in actions or self.actions:
result = Result(self, action)
self.output = Output(action, **self.options)
self.output.start()
try:
await action(target=self)
except Exception as e:
self.output.fail(e)
result.status = 'failure'
result.exception = e
if actions:
# nested call, re-raise
raise
else:
break
else:
self.output.success()
result.status = 'success'
finally:
self.caller.results.append(result)
clean = getattr(action, 'clean', None)
if clean:
action.result = result
self.output.clean()
await clean(self)
async def rexec(self, *args, **kwargs):
kwargs['user'] = 'root'
return await self.exec(*args, **kwargs)
async def which(self, *cmd):
"""
Return the first path to the cmd in the container.
If cmd argument is a list then it will try all commands.
"""
proc = await self.exec('type ' + ' '.join(cmd), raises=False)
result = []
for res in proc.out.split('\n'):
match = re.match('([^ ]+) is ([^ ]+)$', res.strip())
if match:
result.append(match.group(1))
return result
async def exec(self, *args, **kwargs):
raise NotImplemented()

View File

@ -1,127 +1,89 @@
import asyncio
import os import os
import asyncio
from pathlib import Path
import signal
import shlex
import subprocess
import sys import sys
import textwrap from pathlib import Path
from .base import Target
from ..actions.base import Action
from ..exceptions import Mistake
from ..proc import Proc
from ..image import Image from ..image import Image
from .localhost import Localhost from ..proc import Proc
class Buildah(Localhost): class Buildah(Target):
""" def __init__(self,
The build script iterates over visitors and runs the build functions, it *actions,
also provides wrappers around the buildah command. base=None, commit=None,
""" cmd=None,
contextualize = Localhost.contextualize + ['mnt', 'ctr', 'mount', 'image'] **options):
self.base = base or 'alpine'
self.image = Image(commit) if commit else None
def __init__(self, base, *args, commit=None, push=False, cmd=None, **kwargs):
if isinstance(base, Action):
args = [base] + list(args)
base = 'alpine' # default selection in case of mistake
super().__init__(*args, **kwargs)
self.base = base
self.mounts = dict()
self.ctr = None self.ctr = None
self.mnt = None self.mnt = None
self.image = Image(commit) if commit else None self.mounts = dict()
self.config= dict(
self.config = dict(
cmd=cmd or 'sh', cmd=cmd or 'sh',
) )
def shargs(self, *args, user=None, buildah=True, **kwargs): # Always consider localhost as parent for now
if not buildah or args[0].startswith('buildah'): self.parent = Target()
return super().shargs(*args, user=user, **kwargs)
_args = ['buildah', 'run'] super().__init__(*actions, **options)
if user:
_args += ['--user', user]
_args += [self.ctr, '--', 'sh', '-euc']
return super().shargs(
*(
_args
+ [' '.join([str(a) for a in args])]
),
**kwargs
)
def __repr__(self): def is_runnable(self):
return f'Base({self.base})' return Proc.test or os.getuid() == 0
async def config(self, line): def __str__(self):
"""Run buildah config.""" if not self.is_runnable():
return await self.exec(f'buildah config {line} {self.ctr}', buildah=False) return 'Replacing with: buildah unshare ' + ' '.join(sys.argv)
return 'Buildah image builder'
async def copy(self, *args): async def __call__(self, *actions, target=None):
"""Run buildah copy to copy a file from host into container.""" self.parent = target
src = args[:-1] if not self.is_runnable():
dst = args[-1] os.execvp('buildah', ['buildah', 'unshare'] + sys.argv)
await self.mkdir(dst) # program has been replaced
procs = [] self.ctr = (await self.parent.exec('buildah', 'from', self.base)).out
for s in src: self.mnt = Path((await self.parent.exec('buildah', 'mount', self.ctr)).out)
if Path(s).is_dir(): await super().__call__()
target = self.mnt / s
if not target.exists():
await self.mkdir(target)
args = ['buildah', 'copy', self.ctr, s, Path(dst) / s]
else:
args = ['buildah', 'copy', self.ctr, s, dst]
procs.append(self.exec(*args, buildah=False))
return await asyncio.gather(*procs) async def clean(self, target):
for src, dst in self.mounts.items():
await self.parent.exec('umount', self.mnt / str(dst)[1:])
if self.result.status == 'success':
await self.commit()
if os.getenv('BUILDAH_PUSH'):
await self.image.push(target)
if self.mnt is not None:
await self.parent.exec('buildah', 'umount', self.ctr)
if self.ctr is not None:
await self.parent.exec('buildah', 'rm', self.ctr)
async def mount(self, src, dst): async def mount(self, src, dst):
"""Mount a host directory into the container.""" """Mount a host directory into the container."""
target = self.mnt / str(dst)[1:] target = self.mnt / str(dst)[1:]
await self.exec(f'mkdir -p {src} {target}', buildah=False) await self.parent.exec(f'mkdir -p {src} {target}')
await self.exec(f'mount -o bind {src} {target}', buildah=False) await self.parent.exec(f'mount -o bind {src} {target}')
self.mounts[src] = dst self.mounts[src] = dst
def is_runnable(self): async def exec(self, *args, user=None, **kwargs):
return ( _args = ['buildah', 'run']
Proc.test if user:
or os.getuid() == 0 _args += ['--user', user]
) _args += [self.ctr, '--', 'sh', '-euc']
_args += [' '.join([str(a) for a in args])]
async def call(self, *args, **kwargs): return await self.parent.exec(*_args, **kwargs)
if self.is_runnable():
self.ctr = (await self.exec('buildah', 'from', self.base, buildah=False)).out
self.mnt = Path((await self.exec('buildah', 'mount', self.ctr, buildah=False)).out)
result = await super().call(*args, **kwargs)
return result
from shlax.cli import cli
debug = kwargs.get('debug', False)
# restart under buildah unshare environment
argv = [
'buildah', 'unshare',
sys.argv[0], # current script location
cli.shlaxfile.path, # current shlaxfile location
]
if debug is True:
argv.append('-d')
elif isinstance(debug, str) and debug:
argv.append('-d=' + debug)
argv += [
cli.parser.command.name, # script name ?
]
await self.exec(*argv)
async def commit(self): async def commit(self):
if not self.image: if not self.image:
return return
for key, value in self.config.items(): for key, value in self.config.items():
await self.exec(f'buildah config --{key} "{value}" {self.ctr}') await self.parent.exec(f'buildah config --{key} "{value}" {self.ctr}')
self.sha = (await self.exec( self.sha = (await self.exec(
'buildah', 'buildah',
@ -137,20 +99,4 @@ class Buildah(Localhost):
tags = [self.image.repository] tags = [self.image.repository]
for tag in tags: for tag in tags:
await self.exec('buildah', 'tag', self.sha, tag, buildah=False) await self.parent.exec('buildah', 'tag', self.sha, tag)
async def clean(self, *args, **kwargs):
if self.is_runnable():
for src, dst in self.mounts.items():
await self.exec('umount', self.mnt / str(dst)[1:], buildah=False)
if self.status == 'success':
await self.commit()
if 'push' in args:
await self.image.push(action=self)
if self.mnt is not None:
await self.exec('buildah', 'umount', self.ctr, buildah=False)
if self.ctr is not None:
await self.exec('buildah', 'rm', self.ctr, buildah=False)

View File

@ -1,76 +0,0 @@
import asyncio
from pathlib import Path
import os
from ..image import Image
from .localhost import Localhost
class Docker(Localhost):
contextualize = Localhost.contextualize + ['mnt', 'ctr', 'mount']
def __init__(self, *args, **kwargs):
self.image = kwargs.get('image', 'alpine')
if not isinstance(self.image, Image):
self.image = Image(self.image)
super().__init__(*args, **kwargs)
self.context['ctr'] = None
def shargs(self, *args, daemon=False, **kwargs):
if args[0] == 'docker':
return args, kwargs
extra = []
if 'user' in kwargs:
extra += ['--user', kwargs.pop('user')]
args, kwargs = super().shargs(*args, **kwargs)
if self.context['ctr']:
executor = 'exec'
extra = [self.context['ctr']]
return [self.kwargs.get('docker', 'docker'), executor, '-t'] + extra + list(args), kwargs
executor = 'run'
cwd = os.getcwd()
if daemon:
extra += ['-d']
extra = extra + ['-v', f'{cwd}:{cwd}', '-w', f'{cwd}']
return [self.kwargs.get('docker', 'docker'), executor, '-t'] + extra + [str(self.image)] + list(args), kwargs
async def call(self, *args, **kwargs):
name = kwargs.get('name', os.getcwd()).split('/')[-1]
self.context['ctr'] = (
await self.exec(
'docker', 'ps', '-aq', '--filter',
'name=' + name,
raises=False
)
).out.split('\n')[0]
if 'recreate' in args and self.context['ctr']:
await self.exec('docker', 'rm', '-f', self.context['ctr'])
self.context['ctr'] = None
if self.context['ctr']:
self.context['ctr'] = (await self.exec('docker', 'start', name)).out
return await super().call(*args, **kwargs)
async def copy(self, *args):
src = args[:-1]
dst = args[-1]
await self.mkdir(dst)
procs = []
for s in src:
'''
if Path(s).is_dir():
await self.mkdir(s)
args = ['docker', 'copy', self.ctr, s, Path(dst) / s]
else:
args = ['docker', 'copy', self.ctr, s, dst]
'''
args = ['docker', 'cp', s, self.context['ctr'] + ':' + dst]
procs.append(self.exec(*args))
return await asyncio.gather(*procs)

View File

@ -1,14 +1,14 @@
import os import copy
import re import re
from shlax.proc import Proc from ..output import Output
from ..proc import Proc
from ..result import Result, Results
from ..strategies.script import Script from .base import Target
class Localhost(Script): class Localhost(Target):
root = '/'
def shargs(self, *args, **kwargs): def shargs(self, *args, **kwargs):
user = kwargs.pop('user', None) user = kwargs.pop('user', None)
args = [str(arg) for arg in args if args is not None] args = [str(arg) for arg in args if args is not None]
@ -24,48 +24,17 @@ class Localhost(Script):
elif user: elif user:
args = ['sudo', '-u', user] + args args = ['sudo', '-u', user] + args
return args, kwargs
if self.parent: if self.parent:
return self.parent.shargs(*args, **kwargs) return self.parent.shargs(*args, **kwargs)
else: else:
return args, kwargs return args, kwargs
async def exec(self, *args, **kwargs): async def exec(self, *args, **kwargs):
if 'debug' not in kwargs: kwargs['output'] = self.output
kwargs['debug'] = getattr(self, 'call_kwargs', {}).get('debug', False)
kwargs.setdefault('output', self.output)
args, kwargs = self.shargs(*args, **kwargs) args, kwargs = self.shargs(*args, **kwargs)
proc = await Proc(*args, **kwargs)() proc = await Proc(*args, **kwargs)()
if kwargs.get('wait', True): if kwargs.get('wait', True):
await proc.wait() await proc.wait()
return proc return proc
async def rexec(self, *args, **kwargs):
kwargs['user'] = 'root'
return await self.exec(*args, **kwargs)
async def env(self, name):
return (await self.exec('echo $' + name)).out
async def which(self, *cmd):
"""
Return the first path to the cmd in the container.
If cmd argument is a list then it will try all commands.
"""
proc = await self.exec('type ' + ' '.join(cmd), raises=False)
result = []
for res in proc.out.split('\n'):
match = re.match('([^ ]+) is ([^ ]+)$', res.strip())
if match:
result.append(match.group(1))
return result
async def copy(self, *args):
args = ['cp', '-ra'] + list(args)
return await self.exec(*args)
async def mount(self, *dirs):
pass
async def mkdir(self, *dirs):
return await self.exec(*['mkdir', '-p'] + list(dirs))

View File

@ -1,17 +0,0 @@
import os
from shlax.proc import Proc
from .localhost import Localhost
class Ssh(Localhost):
root = '/'
def __init__(self, host, *args, **kwargs):
self.host = host
super().__init__(*args, **kwargs)
def shargs(self, *args, **kwargs):
args, kwargs = super().shargs(*args, **kwargs)
return (['ssh', self.host] + list(args)), kwargs

23
shlax/targets/stub.py Normal file
View File

@ -0,0 +1,23 @@
from ..proc import Proc
from .base import Target
class ProcStub(Proc):
async def __call__(self, wait=True):
return self
async def communicate(self):
self.communicated = True
return self
async def wait(self):
return self
class Stub(Target):
async def exec(self, *args, **kwargs):
proc = await ProcStub(*args, **kwargs)()
if kwargs.get('wait', True):
await proc.wait()
return proc

View File

@ -1,55 +1,12 @@
#!/usr/bin/env shlax #!/usr/bin/env shlax
from shlax.contrib.gitlab import * """
Shlaxfile for shlax itself.
"""
PYTEST = 'py.test -svv tests' from shlax.shortcuts import *
test = Script(
Pip('.[test]'),
Run(PYTEST),
)
build = Buildah( build = Buildah(
'quay.io/podman/stable', Run('echo hi'),
Packages('python38', 'buildah', 'unzip', 'findutils', 'python3-yaml', upgrade=False), Packages('python38'),
Async( base='quay.io/podman/stable',
# python3.8 on centos with pip dance ...
Run('''
curl -o setuptools.zip https://files.pythonhosted.org/packages/42/3e/2464120172859e5d103e5500315fb5555b1e908c0dacc73d80d35a9480ca/setuptools-45.1.0.zip
unzip setuptools.zip
mkdir -p /usr/local/lib/python3.8/site-packages/
sh -c "cd setuptools-* && python3.8 setup.py install"
easy_install-3.8 pip
echo python3.8 -m pip > /usr/bin/pip
chmod +x /usr/bin/pip
'''),
Copy('shlax/', 'setup.py', '/app'),
),
Pip('/app[full]'),
commit='docker.io/yourlabs/shlax',
workdir='/app',
)
shlax = Container(
build=build,
test=Script(Run('./shlaxfile.py -d test')),
)
gitlabci = GitLabCI(
test=dict(
stage='build',
script='pip install -U --user -e .[test] && ' + PYTEST,
image='yourlabs/python',
),
build=dict(
stage='build',
image='yourlabs/shlax',
script='pip install -U --user -e . && CACHE_DIR=$(pwd)/.cache ./shlaxfile.py -d shlax build push',
cache=dict(paths=['.cache'], key='cache'),
),
pypi=dict(
stage='deploy',
only=['tags'],
image='yourlabs/python',
script='pypi-release',
),
) )

View File

@ -1,53 +0,0 @@
from shlax import *
inner = Run()
other = Run('ls')
middle = Buildah('alpine', inner, other)
outer = Localhost(middle)
middle = outer.actions[0]
other = middle.actions[1]
inner = middle.actions[0]
def test_action_init_args():
assert other.args == ('ls',)
def test_action_parent_autoset():
assert list(outer.actions) == [middle]
assert middle.parent == outer
assert inner.parent == middle
assert other.parent == middle
def test_action_context():
assert outer.context is inner.context
assert middle.context is inner.context
assert middle.context is outer.context
assert other.context is outer.context
def test_action_sibblings():
assert inner.sibblings() == [other]
assert inner.sibblings(lambda s: s.args[0] == 'ls') == [other]
assert inner.sibblings(lambda s: s.args[0] == 'foo') == []
assert inner.sibblings(type='run') == [other]
assert inner.sibblings(args=('ls',)) == [other]
def test_actions_parents():
assert other.parents() == [middle, outer]
assert other.parents(lambda p: p.base == 'alpine') == [middle]
assert inner.parents(type='localhost') == [outer]
assert inner.parents(type='buildah') == [middle]
def test_action_childrens():
assert middle.children() == [inner, other]
assert middle.children(lambda a: a.args[0] == 'ls') == [other]
assert outer.children() == [middle, inner, other]
def test_action_getattr():
assert other.exec == middle.exec
assert other.shargs == middle.shargs

View File

@ -1,7 +1,7 @@
import pytest import pytest
import os import os
from shlax import Image from shlax.image import Image
tests = { tests = {

View File

@ -1,5 +1,5 @@
import pytest import pytest
from shlax import Output from shlax.output import Output
class Write: class Write:

View File

@ -1,65 +0,0 @@
import pytest
from unittest.mock import patch
from shlax import *
from shlax import proc
test_args_params = [
(
Localhost(Run('echo hi')),
[('sh', '-euc', 'echo hi')]
),
(
Localhost(Run('echo hi', user='jimi')),
[('sudo', '-u', 'jimi', 'sh', '-euc', 'echo hi')]
),
(
Localhost(Run('echo hi', user='root')),
[('sudo', 'sh', '-euc', 'echo hi')]
),
(
Ssh('host', Run('echo hi', user='root')),
[('ssh', 'host', 'sudo', 'sh', '-euc', 'echo hi')]
),
(
Buildah('alpine', Run('echo hi')),
[
('buildah', 'from', 'alpine'),
('buildah', 'mount', ''),
('buildah', 'run', '', '--', 'sh', '-euc', 'echo hi'),
('buildah', 'umount', ''),
('buildah', 'rm', ''),
]
),
(
Buildah('alpine', Run('echo hi', user='root')),
[
('buildah', 'from', 'alpine'),
('buildah', 'mount', ''),
('buildah', 'run', '--user', 'root', '', '--', 'sh', '-euc', 'echo hi'),
('buildah', 'umount', ''),
('buildah', 'rm', ''),
]
),
(
Ssh('host', Buildah('alpine', Run('echo hi', user='root'))),
[
('ssh', 'host', 'buildah', 'from', 'alpine'),
('ssh', 'host', 'buildah', 'mount', ''),
('ssh', 'host', 'buildah', 'run', '--user', 'root', '', '--', 'sh', '-euc', 'echo hi'),
('ssh', 'host', 'buildah', 'umount', ''),
('ssh', 'host', 'buildah', 'rm', ''),
]
),
]
@pytest.mark.parametrize(
'script,commands',
test_args_params
)
@pytest.mark.asyncio
async def test_args(script, commands):
with Proc.mock():
await script()
assert commands == Proc.test

101
tests/test_target.py Normal file
View File

@ -0,0 +1,101 @@
import pytest
from shlax.targets.stub import Stub
from shlax.actions.run import Run
from shlax.actions.parallel import Parallel
from shlax.result import Result
class Error:
async def __call__(self, target):
raise Exception('lol')
@pytest.mark.asyncio
async def test_success():
action = Run('echo hi')
target = Stub(action)
await target()
assert target.results[0].action == action
assert target.results[0].status == 'success'
@pytest.mark.asyncio
async def test_error():
action = Error()
target = Stub(action)
await target()
assert target.results[0].action == action
assert target.results[0].status == 'failure'
@pytest.mark.asyncio
async def test_nested():
nested = Error()
class Nesting:
async def __call__(self, target):
await target(nested)
nesting = Nesting()
target = Stub(nesting)
await target()
assert len(target.results) == 2
assert target.results[0].status == 'failure'
assert target.results[0].action == nested
assert target.results[1].status == 'failure'
assert target.results[1].action == nesting
@pytest.mark.asyncio
async def test_parallel():
winner = Run('echo hi')
looser = Error()
parallel = Parallel(winner, looser)
target = Stub(parallel)
await target()
assert len(target.results) == 3
assert target.results[0].status == 'success'
assert target.results[0].action == winner
assert target.results[1].status == 'failure'
assert target.results[1].action == looser
assert target.results[2].status == 'failure'
assert target.results[2].action == parallel
@pytest.mark.asyncio
async def test_function():
async def hello(target):
target.exec('hello')
await Stub()(hello)
@pytest.mark.asyncio
async def test_method():
class Example:
def __init__(self):
self.was_called = False
async def test(self, target):
self.was_called = True
example = Example()
action = example.test
target = Stub()
await target(action)
assert example.was_called
@pytest.mark.asyncio
async def test_target_action():
child = Stub(Run('echo hi'))
parent = Stub(child)
grandpa = Stub()
await grandpa(parent)
assert len(grandpa.results) == 3
grandpa = Stub(parent)
await grandpa()
assert len(grandpa.results) == 3