rewritewip

This commit is contained in:
jpic 2020-04-21 00:14:59 +02:00
parent 6b15838059
commit 94a2a414b3
43 changed files with 301 additions and 1969 deletions

153
README.md Normal file
View File

@ -0,0 +1,153 @@
# Shlax: Pythonic automation tool
Shlax is a Python framework for system automation, initially with the purpose
of replacing docker, docker-compose and ansible with a single tool, with the
purpose of code-reuse. It may be viewed as "async fabric rewrite by a
megalomanic Django fanboy".
The pattern resolves around two moving parts: Actions and Targets.
## Action
An action is a function that takes a target argument, it may execute nested
actions by passing over the target argument which collects the results.
Example:
```python
async def hello_world(target):
"""Bunch of silly commands to demonstrate action programming."""
await target.mkdir('foo')
python = await target.which('python3', 'python')
await target.exec(f'{python} --version > foo/test')
version = target.exec('cat foo/test').output
print('version')
```
### Recursion
An action may call other actions recursively. There are two ways:
```python
async def something(target):
# just run the other action code
hello_world(target)
# or delegate the call to target
target(hello_world)
```
In the first case, the resulting count of ran actions will remain 1:
"something" action.
In the second case, the resulting count of ran actions will be 2: "something"
and "hello_world".
### Callable classes
Actually in practice, Actions are basic callable Python classes, here's a basic
example to run a command:
```python
class Run:
def __init__(self, cmd):
self.cmd = cmd
async def __call__(self, target):
return await target.exec(self.cmd)
```
This allows to create callable objects which may be called just like functions
and as such be appropriate actions, instead of:
```python
async def one(target):
target.exec('one')
async def two(target):
target.exec('two')
```
You can do:
```python
one = Run('one')
two = Run('two')
```
### Parallel execution
Actions may be executed in parallel with an action named ... Parallel. This
defines an action that will execute three actions in parallel:
```python
action = Parallel(
hello_world,
something,
Run('echo hi'),
)
```
In this case, all actions must succeed for the parallel action to be considered
a success.
### Methods
An action may also be a method, as long as it just takes a target argument, for
example:
```python
class Thing:
def start(self, target):
"""Starts thing"""
def stop(self, target):
"""Stops thing"""
action = Thing().start
```
### Cleaning
If an action defines a `clean` method, it will always be called wether or not
the action succeeded. Example:
```python
class Thing:
def __call__(self, target):
"""Do some thing"""
def clean(self, target):
"""Clean-up target after __call__"""
```
## Target
A Target is mainly an object providing an abstraction layer over the system we
want to automate with actions. It defines functions to execute a command, mount
a directory, copy a file, manage environment variables and so on.
### Pre-configuration
A Target can be pre-configured with a list of Actions in which case calling the
target without argument will execute its Actions until one fails by raising an
Exception:
```python
say_hello = Localhost(
hello_world,
Run('echo hi'),
)
await say_hello()
```
### Results
Every time a target execute an action, it will set the "status" attribute on it
to "success" or "failure", and add it to the "results" attribute:
```
say_hello = Localhost(Run('echo hi'))
await say_hello()
say_hello.results # contains the action with status="success"
```

View File

@ -1,247 +1,2 @@
from copy import deepcopy
import functools
import inspect
import importlib
import sys
from ..output import Output
from ..exceptions import WrongResult
from ..result import Result
class class_or_instance_method:
def __init__(self, f):
self.f = f
def __get__(self, instance, owner):
def newfunc(*args, **kwargs):
return self.f(
instance if instance is not None else owner,
*args,
**kwargs
)
return newfunc
class Action:
display_variables = []
hide_variables = ['output']
default_steps = ['apply']
parent = None
contextualize = []
regexps = {
r'([\w]+):': '{cyan}\\1{gray}:{reset}',
r'(^|\n)( *)\- ': '\\1\\2{red}-{reset} ',
}
options = dict(
debug=dict(
alias='d',
default='visit',
help='''
Display debug output. Supports values (combinable): cmd,out,visit
'''.strip(),
immediate=True,
),
verbose=dict(
alias='v',
default=False,
help='Verbose, like -d=visit,cmd,out',
immediate=True,
),
)
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
for key, value in kwargs.items():
setattr(self, key, value)
if isinstance(value, Action):
getattr(self, key).shlaxstep = True
def actions_filter(self, results, f=None, **filters):
if f:
def ff(a):
try:
return f(a)
except:
return False
results = [*filter(ff, results)]
for k, v in filters.items():
if k == 'type':
results = [*filter(
lambda s: type(s).__name__.lower() == str(v).lower(),
results
)]
else:
results = [*filter(
lambda s: getattr(s, k, None) == v,
results
)]
return results
def sibblings(self, f=None, **filters):
if not self.parent:
return []
return self.actions_filter(
[a for a in self.parent.actions if a is not self],
f, **filters
)
def parents(self, f=None, **filters):
if self.parent:
return self.actions_filter(
[self.parent] + self.parent.parents(),
f, **filters
)
return []
def children(self, f=None, **filters):
children = []
def add(parent):
if parent != self:
children.append(parent)
if 'actions' not in parent.__dict__:
return
for action in parent.actions:
add(action)
add(self)
return self.actions_filter(children, f, **filters)
async def __call__(self, *targets, **options):
if not targets:
from ..targets.localhost import Localhost
targets = [Localhost()]
output = Output(
regexp=self.regexps,
debug='cmd,visit,out' if options['verbose'] else options['debug'],
)
results = []
for target in targets:
target.output = output
if len(targets) > 1:
output.prefix = target
from copy import deepcopy
action = deepcopy(self)
action.target = target
result = Result(action, target)
results.append(result)
action.result = result
action.output = output
for step in options.get('steps', None) or self.default_steps:
if step not in action.steps():
print(f'Failed to find {type(action).__name__}.{step}')
continue
action.step = step
output.start(action)
try:
if isinstance(getattr(action, step), Action):
await getattr(action, step)(**options)
else:
await getattr(action, step)()
except Exception as e:
output.fail(action, e)
action.result.status = 'fail'
proc = getattr(e, 'proc', None)
if proc:
result = proc.rc
else:
raise
else:
output.success(action)
result.status = 'success'
finally:
clean = getattr(action, 'clean', None)
if clean:
output.clean(action)
await clean(target)
return results
def __repr__(self):
return ' '.join([type(self).__name__] + [
f'{k}={v}'
for k, v in self.__dict__.items()
if (k in self.display_variables or not self.display_variables)
and (k not in self.hide_variables)
])
def colorized(self, colors):
return ' '.join([
colors['pink1']
+ type(self).__name__
+ '.'
+ self.step
+ colors['yellow']
] + [
f'{colors["blue"]}{k}{colors["gray"]}={colors["green2"]}{v}'
for k, v in self.__dict__.items()
if (k in self.display_variables or not self.display_variables)
and (k not in self.hide_variables)
] + [colors['reset']])
def callable(self):
from ..targets import Localhost
async def cb(*a, **k):
from shlax.cli import cli
script = Localhost(self, quiet=True)
result = await script(*a, **k)
success = functools.reduce(
lambda a, b: a + b,
[1 for c in script.children() if c.status == 'success'] or [0])
if success:
script.output.success(f'{success} PASS')
failures = functools.reduce(
lambda a, b: a + b,
[1 for c in script.children() if c.status == 'fail'] or [0])
if failures:
script.output.fail(f'{failures} FAIL')
cli.exit_code = failures
return result
return cb
def kwargs_output(self):
return self.kwargs
def action(self, action, *args, **kwargs):
if isinstance(action, str):
# import dotted module path string to action
import cli2
a = cli2.Callable.factory(action).target
if not a:
a = cli2.Callable.factory(
'.'.join(['shlax', action])
).target
if a:
action = a
p = action(*args, **kwargs)
p.parent = self
for parent in self.parents():
if hasattr(parent, 'actions'):
p.parent = parent
break
if 'actions' not in self.__dict__:
# "mutate" to Strategy
from ..strategies.script import Actions
self.actions = Actions(self, [p])
return p
@class_or_instance_method
def steps(self):
return {
key: getattr(self, key)
for key in dir(self)
if key != 'steps' # avoid recursion
and (
key in self.default_steps
or getattr(getattr(self, key), 'shlaxstep', False)
)
}
pass

View File

@ -1,7 +0,0 @@
from .base import Action
class Copy(Action):
"""Copy files or directories to target."""
async def call(self, *args, **kwargs):
await self.copy(*self.args)

View File

@ -1,37 +0,0 @@
import hashlib
import secrets
import string
from .base import Action
class Htpasswd(Action):
"""Ensure a user is present in an htpasswd file."""
display_variables = ('user', 'path')
regexps = {
r'(.*)': '{red}\\1{gray}:${blue}\\2${blue}',
r'([^:]*):\\$([^$]*)\\$(.*)$': '{red}\\1{gray}:${blue}\\2${blue}\\3',
}
def __init__(self, user, path, **kwargs):
self.user = user
self.path = path
super().__init__(**kwargs)
async def apply(self):
found = False
htpasswd = await self.target.exec(
'cat', self.path, raises=False)
if htpasswd.rc == 0:
for line in htpasswd.out.split('\n'):
if line.startswith(self.user + ':'):
found = True
break
if not found:
self.password = ''.join(secrets.choice(
string.ascii_letters + string.digits
) for i in range(20))
hashed = hashlib.sha1(self.password.encode('utf8'))
line = f'{self.user}:\\$sha1\\${hashed.hexdigest()}'
await self.target.exec(f'echo {line} >> {self.path}')

View File

@ -1,169 +0,0 @@
import asyncio
import copy
from datetime import datetime
from glob import glob
import os
import subprocess
from textwrap import dedent
from .base import Action
class Packages(Action):
"""
Package manager abstract layer with caching.
It's a central piece of the build process, and does iterate over other
container visitors in order to pick up packages. For example, the Pip
visitor will declare ``self.packages = dict(apt=['python3-pip'])``, and the
Packages visitor will pick it up.
"""
regexps = {
#r'Installing ([\w\d-]+)': '{cyan}\\1',
r'Installing': '{cyan}lol',
}
mgrs = dict(
apk=dict(
update='apk update',
upgrade='apk upgrade',
install='apk add',
),
apt=dict(
update='apt-get -y update',
upgrade='apt-get -y upgrade',
install='apt-get -y --no-install-recommends install',
),
pacman=dict(
update='pacman -Sy',
upgrade='pacman -Su --noconfirm',
install='pacman -S --noconfirm',
),
dnf=dict(
update='dnf makecache --assumeyes',
upgrade='dnf upgrade --best --assumeyes --skip-broken', # noqa
install='dnf install --setopt=install_weak_deps=False --best --assumeyes', # noqa
),
yum=dict(
update='yum update',
upgrade='yum upgrade',
install='yum install',
),
)
installed = []
def __init__(self, *packages, **kwargs):
self.packages = []
for package in packages:
line = dedent(package).strip().replace('\n', ' ')
self.packages += line.split(' ')
super().__init__(*packages, **kwargs)
@property
def cache_root(self):
if 'CACHE_DIR' in os.environ:
return os.path.join(os.getenv('CACHE_DIR'))
else:
return os.path.join(os.getenv('HOME'), '.cache')
async def update(self):
# run pkgmgr_setup functions ie. apk_setup
cachedir = await getattr(self, self.mgr + '_setup')()
lastupdate = None
if os.path.exists(cachedir + '/lastupdate'):
with open(cachedir + '/lastupdate', 'r') as f:
try:
lastupdate = int(f.read().strip())
except:
pass
if not os.path.exists(cachedir):
os.makedirs(cachedir)
now = int(datetime.now().strftime('%s'))
# cache for a week
if not lastupdate or now - lastupdate > 604800:
# crude lockfile implementation, should work against *most*
# race-conditions ...
lockfile = cachedir + '/update.lock'
if not os.path.exists(lockfile):
with open(lockfile, 'w+') as f:
f.write(str(os.getpid()))
try:
await self.target.rexec(self.cmds['update'])
finally:
os.unlink(lockfile)
with open(cachedir + '/lastupdate', 'w+') as f:
f.write(str(now))
else:
while os.path.exists(lockfile):
print(f'{self.target} | Waiting for {lockfile} ...')
await asyncio.sleep(1)
async def apply(self):
cached = getattr(self.target, 'pkgmgr', None)
if cached:
self.mgr = cached
else:
mgr = await self.target.which(*self.mgrs.keys())
if mgr:
self.mgr = mgr[0].split('/')[-1]
if not self.mgr:
raise Exception('Packages does not yet support this distro')
self.cmds = self.mgrs[self.mgr]
if not getattr(self, '_packages_upgraded', None):
await self.update()
if self.kwargs.get('upgrade', True):
await self.target.exec(self.cmds['upgrade'], user='root')
self._packages_upgraded = True
packages = []
for package in self.packages:
if ',' in package:
parts = package.split(',')
package = parts[0]
if self.mgr in parts[1:]:
# include apt on apt
packages.append(package)
else:
packages.append(package)
await self.target.exec(*self.cmds['install'].split(' ') + packages, user='root')
async def apk_setup(self):
cachedir = os.path.join(self.cache_root, self.mgr)
await self.mount(cachedir, '/var/cache/apk')
# special step to enable apk cache
await self.rexec('ln -sf /var/cache/apk /etc/apk/cache')
return cachedir
async def dnf_setup(self):
cachedir = os.path.join(self.cache_root, self.mgr)
await self.mount(cachedir, f'/var/cache/{self.mgr}')
await self.rexec('echo keepcache=True >> /etc/dnf/dnf.conf')
return cachedir
async def apt_setup(self):
codename = (await self.rexec(
f'source {self.mnt}/etc/os-release; echo $VERSION_CODENAME'
)).out
cachedir = os.path.join(self.cache_root, self.mgr, codename)
await self.rexec('rm /etc/apt/apt.conf.d/docker-clean')
cache_archives = os.path.join(cachedir, 'archives')
await self.mount(cache_archives, f'/var/cache/apt/archives')
cache_lists = os.path.join(cachedir, 'lists')
await self.mount(cache_lists, f'/var/lib/apt/lists')
return cachedir
async def pacman_setup(self):
return self.cache_root + '/pacman'
def __repr__(self):
return f'Packages({self.packages})'

11
shlax/actions/parallel.py Normal file
View File

@ -0,0 +1,11 @@
import asyncio
class Parallel:
def __init__(self, *actions):
self.actions = actions
async def __call__(self, target):
return await asyncio.gather(*[
target(action) for action in self.actions
])

View File

@ -1,59 +0,0 @@
from glob import glob
import os
from .base import Action
class Pip(Action):
"""Pip abstraction layer."""
def __init__(self, *pip_packages, pip=None, requirements=None):
self.requirements = requirements
super().__init__(*pip_packages, pip=pip, requirements=requirements)
async def call(self, *args, **kwargs):
pip = self.kwargs.get('pip', None)
if not pip:
pip = await self.which('pip3', 'pip', 'pip2')
if pip:
pip = pip[0]
else:
from .packages import Packages
action = self.action(
Packages,
'python3,apk', 'python3-pip,apt',
args=args, kwargs=kwargs
)
await action(*args, **kwargs)
pip = await self.which('pip3', 'pip', 'pip2')
if not pip:
raise Exception('Could not install a pip command')
else:
pip = pip[0]
if 'CACHE_DIR' in os.environ:
cache = os.path.join(os.getenv('CACHE_DIR'), 'pip')
else:
cache = os.path.join(os.getenv('HOME'), '.cache', 'pip')
if getattr(self, 'mount', None):
# we are in a target which shares a mount command
await self.mount(cache, '/root/.cache/pip')
await self.exec(f'{pip} install --upgrade pip')
# https://github.com/pypa/pip/issues/5599
if 'pip' not in self.kwargs:
pip = 'python3 -m pip'
source = [p for p in self.args if p.startswith('/') or p.startswith('.')]
if source:
await self.exec(
f'{pip} install --upgrade --editable {" ".join(source)}'
)
nonsource = [p for p in self.args if not p.startswith('/')]
if nonsource:
await self.exec(f'{pip} install --upgrade {" ".join(nonsource)}')
if self.requirements:
await self.exec(f'{pip} install --upgrade -r {self.requirements}')

View File

@ -1,24 +1,8 @@
from .base import Action
class Run(Action):
"""Run a script or command on a target."""
def __init__(self, *args, image=None, **kwargs):
super().__init__(**kwargs)
self.args = args
self.kwargs = kwargs
self.image = image
class Run:
def __init__(self, cmd):
self.cmd = cmd
async def apply(self):
if not self.image:
return await self.target.exec(*self.args, **self.kwargs)
from ..targets.buildah import Buildah
from ..targets.docker import Docker
if isinstance(image, Buildah):
result = await self.action(image, *args, **kwargs)
return await Docker(
image=image,
).exec(*args, **kwargs)
async def __call__(self, target):
target.exec(self.cmd)

View File

@ -1,19 +0,0 @@
import asyncio
from .base import Action
class Service(Action):
"""
Manage a systemd service.
"""
def __init__(self, *names, state=None):
self.state = state or 'started'
self.names = names
super().__init__()
async def call(self, *args, **kwargs):
return asyncio.gather(*[
self.exec('systemctl', 'start', name, user='root')
for name in self.names
])

View File

@ -1,246 +0,0 @@
"""
Shlax automation tool manual
Shlax is built mostly around 3 moving pieces:
- Target: a target host and protocol
- Action: execute a shlax action
- Strategy: defines how to apply actions on targets (scripted only)
Shlax executes mostly in 3 ways:
- Execute actions on targets with the command line
- With your shlaxfile as first argument: offer defined Actions
- With the name of a module in shlax.repo: a community maintained shlaxfile
"""
import copy
import cli2
import inspect
import importlib
import glob
import os
import sys
from .actions.base import Action
from .exceptions import ShlaxException, WrongResult
from .strategies import Script
class ConsoleScript(cli2.ConsoleScript):
class Parser(cli2.Parser):
def __init__(self, *args, **kwargs):
self.targets = dict()
super().__init__(*args, **kwargs)
def append(self, arg):
if '=' not in arg and '@' in arg:
if '://' in arg:
kind, spec = arg.split('://')
else:
kind = 'ssh'
spec = arg
mod = importlib.import_module('shlax.targets.' + kind)
target = getattr(mod, kind.capitalize())(spec)
self.targets[str(target)] = target
else:
super().append(arg)
def __call__(self):
if len(self.argv) > 1 and os.path.exists(self.argv[1]):
self.argv = sys.argv[1:]
spec = importlib.util.spec_from_file_location('shlaxfile', sys.argv[1])
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
self.doc = (inspect.getdoc(mod) or '').split("\n")[0]
for name, value in mod.__dict__.items():
if isinstance(value, Action):
self[name] = cli2.Callable(
name,
self.action(value),
doc=type(value).__doc__,
options={
option: cli2.Option(option, **cfg)
for option, cfg in value.options.items()
}
)
#self[name] = value
#elif callable(value) and getattr(value, '__name__', '').startswith('test_'):
# self.tests[value.__name__] = value
#modname = sys.argv[1].split('/')[-1].replace('.py', '')
#mod = importlib.import_module('shlax.actions.' + modname)
else:
scripts = glob.glob(os.path.join(
os.path.dirname(__file__), 'actions', '*.py'))
for script in scripts:
modname = script.split('/')[-1].replace('.py', '')
if modname == '__init__':
continue
mod = importlib.import_module('shlax.actions.' + modname)
for key, value in mod.__dict__.items():
if key == '__builtins__':
continue
if key.lower() != modname:
continue
break
self[modname] = cli2.Callable(
modname,
self.action_class(value),
options={
option: cli2.Option(option, **cfg)
for option, cfg in value.options.items()
}
)
scripts = glob.glob(os.path.join(
os.path.dirname(__file__), 'repo', '*.py'))
for script in scripts:
modname = script.split('/')[-1].replace('.py', '')
mod = importlib.import_module('shlax.repo.' + modname)
self[modname] = cli2.Group(key, doc=inspect.getdoc(mod))
for key, value in mod.__dict__.items():
if not isinstance(value, Action):
continue
doc = (inspect.getdoc(mod) or '').split("\n")[0]
if key == 'main':
if len(value.steps()) == 1:
self[modname] = cli2.Callable(
modname,
self.action(value),
doc=doc,
options={
option: cli2.Option(option, **cfg)
for option, cfg in value.options.items()
}
)
else:
for name, step in value.steps().items():
if isinstance(step, Action):
self[modname][name] = cli2.Callable(
modname,
self.action(step),
doc=inspect.getdoc(step),
options={
option: cli2.Option(option, **cfg)
for option, cfg in value.options.items()
}
)
else:
# should be a method, just clone the
# original action and replace default_steps
action = copy.deepcopy(value)
action.default_steps = [name]
self[modname][name] = cli2.Callable(
modname,
self.action(action),
doc=inspect.getdoc(step),
options={
option: cli2.Option(option, **cfg)
for option, cfg in value.options.items()
}
)
else:
if len(value.steps()) == 1:
self[modname][key] = cli2.Callable(
modname,
self.action(value),
doc=doc,
options={
option: cli2.Option(option, **cfg)
for option, cfg in value.options.items()
}
)
else:
self[modname][key] = cli2.Group('steps')
for step in value.steps():
self[modname][key][step] = cli2.Callable(
modname,
self.action(value),
doc='lol',
options={
option: cli2.Option(option, **cfg)
for option, cfg in value.options.items()
}
)
return super().__call__()
def action(self, action):
async def cb(*args, **kwargs):
options = dict(steps=args)
options.update(self.parser.options)
# UnboundLocalError: local variable 'action' referenced before assignment
# ??? gotta be missing something, commenting meanwhile
# action = copy.deepcopy(action)
return await action(*self.parser.targets, **options)
cb.__name__ = type(action).__name__
return cb
def action_class(self, action_class):
async def cb(*args, **kwargs):
argspec = inspect.getfullargspec(action_class)
required = argspec.args[1:]
missing = []
for i, name in enumerate(required):
if len(args) - 1 <= i:
continue
if name in kwargs:
continue
missing.append(name)
if missing:
if not args:
print('No args provided after action name ' + action_class.__name__.lower())
print('Required arguments: ' + ', '.join(argspec.args[1:]))
if args:
print('Provided: ' + ', '.join(args))
print('Missing arguments: ' + ', '.join(missing))
print('Try to just add args on the command line separated with a space')
print(inspect.getdoc(action_class))
example = 'Example: shlax action '
example += action_class.__name__.lower()
if args:
example += ' ' + ' '.join(args)
example += ' ' + ' '.join(missing)
print(example)
return
_args = []
steps = []
for arg in args:
if arg in action_class.steps():
steps.append(arg)
else:
_args.append(arg)
options = dict(steps=steps)
'''
varargs = argspec.varargs
if varargs:
extra = args[len(argspec.args) - 1:]
args = args[:len(argspec.args) - 1]
options = dict(steps=extra)
else:
extra = args[len(argspec.args) - 1:]
args = args[:len(argspec.args) - 1]
options = dict(steps=extra)
'''
options.update(self.parser.options)
return await action_class(*_args, **kwargs)(*self.parser.targets, **options)
cb.__doc__ = (inspect.getdoc(action_class) or '').split("\n")[0]
cb.__name__ = action_class.__name__
return cb
def call(self, command):
try:
return super().call(command)
except WrongResult as e:
print(e)
self.exit_code = e.proc.rc
except ShlaxException as e:
print(e)
self.exit_code = 1
cli = ConsoleScript(__doc__).add_module('shlax.cli')

View File

@ -1,68 +0,0 @@
colors = dict(
cyan='\u001b[38;5;51m',
cyan1='\u001b[38;5;87m',
cyan2='\u001b[38;5;123m',
cyan3='\u001b[38;5;159m',
blue='\u001b[38;5;33m',
blue1='\u001b[38;5;69m',
blue2='\u001b[38;5;75m',
blue3='\u001b[38;5;81m',
blue4='\u001b[38;5;111m',
blue5='\u001b[38;5;27m',
green='\u001b[38;5;10m',
green1='\u001b[38;5;2m',
green2='\u001b[38;5;46m',
green3='\u001b[38;5;47m',
green4='\u001b[38;5;48m',
green5='\u001b[38;5;118m',
green6='\u001b[38;5;119m',
green7='\u001b[38;5;120m',
purple='\u001b[38;5;5m',
purple1='\u001b[38;5;6m',
purple2='\u001b[38;5;13m',
purple3='\u001b[38;5;164m',
purple4='\u001b[38;5;165m',
purple5='\u001b[38;5;176m',
purple6='\u001b[38;5;145m',
purple7='\u001b[38;5;213m',
purple8='\u001b[38;5;201m',
red='\u001b[38;5;1m',
red1='\u001b[38;5;9m',
red2='\u001b[38;5;196m',
red3='\u001b[38;5;160m',
red4='\u001b[38;5;197m',
red5='\u001b[38;5;198m',
red6='\u001b[38;5;199m',
yellow='\u001b[38;5;226m',
yellow1='\u001b[38;5;227m',
yellow2='\u001b[38;5;226m',
yellow3='\u001b[38;5;229m',
yellow4='\u001b[38;5;220m',
yellow5='\u001b[38;5;230m',
gray='\u001b[38;5;250m',
gray1='\u001b[38;5;251m',
gray2='\u001b[38;5;252m',
gray3='\u001b[38;5;253m',
gray4='\u001b[38;5;254m',
gray5='\u001b[38;5;255m',
gray6='\u001b[38;5;249m',
pink='\u001b[38;5;197m',
pink1='\u001b[38;5;198m',
pink2='\u001b[38;5;199m',
pink3='\u001b[38;5;200m',
pink4='\u001b[38;5;201m',
pink5='\u001b[38;5;207m',
pink6='\u001b[38;5;213m',
orange='\u001b[38;5;202m',
orange1='\u001b[38;5;208m',
orange2='\u001b[38;5;214m',
orange3='\u001b[38;5;220m',
orange4='\u001b[38;5;172m',
orange5='\u001b[38;5;166m',
reset='\u001b[0m',
)
colors.update({
k + 'bold': v.replace('[', '[1;')
for k, v in colors.items()
})

View File

@ -1,22 +0,0 @@
class ShlaxException(Exception):
pass
class Mistake(ShlaxException):
pass
class WrongResult(ShlaxException):
def __init__(self, proc):
self.proc = proc
msg = f'FAIL exit with {proc.rc} ' + proc.args[0]
if not proc.output.debug or 'cmd' not in str(proc.output.debug):
msg += '\n' + proc.cmd
if not proc.output.debug or 'out' not in str(proc.output.debug):
msg += '\n' + proc.out
msg += '\n' + proc.err
super().__init__(msg)

View File

@ -1,75 +0,0 @@
import os
import re
class Image:
ENV_TAGS = (
# gitlab
'CI_COMMIT_SHORT_SHA',
'CI_COMMIT_REF_NAME',
'CI_COMMIT_TAG',
# CircleCI
'CIRCLE_SHA1',
'CIRCLE_TAG',
'CIRCLE_BRANCH',
# contributions welcome here
)
PATTERN = re.compile(
'^((?P<backend>[a-z]*)://)?((?P<registry>[^/]*[.][^/]*)/)?((?P<repository>[^:]+))?(:(?P<tags>.*))?$' # noqa
, re.I
)
def __init__(self, arg=None, format=None, backend=None, registry=None, repository=None, tags=None):
self.arg = arg
self.format = format
self.backend = backend
self.registry = registry
self.repository = repository
self.tags = tags or []
match = re.match(self.PATTERN, arg)
if match:
for k, v in match.groupdict().items():
if getattr(self, k):
continue
if not v:
continue
if k == 'tags':
v = v.split(',')
setattr(self, k, v)
# docker.io currently has issues with oci format
self.format = format or 'oci'
if self.registry == 'docker.io':
self.format = 'docker'
# figure tags from CI vars
for name in self.ENV_TAGS:
value = os.getenv(name)
if value:
self.tags.append(value)
# filter out tags which resolved to None
self.tags = [t for t in self.tags if t]
# default tag by default ...
if not self.tags:
self.tags = ['latest']
async def __call__(self, action, *args, **kwargs):
args = list(args)
return await action.exec(*args, **self.kwargs)
def __str__(self):
return f'{self.repository}:{self.tags[-1]}'
async def push(self, *args, **kwargs):
user = os.getenv('DOCKER_USER')
passwd = os.getenv('DOCKER_PASS')
action = kwargs.get('action', self)
if user and passwd:
action.output.cmd('buildah login -u ... -p ...' + self.registry)
await action.exec('buildah', 'login', '-u', user, '-p', passwd, self.registry or 'docker.io', debug=False)
for tag in self.tags:
await action.exec('buildah', 'push', f'{self.repository}:{tag}')

View File

@ -1,149 +0,0 @@
import re
import sys
from .colors import colors
class Output:
prefixes = dict()
colors = colors
prefix_colors = (
'\x1b[1;36;45m',
'\x1b[1;36;41m',
'\x1b[1;36;40m',
'\x1b[1;37;45m',
'\x1b[1;32m',
'\x1b[1;37;44m',
)
def color(self, code=None):
if not code:
return '\u001b[0m'
code = str(code)
return u"\u001b[38;5;" + code + "m"
def colorize(self, code, content):
return self.color(code) + content + self.color()
def __init__(self, prefix=None, regexps=None, debug=False, write=None, flush=None, **kwargs):
self.prefix = prefix
self.debug = debug
self.prefix_length = 0
self.regexps = regexps or dict()
self.write = write or sys.stdout.buffer.write
self.flush = flush or sys.stdout.flush
self.kwargs = kwargs
def prefix_line(self):
if self.prefix not in self.prefixes:
self.prefixes[self.prefix] = self.prefix_colors[len(self.prefixes)]
if len(self.prefix) > self.prefix_length:
self.prefix_length = len(self.prefix)
prefix_color = self.prefixes[self.prefix] if self.prefix else ''
prefix_padding = '.' * (self.prefix_length - len(self.prefix) - 2) if self.prefix else ''
if prefix_padding:
prefix_padding = ' ' + prefix_padding + ' '
return [
prefix_color,
prefix_padding,
self.prefix,
' ',
self.colors['reset'],
'| '
]
def __call__(self, line, highlight=True, flush=True):
line = [self.highlight(line) if highlight else line]
if self.prefix:
line = self.prefix_line() + line
line = ''.join(line)
self.write(line.encode('utf8'))
if flush:
self.flush()
def cmd(self, line):
self(
self.colorize(251, '+')
+ '\x1b[1;38;5;15m'
+ ' '
+ self.highlight(line, 'bash')
+ self.colors['reset']
+ '\n',
highlight=False
)
def print(self, content):
self(
content,
prefix=None,
highlight=False
)
def highlight(self, line, highlight=True):
line = line.decode('utf8') if isinstance(line, bytes) else line
if not highlight or (
'\x1b[' in line
or '\033[' in line
or '\\e[' in line
):
return line
for regexp, colors in self.regexps.items():
line = re.sub(regexp, colors.format(**self.colors), line)
line = line + self.colors['reset']
return line
def test(self, action):
if self.debug is True:
self(''.join([
self.colors['purplebold'],
'! TEST ',
self.colors['reset'],
action.colorized(self.colors),
'\n',
]))
def clean(self, action):
if self.debug is True:
self(''.join([
self.colors['bluebold'],
'+ CLEAN ',
self.colors['reset'],
action.colorized(self.colors),
'\n',
]))
def start(self, action):
if self.debug is True or 'visit' in str(self.debug):
self(''.join([
self.colors['orangebold'],
'⚠ START ',
self.colors['reset'],
action.colorized(self.colors),
'\n',
]))
def success(self, action):
if self.debug is True or 'visit' in str(self.debug):
self(''.join([
self.colors['greenbold'],
'✔ SUCCESS ',
self.colors['reset'],
action.colorized(self.colors) if hasattr(action, 'colorized') else str(action),
'\n',
]))
def fail(self, action, exception=None):
if self.debug is True or 'visit' in str(self.debug):
self(''.join([
self.colors['redbold'],
'✘ FAIL ',
self.colors['reset'],
action.colorized(self.colors) if hasattr(action, 'colorized') else str(action),
'\n',
]))

View File

@ -1,9 +0,0 @@
from .targets import Localhost
class Play:
def __init__(self, *actions, targets=None, options=None):
self.options = options or {}
self.targets = targets or dict(localhost=Localhost())
self.actions =

View File

@ -7,10 +7,25 @@ import os
import shlex
import sys
from .exceptions import WrongResult
from .output import Output
class ProcFailure(Exception):
def __init__(self, proc):
self.proc = proc
msg = f'FAIL exit with {proc.rc} ' + proc.args[0]
if not proc.output.debug or 'cmd' not in str(proc.output.debug):
msg += '\n' + proc.cmd
if not proc.output.debug or 'out' not in str(proc.output.debug):
msg += '\n' + proc.out
msg += '\n' + proc.err
super().__init__(msg)
class PrefixStreamProtocol(asyncio.subprocess.SubprocessStreamProtocol):
"""
Internal subprocess stream protocol to add a prefix in front of output to
@ -122,7 +137,7 @@ class Proc:
if not self.communicated:
await self.communicate()
if self.raises and self.proc.returncode:
raise WrongResult(self)
raise ProcFailure(self)
return self
@property

View File

@ -1,35 +0,0 @@
#!/usr/bin/env shlax
"""
Manage a traefik container maintained by Shlax community.
"""
from shlax.shortcuts import *
main = Docker(
name='traefik',
image='traefik:v2.0.0',
install=Htpasswd(
'./htpasswd', 'root', doc='Install root user in ./htpasswd'
),
networks=['web'],
command=[
'--entrypoints.web.address=:80',
'--providers.docker',
'--api',
],
ports=[
'80:80',
'443:443',
],
volumes=[
'/var/run/docker.sock:/var/run/docker.sock:ro',
'/etc/traefik/acme/:/etc/traefik/acme/',
'/etc/traefik/htpasswd:/htpasswd:ro',
],
labels=[
'traefik.http.routers.traefik.rule=Host(`{{ url.split("/")[2] }}`)',
'traefik.http.routers.traefik.service=api@internal',
'traefik.http.routers.traefik.entrypoints=web',
],
)

View File

@ -1,7 +1,12 @@
class Result:
def __init__(self, action, target):
self.action = action
def __init__(self, target, action):
self.target = target
self.action = action
self.status = 'pending'
class Results(list):
def new(self, target, action):
result = Result(target, action)
self.append(result)
return result

View File

@ -1,28 +0,0 @@
import importlib
import os
from .actions.base import Action
class Shlaxfile:
def __init__(self, actions=None, tests=None):
self.actions = actions or {}
self.tests = tests or {}
self.paths = []
def parse(self, path):
spec = importlib.util.spec_from_file_location('shlaxfile', path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
for name, value in mod.__dict__.items():
if isinstance(value, Action):
value.__name__ = name
self.actions[name] = value
elif callable(value) and getattr(value, '__name__', '').startswith('test_'):
self.tests[value.__name__] = value
self.paths.append(path)
@property
def path(self):
return self.paths[0]

View File

@ -1,14 +1,4 @@
from .actions.copy import Copy
from .actions.packages import Packages # noqa
from .actions.base import Action # noqa
from .actions.htpasswd import Htpasswd
from .actions.run import Run # noqa
from .actions.pip import Pip
from .actions.service import Service
from .targets.base import Target
from .targets.buildah import Buildah
from .targets.docker import Docker
from .targets.localhost import Localhost
from .targets.ssh import Ssh
from .actions.run import Run

View File

@ -1,3 +0,0 @@
from .asyn import Async
from .script import Script
from .pod import Pod, Container

View File

@ -1,11 +0,0 @@
import asyncio
from .script import Script
class Async(Script):
async def call(self, *args, **kwargs):
return await asyncio.gather(*[
action(*args, **kwargs)
for action in self.actions
])

View File

@ -1,44 +0,0 @@
import os
from .script import Script
from ..image import Image
class Container(Script):
"""
Wolcome to crazy container control cli
Such wow
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault('start', dict())
super().__init__(*args, **kwargs)
async def call(self, *args, **kwargs):
if step('build'):
await self.kwargs['build'](**kwargs)
self.image = self.kwargs['build'].image
else:
self.image = kwargs.get('image', 'alpine')
if isinstance(self.image, str):
self.image = Image(self.image)
if step('install'):
await self.install(*args, **kwargs)
if step('test'):
self.output.test(self)
await self.action('Docker',
*self.kwargs['test'].actions,
image=self.image,
mount={'.': '/app'},
workdir='/app',
)(**kwargs)
if step('push'):
await self.image.push(action=self)
#name = kwargs.get('name', os.getcwd()).split('/')[-1]
class Pod(Script):
pass

View File

@ -1,41 +0,0 @@
import copy
import os
from ..exceptions import WrongResult
from ..actions.base import Action
from ..proc import Proc
class Actions(list):
def __init__(self, owner, actions):
self.owner = owner
super().__init__()
for action in actions:
self.append(action)
def append(self, value):
action = copy.deepcopy(value)
action.parent = self.owner
action.status = 'pending'
super().append(action)
class Script(Action):
contextualize = ['shargs', 'exec', 'rexec', 'env', 'which', 'copy']
def __init__(self, *actions, **kwargs):
self.home = kwargs.pop('home', os.getcwd())
super().__init__(**kwargs)
self.actions = Actions(self, actions)
async def call(self, *args, **kwargs):
for action in self.actions:
result = await action(*args, **kwargs)
if action.status != 'success':
break
def pollute(self, gbls):
for name, script in self.kwargs.items():
if not isinstance(script, Script):
continue
gbls[name] = script

View File

@ -1,7 +0,0 @@
from .script import Script
class Test(Script):
async def call(self, *args, backend=None, **kwargs):
backend = backend or 'Docker'
breakpoint()
return await self.action(backend, self.actions, **kwargs)

27
shlax/targets/base.py Normal file
View File

@ -0,0 +1,27 @@
import copy
from ..result import Result, Results
class Target:
def __init__(self, *actions, **options):
self.actions = actions
self.options = options
self.results = []
async def __call__(self, *actions):
for action in actions or self.actions:
try:
await action(self)
except Exception as e:
action.status = 'failure'
action.exception = e
if actions:
# nested call, re-raise
raise
else:
break
else:
action.status = 'success'
finally:
self.results.append(action)

View File

@ -1,156 +1,5 @@
import asyncio
import os
import asyncio
from pathlib import Path
import signal
import shlex
import subprocess
import sys
import textwrap
from ..actions.base import Action
from ..exceptions import Mistake
from ..proc import Proc
from ..image import Image
from .localhost import Localhost
from .base import Target
class Buildah(Localhost):
"""
The build script iterates over visitors and runs the build functions, it
also provides wrappers around the buildah command.
"""
contextualize = Localhost.contextualize + ['mnt', 'ctr', 'mount', 'image']
def __init__(self, base, *args, commit=None, push=False, cmd=None, **kwargs):
if isinstance(base, Action):
args = [base] + list(args)
base = 'alpine' # default selection in case of mistake
super().__init__(*args, **kwargs)
self.base = base
self.mounts = dict()
self.ctr = None
self.mnt = None
self.image = Image(commit) if commit else None
self.config= dict(
cmd=cmd or 'sh',
)
def shargs(self, *args, user=None, buildah=True, **kwargs):
if not buildah or args[0].startswith('buildah'):
return super().shargs(*args, user=user, **kwargs)
_args = ['buildah', 'run']
if user:
_args += ['--user', user]
_args += [self.ctr, '--', 'sh', '-euc']
return super().shargs(
*(
_args
+ [' '.join([str(a) for a in args])]
),
**kwargs
)
def __repr__(self):
return f'Base({self.base})'
async def config(self, line):
"""Run buildah config."""
return await self.exec(f'buildah config {line} {self.ctr}', buildah=False)
async def copy(self, *args):
"""Run buildah copy to copy a file from host into container."""
src = args[:-1]
dst = args[-1]
await self.mkdir(dst)
procs = []
for s in src:
if Path(s).is_dir():
target = self.mnt / s
if not target.exists():
await self.mkdir(target)
args = ['buildah', 'copy', self.ctr, s, Path(dst) / s]
else:
args = ['buildah', 'copy', self.ctr, s, dst]
procs.append(self.exec(*args, buildah=False))
return await asyncio.gather(*procs)
async def mount(self, src, dst):
"""Mount a host directory into the container."""
target = self.mnt / str(dst)[1:]
await self.exec(f'mkdir -p {src} {target}', buildah=False)
await self.exec(f'mount -o bind {src} {target}', buildah=False)
self.mounts[src] = dst
def is_runnable(self):
return (
Proc.test
or os.getuid() == 0
)
async def call(self, *args, **kwargs):
if self.is_runnable():
self.ctr = (await self.exec('buildah', 'from', self.base, buildah=False)).out
self.mnt = Path((await self.exec('buildah', 'mount', self.ctr, buildah=False)).out)
result = await super().call(*args, **kwargs)
return result
from shlax.cli import cli
debug = kwargs.get('debug', False)
# restart under buildah unshare environment
argv = [
'buildah', 'unshare',
sys.argv[0], # current script location
cli.shlaxfile.path, # current shlaxfile location
]
if debug is True:
argv.append('-d')
elif isinstance(debug, str) and debug:
argv.append('-d=' + debug)
argv += [
cli.parser.command.name, # script name ?
]
await self.exec(*argv)
async def commit(self):
if not self.image:
return
for key, value in self.config.items():
await self.exec(f'buildah config --{key} "{value}" {self.ctr}')
self.sha = (await self.exec(
'buildah',
'commit',
'--format=' + self.image.format,
self.ctr,
buildah=False,
)).out
if self.image.tags:
tags = [f'{self.image.repository}:{tag}' for tag in self.image.tags]
else:
tags = [self.image.repository]
for tag in tags:
await self.exec('buildah', 'tag', self.sha, tag, buildah=False)
async def clean(self, *args, **kwargs):
if self.is_runnable():
for src, dst in self.mounts.items():
await self.exec('umount', self.mnt / str(dst)[1:], buildah=False)
if self.status == 'success':
await self.commit()
if 'push' in args:
await self.image.push(action=self)
if self.mnt is not None:
await self.exec('buildah', 'umount', self.ctr, buildah=False)
if self.ctr is not None:
await self.exec('buildah', 'rm', self.ctr, buildah=False)
class Buildah(Target):
pass

View File

@ -1,111 +0,0 @@
import asyncio
from pathlib import Path
import os
from ..image import Image
from .localhost import Localhost
class Docker(Localhost):
"""Manage a docker container."""
default_steps = ['install', 'up']
contextualize = ['image', 'home']
def __init__(self, *args, **kwargs):
self.image = kwargs.get('image', 'alpine')
self.name = kwargs.get('name', os.getcwd().split('/')[-1])
if not isinstance(self.image, Image):
self.image = Image(self.image)
super().__init__(*args, **kwargs)
def shargs(self, *args, daemon=False, **kwargs):
if args[0] == 'docker':
return args, kwargs
extra = []
if 'user' in kwargs:
extra += ['--user', kwargs.pop('user')]
args, kwargs = super().shargs(*args, **kwargs)
if self.name:
executor = 'exec'
extra = [self.name]
return [self.kwargs.get('docker', 'docker'), executor, '-t'] + extra + list(args), kwargs
executor = 'run'
cwd = os.getcwd()
if daemon:
extra += ['-d']
extra = extra + ['-v', f'{cwd}:{cwd}', '-w', f'{cwd}']
return [self.kwargs.get('docker', 'docker'), executor, '-t'] + extra + [str(self.image)] + list(args), kwargs
async def call(self, *args, **kwargs):
def step(step):
return not args or step in args
# self.name = (
# await self.exec(
# 'docker', 'ps', '-aq', '--filter',
# 'name=' + self.name,
# raises=False
# )
# ).out.split('\n')[0]
if step('install') and 'install' in self.kwargs:
await self.action(self.kwargs['install'], *args, **kwargs)
if step('rm') and await self.exists():
await self.exec('docker', 'rm', '-f', self.name)
if step('up'):
if await self.exists():
self.name = (await self.exec('docker', 'start', self.name)).out
else:
self.id = (await self.exec(
'docker', 'run', '-d', '--name', self.name, str(self.image))
).out
return await super().call(*args, **kwargs)
async def exists(self):
proc = await self.exec(
'docker', 'ps', '-aq', '--filter',
'name=' + self.name,
raises=False
)
return bool(proc.out.strip())
async def copy(self, *args):
src = args[:-1]
dst = args[-1]
await self.mkdir(dst)
procs = []
for s in src:
'''
if Path(s).is_dir():
await self.mkdir(s)
args = ['docker', 'copy', self.ctr, s, Path(dst) / s]
else:
args = ['docker', 'copy', self.ctr, s, dst]
'''
args = ['docker', 'cp', s, self.name + ':' + dst]
procs.append(self.exec(*args))
return await asyncio.gather(*procs)
async def up(self):
"""Ensure container is up and running."""
if await self.exists():
self.name = (await self.exec('docker', 'start', self.name)).out
else:
self.id = (await self.exec(
'docker', 'run', '-d', '--name', self.name, str(self.image))
).out
up.shlaxstep = True
async def rm(self):
"""Remove container."""
await self.exec('docker', 'rm', '-f', self.name)
rm.shlaxstep = True

View File

@ -1,80 +0,0 @@
import os
import re
from shlax.proc import Proc
from ..strategies.script import Script
class Localhost(Script):
root = '/'
contextualize = Script.contextualize + ['home']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.home = kwargs.pop('home', os.getcwd())
def shargs(self, *args, **kwargs):
user = kwargs.pop('user', None)
args = [str(arg) for arg in args if args is not None]
if args and ' ' in args[0]:
if len(args) == 1:
args = ['sh', '-euc', args[0]]
else:
args = ['sh', '-euc'] + list(args)
if user == 'root':
args = ['sudo'] + args
elif user:
args = ['sudo', '-u', user] + args
if self.parent:
return self.parent.shargs(*args, **kwargs)
else:
return args, kwargs
async def exec(self, *args, **kwargs):
kwargs['output'] = self.output
args, kwargs = self.shargs(*args, **kwargs)
proc = await Proc(*args, **kwargs)()
if kwargs.get('wait', True):
await proc.wait()
return proc
async def rexec(self, *args, **kwargs):
kwargs['user'] = 'root'
return await self.exec(*args, **kwargs)
async def env(self, name):
return (await self.exec('echo $' + name)).out
async def exists(self, *paths):
proc = await self.exec('type ' + ' '.join(cmd), raises=False)
async def which(self, *cmd):
"""
Return the first path to the cmd in the container.
If cmd argument is a list then it will try all commands.
"""
proc = await self.exec('type ' + ' '.join(cmd), raises=False)
result = []
for res in proc.out.split('\n'):
match = re.match('([^ ]+) is ([^ ]+)$', res.strip())
if match:
result.append(match.group(1))
return result
async def copy(self, *args):
if args[-1].startswith('./'):
args = list(args)
args[-1] = self.home + '/' + args[-1][2:]
args = ['cp', '-rua'] + list(args)
return await self.exec(*args)
async def mount(self, *dirs):
pass
async def mkdir(self, *dirs):
return await self.exec(*['mkdir', '-p'] + list(dirs))

View File

@ -1,17 +0,0 @@
import os
from shlax.proc import Proc
from .localhost import Localhost
class Ssh(Localhost):
root = '/'
def __init__(self, host, *args, **kwargs):
self.host = host
super().__init__(*args, **kwargs)
def shargs(self, *args, **kwargs):
args, kwargs = super().shargs(*args, **kwargs)
return (['ssh', self.host] + list(args)), kwargs

View File

@ -1,55 +1,15 @@
#!/usr/bin/env shlax
from shlax.contrib.gitlab import *
"""
Shlaxfile for shlax itself.
"""
PYTEST = 'py.test -svv tests'
test = Script(
Pip('.[test]'),
Run(PYTEST),
)
from shlax.shortcuts import *
build = Buildah(
'quay.io/podman/stable',
Packages('python38', 'buildah', 'unzip', 'findutils', 'python3-yaml', upgrade=False),
Async(
# dancing for pip on centos python3.8
Run('''
curl -o setuptools.zip https://files.pythonhosted.org/packages/42/3e/2464120172859e5d103e5500315fb5555b1e908c0dacc73d80d35a9480ca/setuptools-45.1.0.zip
unzip setuptools.zip
mkdir -p /usr/local/lib/python3.8/site-packages/
sh -c "cd setuptools-* && python3.8 setup.py install"
easy_install-3.8 pip
echo python3.8 -m pip > /usr/bin/pip
chmod +x /usr/bin/pip
'''),
Copy('shlax/', 'setup.py', '/app'),
),
Pip('/app[full]'),
Run('echo hi'),
commit='docker.io/yourlabs/shlax',
workdir='/app',
)
shlax = Container(
build=build,
test=Script(Run('./shlaxfile.py -d test')),
)
gitlabci = GitLabCI(
test=dict(
stage='build',
script='pip install -U --user -e .[test] && ' + PYTEST,
image='yourlabs/python',
),
build=dict(
stage='build',
image='yourlabs/shlax',
script='pip install -U --user -e . && CACHE_DIR=$(pwd)/.cache ./shlaxfile.py -d shlax build push',
cache=dict(paths=['.cache'], key='cache'),
),
pypi=dict(
stage='deploy',
only=['tags'],
image='yourlabs/python',
script='pypi-release',
),
)
build()

View File

@ -1,7 +0,0 @@
from shlax.cli import ConsoleScript
def test_parser():
parser = ConsoleScript.Parser(['@host'])
parser.parse()
assert parser.targets['host'] == Ssh('host')

View File

@ -1,53 +0,0 @@
from shlax import *
inner = Run()
other = Run('ls')
middle = Buildah('alpine', inner, other)
outer = Localhost(middle)
middle = outer.actions[0]
other = middle.actions[1]
inner = middle.actions[0]
def test_action_init_args():
assert other.args == ('ls',)
def test_action_parent_autoset():
assert list(outer.actions) == [middle]
assert middle.parent == outer
assert inner.parent == middle
assert other.parent == middle
def test_action_context():
assert outer.context is inner.context
assert middle.context is inner.context
assert middle.context is outer.context
assert other.context is outer.context
def test_action_sibblings():
assert inner.sibblings() == [other]
assert inner.sibblings(lambda s: s.args[0] == 'ls') == [other]
assert inner.sibblings(lambda s: s.args[0] == 'foo') == []
assert inner.sibblings(type='run') == [other]
assert inner.sibblings(args=('ls',)) == [other]
def test_actions_parents():
assert other.parents() == [middle, outer]
assert other.parents(lambda p: p.base == 'alpine') == [middle]
assert inner.parents(type='localhost') == [outer]
assert inner.parents(type='buildah') == [middle]
def test_action_childrens():
assert middle.children() == [inner, other]
assert middle.children(lambda a: a.args[0] == 'ls') == [other]
assert outer.children() == [middle, inner, other]
def test_action_getattr():
assert other.exec == middle.exec
assert other.shargs == middle.shargs

View File

@ -1,7 +0,0 @@
from shlax.cli import ConsoleScript
def test_parser():
parser = ConsoleScript.Parser(['@host'])
parser.parse()
assert parser.targets['host'] == Ssh('host')

View File

@ -1,33 +0,0 @@
import pytest
import os
from shlax import Image
tests = {
'docker://a.b:1337/re/po:x,y': ('docker', 'a.b:1337', 're/po', 'x,y'),
'docker://a.b/re/po:x,y': ('docker', 'a.b', 're/po', 'x,y'),
'a.b:1337/re/po:x,y': (None, 'a.b:1337', 're/po', 'x,y'),
'a.b/re/po:x,y': (None, 'a.b', 're/po', 'x,y'),
're/po:x,y': (None, None, 're/po', 'x,y'),
're/po': (None, None, 're/po', 'latest'),
'docker://re/po': ('docker', None, 're/po', 'latest'),
'docker://re/po:x,y': ('docker', None, 're/po', 'x,y'),
}
@pytest.mark.parametrize(
'arg,expected', [(k, dict(
backend=v[0], registry=v[1], repository=v[2], tags=v[3].split(',')
)) for k, v in tests.items()]
)
def test_args(arg, expected):
Image.ENV_TAGS = []
im = Image(arg)
for k, v in expected.items():
assert getattr(im, k) == v
def test_args_env():
os.environ['IMAGE_TEST_ARGS_ENV'] = 'foo'
Image.ENV_TAGS = ['IMAGE_TEST_ARGS_ENV']
im = Image('re/po:x,y')
assert im.tags == ['x', 'y', 'foo']

View File

@ -1,24 +0,0 @@
import pytest
from shlax import Output
class Write:
def __init__(self):
self.output = ''
def __call__(self, out):
self.output += out.decode('utf8')
@pytest.fixture
def write():
return Write()
def test_output_regexps(write):
output = Output(
regexps={'^(.*)$': '{red}\\1'},
write=write,
flush=lambda: None,
)
output('foo')
assert write.output.strip() == output.colors['red'] + 'foo' + output.colors['reset']

View File

@ -1,5 +0,0 @@
from shlax.play import Play
def test_play_call():

View File

@ -1,65 +0,0 @@
import pytest
from unittest.mock import patch
from shlax import *
from shlax import proc
test_args_params = [
(
Localhost(Run('echo hi')),
[('sh', '-euc', 'echo hi')]
),
(
Localhost(Run('echo hi', user='jimi')),
[('sudo', '-u', 'jimi', 'sh', '-euc', 'echo hi')]
),
(
Localhost(Run('echo hi', user='root')),
[('sudo', 'sh', '-euc', 'echo hi')]
),
(
Ssh('host', Run('echo hi', user='root')),
[('ssh', 'host', 'sudo', 'sh', '-euc', 'echo hi')]
),
(
Buildah('alpine', Run('echo hi')),
[
('buildah', 'from', 'alpine'),
('buildah', 'mount', ''),
('buildah', 'run', '', '--', 'sh', '-euc', 'echo hi'),
('buildah', 'umount', ''),
('buildah', 'rm', ''),
]
),
(
Buildah('alpine', Run('echo hi', user='root')),
[
('buildah', 'from', 'alpine'),
('buildah', 'mount', ''),
('buildah', 'run', '--user', 'root', '', '--', 'sh', '-euc', 'echo hi'),
('buildah', 'umount', ''),
('buildah', 'rm', ''),
]
),
(
Ssh('host', Buildah('alpine', Run('echo hi', user='root'))),
[
('ssh', 'host', 'buildah', 'from', 'alpine'),
('ssh', 'host', 'buildah', 'mount', ''),
('ssh', 'host', 'buildah', 'run', '--user', 'root', '', '--', 'sh', '-euc', 'echo hi'),
('ssh', 'host', 'buildah', 'umount', ''),
('ssh', 'host', 'buildah', 'rm', ''),
]
),
]
@pytest.mark.parametrize(
'script,commands',
test_args_params
)
@pytest.mark.asyncio
async def test_args(script, commands):
with Proc.mock():
await script()
assert commands == Proc.test

View File

@ -1,47 +0,0 @@
import copy
class Action:
args = dict(
step=None,
)
class
user=dict(
doc='Username',
required=True,
),
steps=dict(
up='Started',
down='Stopped',
),
)
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __call__(self, *args, **kwargs):
pass
class Target(Action):
def __call__(self, action):
action = copy.deepcopy(action)
action.target = self
class FakeAction(Action):
def __init__(self, user, path, *steps, **kwargs)
self.user = user
self.path = path
self.steps = steps
self.kwargs = kwargs
action = Action('root', '/test', 'up', 'rm')
target = Target()

View File

@ -1,6 +0,0 @@
import os
import sys
import pytest
if not os.getenv('CI'):
pytest.skip('Please run with ./shlaxfile.py test', allow_module_level=True)

67
tests/test_target.py Normal file
View File

@ -0,0 +1,67 @@
import pytest
from shlax.targets.base import Target
from shlax.actions.run import Run
from shlax.actions.parallel import Parallel
from shlax.result import Result
class Error:
async def __call__(self, target):
raise Exception('lol')
class Target(Target):
def exec(self, *args):
print(*args)
@pytest.mark.asyncio
async def test_success():
action = Run('echo hi')
target = Target(action)
await target()
assert action.status == 'success'
@pytest.mark.asyncio
async def test_error():
action = Error()
target = Target(action)
await target()
assert action.status == 'failure'
@pytest.mark.asyncio
async def test_nested():
nested = Error()
class Nesting:
async def __call__(self, target):
await target(nested)
nesting = Nesting()
target = Target(nesting)
await target()
assert len(target.results) == 2
assert target.results == [nested, nesting]
assert target.results[0].status == 'failure'
assert target.results[1].status == 'failure'
@pytest.mark.asyncio
async def test_parallel():
winner = Run('echo hi')
looser = Error()
parallel = Parallel(winner, looser)
target = Target(parallel)
await target()
assert len(target.results) == 3
assert target.results[0].status == 'success'
assert target.results[0] == winner
assert target.results[1].status == 'failure'
assert target.results[1] == looser
assert target.results[2].status == 'failure'
assert target.results[2] == parallel