Extracted shlax from podctl

This commit is contained in:
jpic 2020-02-14 20:14:56 +01:00
parent c78874a7e0
commit 2e94928aef
51 changed files with 750 additions and 940 deletions

View File

@ -1,24 +1,10 @@
build:
stage: test
image: yourlabs/podctl
script: pip install . && CACHE_DIR=$(pwd)/.cache podctl build -d
cache:
paths:
- .cache
key: cache
py-test:
stage: test
image: yourlabs/python
script: pip install -e . && py.test -s
pod-test:
stage: test
image: yourlabs/podctl
script: pip install -e . && cd examples/simple && podctl test -d .
pypi:
stage: deploy
image: yourlabs/python
only:
- tags
script: pypi-release
only: [tags]
stage: deploy
test:
image: yourlabs/python
script: pip install -U . && py.test -svv tests
stage: test

9
examples/001_hello.py Normal file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env python
import asyncio
from shlax import *
bash = Script(
Packages('bash'),
)

View File

@ -1,24 +0,0 @@
from podctl import *
ex = Container(
Base('docker.io/alpine'),
Packages('bash'),
DumbInit('sleep 55'),
Commit('test'),
)
podctl2 = Container(
Base('docker.io/alpine'),
Packages('bash python-dev'),
Commit('test2'),
)
async def test_pod_story2(pod):
await pod.script('down')()
async def test_pod_story(pod):
await pod.script('down')()
await pod.script('up')()
await pod.script('down')()

View File

@ -1,5 +0,0 @@
from .build import Build # noqa
from .container import Container # noqa
from .pod import Pod # noqa
from .script import Script # noqa
from .visitors import * # noqa

View File

@ -1,54 +0,0 @@
from .build import Build
from .exceptions import WrongResult
from .proc import output
from .visitable import Visitable
class Container(Visitable):
default_scripts = dict(
build=Build(),
)
@property
def container_name(self):
return '-'.join([self.pod.name, self.name])
@property
def image_name(self):
return self.pod.visitor(self.name).variable('repotags')[0]
async def down(self, script):
try:
await script.exec('podman', 'inspect', self.container_name)
except WrongResult:
pass
else:
try:
from podctl.console_script import console_script
argv = console_script.parser.nonoptions
except AttributeError:
argv = []
argv = argv + [self.container_name]
await script.exec('podman', 'rm', '-f', *argv)
async def run(self, script):
await script.exec(
'podman', 'run',
'--name', self.container_name,
':'.join((self.variable('repo'), self.variable('tags')[0])),
)
async def up(self, script):
try:
await script.exec('podman', 'inspect', self.container_name)
except WrongResult as ee:
output('Container creating', self.name)
await script.exec(
'podman', 'run', '-d', '--name', self.container_name,
self.image_name,
)
output('Container created', self.name)
else:
output('Container starting', self.name)
await script.exec('podman', 'start', self.container_name)
output('Container started', self.name)

View File

@ -1,17 +0,0 @@
class PodctlException(Exception):
pass
class Mistake(PodctlException):
pass
class WrongResult(PodctlException):
def __init__(self, proc):
self.proc = proc
super().__init__('\n'.join([i for i in [
f'Command failed ! Exit with {proc.rc}'
'+ ' + proc.cmd,
proc.out,
proc.err,
]]))

View File

@ -1,58 +0,0 @@
import os
from .build import Build
from .container import Container
from .scripts import *
from .visitable import Visitable
class Pod(Visitable):
default_scripts = dict(
build=Build(),
up=Up('up', 'Start the stack'),
down=Down('down', 'Destroy the stack'),
run=Run('run', 'Run a command in container(s)'),
name=Name(
'name',
'Output the pod name for usage with podman',
),
)
def script(self, name):
async def cb(*args, **kwargs):
asyncio.events.get_event_loop()
kwargs['pod'] = self
return await self.scripts[name].run(*args, **kwargs)
return cb
async def down(self, script):
try:
await script.exec('podman', 'pod', 'inspect', self.name)
except WrongResult:
pass
else:
await script.exec('podman', 'pod', 'rm', self.name)
async def up(self, script):
try:
await script.exec(
'podman', 'pod', 'inspect', self.name
)
print(f'{self.name} | Pod ready')
except WrongResult:
print(f'{self.name} | Pod creating')
await script.exec(
'podman', 'pod', 'create', '--name', self.name,
)
print(f'{self.name} | Pod created')
@property
def name(self):
return os.getenv('POD', os.getcwd().split('/')[-1])
@property
def containers(self):
return [i for i in self.visitors if type(i) == Container]
def __repr__(self):
return self.name

View File

@ -1,44 +0,0 @@
import importlib
import os
from .container import Container
from .pod import Pod
class Podfile:
def __init__(self, pods, containers, path, tests):
self.pods = pods
self.containers = containers
self.path = path
self.tests = tests
if not self.pods:
self.pods['pod'] = Pod(*containers.values())
for pod in self.pods.values():
for container in pod.containers:
container.pod = pod
@property
def pod(self):
return self.pods['pod']
@classmethod
def factory(cls, path):
containers = dict()
pods = dict()
tests = dict()
spec = importlib.util.spec_from_file_location('pod', path)
pod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(pod)
for name, value in pod.__dict__.items():
if isinstance(value, Container):
containers[name] = value
value.name = name
elif isinstance(value, Pod):
pods[name] = value
value.name = name
elif callable(value) and value.__name__.startswith('test_'):
tests[value.__name__] = value
return cls(pods, containers, path, tests)

View File

@ -1,129 +0,0 @@
import asyncio
import copy
import cli2
import os
import textwrap
import subprocess
import sys
from .proc import output, Proc
class Script:
options = [
cli2.Option(
'debug',
alias='d',
color=cli2.GREEN,
default='visit',
help='''
Display debug output. Supports values (combinable): cmd,out,visit
'''.strip(),
immediate=True,
),
]
unshare = False
def __init__(self, name=None, doc=None):
self.name = name or type(self).__name__.lower()
self.doc = doc or 'Custom script'
async def exec(self, *args, **kwargs):
"""Execute a command on the host."""
if getattr(self, 'container', None) and getattr(self.container, 'name', None):
kwargs.setdefault('prefix', self.container.name)
proc = await Proc(*args, **kwargs)()
if kwargs.get('wait', True):
await proc.wait()
return proc
async def __call__(self, visitable, *args, **kwargs):
from .console_script import console_script
debug = console_script.options.get('debug', False)
self.args = args
for key, value in kwargs.items():
setattr(self, key, value)
visitors = visitable.visitors
def val(k, v):
if isinstance(v, list) and len(v) > 1:
return '[' + str(v[-1]) + '...]'
if k == 'scripts':
return dict()
return v
results = []
async def clean():
for visitor in visitable.visitors:
if hasattr(visitor, 'clean_' + self.name):
method = 'clean_' + self.name
result = getattr(visitor, method)(self)
if debug is True or 'visit' in str(debug):
output(
''.join([
'.'.join([type(visitor).__name__, method]),
'(',
', '.join(f'{k}={val(k, v)}' for k, v in visitor.__dict__.items()),
')'
]),
getattr(visitor, 'name',
getattr(visitable, 'name', None)),
)
if result:
await result
for prefix in ('init_', 'pre_', '', 'post_', 'clean_'):
method = prefix + self.name
for visitor in visitable.visitors:
if not hasattr(visitor, method):
continue
if debug is True or 'visit' in str(debug):
output(
''.join([
'.'.join([type(visitor).__name__, method]),
'(',
', '.join(f'{k}={val(k, v)}' for k, v in visitor.__dict__.items()),
')'
]),
getattr(visitor, 'name',
getattr(visitable, 'name', None))
)
result = getattr(visitor, method)(self)
if result:
try:
await result
except Exception as e:
await clean()
raise
async def run(self, *args, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
if args:
containers = [c for c in self.pod.containers if c.name in args]
else:
containers = self.pod.containers
procs = [
copy.deepcopy(self)(
self.pod,
*args,
**kwargs,
)
]
procs += [
copy.deepcopy(self)(
container,
*args,
container=container,
**kwargs,
)
for container in containers
]
await asyncio.gather(*procs)

View File

@ -1,50 +0,0 @@
import asyncio
import cli2
import copy
import os
import sys
from .build import Build
from .exceptions import WrongResult
from .proc import output, Proc
from .script import Script
class Name(Script):
color = cli2.GREEN
async def run(self, *args, **kwargs):
print(kwargs.get('pod').name)
class Down(Script):
color = cli2.RED
class Up(Script):
pass
class Run(Script):
async def run(self, *args, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
try:
await self.exec(
'podman', 'pod', 'inspect', self.pod.name
)
print(f'{self.pod.name} | Pod ready')
except WrongResult:
print(f'{self.pod.name} | Pod creating')
await self.exec(
'podman', 'pod', 'create', '--name', self.pod.name,
)
print(f'{self.pod.name} | Pod created')
return await asyncio.gather(*[
copy.deepcopy(self)(
self.pod,
)
for container in self.pod.containers
])

View File

@ -1,5 +0,0 @@
class Service:
def __init__(self, name, container, restart=None):
self.name = name
self.container = container
self.restart = restart

View File

@ -1,24 +0,0 @@
import asyncio
from copy import copy, deepcopy
class Visitable:
default_scripts = dict()
def __init__(self, *visitors, **scripts):
self.visitors = list(visitors)
self.scripts = deepcopy(self.default_scripts)
self.scripts.update(scripts)
def visitor(self, name):
for visitor in self.visitors:
if name.lower() in (
type(visitor).__name__.lower(),
getattr(visitor, 'name', '')
):
return visitor
def variable(self, name):
for visitor in self.visitors:
if getattr(visitor, name, None) is not None:
return getattr(visitor, name)

View File

@ -1,13 +0,0 @@
from .base import Base # noqa
from .cmd import Cmd # noqa
from .commit import Commit # noqa
from .config import Config # noqa
from .copy import Copy # noqa
from .dumbinit import DumbInit # noqa
from .npm import Npm # noqa
from .mount import Mount # noqa
from .packages import Packages # noqa
from .pip import Pip # noqa
from .run import Run # noqa
from .template import Append, Template # noqa
from .user import User # noqa

View File

@ -1,19 +0,0 @@
from pathlib import Path
class Base:
def __init__(self, base):
self.base = base
async def init_build(self, script):
script.ctr = Path((await script.exec('buildah', 'from', self.base)).out)
script.mnt = Path((await script.exec('buildah', 'mount', script.ctr)).out)
async def clean_build(self, script):
await script.umounts()
await script.umount()
if script.ctr:
proc = await script.exec('buildah', 'rm', script.ctr, raises=False)
def __repr__(self):
return f'Base({self.base})'

View File

@ -1,7 +0,0 @@
class Cmd:
def __init__(self, cmd):
self.cmd = cmd
async def build(self, script):
# script._run() does not really support sudo code
await script.run(self.cmd)

View File

@ -1,7 +0,0 @@
class Config:
def __init__(self, **values):
self.values = values
def post_build(self, script):
for key, value in self.values.items():
script.config(f'--{key} "{value}"')

View File

@ -1,25 +0,0 @@
class Copy:
def __init__(self, *args):
self.src = args[:-1]
self.dst = args[-1]
def init_build(self, script):
count = self.dst.count(':')
self.mode = None
self.owner = None
if count == 2:
self.dst, self.mode, self.owner = self.dst.split(':')
elif count == 1:
self.dst, self.mode = self.dst.split(':')
self.owner = script.variable('user')
async def build(self, script):
await script.crexec(f'mkdir -p {self.dst}')
for item in self.src:
await script.exec(f'cp -a {item} {script.mnt}{self.dst}')
if self.mode:
await script.crexec(f'chmod {self.mode} {script.mnt}{self.dst}')
if self.owner:
await script.crexec(f'chown -R {self.owner} {script.mnt}{self.dst}')

View File

@ -1,18 +0,0 @@
import re
import shlex
from .packages import Packages
class DumbInit:
packages = ['dumb-init']
def __init__(self, cmd):
self.cmd = cmd
async def post_build(self, script):
cmd = '--cmd "dumb-init bash -euxc \'%s\'"' % self.cmd
await script.config(cmd)
def __repr__(self):
return f'DumbInit({self.cmd})'

View File

@ -1,7 +0,0 @@
class Mount:
def __init__(self, src, dst):
self.src = src
self.dst = dst
async def build(self, script):
await script.mount(self.src, self.dst)

View File

@ -1,10 +0,0 @@
class Npm:
def __init__(self, install=None):
self.npm_install = install
async def build(self, script):
await script.run('sudo npm update -g npm')
await script.run(f'''
cd {self.npm_install}
npm install
''')

View File

@ -1,49 +0,0 @@
from glob import glob
import os
class Pip:
packages = dict(
apt=['python3-pip'],
)
def __init__(self, *pip_packages, pip=None, requirements=None):
self.pip_packages = pip_packages
#self.pip = pip
self.requirements = requirements
async def build(self, script):
self.pip = await script.which(('pip3', 'pip', 'pip2'))
if not self.pip:
raise Exception('Could not find pip command')
if 'CACHE_DIR' in os.environ:
cache = os.path.join(os.getenv('CACHE_DIR'), 'pip')
else:
cache = os.path.join(os.getenv('HOME'), '.cache', 'pip')
await script.mount(cache, '/root/.cache/pip')
await script.crexec(f'{self.pip} install --upgrade pip')
# https://github.com/pypa/pip/issues/5599
self.pip = 'python3 -m pip'
pip_packages = []
for visitor in script.container.visitors:
pp = getattr(visitor, 'pip_packages', None)
if not pp:
continue
pip_packages += pip_packages
source = [p for p in pip_packages if p.startswith('/')]
if source:
await script.crexec(
f'{self.pip} install --upgrade --editable {" ".join(source)}'
)
nonsource = [p for p in pip_packages if not p.startswith('/')]
if nonsource:
await script.crexec(f'{self.pip} install --upgrade {" ".join(nonsource)}')
if self.requirements:
await script.crexec(f'{self.pip} install --upgrade -r {self.requirements}')

View File

@ -1,7 +0,0 @@
class Run:
def __init__(self, *commands):
self.commands = commands
async def build(self, script):
for command in self.commands:
await script.run(command)

View File

@ -1,30 +0,0 @@
from textwrap import dedent
class Template:
CMD = dedent(
'''cat <<EOF > {target}
{script}
EOF'''
)
def __init__(self, target, *lines, **variables):
self.target = target
self.lines = lines
self.variables = variables
async def build(self, script):
self.script = '\n'.join([
dedent(l).strip() for l in self.lines
]).format(**self.variables)
await script.cexec(self.CMD.strip().format(**self.__dict__))
if self.script.startswith('#!'):
await script.cexec('chmod +x ' + self.target, user='root')
class Append(Template):
CMD = dedent(
'''cat <<EOF >> {target}
{script}
EOF'''
)

View File

@ -1,28 +0,0 @@
from .packages import Packages
class User:
"""Secure the image with a user"""
packages = dict(
apk=['shadow'],
)
def __init__(self, username, uid, home, directories=None):
self.username = username
self.uid = uid
self.home = home
self.user_created = False
self.directories = directories
async def build(self, script):
try:
await script.cexec('id', self.uid)
except:
await script.cexec('useradd', '-d', self.home, '-u', self.uid, ' ',
self.username)
else:
await script.cexec('id', '-gn', self.uid)
self.user_created = True
def post_build(self, script):
script.config(f'--user {self.username}')

View File

@ -2,27 +2,26 @@ from setuptools import setup
setup(
name='podctl',
name='shlax',
versioning='dev',
setup_requires='setupmeta',
install_requires=['cli2', 'pygments'],
install_requires=['cli2'],
extras_require=dict(
test=[
'freezegun',
'pytest',
'pytest-cov',
],
),
author='James Pic',
author_email='jamespic@gmail.com',
url='https://yourlabs.io/oss/podctl',
url='https://yourlabs.io/oss/shlax',
include_package_data=True,
license='MIT',
keywords='cli',
keywords='cli automation ansible',
python_requires='>=3',
entry_points={
'console_scripts': [
'podctl = podctl.console_script:console_script',
'shlax = shlax.cli:cli',
],
},
)

6
shlax/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from .actions import *
from .image import Image
from .strategies import *
from .proc import output, Proc
from .targets import *
from .shlaxfile import Shlaxfile

View File

@ -0,0 +1,5 @@
from .commit import Commit
from .packages import Packages # noqa
from .base import Action # noqa
from .run import Run # noqa
from .service import Service

81
shlax/actions/base.py Normal file
View File

@ -0,0 +1,81 @@
import inspect
import sys
class Action:
parent = None
contextualize = []
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
@property
def context(self):
if not self.parent:
if '_context' not in self.__dict__:
self._context = dict()
return self._context
else:
return self.parent.context
def actions_filter(self, results, f=None, **filters):
if f:
def ff(a):
try:
return f(a)
except:
return False
results = [*filter(ff, results)]
for k, v in filters.items():
if k == 'type':
results = [*filter(
lambda s: type(s).__name__.lower() == str(v).lower(),
results
)]
else:
results = [*filter(
lambda s: getattr(s, k, None) == v,
results
)]
return results
def sibblings(self, f=None, **filters):
return self.actions_filter(
[a for a in self.parent.actions if a is not self],
f, **filters
)
def parents(self, f=None, **filters):
if self.parent:
return self.actions_filter(
[self.parent] + self.parent.parents(),
f, **filters
)
return []
def children(self, f=None, **filters):
children = []
def add(parent):
if parent != self:
children.append(parent)
if 'actions' not in parent.__dict__:
return
for action in parent.actions:
add(action)
add(self)
return self.actions_filter(children, f, **filters)
def __getattr__(self, name):
for a in self.parents() + self.sibblings() + self.children():
if name in a.contextualize:
return getattr(a, name)
raise AttributeError(name)
async def __call__(self, *args, **kwargs):
print(f'{self}.__call__(*args, **kwargs) not implemented')
sys.exit(1)

View File

@ -1,6 +1,8 @@
import os
import subprocess
from .base import Action
from ..exceptions import WrongResult
CI_VARS = (
@ -15,29 +17,25 @@ CI_VARS = (
)
class Commit:
class Commit(Action):
def __init__(self, repo, tags=None, format=None, push=None, registry=None):
self.repo = repo
self.registry = registry or 'localhost'
self.push = push or os.getenv('CI')
self.tags = tags or []
# figure out registry host
if '/' in self.repo and not registry:
first = self.repo.split('/')[0]
if '.' in first or ':' in first:
self.registry = self.repo.split('/')[0]
self.repo = '/'.join(self.repo.split('/')[1:])
if ':' in self.repo and not tags:
self.tags = [self.repo.split(':')[1]]
self.repo = self.repo.split(':')[0]
# docker.io currently has issues with oci format
self.format = format or 'oci'
if self.registry == 'docker.io':
self.format = 'docker'
self.tags = tags or []
# figure tags from CI vars
if not self.tags:
for name in CI_VARS:
@ -52,23 +50,20 @@ class Commit:
if not self.tags:
self.tags = ['latest']
# default tag for master too
if 'master' in self.tags:
self.tags.append('latest')
self.repotags = [f'{self.registry}/{self.repo}:{tag}' for tag in self.tags]
async def post_build(self, script):
self.sha = (await script.exec(
async def __call__(self, *args, ctr=None, **kwargs):
self.sha = (await self.parent.parent.exec(
'buildah',
'commit',
'--format=' + self.format,
script.ctr,
ctr,
)).out
if 'master' in self.tags:
self.tags.append('latest')
if self.tags:
for tag in self.repotags:
await script.exec('buildah', 'tag', self.sha, self.repo, tag)
tags = ' '.join([f'{self.repo}:{tag}' for tag in self.tags])
await script.exec('buildah', 'tag', self.sha, self.repo, tags)
if self.push:
user = os.getenv('DOCKER_USER')
@ -84,8 +79,8 @@ class Commit:
self.registry,
)
for tag in self.repotags:
await script.exec('podman', 'push', tag)
for tag in self.tags:
await script.exec('podman', 'push', f'{self.repo}:{tag}')
await script.umount()
def __repr__(self):

View File

@ -7,8 +7,10 @@ import os
import subprocess
from textwrap import dedent
from .base import Action
class Packages:
class Packages(Action):
"""
The Packages visitor wraps around the container's package manager.
@ -17,6 +19,8 @@ class Packages:
visitor will declare ``self.packages = dict(apt=['python3-pip'])``, and the
Packages visitor will pick it up.
"""
contextualize = ['mgr']
mgrs = dict(
apk=dict(
update='apk update',
@ -28,6 +32,11 @@ class Packages:
upgrade='apt-get -y upgrade',
install='apt-get -y --no-install-recommends install',
),
pacman=dict(
update='pacman -Sy',
upgrade='pacman -Su --noconfirm',
install='pacman -S --noconfirm',
),
dnf=dict(
update='dnf makecache --assumeyes',
upgrade='dnf upgrade --best --assumeyes --skip-broken', # noqa
@ -58,24 +67,9 @@ class Packages:
else:
return os.path.join(os.getenv('HOME'), '.cache')
async def init_build(self, script):
cached = script.container.variable('mgr')
if cached:
self.mgr = cached
else:
for mgr, cmds in self.mgrs.items():
if await script.which(mgr):
self.mgr = mgr
break
if not self.mgr:
raise Exception('Packages does not yet support this distro')
self.cmds = self.mgrs[self.mgr]
async def update(self, script):
async def update(self):
# run pkgmgr_setup functions ie. apk_setup
cachedir = await getattr(self, self.mgr + '_setup')(script)
cachedir = await getattr(self, self.mgr + '_setup')()
lastupdate = None
if os.path.exists(cachedir + '/lastupdate'):
@ -85,6 +79,9 @@ class Packages:
except:
pass
if not os.path.exists(cachedir):
os.makedirs(cachedir)
now = int(datetime.now().strftime('%s'))
# cache for a week
if not lastupdate or now - lastupdate > 604800:
@ -96,7 +93,7 @@ class Packages:
f.write(str(os.getpid()))
try:
await script.cexec(self.cmds['update'])
await self.rexec(self.cmds['update'])
finally:
os.unlink(lockfile)
@ -104,54 +101,69 @@ class Packages:
f.write(str(now))
else:
while os.path.exists(lockfile):
print(f'{script.container.name} | Waiting for update ...')
print(f'{self.container.name} | Waiting for update ...')
await asyncio.sleep(1)
async def build(self, script):
if not getattr(script.container, '_packages_upgraded', None):
await self.update(script)
await script.cexec(self.cmds['upgrade'])
async def __call__(self, *args, **kwargs):
cached = getattr(self, '_pagkages_mgr', None)
if cached:
self.mgr = cached
else:
mgr = await self.which(*self.mgrs.values())
if mgr:
self.mgr = mgr.split('/')[-1]
if not self.mgr:
raise Exception('Packages does not yet support this distro')
self.cmds = self.mgrs[self.mgr]
if not getattr(self, '_packages_upgraded', None):
await self.update()
await self.rexec(self.cmds['upgrade'])
# first run on container means inject visitor packages
packages = []
for visitor in script.container.visitors:
pp = getattr(visitor, 'packages', None)
for sibbling in self.sibblings:
pp = getattr(sibbling, 'packages', None)
if pp:
if isinstance(pp, list):
packages += pp
elif self.mgr in pp:
packages += pp[self.mgr]
script.container._packages_upgraded = True
self._packages_upgraded = True
else:
packages = self.packages
await script.crexec(*self.cmds['install'].split(' ') + packages)
await self.rexec(*self.cmds['install'].split(' ') + packages)
async def apk_setup(self, script):
async def apk_setup(self):
cachedir = os.path.join(self.cache_root, self.mgr)
await script.mount(cachedir, '/var/cache/apk')
await self.mount(cachedir, '/var/cache/apk')
# special step to enable apk cache
await script.cexec('ln -s /var/cache/apk /etc/apk/cache')
await self.rexec('ln -s /var/cache/apk /etc/apk/cache')
return cachedir
async def dnf_setup(self, script):
async def dnf_setup(self):
cachedir = os.path.join(self.cache_root, self.mgr)
await script.mount(cachedir, f'/var/cache/{self.mgr}')
await script.run('echo keepcache=True >> /etc/dnf/dnf.conf')
await self.mount(cachedir, f'/var/cache/{self.mgr}')
await self.run('echo keepcache=True >> /etc/dnf/dnf.conf')
return cachedir
async def apt_setup(self, script):
codename = (await script.exec(
f'source {script.mnt}/etc/os-release; echo $VERSION_CODENAME'
async def apt_setup(self):
codename = (await self.rexec(
f'source {self.mnt}/etc/os-release; echo $VERSION_CODENAME'
)).out
cachedir = os.path.join(self.cache_root, self.mgr, codename)
await script.cexec('rm /etc/apt/apt.conf.d/docker-clean')
await self.rexec('rm /etc/apt/apt.conf.d/docker-clean')
cache_archives = os.path.join(cachedir, 'archives')
await script.mount(cache_archives, f'/var/cache/apt/archives')
await self.mount(cache_archives, f'/var/cache/apt/archives')
cache_lists = os.path.join(cachedir, 'lists')
await script.mount(cache_lists, f'/var/lib/apt/lists')
await self.mount(cache_lists, f'/var/lib/apt/lists')
return cachedir
async def pacman_setup(self):
return self.cache_root + '/pacman'
def __repr__(self):
return f'Packages({self.packages})'

6
shlax/actions/run.py Normal file
View File

@ -0,0 +1,6 @@
from .base import Action
class Run(Action):
async def __call__(self, *args, **kwargs):
return (await self.exec(*self.args, **self.kwargs))

16
shlax/actions/service.py Normal file
View File

@ -0,0 +1,16 @@
import asyncio
from .base import Action
class Service(Action):
def __init__(self, *names, state=None):
self.state = state or 'started'
self.names = names
super().__init__()
async def __call__(self, *args, **kwargs):
return asyncio.gather(*[
self.exec('systemctl', 'start', name, user='root')
for name in self.names
])

View File

@ -1,5 +1,9 @@
'''
docker & docker-compose frustrated me, podctl unfrustrates me.
shlax is a micro-framework to orchestrate commands.
shlax yourfile.py: to list actions you have declared.
shlax yourfile.py <action>: to execute a given action
#!/usr/bin/env shlax: when making yourfile.py an executable.
'''
import asyncio
@ -8,16 +12,19 @@ import inspect
import os
import sys
from .container import Container
from .exceptions import Mistake, WrongResult
from .pod import Pod
from .podfile import Podfile
from .proc import output
from .service import Service
from .exceptions import *
from .shlaxfile import Shlaxfile
from .targets import Localhost
async def runall(*args, **kwargs):
for name, action in cli.shlaxfile.actions.items():
await Localhost(action)(*args, **kwargs)
@cli2.option('debug', alias='d', help='Display debug output.')
async def test(*args, **kwargs):
breakpoint()
"""Run podctl test over a bunch of paths."""
report = []
@ -105,54 +112,37 @@ async def test(*args, **kwargs):
class ConsoleScript(cli2.ConsoleScript):
class Parser(cli2.Parser):
def parse(self):
super().parse()
if str(self.command) == 'help':
return
self.forward_args = []
found_dash = False
for arg in self.argv:
if arg == '--':
found_dash = True
if not found_dash:
continue
self.forward_args.append(arg)
self.funckwargs['cmd'] = self.forward_args
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.options = dict()
def __call__(self, *args, **kwargs):
podfile = os.getenv('PODFILE', 'pod.py')
if os.path.exists(podfile):
self.podfile = Podfile.factory(podfile)
for name, script in self.podfile.pod.scripts.items():
cb = self.podfile.pod.script(name)
cb.__doc__ = inspect.getdoc(script) or script.doc
self.shlaxfile = None
shlaxfile = sys.argv.pop(1) if len(sys.argv) > 1 else ''
if os.path.exists(shlaxfile.split('::')[0]):
self.shlaxfile = Shlaxfile()
self.shlaxfile.parse(shlaxfile)
for name, action in self.shlaxfile.actions.items():
async def cb(*args, **kwargs):
return await Localhost(action)(*args, **kwargs)
self[name] = cli2.Callable(
name,
cb,
options={o.name: o for o in script.options},
color=getattr(script, 'color', cli2.YELLOW),
color=getattr(action, 'color', cli2.YELLOW),
)
return super().__call__(*args, **kwargs)
def call(self, command):
self.options = self.parser.options
args = self.parser.funcargs
kwargs = self.parser.funckwargs
breakpoint()
return command(*args, **kwargs)
def call(self, command):
try:
return super().call(command)
except Mistake as e:
print(e)
self.exit_code = 1
except WrongResult as e:
print(e)
self.exit_code = e.proc.rc
except ShlaxException as e:
print(e)
self.exit_code = 1
console_script = ConsoleScript(__doc__).add_module('podctl.console_script')
cli = ConsoleScript(__doc__).add_module('shlax.cli')

21
shlax/contrib/gitlab.py Normal file
View File

@ -0,0 +1,21 @@
import yaml
from shlax import *
class GitLabCIConfig(Script):
async def __call__(self, *args, write=True, **kwargs):
await super().__call__(*args, **kwargs)
self.kwargs = kwargs
for name, definition in self.context.items():
self.kwargs[name] = definition
output = yaml.dump(self.kwargs)
print(output)
if write:
with open('.gitlab-ci.yml', 'w+') as f:
f.write(output)
class Job(Action):
async def __call__(self, *args, **kwargs):
self.context[self.args[0]] = self.kwargs

22
shlax/exceptions.py Normal file
View File

@ -0,0 +1,22 @@
class ShlaxException(Exception):
pass
class Mistake(ShlaxException):
pass
class WrongResult(ShlaxException):
def __init__(self, proc):
self.proc = proc
msg = f'FAIL exit with {proc.rc} ' + proc.args[0]
if not proc.debug or 'cmd' not in str(proc.debug):
msg += '\n' + proc.cmd
if not proc.debug or 'out' not in str(proc.debug):
msg += '\n' + proc.out
msg += '\n' + proc.err
super().__init__(msg)

57
shlax/image.py Normal file
View File

@ -0,0 +1,57 @@
import os
import re
class Image:
ENV_TAGS = (
# gitlab
'CI_COMMIT_SHORT_SHA',
'CI_COMMIT_REF_NAME',
'CI_COMMIT_TAG',
# CircleCI
'CIRCLE_SHA1',
'CIRCLE_TAG',
'CIRCLE_BRANCH',
# contributions welcome here
)
PATTERN = re.compile(
'^((?P<backend>[a-z]*)://)?((?P<registry>[^/]*[.][^/]*)/)?((?P<repository>[^:]+))?(:(?P<tags>.*))?$' # noqa
, re.I
)
def __init__(self, arg=None, format=None, backend=None, registry=None, repository=None, tags=None):
self.arg = arg
self.format = format
self.backend = backend
self.registry = registry
self.repository = repository
self.tags = tags or []
match = re.match(self.PATTERN, arg)
if match:
for k, v in match.groupdict().items():
if getattr(self, k):
continue
if not v:
continue
if k == 'tags':
v = v.split(',')
setattr(self, k, v)
# docker.io currently has issues with oci format
self.format = format or 'oci'
if self.registry == 'docker.io':
self.format = 'docker'
# figure tags from CI vars
for name in self.ENV_TAGS:
value = os.getenv(name)
if value:
self.tags.append(value)
# filter out tags which resolved to None
self.tags = [t for t in self.tags if t]
# default tag by default ...
if not self.tags:
self.tags = ['latest']

View File

@ -79,7 +79,12 @@ class Output:
)
def highlight(self, line, highlight=True):
if not highlight:
line = line.decode('utf8') if isinstance(line, bytes) else line
if not highlight or (
'\x1b[' in line
or '\033[' in line
or '\\e[' in line
):
return line
elif isinstance(highlight, str):
lexer = lexers.get_lexer_by_name(highlight)
@ -100,18 +105,13 @@ class PrefixStreamProtocol(asyncio.subprocess.SubprocessStreamProtocol):
"""
def __init__(self, prefix, *args, **kwargs):
self.debug = kwargs.get('debug', True)
self.prefix = prefix
super().__init__(*args, **kwargs)
def pipe_data_received(self, fd, data):
from .console_script import console_script
debug = console_script.options.get('debug', False)
if (debug is True or 'out' in str(debug)) and fd in (1, 2):
for line in data.split(b'\n'):
if not line:
continue
output(line, self.prefix, flush=False)
if (self.debug is True or 'out' in str(self.debug)) and fd in (1, 2):
output(data, self.prefix, flush=False)
sys.stdout.flush()
super().pipe_data_received(fd, data)
@ -140,30 +140,45 @@ class Proc:
print(proc.err) # stderr
print(proc.rc) # return code
"""
test = False
def __init__(self, *args, prefix=None, raises=True):
args = [str(a) for a in args]
def __init__(self, *args, prefix=None, raises=True, debug=True):
self.debug = debug if not self.test else False
self.cmd = ' '.join(args)
if len(args) == 1:
if isinstance(args[0], (list, tuple)):
args = self.cmd = args[0]
else:
args = ['sh', '-euc', ' '.join(args)]
self.args = args
self.prefix = prefix
self.raises = raises
self.called = False
self.communicated = False
self.out_raw = b''
self.err_raw = b''
self.out = ''
self.err = ''
self.rc = None
@staticmethod
def split(*args):
args = [str(a) for a in args]
if len(args) == 1:
if isinstance(args[0], (list, tuple)):
args = args[0]
else:
args = ['sh', '-euc', ' '.join(args)]
return args
async def __call__(self, wait=True):
if self.called:
raise Exception('Already called: ' + self.cmd)
from .console_script import console_script
debug = console_script.options.get('debug', False)
if debug is True or 'cmd' in str(debug):
if self.debug is True or 'cmd' in str(self.debug):
output.cmd(self.cmd, self.prefix)
if self.test:
if self.test is True:
type(self).test = []
self.test.append(self.args)
return self
loop = asyncio.events.get_event_loop()
transport, protocol = await loop.subprocess_exec(
protocol_factory(self.prefix), *self.args)
@ -184,6 +199,8 @@ class Proc:
return self
async def wait(self):
if self.test:
return self
if not self.called:
await self()
if not self.communicated:
@ -196,3 +213,13 @@ class Proc:
def json(self):
import json
return json.loads(self.out)
def mock():
"""Context manager for testing purpose."""
cls = Proc
class Mock:
def __enter__(_):
cls.test = True
def __exit__(_, exc_type, exc_value, traceback):
cls.test = False
return Mock()

27
shlax/shlaxfile.py Normal file
View File

@ -0,0 +1,27 @@
import importlib
import os
from .actions.base import Action
class Shlaxfile:
def __init__(self, actions=None, tests=None):
self.actions = actions or {}
self.tests = tests or {}
self.paths = []
def parse(self, path):
spec = importlib.util.spec_from_file_location('shlaxfile', path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
for name, value in mod.__dict__.items():
if isinstance(value, Action):
value.name = name
self.actions[name] = value
elif callable(value) and getattr(value, '__name__', '').startswith('test_'):
self.tests[value.__name__] = value
self.paths.append(path)
@property
def path(self):
return self.paths[0]

View File

@ -0,0 +1,2 @@
from .asyn import Async
from .script import Script

11
shlax/strategies/asyn.py Normal file
View File

@ -0,0 +1,11 @@
import asyncio
from .script import Script
class Async(Script):
async def __call__(self, *args, **kwargs):
return asyncio.gather(*[
procs.append(action(*args, **kwargs))
for action in self.actions
])

View File

@ -0,0 +1,87 @@
import copy
import os
from ..exceptions import WrongResult
from ..actions.base import Action
from ..proc import Proc
class Actions(list):
def __init__(self, owner, actions):
self.owner = owner
super().__init__()
for action in actions:
self.append(action)
def append(self, value):
value.parent = self.owner
value.status = 'pending'
super().append(value)
class Script(Action):
root = '/'
contextualize = ['shargs', 'exec', 'rexec', 'env', 'which']
def __init__(self, *actions, **kwargs):
super().__init__(**kwargs)
self.actions = Actions(self, actions)
async def __call__(self, *args, **kwargs):
for action in self.actions:
try:
await action(*args, **kwargs)
except WrongResult as e:
print(e)
action.status = 'fail'
break
else:
if action.status == 'running':
action.status = 'success'
def shargs(self, *args, **kwargs):
user = kwargs.pop('user', None)
kwargs['debug'] = True
args = [str(arg) for arg in args if args is not None]
if args and ' ' in args[0]:
if len(args) == 1:
args = ['sh', '-euc', args[0]]
else:
args = ['sh', '-euc'] + list(args)
if user == 'root':
args = ['sudo'] + args
elif user:
args = ['sudo', '-u', user] + args
if self.parent:
return self.parent.shargs(*args, **kwargs)
else:
return args, kwargs
async def exec(self, *args, **kwargs):
args, kwargs = self.shargs(*args, **kwargs)
proc = await Proc(*args, **kwargs)()
if kwargs.get('wait', True):
await proc.wait()
return proc
async def rexec(self, *args, **kwargs):
kwargs['user'] = 'root'
return await self.exec(*args, **kwargs)
async def env(self, name):
return (await self.exec('echo $' + name)).out
async def which(self, *cmd):
"""
Return the first path to the cmd in the container.
If cmd argument is a list then it will try all commands.
"""
for path in (await self.env('PATH')).split(':'):
for c in cmd:
p = os.path.join(self.root, path[1:], c)
if os.path.exists(p):
return p[len(str(self.root)):]

View File

@ -0,0 +1,3 @@
from .buildah import Buildah
from .localhost import Localhost
from .ssh import Ssh

View File

@ -1,28 +1,51 @@
import asyncio
import os
import asyncio
from pathlib import Path
import signal
import shlex
import subprocess
import sys
import textwrap
from .proc import Proc, output
from .script import Script
from ..proc import Proc, output
from ..image import Image
from .localhost import Localhost
class Build(Script):
class Buildah(Localhost):
"""
The build script iterates over visitors and runs the build functions, it
also provides wrappers around the buildah command.
"""
unshare = True
contextualize = Localhost.contextualize + ['mnt', 'ctr']
def __init__(self):
super().__init__()
def __init__(self, base, *args, commit=None, **kwargs):
super().__init__(*args, **kwargs)
self.base = base
self.mounts = dict()
self.ctr = None
self.mnt = None
self.commit = commit
def shargs(self, *args, user=None, buildah=True, **kwargs):
if not buildah:
return super().shargs(*args, user=user, **kwargs)
_args = ['buildah', 'run']
if user:
_args += ['--user', user]
_args += [self.ctr, '--', 'sh', '-euc']
return super().shargs(
*(
_args
+ [' '.join([str(a) for a in args])]
),
**kwargs
)
def __repr__(self):
return f'Base({self.base})'
async def config(self, line):
"""Run buildah config."""
@ -32,50 +55,31 @@ class Build(Script):
"""Run buildah copy to copy a file from host into container."""
return await self.exec(f'buildah copy {self.ctr} {src} {dst}')
async def cexec(self, *args, user=None, **kwargs):
"""Execute a command in the container."""
_args = ['buildah', 'run']
if user:
_args += ['--user', user]
_args += [self.ctr, '--', 'sh', '-euc']
return await self.exec(*(_args + [' '.join([str(a) for a in args])]))
async def crexec(self, *args, **kwargs):
"""Execute a command in the container as root."""
kwargs['user'] = 'root'
return await self.cexec(*args, **kwargs)
async def mount(self, src, dst):
"""Mount a host directory into the container."""
target = self.mnt / str(dst)[1:]
await self.exec(f'mkdir -p {src} {target}')
await self.exec(f'mount -o bind {src} {target}')
await super().exec(f'mkdir -p {src} {target}')
await super().exec(f'mount -o bind {src} {target}')
self.mounts[src] = dst
async def umounts(self):
"""Unmount all mounted directories from the container."""
for src, dst in self.mounts.items():
await self.exec('umount', self.mnt / str(dst)[1:])
await super().exec('umount', self.mnt / str(dst)[1:])
async def umount(self):
"""Unmount the buildah container with buildah unmount."""
if self.ctr:
await self.exec(f'buildah unmount {self.ctr}')
await super().exec(f'buildah unmount {self.ctr}')
async def paths(self):
"""Return the list of $PATH directories"""
return (await self.cexec('echo $PATH')).out.split(':')
async def which(self, cmd):
async def which(self, *cmd):
"""
Return the first path to the cmd in the container.
If cmd argument is a list then it will try all commands.
"""
if not isinstance(cmd, (list, tuple)):
cmd = [cmd]
for path in await self.paths():
paths = (await self.env('PATH')).split(':')
for path in paths:
for c in cmd:
p = os.path.join(self.mnt, path[1:], c)
if os.path.exists(p):
@ -84,22 +88,30 @@ class Build(Script):
def __repr__(self):
return f'Build'
async def run(self, *args, **kwargs):
if os.getuid() == 0:
return await super().run(*args, **kwargs)
async def __call__(self, *args, debug=False, **kwargs):
if Proc.test or os.getuid() == 0 or self.parent.parent:
self.ctr = (await self.exec('buildah', 'from', self.base, buildah=False)).out
self.mnt = Path((await self.exec('buildah', 'mount', self.ctr, buildah=False)).out)
from podctl.console_script import console_script
# restart under buildah unshare environment !
result = await super().__call__(*args, **kwargs)
#await self.umounts()
#await self.umount()
await self.exec('buildah', 'rm', self.ctr, raises=False, buildah=False)
return result
from shlax.cli import cli
# restart under buildah unshare environment
argv = [
'buildah', 'unshare',
sys.argv[0], # current podctl location
sys.argv[0], # current script location
]
if console_script.options.get('debug') is True:
if debug is True:
argv.append('-d')
elif isinstance(console_script.options.get('debug'), str):
argv.append('-d=' + console_script.options.get('debug'))
elif isinstance(debug, str):
argv.append('-d=' + debug)
argv += [
type(self).__name__.lower(), # script name ?
cli.shlaxfile.path,
cli.parser.command.name, # script name ?
]
output(' '.join(argv), 'EXECUTION', flush=True)
@ -110,13 +122,4 @@ class Build(Script):
stdout=sys.stdout,
)
await proc.communicate()
console_script.exit_code = proc.returncode
return
pp = subprocess.Popen(
argv,
stderr=sys.stderr,
stdin=sys.stdin,
stdout=sys.stdout,
)
pp.communicate()
console_script.exit_code = pp.returncode
cli.exit_code = await proc.wait()

View File

@ -0,0 +1,9 @@
import os
from shlax.proc import Proc
from ..strategies.script import Script
class Localhost(Script):
root = '/'

17
shlax/targets/ssh.py Normal file
View File

@ -0,0 +1,17 @@
import os
from shlax.proc import Proc
from .localhost import Localhost
class Ssh(Localhost):
root = '/'
def __init__(self, host, *args, **kwargs):
self.host = host
super().__init__(*args, **kwargs)
def shargs(self, *args, **kwargs):
args, kwargs = super().shargs(*args, **kwargs)
return (['ssh', self.host] + list(args)), kwargs

16
shlaxfile.py Executable file
View File

@ -0,0 +1,16 @@
#!/usr/bin/env shlax
from shlax.contrib.gitlab import *
gitlabci = GitLabCIConfig(
Job('test',
stage='test',
image='yourlabs/python',
script='pip install -U . && py.test -svv tests',
),
Job('pypi',
stage='deploy',
image='yourlabs/python',
script='pypi-release',
only=['tags']
),
)

View File

@ -0,0 +1,50 @@
from shlax import *
inner = Run()
other = Run('ls')
middle = Buildah('alpine', inner, other)
outer = Localhost(middle)
def test_action_init_args():
assert other.args == ('ls',)
def test_action_parent_autoset():
assert list(outer.actions) == [middle]
assert middle.parent == outer
assert inner.parent == middle
assert other.parent == middle
def test_action_context():
assert outer.context is inner.context
assert middle.context is inner.context
assert middle.context is outer.context
assert other.context is outer.context
def test_action_sibblings():
assert inner.sibblings() == [other]
assert inner.sibblings(lambda s: s.args[0] == 'ls') == [other]
assert inner.sibblings(lambda s: s.args[0] == 'foo') == []
assert inner.sibblings(type='run') == [other]
assert inner.sibblings(args=('ls',)) == [other]
def test_actions_parents():
assert other.parents() == [middle, outer]
assert other.parents(lambda p: p.base == 'alpine') == [middle]
assert inner.parents(type='localhost') == [outer]
assert inner.parents(type='buildah') == [middle]
def test_action_childrens():
assert middle.children() == [inner, other]
assert middle.children(lambda a: a.args[0] == 'ls') == [other]
assert outer.children() == [middle, inner, other]
def test_action_getattr():
assert other.exec == middle.exec
assert other.shargs == middle.shargs

View File

@ -1,19 +0,0 @@
from podctl.visitors.commit import Commit
def test_name_parse():
commit = Commit('foo.ee/bar/test:y')
assert commit.registry == 'foo.ee'
assert commit.repo == 'bar/test'
assert commit.tags == ['y']
commit = Commit('foo.ee/bar/test')
assert commit.registry == 'foo.ee'
assert commit.repo == 'bar/test'
commit = Commit('bar/test')
assert commit.repo == 'bar/test'
commit = Commit('bar/test:y')
assert commit.repo == 'bar/test'
assert commit.tags == ['y']

31
tests/test_image.py Normal file
View File

@ -0,0 +1,31 @@
import pytest
from shlax import Image
tests = {
'docker://a.b:1337/re/po:x,y': ('docker', 'a.b:1337', 're/po', 'x,y'),
'docker://a.b/re/po:x,y': ('docker', 'a.b', 're/po', 'x,y'),
'a.b:1337/re/po:x,y': (None, 'a.b:1337', 're/po', 'x,y'),
'a.b/re/po:x,y': (None, 'a.b', 're/po', 'x,y'),
're/po:x,y': (None, None, 're/po', 'x,y'),
're/po': (None, None, 're/po', 'latest'),
'docker://re/po': ('docker', None, 're/po', 'latest'),
'docker://re/po:x,y': ('docker', None, 're/po', 'x,y'),
}
@pytest.mark.parametrize(
'arg,expected', [(k, dict(
backend=v[0], registry=v[1], repository=v[2], tags=v[3].split(',')
)) for k, v in tests.items()]
)
def test_args(arg, expected):
im = Image(arg)
for k, v in expected.items():
assert getattr(im, k) == v
def test_args_env():
import os
os.environ['CIRCLE_TAG'] = 'foo'
im = Image('re/po:x,y')
assert im.tags == ['x', 'y', 'foo']

62
tests/test_proc.py Normal file
View File

@ -0,0 +1,62 @@
import pytest
from unittest.mock import patch
from shlax import *
from shlax import proc
test_args_params = [
(
Localhost(Run('echo hi')),
[('sh', '-euc', 'echo hi')]
),
(
Localhost(Run('echo hi', user='jimi')),
[('sudo', '-u', 'jimi', 'sh', '-euc', 'echo hi')]
),
(
Localhost(Run('echo hi', user='root')),
[('sudo', 'sh', '-euc', 'echo hi')]
),
(
Ssh('host', Run('echo hi', user='root')),
[('ssh', 'host', 'sudo', 'sh', '-euc', 'echo hi')]
),
(
Buildah('alpine', Run('echo hi')),
[
('buildah', 'from', 'alpine'),
('buildah', 'mount', ''),
('buildah', 'run', '', '--', 'sh', '-euc', 'echo hi'),
('buildah', 'rm', ''),
]
),
(
Buildah('alpine', Run('echo hi', user='root')),
[
('buildah', 'from', 'alpine'),
('buildah', 'mount', ''),
('buildah', 'run', '--user', 'root', '', '--', 'sh', '-euc', 'echo hi'),
('buildah', 'rm', ''),
]
),
(
Ssh('host', Buildah('alpine', Run('echo hi', user='root'))),
[
('ssh', 'host', 'buildah', 'from', 'alpine'),
('ssh', 'host', 'buildah', 'mount', ''),
('ssh', 'host', 'buildah', 'run', '--user', 'root', '', '--', 'sh', '-euc', 'echo hi'),
('ssh', 'host', 'buildah', 'rm', ''),
]
),
]
@pytest.mark.parametrize(
'script,commands',
test_args_params
)
@pytest.mark.asyncio
async def test_args(script, commands):
with Proc.mock():
await script()
assert commands == Proc.test

View File

@ -1,91 +0,0 @@
from unittest.mock import MagicMock
from podctl.script import Script
from podctl.visitable import Visitable
class Visitor0:
def __init__(self, name=None):
self.name = name or 'visit0'
class Visitor1:
def pre_build(self, script):
script.append('pre_build')
def build(self, script):
script.append('build')
def post_build(self, script):
script.append('post_build')
def test_visitable_visitor():
visitable = Visitable(Visitor0(), Visitor1(), build=Script())
script = visitable.script('build')
assert 'pre_build' in script
assert 'build' in script
assert 'post_build' in script
def test_visitable_visitor():
x = Visitor0()
assert Visitable(x).visitor('visitor0') is x
def test_visitable_variable():
assert Visitable(Visitor0('foo')).variable('name') == 'foo'
#
#
#def test_visitable_configuration():
# '''Attributes should be passable to constructor or as class attributes'''
# assert Container(a='b')['a'] == 'b'
# class Test(Container):
# cfg = dict(a='b')
# assert Test()['a'] == 'b'
#
#
#def test_switch_simple():
# assert Container(a=switch(default='expected'))['a'] == 'expected'
# assert Container(a=switch(noise='noise'))['a'] == None
# fixture = Container(
# 'test',
# a=switch(default='noise', test='expected')
# )
# assert fixture['a'] == 'expected'
# assert [*fixture.values()][0] == 'expected'
# assert [*fixture.items()][0][1] == 'expected'
#
#
#def test_switch_iterable():
# class TContainer(Container):
# cfg = dict(
# a=switch(dev='test')
# )
# assert TContainer()['a'] is None
# assert TContainer('dev')['a'] == 'test'
# assert TContainer('dev', a=[switch(dev='y')])['a'] == ['y']
# assert TContainer('dev', a=[switch(default='y')])['a'] == ['y']
#
#
#def test_switch_value_list():
# assert Container('test').switch_value(
# [switch(default='noise', test=False)]
# ) == [False]
#
# assert Container('none').switch_value(
# [switch(noise='noise')]
# ) == []
#
#
#def test_switch_value_dict():
# assert Container('foo').switch_value(
# dict(i=switch(default='expected', noise='noise'))
# ) == dict(i='expected')
#
# assert Container('test').switch_value(
# dict(i=switch(default='noise', test='expected'))
# ) == dict(i='expected')
#
# assert Container('none').switch_value(
# dict(i=switch(noise='noise'), j=dict(e=switch(none=1)))
# ) == dict(j=dict(e=1))