Compare commits

..

No commits in common. "rewriteagain" and "master" have entirely different histories.

24 changed files with 615 additions and 512 deletions

10
.gitignore vendored
View File

@ -1,10 +0,0 @@
*.pyc
__pycache__
.cache/
.coverage
.eggs/
.podctl_build_django.sh
.podctl_build_podctl.sh
.setupmeta.version
.testmondata
*.egg-info

View File

@ -1,15 +1,20 @@
build:
cache:
key: cache
paths: [.cache]
image: yourlabs/shlax
script: pip install -U --user -e . && CACHE_DIR=$(pwd)/.cache ./shlaxfile.py -d
shlax build push
stage: build
image: yourlabs/python-arch
qa:
stage: test
script: flake8
pytest:
stage: test
script:
- pip install --user -e .
- pytest -vv --cov shlax --cov-report=xml:coverage.xml --junitxml=report.xml --cov-report=term-missing --strict tests
- CI_PROJECT_PATH=yourlabs/shlax CI_BUILD_REPO=https://github.com/yourlabs/cli2 codecov-bash -f coverage.xml
artifacts:
reports:
junit: report.xml
pypi:
image: yourlabs/python
only: [tags]
script: pypi-release
stage: deploy
test: {image: yourlabs/python, script: 'pip install -U --user -e .[test] && py.test
-svv tests', stage: build}
script: pypi-release
only: [tags]

View File

@ -1,14 +0,0 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://yourlabs.io/oss/shlax
rev: master
hooks:
- id: shlaxfile-gitlabci

View File

@ -1,5 +0,0 @@
- id: shlaxfile-gitlabci
name: Regenerate .gitlab-ci.yml
description: Regenerate gitlabci
entry: ./shlaxfile.py gitlabci
language: python

153
README.md
View File

@ -1,153 +0,0 @@
# Shlax: Pythonic automation tool
Shlax is a Python framework for system automation, initially with the purpose
of replacing docker, docker-compose and ansible with a single tool, with the
purpose of code-reuse. It may be viewed as "async fabric rewrite by a
megalomanic Django fanboy".
The pattern resolves around two moving parts: Actions and Targets.
## Action
An action is a function that takes a target argument, it may execute nested
actions by passing over the target argument which collects the results.
Example:
```python
async def hello_world(target):
"""Bunch of silly commands to demonstrate action programming."""
await target.mkdir('foo')
python = await target.which('python3', 'python')
await target.exec(f'{python} --version > foo/test')
version = target.exec('cat foo/test').output
print('version')
```
### Recursion
An action may call other actions recursively. There are two ways:
```python
async def something(target):
# just run the other action code
hello_world(target)
# or delegate the call to target
target(hello_world)
```
In the first case, the resulting count of ran actions will remain 1:
"something" action.
In the second case, the resulting count of ran actions will be 2: "something"
and "hello_world".
### Callable classes
Actually in practice, Actions are basic callable Python classes, here's a basic
example to run a command:
```python
class Run:
def __init__(self, cmd):
self.cmd = cmd
async def __call__(self, target):
return await target.exec(self.cmd)
```
This allows to create callable objects which may be called just like functions
and as such be appropriate actions, instead of:
```python
async def one(target):
target.exec('one')
async def two(target):
target.exec('two')
```
You can do:
```python
one = Run('one')
two = Run('two')
```
### Parallel execution
Actions may be executed in parallel with an action named ... Parallel. This
defines an action that will execute three actions in parallel:
```python
action = Parallel(
hello_world,
something,
Run('echo hi'),
)
```
In this case, all actions must succeed for the parallel action to be considered
a success.
### Methods
An action may also be a method, as long as it just takes a target argument, for
example:
```python
class Thing:
def start(self, target):
"""Starts thing"""
def stop(self, target):
"""Stops thing"""
action = Thing().start
```
### Cleaning
If an action defines a `clean` method, it will always be called wether or not
the action succeeded. Example:
```python
class Thing:
def __call__(self, target):
"""Do some thing"""
def clean(self, target):
"""Clean-up target after __call__"""
```
## Target
A Target is mainly an object providing an abstraction layer over the system we
want to automate with actions. It defines functions to execute a command, mount
a directory, copy a file, manage environment variables and so on.
### Pre-configuration
A Target can be pre-configured with a list of Actions in which case calling the
target without argument will execute its Actions until one fails by raising an
Exception:
```python
say_hello = Localhost(
hello_world,
Run('echo hi'),
)
await say_hello()
```
### Results
Every time a target execute an action, it will set the "status" attribute on it
to "success" or "failure", and add it to the "results" attribute:
```
say_hello = Localhost(Run('echo hi'))
await say_hello()
say_hello.results # contains the action with status="success"
```

110
README.rst Normal file
View File

@ -0,0 +1,110 @@
Shlax: Beautiful Async Subprocess executor
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Why?
====
In Python we now have async subprocesses which allows to execute several
subprocesses at the same time. The purpose of this library is to:
- stream stderr and stdout in real time while capturing it,
- real time output must be prefixed for when you execute several commands at
the time so that you know which line is for which process, like with
docker-compose logs,
- output coloration in real time with regexps to make even more readable.
This code was copy/pasted between projects and finally extracted on its own.
Demo
====
.. image:: https://yourlabs.io/oss/shlax/-/raw/master/demo.png
You will find the demo script in demo.py in this repository.
Usage
=====
Basics
------
Basic example, this will both stream output and capture it:
.. code-block:: python
from shlax import Subprocess
proc = await Subprocess('echo hi').wait()
print(proc.rc, proc.out, proc.err, proc.out_raw, proc.err_raw)
Longer
------
If you want to start the command and wait for completion elsewhere then call
any of ``start()`` and ``wait()``, or both, explicitely:
.. code-block:: python
proc = Subprocess('echo hi')
await proc.start() # start the process
await proc.wait() # wait for completion
Proc alias
----------
Note that shlax defines an alias ``Proc`` to ``Subprocess`` so this also works:
.. code-block:: python
from shlax import Proc
proc = await Proc('echo hi').wait()
Quiet
-----
To disable real time output streaming use the ``quiet`` argument:
.. code-block:: python
proc = await Subprocess('echo hi', quiet=True).wait()
Prefix
------
Using prefixes, you can have real time outputs of parallel commands and at the
same time know which output belongs to which process:
.. code-block:: python
proc0 = Subprocess('find /', prefix='first')
proc1 = Subprocess('find /', prefix='second')
await asyncio.gather(proc0.wait(), proc1.wait())
Coloration and output patching
------------------------------
You can add coloration or patch real time output with regexps, note that it
will be applied line by line:
.. code-block:: python
import sys
regexps = {
'^(.*).py$': '{cyan}\\1',
}
await asyncio.gather(*[
Subprocess(
f'find {path}',
regexps=regexps,
).wait()
for path in sys.path
])
Where is the rest?
==================
Shlax used to be the name of a much more ambitious poc-project, that you can
still find in the ``OLD`` branch of this repository. It has been extracted in
two projects with clear boundaries, namely `sysplan
<https://yourlabs.io/oss/sysplan>`_ and `podplan
<https://yourlabs.io/oss/podplan>`_ which are still in alpha state, although
Shlax as it now, is feature complete and stable.

BIN
demo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 990 KiB

24
demo.py Normal file
View File

@ -0,0 +1,24 @@
import asyncio
from shlax import Subprocess
async def main():
colors = {
'^(.*).txt$': '{green}\\1.txt',
'^(.*).py$': '{bred}\\1.py',
}
await asyncio.gather(
Subprocess(
'for i in $(find .. | head); do echo $i; sleep .2; done',
regexps=colors,
prefix='parent',
).wait(),
Subprocess(
'for i in $(find . | head); do echo $i; sleep .3; done',
regexps=colors,
prefix='cwd',
).wait(),
)
asyncio.run(main())

View File

@ -5,11 +5,7 @@ setup(
name='shlax',
versioning='dev',
setup_requires='setupmeta',
install_requires=['cli2'],
extras_require=dict(
full=[
'pyyaml',
],
test=[
'pytest',
'pytest-cov',
@ -21,11 +17,6 @@ setup(
url='https://yourlabs.io/oss/shlax',
include_package_data=True,
license='MIT',
keywords='cli automation ansible',
keywords='async subprocess',
python_requires='>=3',
entry_points={
'console_scripts': [
'shlax = shlax.cli:cli',
],
},
)

3
shlax/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .subprocess import Subprocess # noqa
Proc = Subprocess # noqa
from .colors import colors, c # noqa

View File

@ -1,2 +0,0 @@
class Action:
pass

View File

@ -1,11 +0,0 @@
import asyncio
class Parallel:
def __init__(self, *actions):
self.actions = actions
async def __call__(self, target):
return await asyncio.gather(*[
target(action) for action in self.actions
])

View File

@ -1,8 +0,0 @@
class Run:
def __init__(self, cmd):
self.cmd = cmd
async def __call__(self, target):
target.exec(self.cmd)

73
shlax/colors.py Normal file
View File

@ -0,0 +1,73 @@
theme = dict(
cyan='\033[38;5;51m',
cyan1='\033[38;5;87m',
cyan2='\033[38;5;123m',
cyan3='\033[38;5;159m',
blue='\033[38;5;33m',
blue1='\033[38;5;69m',
blue2='\033[38;5;75m',
blue3='\033[38;5;81m',
blue4='\033[38;5;111m',
blue5='\033[38;5;27m',
green='\033[38;5;10m',
green1='\033[38;5;2m',
green2='\033[38;5;46m',
green3='\033[38;5;47m',
green4='\033[38;5;48m',
green5='\033[38;5;118m',
green6='\033[38;5;119m',
green7='\033[38;5;120m',
purple='\033[38;5;5m',
purple1='\033[38;5;6m',
purple2='\033[38;5;13m',
purple3='\033[38;5;164m',
purple4='\033[38;5;165m',
purple5='\033[38;5;176m',
purple6='\033[38;5;145m',
purple7='\033[38;5;213m',
purple8='\033[38;5;201m',
red='\033[38;5;1m',
red1='\033[38;5;9m',
red2='\033[38;5;196m',
red3='\033[38;5;160m',
red4='\033[38;5;197m',
red5='\033[38;5;198m',
red6='\033[38;5;199m',
yellow='\033[38;5;226m',
yellow1='\033[38;5;227m',
yellow2='\033[38;5;226m',
yellow3='\033[38;5;229m',
yellow4='\033[38;5;220m',
yellow5='\033[38;5;230m',
gray='\033[38;5;250m',
gray1='\033[38;5;251m',
gray2='\033[38;5;252m',
gray3='\033[38;5;253m',
gray4='\033[38;5;254m',
gray5='\033[38;5;255m',
gray6='\033[38;5;249m',
pink='\033[38;5;197m',
pink1='\033[38;5;198m',
pink2='\033[38;5;199m',
pink3='\033[38;5;200m',
pink4='\033[38;5;201m',
pink5='\033[38;5;207m',
pink6='\033[38;5;213m',
orange='\033[38;5;202m',
orange1='\033[38;5;208m',
orange2='\033[38;5;214m',
orange3='\033[38;5;220m',
orange4='\033[38;5;172m',
orange5='\033[38;5;166m',
reset='\033[0m',
)
class Colors:
def __init__(self, **theme):
for name, value in theme.items():
setattr(self, name, value)
setattr(self, f'b{name}', value.replace('[', '[1;'))
c = colors = Colors(**theme)

View File

@ -1,156 +0,0 @@
"""
Asynchronous process execution wrapper.
"""
import asyncio
import os
import shlex
import sys
from .output import Output
class ProcFailure(Exception):
def __init__(self, proc):
self.proc = proc
msg = f'FAIL exit with {proc.rc} ' + proc.args[0]
if not proc.output.debug or 'cmd' not in str(proc.output.debug):
msg += '\n' + proc.cmd
if not proc.output.debug or 'out' not in str(proc.output.debug):
msg += '\n' + proc.out
msg += '\n' + proc.err
super().__init__(msg)
class PrefixStreamProtocol(asyncio.subprocess.SubprocessStreamProtocol):
"""
Internal subprocess stream protocol to add a prefix in front of output to
make asynchronous output readable.
"""
def __init__(self, output, *args, **kwargs):
self.output = output
super().__init__(*args, **kwargs)
def pipe_data_received(self, fd, data):
if self.output.debug is True or 'out' in str(self.output.debug):
if fd in (1, 2):
self.output(data)
super().pipe_data_received(fd, data)
def protocol_factory(output):
def _p():
return PrefixStreamProtocol(
output,
limit=asyncio.streams._DEFAULT_LIMIT,
loop=asyncio.events.get_event_loop()
)
return _p
class Proc:
"""
Subprocess wrapper.
Example usage::
proc = Proc('find', '/', prefix='containername')
await proc() # execute
print(proc.out) # stdout
print(proc.err) # stderr
print(proc.rc) # return code
"""
test = False
def __init__(self, *args, prefix=None, raises=True, output=None):
self.output = output or Output()
self.cmd = ' '.join(args)
self.args = args
self.prefix = prefix
self.raises = raises
self.called = False
self.communicated = False
self.out_raw = b''
self.err_raw = b''
self.out = ''
self.err = ''
self.rc = None
@staticmethod
def split(*args):
args = [str(a) for a in args]
if len(args) == 1:
if isinstance(args[0], (list, tuple)):
args = args[0]
else:
args = ['sh', '-euc', ' '.join(args)]
return args
def output_factory(self, *args, **kwargs):
args = tuple(self.prefix) + args
return Output(*args, kwargs)
async def __call__(self, wait=True):
if self.called:
raise Exception('Already called: ' + self.cmd)
if 'cmd' in str(self.output.debug):
self.output.cmd(self.cmd)
if self.test:
if self.test is True:
type(self).test = []
self.test.append(self.args)
return self
loop = asyncio.events.get_event_loop()
transport, protocol = await loop.subprocess_exec(
protocol_factory(self.output), *self.args)
self.proc = asyncio.subprocess.Process(transport, protocol, loop)
self.called = True
if wait:
await self.wait()
return self
async def communicate(self):
self.out_raw, self.err_raw = await self.proc.communicate()
self.out = self.out_raw.decode('utf8').strip()
self.err = self.err_raw.decode('utf8').strip()
self.rc = self.proc.returncode
self.communicated = True
return self
async def wait(self):
if self.test:
return self
if not self.called:
await self()
if not self.communicated:
await self.communicate()
if self.raises and self.proc.returncode:
raise ProcFailure(self)
return self
@property
def json(self):
import json
return json.loads(self.out)
def mock():
"""Context manager for testing purpose."""
cls = Proc
class Mock:
def __enter__(_):
cls.test = True
def __exit__(_, exc_type, exc_value, traceback):
cls.test = False
return Mock()

View File

@ -1,12 +0,0 @@
class Result:
def __init__(self, target, action):
self.target = target
self.action = action
self.status = 'pending'
class Results(list):
def new(self, target, action):
result = Result(target, action)
self.append(result)
return result

View File

@ -1,4 +0,0 @@
from .targets.base import Target
from .targets.buildah import Buildah
from .actions.run import Run

182
shlax/subprocess.py Normal file
View File

@ -0,0 +1,182 @@
import asyncio
import functools
import re
import shlex
import sys
from .colors import colors
class SubprocessProtocol(asyncio.SubprocessProtocol):
def __init__(self, proc):
self.proc = proc
self.output = bytearray()
def pipe_data_received(self, fd, data):
if fd == 1:
self.proc.stdout(data)
elif fd == 2:
self.proc.stderr(data)
def process_exited(self):
self.proc.exit_future.set_result(True)
class Subprocess:
colors = colors
# arbitrary list of colors
prefix_colors = (
colors.cyan,
colors.blue,
colors.green,
colors.purple,
colors.red,
colors.yellow,
colors.gray,
colors.pink,
colors.orange,
)
# class variables, meant to grow as new prefixes are discovered to ensure
# output alignment
prefixes = dict()
prefix_length = 0
def __init__(
self,
*args,
quiet=None,
prefix=None,
regexps=None,
write=None,
flush=None,
):
if len(args) == 1 and ' ' in args[0]:
args = ['sh', '-euc', args[0]]
self.args = args
self.quiet = quiet if quiet is not None else False
self.prefix = prefix
self.write = write or sys.stdout.buffer.write
self.flush = flush or sys.stdout.flush
self.started = False
self.waited = False
self.out_raw = bytearray()
self.err_raw = bytearray()
self.regexps = dict()
if regexps:
for search, replace in regexps.items():
if isinstance(search, str):
search = search.encode()
search = re.compile(search)
replace = replace.format(**self.colors.__dict__).encode()
self.regexps[search] = replace
async def start(self, wait=True):
if not self.quiet:
self.output(
self.colors.bgray.encode()
+ b'+ '
+ shlex.join([
arg.replace('\n', '\\n')
for arg in self.args
]).encode()
+ self.colors.reset.encode(),
highlight=False
)
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
self.exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
self.transport, self.protocol = await loop.subprocess_exec(
lambda: SubprocessProtocol(self),
*self.args,
stdin=None,
)
self.started = True
async def wait(self, *args, **kwargs):
if not self.started:
await self.start()
if not self.waited:
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await self.exit_future
# Close the stdout pipe.
self.transport.close()
self.waited = True
return self
def stdout(self, data):
self.out_raw.extend(data)
if not self.quiet:
self.output(data)
def stderr(self, data):
self.err_raw.extend(data)
if not self.quiet:
self.output(data)
@functools.cached_property
def out(self):
return self.out_raw.decode().strip()
@functools.cached_property
def err(self):
return self.err_raw.decode().strip()
@functools.cached_property
def rc(self):
return self.transport.get_returncode()
def output(self, data, highlight=True, flush=True):
for line in data.strip().split(b'\n'):
line = [self.highlight(line) if highlight else line]
if self.prefix:
line = self.prefix_line() + line
line.append(b'\n')
line = b''.join(line)
self.write(line)
if flush:
self.flush()
def highlight(self, line, highlight=True):
if not highlight or (
b'\x1b[' in line
or b'\033[' in line
or b'\\e[' in line
):
return line
for search, replace in self.regexps.items():
line = re.sub(search, replace, line)
line = line + self.colors.reset.encode()
return line
def prefix_line(self):
if self.prefix not in self.prefixes:
self.prefixes[self.prefix] = self.prefix_colors[len(self.prefixes)]
if len(self.prefix) > self.prefix_length:
type(self).prefix_length = len(self.prefix)
return [
self.prefixes[self.prefix].encode(),
b' ' * (self.prefix_length - len(self.prefix)),
self.prefix.encode(),
b' ',
self.colors.reset.encode(),
b'| '
]

View File

@ -1,27 +0,0 @@
import copy
from ..result import Result, Results
class Target:
def __init__(self, *actions, **options):
self.actions = actions
self.options = options
self.results = []
async def __call__(self, *actions):
for action in actions or self.actions:
try:
await action(self)
except Exception as e:
action.status = 'failure'
action.exception = e
if actions:
# nested call, re-raise
raise
else:
break
else:
action.status = 'success'
finally:
self.results.append(action)

View File

@ -1,5 +0,0 @@
from .base import Target
class Buildah(Target):
pass

View File

@ -1,15 +0,0 @@
#!/usr/bin/env shlax
"""
Shlaxfile for shlax itself.
"""
from shlax.shortcuts import *
build = Buildah(
'quay.io/podman/stable',
Run('echo hi'),
commit='docker.io/yourlabs/shlax',
workdir='/app',
)
build()

7
tests/test_colors.py Normal file
View File

@ -0,0 +1,7 @@
import shlax
def test_colors():
assert shlax.colors.cyan == '\u001b[38;5;51m'
assert shlax.colors.bcyan == '\u001b[1;38;5;51m'
assert shlax.colors.reset == '\u001b[0m'

197
tests/test_proc.py Normal file
View File

@ -0,0 +1,197 @@
import pytest
from unittest.mock import Mock, call
from shlax import Proc
@pytest.mark.asyncio
@pytest.mark.parametrize(
'args',
(
['sh', '-c', 'echo hi'],
['echo hi'],
['sh -c "echo hi"'],
)
)
async def test_proc(args):
proc = Proc(*args, quiet=True)
assert not proc.waited
assert not proc.started
await proc.wait()
assert proc.waited
assert proc.started
assert proc.out == 'hi'
assert proc.err == ''
assert proc.out_raw == b'hi\n'
assert proc.err_raw == b''
assert proc.rc == 0
@pytest.mark.asyncio
async def test_wait_unbound():
proc = await Proc('echo hi', quiet=True).wait()
assert proc.out == 'hi'
@pytest.mark.asyncio
async def test_rc_1():
proc = await Proc(
'NON EXISTING COMMAND',
quiet=True,
).wait()
assert proc.rc != 0
assert proc.err == 'sh: line 1: NON: command not found'
@pytest.mark.asyncio
async def test_prefix():
"""
Test output prefixes for when executing multiple commands in parallel.
"""
Proc.prefix_length = 0 # reset
write = Mock()
await Proc(
'echo hi',
write=write,
prefix='test_prefix',
).wait()
await Proc(
'echo hi',
write=write,
prefix='test_prefix_1'
).wait()
await Proc(
'echo hi',
write=write,
prefix='test_prefix',
).wait()
assert write.mock_calls == [
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo hi\''
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| hi'
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[1].encode()
+ b'test_prefix_1 '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo hi\''
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[1].encode()
# padding has been added because of output1
+ b'test_prefix_1 '
+ Proc.colors.reset.encode()
+ b'| hi'
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
# padding has been added because of output1
+ b' test_prefix '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo hi\''
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
# padding has been added because of output1
+ b' test_prefix '
+ Proc.colors.reset.encode()
+ b'| hi'
+ Proc.colors.reset.encode()
+ b'\n'
)
]
@pytest.mark.asyncio
async def test_prefix_multiline():
Proc.prefix_length = 0 # reset
proc = await Proc(
'echo -e "a\nb"',
write=Mock(),
prefix='test_prefix',
).wait()
assert proc.write.mock_calls == [
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo -e "a\\nb"\''
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| a'
+ Proc.colors.reset.encode()
+ b'\n'
),
call(
Proc.prefix_colors[0].encode()
# padding has been added because of output1
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| b'
+ Proc.colors.reset.encode()
+ b'\n'
),
]
@pytest.mark.asyncio
async def test_highlight():
"""
Test that we can color output with regexps.
"""
proc = await Proc(
'echo hi',
write=Mock(),
regexps={
r'h([\w\d-]+)': 'h{cyan}\\1',
}
).wait()
proc.write.assert_called_with(b'h\x1b[38;5;51mi\x1b[0m\n')
@pytest.mark.asyncio
async def test_highlight_if_not_colored():
"""
Test that coloration does not apply on output that is already colored.
"""
proc = await Proc(
'echo -e h"\\e[31m"i',
write=Mock(),
regexps={
r'h([\w\d-]+)': 'h{cyan}\\1',
}
).wait()
proc.write.assert_called_with(b'h\x1b[31mi\n')

View File

@ -1,67 +0,0 @@
import pytest
from shlax.targets.base import Target
from shlax.actions.run import Run
from shlax.actions.parallel import Parallel
from shlax.result import Result
class Error:
async def __call__(self, target):
raise Exception('lol')
class Target(Target):
def exec(self, *args):
print(*args)
@pytest.mark.asyncio
async def test_success():
action = Run('echo hi')
target = Target(action)
await target()
assert action.status == 'success'
@pytest.mark.asyncio
async def test_error():
action = Error()
target = Target(action)
await target()
assert action.status == 'failure'
@pytest.mark.asyncio
async def test_nested():
nested = Error()
class Nesting:
async def __call__(self, target):
await target(nested)
nesting = Nesting()
target = Target(nesting)
await target()
assert len(target.results) == 2
assert target.results == [nested, nesting]
assert target.results[0].status == 'failure'
assert target.results[1].status == 'failure'
@pytest.mark.asyncio
async def test_parallel():
winner = Run('echo hi')
looser = Error()
parallel = Parallel(winner, looser)
target = Target(parallel)
await target()
assert len(target.results) == 3
assert target.results[0].status == 'success'
assert target.results[0] == winner
assert target.results[1].status == 'failure'
assert target.results[1] == looser
assert target.results[2].status == 'failure'
assert target.results[2] == parallel