Compare commits

..

No commits in common. "push" and "master" have entirely different histories.
push ... master

39 changed files with 608 additions and 2331 deletions

10
.gitignore vendored
View File

@ -1,10 +0,0 @@
*.pyc
__pycache__
.cache/
.coverage
.eggs/
.podctl_build_django.sh
.podctl_build_podctl.sh
.setupmeta.version
.testmondata
*.egg-info

View File

@ -1,30 +1,20 @@
build:
cache:
key: cache
paths: [.cache]
image: yourlabs/buildah
script:
- pip3 install -U --user -e .[cli]
- CACHE_DIR=$(pwd)/.cache python3 ./shlaxfile.py build push
stage: build
image: yourlabs/python-arch
build-itself:
cache:
key: cache
paths: [.cache]
image: quay.io/yourlabs/shlax:$CI_COMMIT_SHORT_SHA
script: python3 ./shlaxfile.py build
qa:
stage: test
script: flake8
test:
image: yourlabs/python
stage: build
pytest:
stage: test
script:
- pip install -U --user -e .[test]
- py.test -sv tests
- pip install --user -e .
- pytest -vv --cov shlax --cov-report=xml:coverage.xml --junitxml=report.xml --cov-report=term-missing --strict tests
- CI_PROJECT_PATH=yourlabs/shlax CI_BUILD_REPO=https://github.com/yourlabs/cli2 codecov-bash -f coverage.xml
artifacts:
reports:
junit: report.xml
pypi:
image: yourlabs/python
only: [tags]
script: pypi-release
stage: deploy
script: pypi-release
only: [tags]

View File

@ -1,14 +0,0 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://yourlabs.io/oss/shlax
rev: master
hooks:
- id: shlaxfile-gitlabci

View File

@ -1,5 +0,0 @@
- id: shlaxfile-gitlabci
name: Regenerate .gitlab-ci.yml
description: Regenerate gitlabci
entry: ./shlaxfile.py gitlabci
language: python

282
README.md
View File

@ -1,282 +0,0 @@
# Shlax: Pythonic automation tool
Shlax is a Python framework for system automation, initially with the purpose
of replacing docker, docker-compose and ansible with a single tool with the
purpose of code-reuse made possible by target abstraction.
## Development status: Design state
I got the thing to work with an ugly PoC that I basically brute-forced, I'm
currently rewriting the codebase with a proper design.
The stories are in development in this order:
- replacing docker build, that's in the state of polishing
- replacing docker-compose, not in use but the PoC works so far
- replacing ansible, also working in working PoC state, the shlax command line
demonstrates
This project is supposed to unblock me from adding the CI feature to the
Sentry/GitLab/Portainer implementation I'm doing in pure python on top of
Django, CRUDLFA+ and Ryzom (isomorphic components in Python to replace
templates). So, as you can see, I'm really deep in it with a strong
determination.
Shlax builds its container itself, so check the shlaxfile.py of this repository
to see what it currently looks like, and check the build job of the CI pipeline
to see the output.
# Design
The pattern resolves around two moving parts: Actions and Targets.
## Action
An action is a function that takes a target argument, it may execute nested
actions by passing over the target argument which collects the results.
Example:
```python
async def hello_world(target):
"""Bunch of silly commands to demonstrate action programming."""
await target.mkdir('foo')
python = await target.which('python3', 'python')
await target.exec(f'{python} --version > foo/test')
version = target.exec('cat foo/test').output
print('version')
```
### Recursion
An action may call other actions recursively. There are two ways:
```python
async def something(target):
# just run the other action code
hello_world(target)
# or delegate the call to target
target(hello_world)
```
In the first case, the resulting count of ran actions will remain 1:
"something" action.
In the second case, the resulting count of ran actions will be 2: "something"
and "hello_world".
### Callable classes
Actually in practice, Actions are basic callable Python classes, here's a basic
example to run a command:
```python
class Run:
def __init__(self, cmd):
self.cmd = cmd
async def __call__(self, target):
return await target.exec(self.cmd)
```
This allows to create callable objects which may be called just like functions
and as such be appropriate actions, instead of:
```python
async def one(target):
target.exec('one')
async def two(target):
target.exec('two')
```
You can do:
```python
one = Run('one')
two = Run('two')
```
### Parallel execution
Actions may be executed in parallel with an action named ... Parallel. This
defines an action that will execute three actions in parallel:
```python
action = Parallel(
hello_world,
something,
Run('echo hi'),
)
```
In this case, all actions must succeed for the parallel action to be considered
a success.
### Methods
An action may also be a method, as long as it just takes a target argument, for
example:
```python
class Thing:
def start(self, target):
"""Starts thing"""
def stop(self, target):
"""Stops thing"""
action = Thing().start
```
### Cleaning
If an action defines a `clean` method, it will always be called wether or not
the action succeeded. Example:
```python
class Thing:
def __call__(self, target):
"""Do some thing"""
def clean(self, target):
"""Clean-up target after __call__"""
```
### Colorful actions
If an action defines a `colorize` method, it will be called with the colorset
as argument for every output, this allows to code custom output rendering.
## Target
A Target is mainly an object providing an abstraction layer over the system we
want to automate with actions. It defines functions to execute a command, mount
a directory, copy a file, manage environment variables and so on.
### Pre-configuration
A Target can be pre-configured with a list of Actions in which case calling the
target without argument will execute its Actions until one fails by raising an
Exception:
```python
say_hello = Localhost(
hello_world,
Run('echo hi'),
)
await say_hello()
```
### Results
Every time a target execute an action, it will set the "status" attribute on it
to "success" or "failure", and add it to the "results" attribute:
```python
say_hello = Localhost(Run('echo hi'))
await say_hello()
say_hello.results # contains the action with status="success"
```
## Targets as Actions: the nesting story
We've seen that any callable taking a target argument is good to be considered
an action, and that targets are callables.
To make a Target runnable like any action, all we had to do is add the target
keyword argument to `Target.__call__`.
But `target()` fills `self.results`, so nested action results would not
propagate to the parent target.
That's why if Target receives a non-None target argument, it will has to set
`self.parent` with it.
This allows nested targets to traverse parents and get to the root Target
with `target.caller`, where it can then attach results to.
This opens the nice side effect that a target implementation may call the
parent target if any, you could write a Docker target as such:
```python
class Docker(Target):
def __init__(self, *actions, name):
self.name = name
super().__init__(*actions)
async def exec(self, *args):
return await self.parent.exec(*['docker', 'exec', self.name] + args)
```
This also means that you always need a parent with an exec implementation,
there are two:
- Localhost, executes on localhost
- Stub, for testing
The result of that design is that the following use cases are available:
```python
# This action installs my favorite package on any distro
action = Packages('python3')
# Run it right here: apt install python3
Localhost()(action)
# Or remotely: ssh yourhost apt install python3
Ssh(host='yourhost')(action)
# Let's make a container build receipe with that action
build = Buildah(package)
# Run it locally: buildah exec apt install python3
Localhost()(build)
# Or on a server: ssh yourhost build exec apt install python3
Ssh(host='yourhost')(build)
# Or on a server behingh a bastion:
# ssh yourbastion ssh yourhost build exec apt install python3
Localhost()(Ssh(host='bastion')(Ssh(host='yourhost')(build))
# That's going to do the same
Localhost(Ssh(
Ssh(
build,
host='yourhost'
),
host='bastion'
))()
```
## CLI
You can execute Shlax actions directly on the command line with the `shlax` CLI
command.
For your own Shlaxfiles, you can build your CLI with your favorite CLI
framework. If you decide to use `cli2`, then Shlax provides a thin layer on top
of it: Group and Command objects made for Shlax objects.
For example:
```python
yourcontainer = Container(
build=Buildah(
User('app', '/app', 1000),
Packages('python', 'unzip', 'findutils'),
Copy('setup.py', 'yourdir', '/app'),
base='archlinux',
commit='yourimage',
),
)
if __name__ == '__main__':
print(Group(doc=__doc__).load(yourcontainer).entry_point())
```
The above will execute a cli2 command with each method of yourcontainer as a
sub-command.

110
README.rst Normal file
View File

@ -0,0 +1,110 @@
Shlax: Beautiful Async Subprocess executor
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Why?
====
In Python we now have async subprocesses which allows to execute several
subprocesses at the same time. The purpose of this library is to:
- stream stderr and stdout in real time while capturing it,
- real time output must be prefixed for when you execute several commands at
the time so that you know which line is for which process, like with
docker-compose logs,
- output coloration in real time with regexps to make even more readable.
This code was copy/pasted between projects and finally extracted on its own.
Demo
====
.. image:: https://yourlabs.io/oss/shlax/-/raw/master/demo.png
You will find the demo script in demo.py in this repository.
Usage
=====
Basics
------
Basic example, this will both stream output and capture it:
.. code-block:: python
from shlax import Subprocess
proc = await Subprocess('echo hi').wait()
print(proc.rc, proc.out, proc.err, proc.out_raw, proc.err_raw)
Longer
------
If you want to start the command and wait for completion elsewhere then call
any of ``start()`` and ``wait()``, or both, explicitely:
.. code-block:: python
proc = Subprocess('echo hi')
await proc.start() # start the process
await proc.wait() # wait for completion
Proc alias
----------
Note that shlax defines an alias ``Proc`` to ``Subprocess`` so this also works:
.. code-block:: python
from shlax import Proc
proc = await Proc('echo hi').wait()
Quiet
-----
To disable real time output streaming use the ``quiet`` argument:
.. code-block:: python
proc = await Subprocess('echo hi', quiet=True).wait()
Prefix
------
Using prefixes, you can have real time outputs of parallel commands and at the
same time know which output belongs to which process:
.. code-block:: python
proc0 = Subprocess('find /', prefix='first')
proc1 = Subprocess('find /', prefix='second')
await asyncio.gather(proc0.wait(), proc1.wait())
Coloration and output patching
------------------------------
You can add coloration or patch real time output with regexps, note that it
will be applied line by line:
.. code-block:: python
import sys
regexps = {
'^(.*).py$': '{cyan}\\1',
}
await asyncio.gather(*[
Subprocess(
f'find {path}',
regexps=regexps,
).wait()
for path in sys.path
])
Where is the rest?
==================
Shlax used to be the name of a much more ambitious poc-project, that you can
still find in the ``OLD`` branch of this repository. It has been extracted in
two projects with clear boundaries, namely `sysplan
<https://yourlabs.io/oss/sysplan>`_ and `podplan
<https://yourlabs.io/oss/podplan>`_ which are still in alpha state, although
Shlax as it now, is feature complete and stable.

BIN
demo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 990 KiB

24
demo.py Normal file
View File

@ -0,0 +1,24 @@
import asyncio
from shlax import Subprocess
async def main():
colors = {
'^(.*).txt$': '{green}\\1.txt',
'^(.*).py$': '{bred}\\1.py',
}
await asyncio.gather(
Subprocess(
'for i in $(find .. | head); do echo $i; sleep .2; done',
regexps=colors,
prefix='parent',
).wait(),
Subprocess(
'for i in $(find . | head); do echo $i; sleep .3; done',
regexps=colors,
prefix='cwd',
).wait(),
)
asyncio.run(main())

View File

@ -6,9 +6,6 @@ setup(
versioning='dev',
setup_requires='setupmeta',
extras_require=dict(
cli=[
'cli2>=2.3.0',
],
test=[
'pytest',
'pytest-cov',
@ -20,11 +17,6 @@ setup(
url='https://yourlabs.io/oss/shlax',
include_package_data=True,
license='MIT',
keywords='cli automation ansible',
keywords='async subprocess',
python_requires='>=3',
entry_points={
'console_scripts': [
'shlax = shlax.cli:cli.entry_point',
],
},
)

3
shlax/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .subprocess import Subprocess # noqa
Proc = Subprocess # noqa
from .colors import colors, c # noqa

View File

@ -1,2 +0,0 @@
class Action:
pass

View File

@ -1,56 +0,0 @@
import asyncio
import binascii
import os
class Copy:
def __init__(self, *args):
self.src = args[:-1]
self.dst = args[-1]
def listfiles(self):
if getattr(self, '_listfiles', None):
return self._listfiles
result = []
for src in self.src:
if os.path.isfile(src):
result.append(src)
continue
for root, dirs, files in os.walk(src):
if '__pycache__' in root:
continue
result += [
os.path.join(root, f)
for f in files
if not f.endswith('.pyc')
]
self._listfiles = result
return result
async def __call__(self, target):
await target.mkdir(self.dst)
for path in self.listfiles():
if os.path.isdir(path):
await target.mkdir(os.path.join(self.dst, path))
elif '/' in path:
dirname = os.path.join(
self.dst,
'/'.join(path.split('/')[:-1])
)
await target.mkdir(dirname)
await target.copy(path, dirname)
else:
await target.copy(path, self.dst)
def __str__(self):
return f'Copy({", ".join(self.src)}, {self.dst})'
async def cachekey(self):
async def chksum(path):
with open(path, 'rb') as f:
return (path, str(binascii.crc32(f.read())))
results = await asyncio.gather(*[chksum(f) for f in self.listfiles()])
return {path: chks for path, chks in results}

View File

@ -1,176 +0,0 @@
import asyncio
import copy
from datetime import datetime
from glob import glob
import os
import subprocess
from textwrap import dedent
class Packages:
"""
Package manager abstract layer with caching.
It's a central piece of the build process, and does iterate over other
container visitors in order to pick up packages. For example, the Pip
visitor will declare ``self.packages = dict(apt=['python3-pip'])``, and the
Packages visitor will pick it up.
"""
regexps = {
#r'Installing ([\w\d-]+)': '{cyan}\\1',
r'Installing': '{cyan}lol',
}
mgrs = dict(
apk=dict(
update='apk update',
upgrade='apk upgrade',
install='apk add',
),
apt=dict(
update='apt-get -y update',
upgrade='apt-get -y upgrade',
install='apt-get -y --no-install-recommends install',
),
pacman=dict(
update='pacman -Sy',
upgrade='pacman -Su --noconfirm',
install='pacman -S --noconfirm',
lastupdate='stat -c %Y /var/lib/pacman/sync/core.db',
),
dnf=dict(
update='dnf makecache --assumeyes',
upgrade='dnf upgrade --best --assumeyes --skip-broken', # noqa
install='dnf install --setopt=install_weak_deps=False --best --assumeyes', # noqa
lastupdate='stat -c %Y /var/cache/dnf/* | head -n1',
),
yum=dict(
update='yum update',
upgrade='yum upgrade',
install='yum install',
),
)
installed = []
def __init__(self, *packages, upgrade=False):
self.packages = []
self.upgrade = upgrade
for package in packages:
line = dedent(package).strip().replace('\n', ' ')
self.packages += line.split(' ')
async def cache_setup(self, target):
if 'CACHE_DIR' in os.environ:
self.cache_root = os.path.join(os.getenv('CACHE_DIR'))
else:
self.cache_root = os.path.join(await target.parent.getenv('HOME'), '.cache')
# run pkgmgr_setup functions ie. apk_setup
await getattr(self, self.mgr + '_setup')(target)
async def update(self, target):
# lastupdate = await target.exec(self.cmds['lastupdate'], raises=False)
# lastupdate = int(lastupdate.out) if lastupdate.rc == 0 else None
lastupdate = None
now = int(datetime.now().strftime('%s'))
if not lastupdate or now - lastupdate > 604800:
await target.rexec(self.cmds['update'])
return
# disabling with the above return call until needed again
# might have to rewrite this to not have our own lockfile
# or find a better place on the filesystem
# also make sure the lockfile is actually needed when running on
# targets that don't have isguest=True
if not lastupdate or now - lastupdate > 604800:
# crude lockfile implementation, should work against *most*
# race-conditions ...
lockfile = cachedir + '/update.lock'
if not await target.parent.exists(lockfile):
await target.parent.write(lockfile, str(os.getpid()))
try:
await target.rexec(self.cmds['update'])
finally:
await target.parent.rm(lockfile)
await target.parent.write(cachedir + '/lastupdate', str(now))
else:
while await target.parent.exists(lockfile):
print(f'{self.target} | Waiting for {lockfile} ...')
await asyncio.sleep(1)
async def __call__(self, target):
cached = getattr(target, 'pkgmgr', None)
if cached:
self.mgr = cached
else:
mgr = await target.which(*self.mgrs.keys())
if mgr:
self.mgr = mgr[0].split('/')[-1]
if not self.mgr:
raise Exception('Packages does not yet support this distro')
self.cmds = self.mgrs[self.mgr]
if target.isguest:
# we're going to mount
await self.cache_setup(target)
await self.update(target)
if self.upgrade:
await target.rexec(self.cmds['upgrade'])
packages = []
for package in self.packages:
if ',' in package:
parts = package.split(',')
package = parts[0]
if self.mgr in parts[1:]:
# include apt on apt
packages.append(package)
else:
packages.append(package)
await target.rexec(*self.cmds['install'].split(' ') + packages)
async def apk_setup(self, target):
cachedir = os.path.join(self.cache_root, self.mgr)
await target.mount(cachedir, '/var/cache/apk')
# special step to enable apk cache
await target.rexec('ln -sf /var/cache/apk /etc/apk/cache')
return cachedir
async def dnf_setup(self, target):
cachedir = os.path.join(self.cache_root, self.mgr)
await target.mount(cachedir, f'/var/cache/{self.mgr}')
await target.rexec('echo keepcache=True >> /etc/dnf/dnf.conf')
return cachedir
async def apt_setup(self, target):
codename = (await target.rexec(
f'source /etc/os-release; echo $VERSION_CODENAME'
)).out
cachedir = os.path.join(self.cache_root, self.mgr, codename)
await self.rexec('rm /etc/apt/apt.conf.d/docker-clean')
cache_archives = os.path.join(cachedir, 'archives')
await target.mount(cache_archives, f'/var/cache/apt/archives')
cache_lists = os.path.join(cachedir, 'lists')
await target.mount(cache_lists, f'/var/lib/apt/lists')
return cachedir
async def pacman_setup(self, target):
cachedir = os.path.join(self.cache_root, self.mgr)
await target.mkdir(cachedir + '/cache', cachedir + '/sync')
await target.mount(cachedir + '/sync', '/var/lib/pacman/sync')
await target.mount(cachedir + '/cache', '/var/cache/pacman')
if await target.host.exists('/etc/pacman.d/mirrorlist'):
await target.copy('/etc/pacman.d/mirrorlist', '/etc/pacman.d/mirrorlist')
def __str__(self):
return f'Packages({self.packages}, upgrade={self.upgrade})'

View File

@ -1,14 +0,0 @@
import asyncio
class Parallel:
def __init__(self, *actions):
self.actions = actions
async def __call__(self, target):
return await asyncio.gather(*[
target(action) for action in self.actions
])
def __str__(self):
return 'Parallel executor'

View File

@ -1,72 +0,0 @@
from glob import glob
import os
from urllib import request
from .base import Action
class Pip(Action):
"""Pip abstraction layer."""
def __init__(self, *pip_packages):
self.pip_packages = pip_packages
async def __call__(self, target):
# ensure python presence
results = await target.which('python3', 'python')
if results:
python = results[0]
else:
raise Exception('Could not find pip nor python')
# ensure pip module presence
result = await target.exec(
python, '-m', 'pip',
raises=False, quiet=True
)
if result.rc != 0:
if not os.path.exists('get-pip.py'):
req = request.urlopen(
'https://bootstrap.pypa.io/get-pip.py'
)
content = req.read()
with open('get-pip.py', 'wb+') as f:
f.write(content)
await target.copy('get-pip.py', '.')
await target.exec(python, 'get-pip.py')
# choose a cache directory
if 'CACHE_DIR' in os.environ:
cache = os.path.join(os.getenv('CACHE_DIR'), 'pip')
else:
cache = os.path.join(os.getenv('HOME'), '.cache', 'pip')
# and mount it
if getattr(target, 'mount', None):
# we are in a target which shares a mount command
await target.mount(cache, '/root/.cache/pip')
source = []
nonsource = []
for package in self.pip_packages:
if os.path.exists(package):
source.append(package)
else:
nonsource.append(package)
if nonsource:
await target.exec(
python, '-m', 'pip',
'install', '--upgrade',
*nonsource
)
if source:
await target.exec(
python, '-m', 'pip',
'install', '--upgrade', '--editable',
*source
)
def __str__(self):
return f'Pip({", ".join(self.pip_packages)})'

View File

@ -1,15 +0,0 @@
class Run:
def __init__(self, cmd, root=False):
self.cmd = cmd
self.root = root
async def __call__(self, target):
if self.root:
self.proc = await target.rexec(self.cmd)
else:
self.proc = await target.exec(self.cmd)
def __str__(self):
return f'Run({self.cmd})'

View File

@ -1,44 +0,0 @@
import os
import re
from .packages import Packages
class User:
"""
Create a user.
Example:
User('app', '/app', getenv('_CONTAINERS_ROOTLESS_UID', 1000)),
_CONTAINERS_ROOTLESS_UID allows to get your UID during build, which happens
in buildah unshare.
"""
def __init__(self, username, home, uid):
self.username = username
self.home = home
self.uid = uid
def __str__(self):
return f'User({self.username}, {self.home}, {self.uid})'
async def __call__(self, target):
result = await target.rexec('id', self.uid, raises=False)
if result.rc == 0:
old = re.match('.*\(([^)]*)\).*', result.out).group(1)
await target.rexec(
'usermod',
'-d', self.home,
'-l', self.username,
old
)
else:
await target.rexec(
'useradd',
'-d', self.home,
'-u', self.uid,
self.username
)
await target.mkdir(self.home)
await target.rexec('chown', self.uid, self.home)

View File

@ -1,120 +0,0 @@
"""
Shlax executes mostly in 3 ways:
- Execute actions on targets with the command line
- With your shlaxfile as first argument: offer defined Actions
- With the name of a module in shlax.repo: a community maintained shlaxfile
"""
import ast
import asyncio
import cli2
import glob
import inspect
import importlib
import os
import sys
from .proc import ProcFailure
class Group(cli2.Group):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cmdclass = Command
class TargetArgument(cli2.Argument):
"""
Target to execute on: localhost by default, target=@ssh_host for ssh.
"""
def __init__(self, cmd, param, doc=None, color=None, default=None):
from shlax.targets.base import Target
super().__init__(cmd, param, doc=self.__doc__, default=Target())
self.alias = ['target', 't']
def cast(self, value):
from shlax.targets.ssh import Ssh
user, host = value.split('@')
return Ssh(host=host, user=user)
def match(self, arg):
return arg if isinstance(arg, str) and '@' in arg else None
class Command(cli2.Command):
def setargs(self):
super().setargs()
if 'target' in self.sig.parameters:
self['target'] = TargetArgument(
self,
self.sig.parameters['target'],
)
if 'actions' in self:
del self['actions']
def __call__(self, *argv):
result = None
try:
result = super().__call__(*argv)
except ProcFailure:
# just output the failure without TB, as command was already
# printed anyway
pass
self['target'].value.output.results(self['target'].value)
return result
class ActionCommand(cli2.Command):
def setargs(self):
super().setargs()
self['target'] = TargetArgument(
self,
inspect.Parameter('target', inspect.Parameter.KEYWORD_ONLY),
)
def call(self, *args, **kwargs):
self.target = self.target(*args, **kwargs)
return super().call(self['target'].value)
class ConsoleScript(Group):
def __call__(self, *argv):
self.load_actions()
#self.load_shlaxfiles() # wip
return super().__call__(*argv)
def load_shlaxfiles(self):
filesdir = os.path.dirname(__file__) + '/shlaxfiles/'
for filename in os.listdir(filesdir):
filepath = filesdir + filename
if not os.path.isfile(filepath):
continue
with open(filepath, 'r') as f:
tree = ast.parse(f.read())
group = self.group(filename[:-3])
main = Group(doc=__doc__).load(shlax)
def load_actions(self):
actionsdir = os.path.dirname(__file__) + '/actions/'
for filename in os.listdir(actionsdir):
filepath = actionsdir + filename
if not os.path.isfile(filepath):
continue
with open(filepath, 'r') as f:
tree = ast.parse(f.read())
cls = [
node
for node in tree.body
if isinstance(node, ast.ClassDef)
]
if not cls:
continue
mod = importlib.import_module('shlax.actions.' + filename[:-3])
cls = getattr(mod, cls[0].name)
self.add(cls, name=filename[:-3], cmdclass=ActionCommand)
cli = ConsoleScript(doc=__doc__)

View File

@ -1,68 +1,73 @@
colors = dict(
cyan='\u001b[38;5;51m',
cyan1='\u001b[38;5;87m',
cyan2='\u001b[38;5;123m',
cyan3='\u001b[38;5;159m',
blue='\u001b[38;5;33m',
blue1='\u001b[38;5;69m',
blue2='\u001b[38;5;75m',
blue3='\u001b[38;5;81m',
blue4='\u001b[38;5;111m',
blue5='\u001b[38;5;27m',
green='\u001b[38;5;10m',
green1='\u001b[38;5;2m',
green2='\u001b[38;5;46m',
green3='\u001b[38;5;47m',
green4='\u001b[38;5;48m',
green5='\u001b[38;5;118m',
green6='\u001b[38;5;119m',
green7='\u001b[38;5;120m',
purple='\u001b[38;5;5m',
purple1='\u001b[38;5;6m',
purple2='\u001b[38;5;13m',
purple3='\u001b[38;5;164m',
purple4='\u001b[38;5;165m',
purple5='\u001b[38;5;176m',
purple6='\u001b[38;5;145m',
purple7='\u001b[38;5;213m',
purple8='\u001b[38;5;201m',
red='\u001b[38;5;1m',
red1='\u001b[38;5;9m',
red2='\u001b[38;5;196m',
red3='\u001b[38;5;160m',
red4='\u001b[38;5;197m',
red5='\u001b[38;5;198m',
red6='\u001b[38;5;199m',
yellow='\u001b[38;5;226m',
yellow1='\u001b[38;5;227m',
yellow2='\u001b[38;5;226m',
yellow3='\u001b[38;5;229m',
yellow4='\u001b[38;5;220m',
yellow5='\u001b[38;5;230m',
gray='\u001b[38;5;250m',
gray1='\u001b[38;5;251m',
gray2='\u001b[38;5;252m',
gray3='\u001b[38;5;253m',
gray4='\u001b[38;5;254m',
gray5='\u001b[38;5;255m',
gray6='\u001b[38;5;249m',
pink='\u001b[38;5;197m',
pink1='\u001b[38;5;198m',
pink2='\u001b[38;5;199m',
pink3='\u001b[38;5;200m',
pink4='\u001b[38;5;201m',
pink5='\u001b[38;5;207m',
pink6='\u001b[38;5;213m',
orange='\u001b[38;5;202m',
orange1='\u001b[38;5;208m',
orange2='\u001b[38;5;214m',
orange3='\u001b[38;5;220m',
orange4='\u001b[38;5;172m',
orange5='\u001b[38;5;166m',
reset='\u001b[0m',
theme = dict(
cyan='\033[38;5;51m',
cyan1='\033[38;5;87m',
cyan2='\033[38;5;123m',
cyan3='\033[38;5;159m',
blue='\033[38;5;33m',
blue1='\033[38;5;69m',
blue2='\033[38;5;75m',
blue3='\033[38;5;81m',
blue4='\033[38;5;111m',
blue5='\033[38;5;27m',
green='\033[38;5;10m',
green1='\033[38;5;2m',
green2='\033[38;5;46m',
green3='\033[38;5;47m',
green4='\033[38;5;48m',
green5='\033[38;5;118m',
green6='\033[38;5;119m',
green7='\033[38;5;120m',
purple='\033[38;5;5m',
purple1='\033[38;5;6m',
purple2='\033[38;5;13m',
purple3='\033[38;5;164m',
purple4='\033[38;5;165m',
purple5='\033[38;5;176m',
purple6='\033[38;5;145m',
purple7='\033[38;5;213m',
purple8='\033[38;5;201m',
red='\033[38;5;1m',
red1='\033[38;5;9m',
red2='\033[38;5;196m',
red3='\033[38;5;160m',
red4='\033[38;5;197m',
red5='\033[38;5;198m',
red6='\033[38;5;199m',
yellow='\033[38;5;226m',
yellow1='\033[38;5;227m',
yellow2='\033[38;5;226m',
yellow3='\033[38;5;229m',
yellow4='\033[38;5;220m',
yellow5='\033[38;5;230m',
gray='\033[38;5;250m',
gray1='\033[38;5;251m',
gray2='\033[38;5;252m',
gray3='\033[38;5;253m',
gray4='\033[38;5;254m',
gray5='\033[38;5;255m',
gray6='\033[38;5;249m',
pink='\033[38;5;197m',
pink1='\033[38;5;198m',
pink2='\033[38;5;199m',
pink3='\033[38;5;200m',
pink4='\033[38;5;201m',
pink5='\033[38;5;207m',
pink6='\033[38;5;213m',
orange='\033[38;5;202m',
orange1='\033[38;5;208m',
orange2='\033[38;5;214m',
orange3='\033[38;5;220m',
orange4='\033[38;5;172m',
orange5='\033[38;5;166m',
reset='\033[0m',
)
colors.update({
k + 'bold': v.replace('[', '[1;')
for k, v in colors.items()
})
class Colors:
def __init__(self, **theme):
for name, value in theme.items():
setattr(self, name, value)
setattr(self, f'b{name}', value.replace('[', '[1;'))
c = colors = Colors(**theme)

View File

@ -1,129 +0,0 @@
import copy
import os
from .podman import Podman
from .image import Image
class Container:
def __init__(self, build=None, image=None, env=None, volumes=None):
self.build = build
self.image = image or self.build.image
if isinstance(self.image, str):
self.image = Image(self.image)
self.volumes = volumes or {}
self.env = env or {}
prefix = os.getcwd().split('/')[-1]
repo = self.image.repository.replace('/', '-')
if prefix == repo:
self.name = repo
else:
self.name = '-'.join([prefix, repo])
self.pod = None
@property
def full_name(self):
if self.pod:
return '-'.join([self.pod.name, self.name])
return self.name
async def up(self, target, *args):
"""Start the container foreground"""
podman = Podman(target)
if self.pod:
pod = None
for _ in await podman.pod.ps():
if _['Name'] == self.pod.name:
pod = _
break
if not pod:
await podman.pod.create('--name', self.pod.name)
args = list(args) + ['--pod', self.pod.name]
# skip if already up
for result in await podman.ps('-a'):
for name in result['Names']:
if name == self.full_name:
if result['State'] == 'running':
target.output.info(f'{self.full_name} already running')
return
elif result['State'] in ('exited', 'configured'):
target.output.info(f'{self.full_name} starting')
startargs = ['podman', 'start']
if '-d' not in args:
startargs.append('--attach')
startargs.append(self.full_name)
await target.exec(*startargs)
return
cmd = [
'podman',
'run',
] + list(args)
for src, dest in self.volumes.items():
cmd += ['--volume', ':'.join([src, dest])]
for src, dest in self.env.items():
cmd += ['--env', '='.join([src, str(dest)])]
cmd += [
'--name',
self.full_name,
str(self.image),
]
await target.exec(*cmd)
async def start(self, target):
"""Start the container background"""
await self.up(target, '-d')
async def stop(self, target):
"""Start the container"""
await target.exec('podman', 'stop', self.full_name)
async def inspect(self, target):
"""Inspect container"""
await target.exec('podman', 'inspect', self.full_name)
async def logs(self, target):
"""Show container logs"""
await target.exec('podman', 'logs', self.full_name)
async def exec(self, target, cmd=None):
"""Execute a command in the container"""
cmd = cmd or 'bash'
if cmd.endswith('sh'):
import os
os.execvp(
'/usr/bin/podman',
[
'podman',
'exec',
'-it',
self.full_name,
cmd,
]
)
result = await target.exec(
'podman',
'exec',
self.full_name,
cmd,
)
async def down(self, target):
"""Start the container"""
await target.exec('podman', 'rm', '-f', self.full_name, raises=False)
async def apply(self, target):
"""Start the container"""
if self.build:
await target(self.build)
await target(self.down)
await target(self.start)
def __str__(self):
return f'Container(name={self.name}, image={self.image}, volumes={self.volumes})'

View File

@ -1,92 +0,0 @@
import json
import os
import re
class Layers(set):
def __init__(self, image):
self.image = image
async def ls(self, target):
"""Fetch layers from localhost"""
ret = set()
results = await target.parent.exec(
'buildah images --json',
quiet=True,
)
results = json.loads(results.out)
prefix = 'localhost/' + self.image.repository + ':layer-'
for result in results:
if not result.get('names', None):
continue
for name in result['names']:
if name.startswith(prefix):
self.add(name)
return self
async def rm(self, target, tags=None):
"""Drop layers for this image"""
if tags is None:
tags = [layer for layer in await self.ls(target)]
await target.exec('podman', 'rmi', *tags, raises=False)
class Image:
PATTERN = re.compile(
'^((?P<backend>[a-z]*)://)?((?P<registry>[^/]*[.][^/]*)/)?((?P<repository>[^:]+))?(:(?P<tags>.*))?$' # noqa
, re.I
)
def __init__(self, arg=None, format=None, backend=None, registry=None,
repository=None, tags=None):
self.arg = arg
self.format = format
self.backend = backend
self.registry = registry
self.repository = repository
self.tags = tags or []
self.layers = Layers(self)
match = re.match(self.PATTERN, arg)
if match:
for k, v in match.groupdict().items():
if getattr(self, k):
continue
if not v:
continue
if k == 'tags':
v = v.split(',')
setattr(self, k, v)
# docker.io currently has issues with oci format
self.format = format or 'oci'
if self.registry == 'docker.io':
self.format = 'docker'
# filter out tags which resolved to None
self.tags = [t for t in self.tags if t]
# default tag by default ...
if not self.tags:
self.tags = ['latest']
def __str__(self):
return f'{self.repository}:{self.tags[-1]}'
async def push(self, target):
user = os.getenv('IMAGES_USER')
passwd = os.getenv('IMAGES_PASS')
if user and passwd:
target.output.cmd('buildah login -u ... -p ...' + self.registry)
await target.parent.exec(
'buildah', 'login', '-u', user, '-p', passwd,
self.registry or 'docker.io', debug=False)
for tag in self.tags:
await target.parent.exec(
'buildah',
'push',
self.repository + ':final',
f'{self.registry}/{self.repository}:{tag}'
)

View File

@ -1,213 +0,0 @@
import re
import sys
import types
from .colors import colors
class Output:
prefixes = dict()
colors = colors
prefix_colors = (
'\x1b[1;36;45m',
'\x1b[1;36;41m',
'\x1b[1;36;40m',
'\x1b[1;37;45m',
'\x1b[1;32m',
'\x1b[1;37;44m',
)
def color(self, code=None):
if not code:
return '\u001b[0m'
code = str(code)
return u"\u001b[38;5;" + code + "m"
def colorize(self, code, content):
return self.color(code) + content + self.color()
def colorized(self, action):
if hasattr(action, 'colorized'):
return action.colorized(self.colors)
elif isinstance(action, types.MethodType):
return f'{action.__self__}.{action.__name__}'
else:
return str(action)
def __init__(
self,
prefix=None,
regexps=None,
debug='cmd,visit,out',
write=None,
flush=None,
**kwargs
):
self.prefix = prefix
self.debug = debug
self.prefix_length = 0
self.regexps = regexps or dict()
self.write = write or sys.stdout.buffer.write
self.flush = flush or sys.stdout.flush
self.kwargs = kwargs
def prefix_line(self):
if self.prefix not in self.prefixes:
self.prefixes[self.prefix] = self.prefix_colors[len(self.prefixes)]
if len(self.prefix) > self.prefix_length:
self.prefix_length = len(self.prefix)
prefix_color = self.prefixes[self.prefix] if self.prefix else ''
prefix_padding = '.' * (self.prefix_length - len(self.prefix) - 2) if self.prefix else ''
if prefix_padding:
prefix_padding = ' ' + prefix_padding + ' '
return [
prefix_color,
prefix_padding,
self.prefix,
' ',
self.colors['reset'],
'| '
]
def __call__(self, line, highlight=True, flush=True):
line = [self.highlight(line) if highlight else line]
if self.prefix:
line = self.prefix_line() + line
line = ''.join(line)
self.write(line.encode('utf8'))
if flush:
self.flush()
def cmd(self, line):
self(
self.colorize(251, '+')
+ '\x1b[1;38;5;15m'
+ ' '
+ self.highlight(line, 'bash')
+ self.colors['reset']
+ '\n',
highlight=False
)
def print(self, content):
self(
content,
prefix=None,
highlight=False
)
def highlight(self, line, highlight=True):
line = line.decode('utf8') if isinstance(line, bytes) else line
if not highlight or (
'\x1b[' in line
or '\033[' in line
or '\\e[' in line
):
return line
for regexp, colors in self.regexps.items():
line = re.sub(regexp, colors.format(**self.colors), line)
line = line + self.colors['reset']
return line
def test(self, action):
self(''.join([
self.colors['purplebold'],
'! TEST ',
self.colors['reset'],
self.colorized(action),
'\n',
]))
def clean(self, action):
if self.debug:
self(''.join([
self.colors['bluebold'],
'+ CLEAN ',
self.colors['reset'],
self.colorized(action),
'\n',
]))
def start(self, action):
if self.debug is True or 'visit' in str(self.debug):
self(''.join([
self.colors['orangebold'],
'⚠ START ',
self.colors['reset'],
self.colorized(action),
'\n',
]))
def info(self, text):
if self.debug is True or 'visit' in str(self.debug):
self(''.join([
self.colors['cyanbold'],
'➤ INFO ',
self.colors['reset'],
text,
'\n',
]))
def skip(self, action):
if self.debug is True or 'visit' in str(self.debug):
self(''.join([
self.colors['yellowbold'],
'↪️ SKIP ',
self.colors['reset'],
self.colorized(action),
'\n',
]))
def success(self, action):
if self.debug is True or 'visit' in str(self.debug):
self(''.join([
self.colors['greenbold'],
'✔ SUCCESS ',
self.colors['reset'],
self.colorized(action),
'\n',
]))
def fail(self, action, exception=None):
if self.debug is True or 'visit' in str(self.debug):
self(''.join([
self.colors['redbold'],
'✘ FAIL ',
self.colors['reset'],
self.colorized(action),
'\n',
]))
def results(self, action):
if len(action.results) < 2:
return
success = 0
fail = 0
for result in action.results:
if result.status == 'success':
success += 1
if result.status == 'failure':
fail += 1
self(''.join([
self.colors['greenbold'],
'✔ SUCCESS REPORT: ',
self.colors['reset'],
str(success),
'\n',
]))
if fail:
self(''.join([
self.colors['redbold'],
'✘ FAIL REPORT: ',
self.colors['reset'],
str(fail),
'\n',
]))

View File

@ -1,78 +0,0 @@
import cli2
import json
import os
import sys
from shlax.targets.base import Target
from shlax.actions.parallel import Parallel
from shlax.proc import Proc
from .podman import Podman
class Pod:
"""Help text"""
def __init__(self, **containers):
self.containers = containers
for name, container in self.containers.items():
container.pod = self
container.name = name
self.name = os.getcwd().split('/')[-1]
async def _call(self, target, method, *names):
methods = [
getattr(container, method)
for name, container in self.containers.items()
if not names or name in names
]
await target(Parallel(*methods))
async def build(self, target, *names):
"""Build container images"""
if not (Proc.test or os.getuid() == 0):
os.execvp('buildah', ['buildah', 'unshare'] + sys.argv)
else:
await self._call(target, 'build', *names)
async def down(self, target, *names):
"""Delete container images"""
await self._call(target, 'down', *names)
async def start(self, target, *names):
"""Start container images"""
await self._call(target, 'start', *names)
async def logs(self, target, *names):
"""Start container images"""
await self._call(target, 'logs', *names)
async def ps(self, target):
"""Show containers and volumes"""
containers = []
names = []
for container in await Podman(target).ps('-a'):
for name in container['Names']:
if name.startswith(self.name + '-'):
container['Name'] = name
containers.append(container)
names.append(name)
for name, container in self.containers.items():
full_name = '-'.join([self.name, container.name])
if full_name in names:
continue
containers.append(dict(
Name=full_name,
State='not created',
))
cli2.Table(
['Name', 'State'],
*[
(container['Name'], container['State'])
for container in containers
]
).print()
def __str__(self):
return f'Pod({self.name})'

View File

@ -1,20 +0,0 @@
import json
class Podman(list):
def __init__(self, target, *args):
self.target = target
super().__init__(args or ['podman'])
def __getattr__(self, command):
if command.startswith('_'):
return super().__getattr__(command)
return Podman(self.target, *self + [command])
async def __call__(self, *args, **kwargs):
cmd = self + list(args) + [
f'--{k}={v}' for k, v in kwargs.items()
]
if 'ps' in cmd:
cmd += ['--format=json']
return (await self.target.exec(*cmd, quiet=True)).json

View File

@ -1,159 +0,0 @@
"""
Asynchronous process execution wrapper.
"""
import asyncio
import os
import shlex
import sys
from .output import Output
class ProcFailure(Exception):
def __init__(self, proc):
self.proc = proc
msg = f'FAIL exit with {proc.rc} ' + proc.args[0]
if not proc.output.debug or 'cmd' not in str(proc.output.debug):
msg += '\n' + proc.cmd
if not proc.output.debug or 'out' not in str(proc.output.debug):
msg += '\n' + proc.out
msg += '\n' + proc.err
super().__init__(msg)
class PrefixStreamProtocol(asyncio.subprocess.SubprocessStreamProtocol):
"""
Internal subprocess stream protocol to add a prefix in front of output to
make asynchronous output readable.
"""
def __init__(self, proc, *args, **kwargs):
self.proc = proc
super().__init__(*args, **kwargs)
def pipe_data_received(self, fd, data):
if self.proc.output.debug is True or 'out' in str(self.proc.output.debug):
if fd in (1, 2):
self.proc.output(data)
super().pipe_data_received(fd, data)
def protocol_factory(proc):
def _p():
return PrefixStreamProtocol(
proc,
limit=asyncio.streams._DEFAULT_LIMIT,
loop=asyncio.events.get_event_loop()
)
return _p
class Proc:
"""
Subprocess wrapper.
Example usage::
proc = Proc('find', '/', prefix='containername')
await proc() # execute
print(proc.out) # stdout
print(proc.err) # stderr
print(proc.rc) # return code
"""
test = False
def __init__(self, *args, prefix=None, raises=True, output=None, quiet=False):
if quiet:
self.output = Output(debug=False)
else:
self.output = output or Output()
self.cmd = ' '.join(args)
self.args = args
self.prefix = prefix
self.raises = raises
self.called = False
self.communicated = False
self.out_raw = b''
self.err_raw = b''
self.out = ''
self.err = ''
self.rc = None
@staticmethod
def split(*args):
args = [str(a) for a in args]
if len(args) == 1:
if isinstance(args[0], (list, tuple)):
args = args[0]
else:
args = ['sh', '-euc', ' '.join(args)]
return args
def output_factory(self, *args, **kwargs):
args = tuple(self.prefix) + args
return Output(*args, kwargs)
async def __call__(self, wait=True):
if self.called:
raise Exception('Already called: ' + self.cmd)
if 'cmd' in str(self.output.debug):
self.output.cmd(self.cmd)
if self.test:
if self.test is True:
type(self).test = []
self.test.append(self.args)
return self
loop = asyncio.events.get_event_loop()
transport, protocol = await loop.subprocess_exec(
protocol_factory(self), *self.args)
self.proc = asyncio.subprocess.Process(transport, protocol, loop)
self.called = True
if wait:
await self.wait()
return self
async def communicate(self):
self.out_raw, self.err_raw = await self.proc.communicate()
self.out = self.out_raw.decode('utf8').strip()
self.err = self.err_raw.decode('utf8').strip()
self.rc = self.proc.returncode
self.communicated = True
return self
async def wait(self):
if self.test:
return self
if not self.called:
await self()
if not self.communicated:
await self.communicate()
if self.raises and self.proc.returncode:
raise ProcFailure(self)
return self
@property
def json(self):
import json
return json.loads(self.out)
def mock():
"""Context manager for testing purpose."""
cls = Proc
class Mock:
def __enter__(_):
cls.test = True
def __exit__(_, exc_type, exc_value, traceback):
cls.test = False
return Mock()

View File

@ -1,13 +0,0 @@
class Result:
def __init__(self, target, action):
self.target = target
self.action = action
self.status = 'pending'
self.exception = None
class Results(list):
def new(self, target, action):
result = Result(target, action)
self.append(result)
return result

View File

@ -1,18 +0,0 @@
from .targets.base import Target
from .targets.buildah import Buildah
from .targets.localhost import Localhost
from .targets.stub import Stub
from .actions.copy import Copy
from .actions.packages import Packages
from .actions.run import Run
from .actions.pip import Pip
from .actions.parallel import Parallel
from .actions.user import User
from .cli import Command, Group
from .container import Container
from .pod import Pod
from os import getenv, environ

182
shlax/subprocess.py Normal file
View File

@ -0,0 +1,182 @@
import asyncio
import functools
import re
import shlex
import sys
from .colors import colors
class SubprocessProtocol(asyncio.SubprocessProtocol):
def __init__(self, proc):
self.proc = proc
self.output = bytearray()
def pipe_data_received(self, fd, data):
if fd == 1:
self.proc.stdout(data)
elif fd == 2:
self.proc.stderr(data)
def process_exited(self):
self.proc.exit_future.set_result(True)
class Subprocess:
colors = colors
# arbitrary list of colors
prefix_colors = (
colors.cyan,
colors.blue,
colors.green,
colors.purple,
colors.red,
colors.yellow,
colors.gray,
colors.pink,
colors.orange,
)
# class variables, meant to grow as new prefixes are discovered to ensure
# output alignment
prefixes = dict()
prefix_length = 0
def __init__(
self,
*args,
quiet=None,
prefix=None,
regexps=None,
write=None,
flush=None,
):
if len(args) == 1 and ' ' in args[0]:
args = ['sh', '-euc', args[0]]
self.args = args
self.quiet = quiet if quiet is not None else False
self.prefix = prefix
self.write = write or sys.stdout.buffer.write
self.flush = flush or sys.stdout.flush
self.started = False
self.waited = False
self.out_raw = bytearray()
self.err_raw = bytearray()
self.regexps = dict()
if regexps:
for search, replace in regexps.items():
if isinstance(search, str):
search = search.encode()
search = re.compile(search)
replace = replace.format(**self.colors.__dict__).encode()
self.regexps[search] = replace
async def start(self, wait=True):
if not self.quiet:
self.output(
self.colors.bgray.encode()
+ b'+ '
+ shlex.join([
arg.replace('\n', '\\n')
for arg in self.args
]).encode()
+ self.colors.reset.encode(),
highlight=False
)
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
self.exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
self.transport, self.protocol = await loop.subprocess_exec(
lambda: SubprocessProtocol(self),
*self.args,
stdin=None,
)
self.started = True
async def wait(self, *args, **kwargs):
if not self.started:
await self.start()
if not self.waited:
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await self.exit_future
# Close the stdout pipe.
self.transport.close()
self.waited = True
return self
def stdout(self, data):
self.out_raw.extend(data)
if not self.quiet:
self.output(data)
def stderr(self, data):
self.err_raw.extend(data)
if not self.quiet:
self.output(data)
@functools.cached_property
def out(self):
return self.out_raw.decode().strip()
@functools.cached_property
def err(self):
return self.err_raw.decode().strip()
@functools.cached_property
def rc(self):
return self.transport.get_returncode()
def output(self, data, highlight=True, flush=True):
for line in data.strip().split(b'\n'):
line = [self.highlight(line) if highlight else line]
if self.prefix:
line = self.prefix_line() + line
line.append(b'\n')
line = b''.join(line)
self.write(line)
if flush:
self.flush()
def highlight(self, line, highlight=True):
if not highlight or (
b'\x1b[' in line
or b'\033[' in line
or b'\\e[' in line
):
return line
for search, replace in self.regexps.items():
line = re.sub(search, replace, line)
line = line + self.colors.reset.encode()
return line
def prefix_line(self):
if self.prefix not in self.prefixes:
self.prefixes[self.prefix] = self.prefix_colors[len(self.prefixes)]
if len(self.prefix) > self.prefix_length:
type(self).prefix_length = len(self.prefix)
return [
self.prefixes[self.prefix].encode(),
b' ' * (self.prefix_length - len(self.prefix)),
self.prefix.encode(),
b' ',
self.colors.reset.encode(),
b'| '
]

View File

@ -1,195 +0,0 @@
import asyncio
import copy
from pathlib import Path
import os
import re
import sys
from ..output import Output
from ..proc import Proc
from ..result import Result, Results
class Target:
isguest = False
def __init__(self, *actions, root=None):
self.actions = actions
self.results = []
self.output = Output()
self.parent = None
self.root = root or ''
def __str__(self):
return 'localhost'
@property
def parent(self):
return self._parent or Target()
@parent.setter
def parent(self, value):
self._parent = value
@property
def caller(self):
"""Traverse parents and return the top-levels Target."""
if not self._parent:
return self
caller = self._parent
while caller._parent:
caller = caller._parent
return caller
async def __call__(self, *actions, target=None):
if target:
# that's going to be used by other target methods, to access
# the calling target
self.parent = target
result = Result(self, self)
result.status = 'success'
for action in actions or self.actions:
if await self.action(action, reraise=bool(actions)):
result.status = 'failure'
break
if getattr(self, 'clean', None):
self.output.clean(self)
await self.clean(self, result)
async def action(self, action, reraise=False):
result = Result(self, action)
self.output.start(action)
try:
await action(target=self)
except Exception as e:
self.output.fail(action, e)
result.status = 'failure'
result.exception = e
if reraise:
# nested call, re-raise
raise
else:
import traceback
traceback.print_exception(type(e), e, sys.exc_info()[2])
return True
else:
if getattr(action, 'skipped', False):
self.output.skip(action)
else:
self.output.success(action)
result.status = 'success'
finally:
self.caller.results.append(result)
clean = getattr(action, 'clean', None)
if clean:
self.output.clean(action)
await clean(self, result)
async def rexec(self, *args, **kwargs):
kwargs['user'] = 'root'
return await self.exec(*args, **kwargs)
async def which(self, *cmd):
"""
Return the first path to the cmd in the container.
If cmd argument is a list then it will try all commands.
"""
proc = await self.exec('type ' + ' '.join(cmd), raises=False)
result = []
for res in proc.out.split('\n'):
match = re.match('([^ ]+) is ([^ ]+)$', res.strip())
if match:
result.append(match.group(1))
return result
def shargs(self, *args, **kwargs):
user = kwargs.pop('user', None)
args = [str(arg) for arg in args if args is not None]
if args and ' ' in args[0]:
if len(args) == 1:
args = ['sh', '-euc', args[0]]
else:
args = ['sh', '-euc'] + list(args)
if user == 'root':
args = ['sudo'] + args
elif user:
args = ['sudo', '-u', user] + args
return args, kwargs
if self.parent:
return self.parent.shargs(*args, **kwargs)
else:
return args, kwargs
async def exec(self, *args, **kwargs):
kwargs['output'] = self.output
args, kwargs = self.shargs(*args, **kwargs)
proc = await Proc(*args, **kwargs)()
if kwargs.get('wait', True):
await proc.wait()
return proc
@property
def root(self):
return self._root
@root.setter
def root(self, value):
self._root = Path(value) if value else ''
@property
def host(self):
current = self
while current.isguest:
current = self.parent
return current
def path(self, path):
if not self.root:
return path
if str(path).startswith('/'):
path = str(path)[1:]
return str(self.root / path)
async def mkdir(self, *paths):
if '_mkdir' not in self.__dict__:
self._mkdir = []
make = [str(path) for path in paths if str(path) not in self._mkdir]
if make:
await self.exec('mkdir', '-p', *make)
self._mkdir += make
async def copy(self, *args):
return await self.exec('cp', '-a', *args)
async def exists(self, path):
return (await self.exec('ls ' + self.path(path), raises=False)).rc == 0
async def read(self, path):
return (await self.exec('cat', self.path(path))).out
async def write(self, path, content, **kwargs):
return await self.exec(
f'cat > {self.path(path)} <<EOF\n'
+ content
+ '\nEOF',
**kwargs
)
async def rm(self, path):
return await self.exec('rm', self.path(path))
async def getenv(self, key):
return (await self.exec('echo $' + key)).out
async def getcwd(self):
return (await self.exec('pwd')).out

View File

@ -1,225 +0,0 @@
import asyncio
import copy
import hashlib
import json
import os
import sys
from pathlib import Path
from .base import Target
from ..image import Image
from ..proc import Proc
class Buildah(Target):
"""Build container image with buildah"""
isguest = True
def __init__(self, *actions, base=None, commit=None):
self.base = base or 'alpine'
self.image = Image(commit) if commit else None
self.ctr = None
self.root = None
self.mounts = dict()
# Always consider localhost as parent for now
self.parent = Target()
super().__init__(*actions)
def is_runnable(self):
return Proc.test or os.getuid() == 0
def __str__(self):
if not self.is_runnable():
return 'Replacing with: buildah unshare ' + ' '.join(sys.argv)
return f'Buildah({self.image})'
async def __call__(self, *actions, target=None, push: bool=False):
if target:
self.parent = target
self.push = push
if not self.is_runnable():
os.execvp('buildah', ['buildah', 'unshare'] + sys.argv)
return # process has been replaced
layers = await self.image.layers.ls(self)
keep = await self.cache_setup(self.image.layers, *actions)
keepnames = [*map(lambda x: 'localhost/' + str(x), keep)]
self.invalidate = [name for name in self.image.layers if name not in keepnames]
if self.invalidate:
self.output.info('Invalidating old layers')
await self.image.layers.rm(self.parent, self.invalidate)
if actions:
actions = actions[len(keep):]
if not actions:
return self.output.success('Image up to date')
else:
self.actions = self.actions[len(keep):]
if not self.actions:
return self.output.success('Image up to date')
self.ctr = (await self.parent.exec('buildah', 'from', self.base)).out
self.root = Path((await self.parent.exec('buildah', 'mount', self.ctr)).out)
return await super().__call__(*actions)
async def cache_setup(self, layers, *actions):
keep = []
self.image_previous = Image(self.base)
for action in actions or self.actions:
action_image = await self.action_image(action)
name = 'localhost/' + str(action_image)
if name in layers:
self.base = self.image_previous = action_image
keep.append(action_image)
self.output.skip(
f'Found layer for {action}: {action_image.tags[0]}'
)
else:
break
return keep
async def action_image(self, action):
prefix = str(self.image_previous)
for tag in self.image_previous.tags:
if tag.startswith('layer-'):
prefix = tag
break
if hasattr(action, 'cachekey'):
action_key = action.cachekey()
if asyncio.iscoroutine(action_key):
action_key = str(await action_key)
else:
action_key = str(action)
key = prefix + action_key
sha1 = hashlib.sha1(key.encode('ascii'))
action_image = copy.deepcopy(self.image)
action_image.tags = ['layer-' + sha1.hexdigest()]
return action_image
async def action(self, action, reraise=False):
stop = await super().action(action, reraise)
if not stop:
action_image = await self.action_image(action)
self.output.info(f'Commiting {action_image} for {action}')
await self.parent.exec(
'buildah',
'commit',
'--format=' + action_image.format,
self.ctr,
action_image,
)
self.image_previous = action_image
return stop
async def clean(self, target, result):
if self.ctr is not None:
for src, dst in self.mounts.items():
await self.parent.exec('umount', self.root / str(dst)[1:])
await self.parent.exec('buildah', 'umount', self.ctr)
if result.status == 'success' and self.ctr:
await self.commit()
if self.push:
await self.image.push(target)
if self.ctr is not None:
await self.parent.exec('buildah', 'rm', self.ctr)
async def mount(self, src, dst):
"""Mount a host directory into the container."""
target = self.root / str(dst)[1:]
await self.parent.exec(f'mkdir -p {src} {target}')
await self.parent.exec(f'mount -o bind {src} {target}')
self.mounts[src] = dst
async def exec(self, *args, user=None, **kwargs):
_args = ['buildah', 'run']
if user:
_args += ['--user', user]
_args += [self.ctr, '--', 'sh', '-euc']
_args += [' '.join([str(a) for a in args])]
return await self.parent.exec(*_args, **kwargs)
async def commit(self):
await self.parent.exec(
f'buildah commit {self.ctr} {self.image.repository}:final'
)
ENV_TAGS = (
# gitlab
'CI_COMMIT_SHORT_SHA',
'CI_COMMIT_REF_NAME',
'CI_COMMIT_TAG',
# CircleCI
'CIRCLE_SHA1',
'CIRCLE_TAG',
'CIRCLE_BRANCH',
# contributions welcome here
)
# figure tags from CI vars
for name in ENV_TAGS:
value = os.getenv(name)
if value:
self.image.tags.append(value)
if self.image.tags:
tags = [f'{self.image.repository}:{tag}' for tag in self.image.tags]
else:
tags = [self.image.repository]
await self.parent.exec('buildah', 'tag', self.image.repository + ':final', *tags)
async def mkdir(self, *paths):
return await self.parent.mkdir(*[self.path(path) for path in paths])
async def copy(self, *args):
return await self.parent.copy(*args[:-1], self.path(args[-1]))
async def write(self, path, content):
return await self.write(path, content)
async def write(self, path, content, **kwargs):
return await self.exec(
f'cat > {path} <<EOF\n'
+ content
+ '\nEOF',
**kwargs
)
class Config:
def __init__(self, **config):
self.config = config
async def __call__(self, target):
for key, value in self.config.items():
await target.parent.exec(
f'buildah config --{key} "{value}" {target.ctr}'
)
def __str__(self):
return f'Buildah.Config({self.config})'
class Env:
def __init__(self, **env):
self.env = env
async def __call__(self, target):
for key, value in self.env.items():
await target.parent.exec(
'buildah',
'config',
'--env',
f'{key}={value}',
target.ctr,
)
def __str__(self):
return f'Buildah.Env({self.env})'

View File

@ -1,40 +0,0 @@
import copy
import re
from ..output import Output
from ..proc import Proc
from ..result import Result, Results
from .base import Target
class Localhost(Target):
def shargs(self, *args, **kwargs):
user = kwargs.pop('user', None)
args = [str(arg) for arg in args if args is not None]
if args and ' ' in args[0]:
if len(args) == 1:
args = ['sh', '-euc', args[0]]
else:
args = ['sh', '-euc'] + list(args)
if user == 'root':
args = ['sudo'] + args
elif user:
args = ['sudo', '-u', user] + args
return args, kwargs
if self.parent:
return self.parent.shargs(*args, **kwargs)
else:
return args, kwargs
async def exec(self, *args, **kwargs):
kwargs['output'] = self.output
args, kwargs = self.shargs(*args, **kwargs)
proc = await Proc(*args, **kwargs)()
if kwargs.get('wait', True):
await proc.wait()
return proc

View File

@ -1,15 +0,0 @@
from .base import Target
class Ssh(Target):
def __init__(self, *actions, host, user=None):
self.host = host
self.user = user
super().__init__(*actions)
async def exec(self, *args, user=None, **kwargs):
_args = ['ssh', self.host]
if user == 'root':
_args += ['sudo']
_args += [' '.join([str(a) for a in args])]
return await self.parent.exec(*_args, **kwargs)

View File

@ -1,23 +0,0 @@
from ..proc import Proc
from .base import Target
class ProcStub(Proc):
async def __call__(self, wait=True):
return self
async def communicate(self):
self.communicated = True
return self
async def wait(self):
return self
class Stub(Target):
async def exec(self, *args, **kwargs):
proc = await ProcStub(*args, **kwargs)()
if kwargs.get('wait', True):
await proc.wait()
return proc

View File

@ -1,20 +0,0 @@
#!/usr/bin/env python
"""
Shlaxfile for shlax itself.
"""
from shlax.shortcuts import *
shlax = Container(
build=Buildah(
Packages('python38', 'buildah', 'unzip', 'findutils', upgrade=False),
Copy('setup.py', 'shlax', '/app'),
Pip('/app'),
base='quay.io/podman/stable',
commit='quay.io/yourlabs/shlax',
),
)
if __name__ == '__main__':
print(Group(doc=__doc__).load(shlax).entry_point())

7
tests/test_colors.py Normal file
View File

@ -0,0 +1,7 @@
import shlax
def test_colors():
assert shlax.colors.cyan == '\u001b[38;5;51m'
assert shlax.colors.bcyan == '\u001b[1;38;5;51m'
assert shlax.colors.reset == '\u001b[0m'

View File

@ -1,27 +0,0 @@
import pytest
import os
from shlax.image import Image
tests = {
'docker://a.b:1337/re/po:x,y': ('docker', 'a.b:1337', 're/po', 'x,y'),
'docker://a.b/re/po:x,y': ('docker', 'a.b', 're/po', 'x,y'),
'a.b:1337/re/po:x,y': (None, 'a.b:1337', 're/po', 'x,y'),
'a.b/re/po:x,y': (None, 'a.b', 're/po', 'x,y'),
're/po:x,y': (None, None, 're/po', 'x,y'),
're/po': (None, None, 're/po', 'latest'),
'docker://re/po': ('docker', None, 're/po', 'latest'),
'docker://re/po:x,y': ('docker', None, 're/po', 'x,y'),
}
@pytest.mark.parametrize(
'arg,expected', [(k, dict(
backend=v[0], registry=v[1], repository=v[2], tags=v[3].split(',')
)) for k, v in tests.items()]
)
def test_args(arg, expected):
Image.ENV_TAGS = []
im = Image(arg)
for k, v in expected.items():
assert getattr(im, k) == v

View File

@ -1,24 +0,0 @@
import pytest
from shlax.output import Output
class Write:
def __init__(self):
self.output = ''
def __call__(self, out):
self.output += out.decode('utf8')
@pytest.fixture
def write():
return Write()
def test_output_regexps(write):
output = Output(
regexps={'^(.*)$': '{red}\\1'},
write=write,
flush=lambda: None,
)
output('foo')
assert write.output.strip() == output.colors['red'] + 'foo' + output.colors['reset']

197
tests/test_proc.py Normal file
View File

@ -0,0 +1,197 @@
import pytest
from unittest.mock import Mock, call
from shlax import Proc
@pytest.mark.asyncio
@pytest.mark.parametrize(
'args',
(
['sh', '-c', 'echo hi'],
['echo hi'],
['sh -c "echo hi"'],
)
)
async def test_proc(args):
proc = Proc(*args, quiet=True)
assert not proc.waited
assert not proc.started
await proc.wait()
assert proc.waited
assert proc.started
assert proc.out == 'hi'
assert proc.err == ''
assert proc.out_raw == b'hi\n'
assert proc.err_raw == b''
assert proc.rc == 0
@pytest.mark.asyncio
async def test_wait_unbound():
proc = await Proc('echo hi', quiet=True).wait()
assert proc.out == 'hi'
@pytest.mark.asyncio
async def test_rc_1():
proc = await Proc(
'NON EXISTING COMMAND',
quiet=True,
).wait()
assert proc.rc != 0
assert proc.err == 'sh: line 1: NON: command not found'
@pytest.mark.asyncio
async def test_prefix():
"""
Test output prefixes for when executing multiple commands in parallel.
"""
Proc.prefix_length = 0 # reset
write = Mock()
await Proc(
'echo hi',
write=write,
prefix='test_prefix',
).wait()
await Proc(
'echo hi',
write=write,
prefix='test_prefix_1'
).wait()
await Proc(
'echo hi',
write=write,
prefix='test_prefix',
).wait()
assert write.mock_calls == [
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo hi\''
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| hi'
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[1].encode()
+ b'test_prefix_1 '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo hi\''
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[1].encode()
# padding has been added because of output1
+ b'test_prefix_1 '
+ Proc.colors.reset.encode()
+ b'| hi'
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
# padding has been added because of output1
+ b' test_prefix '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo hi\''
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
# padding has been added because of output1
+ b' test_prefix '
+ Proc.colors.reset.encode()
+ b'| hi'
+ Proc.colors.reset.encode()
+ b'\n'
)
]
@pytest.mark.asyncio
async def test_prefix_multiline():
Proc.prefix_length = 0 # reset
proc = await Proc(
'echo -e "a\nb"',
write=Mock(),
prefix='test_prefix',
).wait()
assert proc.write.mock_calls == [
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo -e "a\\nb"\''
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| a'
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
# padding has been added because of output1
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| b'
+ Proc.colors.reset.encode()
+ b'\n'
),
]
@pytest.mark.asyncio
async def test_highlight():
"""
Test that we can color output with regexps.
"""
proc = await Proc(
'echo hi',
write=Mock(),
regexps={
r'h([\w\d-]+)': 'h{cyan}\\1',
}
).wait()
proc.write.assert_called_with(b'h\x1b[38;5;51mi\x1b[0m\n')
@pytest.mark.asyncio
async def test_highlight_if_not_colored():
"""
Test that coloration does not apply on output that is already colored.
"""
proc = await Proc(
'echo -e h"\\e[31m"i',
write=Mock(),
regexps={
r'h([\w\d-]+)': 'h{cyan}\\1',
}
).wait()
proc.write.assert_called_with(b'h\x1b[31mi\n')

View File

@ -1,132 +0,0 @@
import pytest
from shlax.targets.stub import Stub
from shlax.actions.run import Run
from shlax.actions.parallel import Parallel
from shlax.result import Result
class Error:
async def __call__(self, target):
raise Exception('lol')
@pytest.mark.asyncio
async def test_success():
action = Run('echo hi')
target = Stub(action)
await target()
assert target.results[0].action == action
assert target.results[0].status == 'success'
@pytest.mark.asyncio
async def test_error():
action = Error()
target = Stub(action)
await target()
assert target.results[0].action == action
assert target.results[0].status == 'failure'
@pytest.mark.asyncio
async def test_nested():
nested = Error()
class Nesting:
async def __call__(self, target):
await target(nested)
nesting = Nesting()
target = Stub(nesting)
await target()
assert len(target.results) == 2
assert target.results[0].status == 'failure'
assert target.results[0].action == nested
assert target.results[1].status == 'failure'
assert target.results[1].action == nesting
@pytest.mark.asyncio
async def test_parallel():
winner = Run('echo hi')
looser = Error()
parallel = Parallel(winner, looser)
target = Stub(parallel)
await target()
assert len(target.results) == 3
assert target.results[0].status == 'success'
assert target.results[0].action == winner
assert target.results[1].status == 'failure'
assert target.results[1].action == looser
assert target.results[2].status == 'failure'
assert target.results[2].action == parallel
@pytest.mark.asyncio
async def test_function():
async def hello(target):
await target.exec('hello')
await Stub()(hello)
@pytest.mark.asyncio
async def test_action_clean():
class Example:
def __init__(self):
self.was_called = False
async def clean(self, target, result):
self.was_called = True
async def __call__(self, target):
raise Exception('lol')
action = Example()
target = Stub()
with pytest.raises(Exception):
await target(action)
assert action.was_called
@pytest.mark.asyncio
async def test_target_clean():
class Example(Stub):
def __init__(self, action):
self.was_called = False
super().__init__(action)
async def clean(self, target, result):
self.was_called = True
target = Example(Error())
await target()
assert target.was_called
@pytest.mark.asyncio
async def test_method():
class Example:
def __init__(self):
self.was_called = False
async def test(self, target):
self.was_called = True
example = Example()
action = example.test
target = Stub()
await target(action)
assert example.was_called
@pytest.mark.asyncio
async def test_target_action():
child = Stub(Run('echo hi'))
parent = Stub(child)
grandpa = Stub()
await grandpa(parent)
assert len(grandpa.results) == 3
grandpa = Stub(parent)
await grandpa()
assert len(grandpa.results) == 3