Compare commits

..

No commits in common. "refactor" and "master" have entirely different histories.

4 changed files with 88 additions and 151 deletions

View File

@ -36,8 +36,6 @@ Basic example, this will both stream output and capture it:
proc = await Subprocess('echo hi').wait()
print(proc.rc, proc.out, proc.err, proc.out_raw, proc.err_raw)
Arguments may be a string command or a list of arguments.
Longer
------
@ -46,7 +44,7 @@ any of ``start()`` and ``wait()``, or both, explicitely:
.. code-block:: python
proc = Subprocess('echo', 'hi')
proc = Subprocess('echo hi')
await proc.start() # start the process
await proc.wait() # wait for completion
@ -95,36 +93,18 @@ will be applied line by line:
}
await asyncio.gather(*[
Subprocess(
'find',
path,
f'find {path}',
regexps=regexps,
shell=True,
).wait()
for path in sys.path
])
Automating input
----------------
You can pass a list of tuples of two bytestrings ``(regexp, characters_to_send)``:
.. code-block:: python
proc = Proc(
'sh',
'-euc',
'echo "x?"; read x; echo x=$x; echo "z?"; read z; echo z=$z',
expects=[
(b'x?', b'y\n'),
(b'z?', b'w\n'),
],
)
await proc.wait()
assert proc.out == 'x?\nx=y\nz?\nz=w'
Where is the rest?
==================
Shlax used to be the name of a much more ambitious poc-project, that you can
still find in the ``OLD`` branch of this repository. Parts of it have been
extracted into smaller repositories.
still find in the ``OLD`` branch of this repository. It has been extracted in
two projects with clear boundaries, namely `sysplan
<https://yourlabs.io/oss/sysplan>`_ and `podplan
<https://yourlabs.io/oss/podplan>`_ which are still in alpha state, although
Shlax as it now, is feature complete and stable.

View File

@ -8,19 +8,17 @@ async def main():
'^(.*).txt$': '{green}\\1.txt',
'^(.*).py$': '{bred}\\1.py',
}
await asyncio.gather(
Subprocess(
'sh', '-euc',
Subprocess(
'for i in $(find .. | head); do echo $i; sleep .2; done',
regexps=colors,
prefix='parent',
).wait(),
Subprocess(
'sh -euc "for i in $(find . | head); do echo $i; sleep .3; done"',
'for i in $(find . | head); do echo $i; sleep .3; done',
regexps=colors,
prefix='cwd',
).wait()
).wait(),
)
asyncio.run(main())

View File

@ -1,6 +1,5 @@
import asyncio
import functools
import os
import re
import shlex
import sys
@ -8,31 +7,19 @@ import sys
from .colors import colors
class SubprocessProtocol(asyncio.subprocess.SubprocessStreamProtocol):
def __init__(self, proc, *args, **kwargs):
class SubprocessProtocol(asyncio.SubprocessProtocol):
def __init__(self, proc):
self.proc = proc
super().__init__(*args, **kwargs)
def receive(self, data, raw, target):
raw.extend(data)
if not self.proc.quiet:
for line in self.proc.lines(data):
target.buffer.write(line)
target.flush()
self.output = bytearray()
def pipe_data_received(self, fd, data):
if fd == 1:
self.receive(data, self.proc.out_raw, self.proc.stdout)
self.proc.stdout(data)
elif fd == 2:
self.receive(data, self.proc.err_raw, self.proc.stderr)
self.proc.stderr(data)
if self.proc.expect_index < len(self.proc.expects):
expected = self.proc.expects[self.proc.expect_index]
if re.match(expected[0], data):
self.stdin.write(expected[1])
event_loop = asyncio.get_event_loop()
asyncio.create_task(self.stdin.drain())
self.proc.expect_index += 1
def process_exited(self):
self.proc.exit_future.set_result(True)
class Subprocess:
@ -62,19 +49,17 @@ class Subprocess:
quiet=None,
prefix=None,
regexps=None,
expects=None,
write=None,
flush=None,
stdout=None,
stderr=None,
):
if len(args) == 1 and ' ' in args[0]:
args = ['sh', '-euc', args[0]]
self.args = args
self.quiet = quiet if quiet is not None else False
self.prefix = prefix
self.stdout = stdout or sys.stdout
self.stderr = stderr or sys.stderr
self.expects = expects or []
self.expect_index = 0
self.write = write or sys.stdout.buffer.write
self.flush = flush or sys.stdout.flush
self.started = False
self.waited = False
self.out_raw = bytearray()
@ -90,41 +75,31 @@ class Subprocess:
self.regexps[search] = replace
async def start(self, wait=True):
if len(self.args) == 1 and not os.path.exists(self.args[0]):
args = shlex.split(self.args[0])
else:
args = self.args
if not self.quiet:
message = b''.join([
self.colors.bgray.encode(),
b'+ ',
shlex.join(args).replace('\n', '\\n').encode(),
self.colors.reset.encode(),
])
for line in self.lines(message, highlight=False):
self.stdout.buffer.write(line)
self.stdout.flush()
self.output(
self.colors.bgray.encode()
+ b'+ '
+ shlex.join([
arg.replace('\n', '\\n')
for arg in self.args
]).encode()
+ self.colors.reset.encode(),
highlight=False
)
# The following is a copy of what asyncio.subprocess_exec and
# asyncio.create_subprocess_exec do except we inject our own
# SubprocessStreamProtocol subclass: it might need an update as new
# python releases come out.
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
self.transport, self.protocol = await loop.subprocess_exec(
lambda: SubprocessProtocol(
self,
limit=asyncio.subprocess.streams._DEFAULT_LIMIT,
loop=loop,
),
*args,
stdin=asyncio.subprocess.PIPE if self.expects else sys.stdin,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self.exit_future = asyncio.Future(loop=loop)
self.proc = asyncio.subprocess.Process(self.transport, self.protocol, loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
self.transport, self.protocol = await loop.subprocess_exec(
lambda: SubprocessProtocol(self),
*self.args,
stdin=None,
)
self.started = True
async def wait(self, *args, **kwargs):
@ -132,35 +107,50 @@ class Subprocess:
await self.start()
if not self.waited:
await self.proc.communicate()
self.rc = self.transport.get_returncode()
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await self.exit_future
# Close the stdout pipe.
self.transport.close()
self.waited = True
return self
@property
def stdout(self, data):
self.out_raw.extend(data)
if not self.quiet:
self.output(data)
def stderr(self, data):
self.err_raw.extend(data)
if not self.quiet:
self.output(data)
@functools.cached_property
def out(self):
if self.waited:
if '_out_cached' not in self.__dict__:
self._out_cached = self.out_raw.decode().strip()
return self._out_cached
return self.out_raw.decode().strip()
@property
@functools.cached_property
def err(self):
if self.waited:
if '_err_cached' not in self.__dict__:
self._err_cached = self.err_raw.decode().strip()
return self._err_cached
return self.err_raw.decode().strip()
def lines(self, data, highlight=True):
@functools.cached_property
def rc(self):
return self.transport.get_returncode()
def output(self, data, highlight=True, flush=True):
for line in data.strip().split(b'\n'):
line = [self.highlight(line) if highlight else line]
if self.prefix:
line = self.prefix_line() + line
line.append(b'\n')
yield b''.join(line)
line = b''.join(line)
self.write(line)
if flush:
self.flush()
def highlight(self, line, highlight=True):
if not highlight or (

View File

@ -8,9 +8,9 @@ from shlax import Proc
@pytest.mark.parametrize(
'args',
(
['sh', '-c', 'echo hi'],
['echo hi'],
['sh -c "echo hi"'],
['sh', '-c', 'echo hi'],
)
)
async def test_proc(args):
@ -36,7 +36,7 @@ async def test_wait_unbound():
@pytest.mark.asyncio
async def test_rc_1():
proc = await Proc(
'sh', '-euc', 'NON EXISTING COMMAND',
'NON EXISTING COMMAND',
quiet=True,
).wait()
assert proc.rc != 0
@ -50,31 +50,31 @@ async def test_prefix():
"""
Proc.prefix_length = 0 # reset
stdout = Mock()
write = Mock()
await Proc(
'echo hi',
stdout=stdout,
write=write,
prefix='test_prefix',
).wait()
await Proc(
'echo hi',
stdout=stdout,
write=write,
prefix='test_prefix_1'
).wait()
await Proc(
'echo hi',
stdout=stdout,
write=write,
prefix='test_prefix',
).wait()
assert stdout.buffer.write.mock_calls == [
assert write.mock_calls == [
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ echo hi'
+ b'+ sh -euc \'echo hi\''
+ Proc.colors.reset.encode()
+ b'\n'
),
@ -92,7 +92,7 @@ async def test_prefix():
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ echo hi'
+ b'+ sh -euc \'echo hi\''
+ Proc.colors.reset.encode()
+ b'\n'
),
@ -112,7 +112,7 @@ async def test_prefix():
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ echo hi'
+ b'+ sh -euc \'echo hi\''
+ Proc.colors.reset.encode()
+ b'\n'
),
@ -133,17 +133,17 @@ async def test_prefix_multiline():
Proc.prefix_length = 0 # reset
proc = await Proc(
'echo -e "a\nb"',
stdout=Mock(),
write=Mock(),
prefix='test_prefix',
).wait()
assert proc.stdout.buffer.write.mock_calls == [
assert proc.write.mock_calls == [
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b"+ echo -e 'a\\nb'"
+ b'+ sh -euc \'echo -e "a\\nb"\''
+ Proc.colors.reset.encode()
+ b'\n'
),
@ -174,12 +174,12 @@ async def test_highlight():
"""
proc = await Proc(
'echo hi',
stdout=Mock(),
write=Mock(),
regexps={
r'h([\w\d-]+)': 'h{cyan}\\1',
}
).wait()
proc.stdout.buffer.write.assert_called_with(b'h\x1b[38;5;51mi\x1b[0m\n')
proc.write.assert_called_with(b'h\x1b[38;5;51mi\x1b[0m\n')
@pytest.mark.asyncio
@ -189,40 +189,9 @@ async def test_highlight_if_not_colored():
"""
proc = await Proc(
'echo -e h"\\e[31m"i',
stdout=Mock(),
write=Mock(),
regexps={
r'h([\w\d-]+)': 'h{cyan}\\1',
}
).wait()
proc.stdout.buffer.write.assert_called_with(b'h\x1b[31mi\n')
@pytest.mark.asyncio
async def test_expect():
proc = Proc(
'sh', '-euc',
'echo "x?"; read x; echo x=$x; echo "z?"; read z; echo z=$z',
expects=[
(b'x?', b'y\n'),
(b'z?', b'w\n'),
],
quiet=True,
)
await proc.wait()
assert proc.out == 'x?\nx=y\nz?\nz=w'
@pytest.mark.asyncio
async def test_stderr():
proc = await Proc(
'sh',
'-euc',
'echo hi >&2',
stdout=Mock(),
stderr=Mock()
).wait()
assert proc.err_raw == bytearray(b'hi\n')
assert proc.err == 'hi'
proc.stderr.buffer.write.assert_called_once_with(
f'hi{proc.colors.reset}\n'.encode()
)
proc.write.assert_called_with(b'h\x1b[31mi\n')