Compare commits

...

4 Commits

Author SHA1 Message Date
jpic
daba65d455 Refactor Subprocess, document expects 2021-11-12 13:27:24 +01:00
jpic
2a0bb424fd Restore arg list support, use stdin.drain instead of flush 2021-11-12 11:37:46 +01:00
jpic
93f40dc7dd Add expect feature 2021-11-12 03:56:48 +01:00
jpic
217777f60e Remove sh -euc from output 2021-11-12 02:44:46 +01:00
4 changed files with 151 additions and 88 deletions

View File

@ -36,6 +36,8 @@ Basic example, this will both stream output and capture it:
proc = await Subprocess('echo hi').wait()
print(proc.rc, proc.out, proc.err, proc.out_raw, proc.err_raw)
Arguments may be a string command or a list of arguments.
Longer
------
@ -44,7 +46,7 @@ any of ``start()`` and ``wait()``, or both, explicitely:
.. code-block:: python
proc = Subprocess('echo hi')
proc = Subprocess('echo', 'hi')
await proc.start() # start the process
await proc.wait() # wait for completion
@ -93,18 +95,36 @@ will be applied line by line:
}
await asyncio.gather(*[
Subprocess(
f'find {path}',
'find',
path,
regexps=regexps,
shell=True,
).wait()
for path in sys.path
])
Automating input
----------------
You can pass a list of tuples of two bytestrings ``(regexp, characters_to_send)``:
.. code-block:: python
proc = Proc(
'sh',
'-euc',
'echo "x?"; read x; echo x=$x; echo "z?"; read z; echo z=$z',
expects=[
(b'x?', b'y\n'),
(b'z?', b'w\n'),
],
)
await proc.wait()
assert proc.out == 'x?\nx=y\nz?\nz=w'
Where is the rest?
==================
Shlax used to be the name of a much more ambitious poc-project, that you can
still find in the ``OLD`` branch of this repository. It has been extracted in
two projects with clear boundaries, namely `sysplan
<https://yourlabs.io/oss/sysplan>`_ and `podplan
<https://yourlabs.io/oss/podplan>`_ which are still in alpha state, although
Shlax as it now, is feature complete and stable.
still find in the ``OLD`` branch of this repository. Parts of it have been
extracted into smaller repositories.

View File

@ -8,17 +8,19 @@ async def main():
'^(.*).txt$': '{green}\\1.txt',
'^(.*).py$': '{bred}\\1.py',
}
await asyncio.gather(
Subprocess(
'sh', '-euc',
'for i in $(find .. | head); do echo $i; sleep .2; done',
regexps=colors,
prefix='parent',
).wait(),
Subprocess(
'for i in $(find . | head); do echo $i; sleep .3; done',
'sh -euc "for i in $(find . | head); do echo $i; sleep .3; done"',
regexps=colors,
prefix='cwd',
).wait(),
).wait()
)
asyncio.run(main())

View File

@ -1,5 +1,6 @@
import asyncio
import functools
import os
import re
import shlex
import sys
@ -7,19 +8,31 @@ import sys
from .colors import colors
class SubprocessProtocol(asyncio.SubprocessProtocol):
def __init__(self, proc):
class SubprocessProtocol(asyncio.subprocess.SubprocessStreamProtocol):
def __init__(self, proc, *args, **kwargs):
self.proc = proc
self.output = bytearray()
super().__init__(*args, **kwargs)
def receive(self, data, raw, target):
raw.extend(data)
if not self.proc.quiet:
for line in self.proc.lines(data):
target.buffer.write(line)
target.flush()
def pipe_data_received(self, fd, data):
if fd == 1:
self.proc.stdout(data)
self.receive(data, self.proc.out_raw, self.proc.stdout)
elif fd == 2:
self.proc.stderr(data)
self.receive(data, self.proc.err_raw, self.proc.stderr)
def process_exited(self):
self.proc.exit_future.set_result(True)
if self.proc.expect_index < len(self.proc.expects):
expected = self.proc.expects[self.proc.expect_index]
if re.match(expected[0], data):
self.stdin.write(expected[1])
event_loop = asyncio.get_event_loop()
asyncio.create_task(self.stdin.drain())
self.proc.expect_index += 1
class Subprocess:
@ -49,17 +62,19 @@ class Subprocess:
quiet=None,
prefix=None,
regexps=None,
expects=None,
write=None,
flush=None,
stdout=None,
stderr=None,
):
if len(args) == 1 and ' ' in args[0]:
args = ['sh', '-euc', args[0]]
self.args = args
self.quiet = quiet if quiet is not None else False
self.prefix = prefix
self.write = write or sys.stdout.buffer.write
self.flush = flush or sys.stdout.flush
self.stdout = stdout or sys.stdout
self.stderr = stderr or sys.stderr
self.expects = expects or []
self.expect_index = 0
self.started = False
self.waited = False
self.out_raw = bytearray()
@ -75,31 +90,41 @@ class Subprocess:
self.regexps[search] = replace
async def start(self, wait=True):
if not self.quiet:
self.output(
self.colors.bgray.encode()
+ b'+ '
+ shlex.join([
arg.replace('\n', '\\n')
for arg in self.args
]).encode()
+ self.colors.reset.encode(),
highlight=False
)
if len(self.args) == 1 and not os.path.exists(self.args[0]):
args = shlex.split(self.args[0])
else:
args = self.args
# Get a reference to the event loop as we plan to use
# low-level APIs.
if not self.quiet:
message = b''.join([
self.colors.bgray.encode(),
b'+ ',
shlex.join(args).replace('\n', '\\n').encode(),
self.colors.reset.encode(),
])
for line in self.lines(message, highlight=False):
self.stdout.buffer.write(line)
self.stdout.flush()
# The following is a copy of what asyncio.subprocess_exec and
# asyncio.create_subprocess_exec do except we inject our own
# SubprocessStreamProtocol subclass: it might need an update as new
# python releases come out.
loop = asyncio.get_running_loop()
self.exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
self.transport, self.protocol = await loop.subprocess_exec(
lambda: SubprocessProtocol(self),
*self.args,
stdin=None,
lambda: SubprocessProtocol(
self,
limit=asyncio.subprocess.streams._DEFAULT_LIMIT,
loop=loop,
),
*args,
stdin=asyncio.subprocess.PIPE if self.expects else sys.stdin,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self.proc = asyncio.subprocess.Process(self.transport, self.protocol, loop)
self.started = True
async def wait(self, *args, **kwargs):
@ -107,50 +132,35 @@ class Subprocess:
await self.start()
if not self.waited:
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await self.exit_future
# Close the stdout pipe.
self.transport.close()
await self.proc.communicate()
self.rc = self.transport.get_returncode()
self.waited = True
return self
def stdout(self, data):
self.out_raw.extend(data)
if not self.quiet:
self.output(data)
def stderr(self, data):
self.err_raw.extend(data)
if not self.quiet:
self.output(data)
@functools.cached_property
@property
def out(self):
if self.waited:
if '_out_cached' not in self.__dict__:
self._out_cached = self.out_raw.decode().strip()
return self._out_cached
return self.out_raw.decode().strip()
@functools.cached_property
@property
def err(self):
if self.waited:
if '_err_cached' not in self.__dict__:
self._err_cached = self.err_raw.decode().strip()
return self._err_cached
return self.err_raw.decode().strip()
@functools.cached_property
def rc(self):
return self.transport.get_returncode()
def output(self, data, highlight=True, flush=True):
def lines(self, data, highlight=True):
for line in data.strip().split(b'\n'):
line = [self.highlight(line) if highlight else line]
if self.prefix:
line = self.prefix_line() + line
line.append(b'\n')
line = b''.join(line)
self.write(line)
if flush:
self.flush()
yield b''.join(line)
def highlight(self, line, highlight=True):
if not highlight or (

View File

@ -8,9 +8,9 @@ from shlax import Proc
@pytest.mark.parametrize(
'args',
(
['sh', '-c', 'echo hi'],
['echo hi'],
['sh -c "echo hi"'],
['sh', '-c', 'echo hi'],
)
)
async def test_proc(args):
@ -36,7 +36,7 @@ async def test_wait_unbound():
@pytest.mark.asyncio
async def test_rc_1():
proc = await Proc(
'NON EXISTING COMMAND',
'sh', '-euc', 'NON EXISTING COMMAND',
quiet=True,
).wait()
assert proc.rc != 0
@ -50,31 +50,31 @@ async def test_prefix():
"""
Proc.prefix_length = 0 # reset
write = Mock()
stdout = Mock()
await Proc(
'echo hi',
write=write,
stdout=stdout,
prefix='test_prefix',
).wait()
await Proc(
'echo hi',
write=write,
stdout=stdout,
prefix='test_prefix_1'
).wait()
await Proc(
'echo hi',
write=write,
stdout=stdout,
prefix='test_prefix',
).wait()
assert write.mock_calls == [
assert stdout.buffer.write.mock_calls == [
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo hi\''
+ b'+ echo hi'
+ Proc.colors.reset.encode()
+ b'\n'
),
@ -92,7 +92,7 @@ async def test_prefix():
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo hi\''
+ b'+ echo hi'
+ Proc.colors.reset.encode()
+ b'\n'
),
@ -112,7 +112,7 @@ async def test_prefix():
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo hi\''
+ b'+ echo hi'
+ Proc.colors.reset.encode()
+ b'\n'
),
@ -133,17 +133,17 @@ async def test_prefix_multiline():
Proc.prefix_length = 0 # reset
proc = await Proc(
'echo -e "a\nb"',
write=Mock(),
stdout=Mock(),
prefix='test_prefix',
).wait()
assert proc.write.mock_calls == [
assert proc.stdout.buffer.write.mock_calls == [
call(
Proc.prefix_colors[0].encode()
+ b'test_prefix '
+ Proc.colors.reset.encode()
+ b'| '
+ Proc.colors.bgray.encode()
+ b'+ sh -euc \'echo -e "a\\nb"\''
+ b"+ echo -e 'a\\nb'"
+ Proc.colors.reset.encode()
+ b'\n'
),
@ -174,12 +174,12 @@ async def test_highlight():
"""
proc = await Proc(
'echo hi',
write=Mock(),
stdout=Mock(),
regexps={
r'h([\w\d-]+)': 'h{cyan}\\1',
}
).wait()
proc.write.assert_called_with(b'h\x1b[38;5;51mi\x1b[0m\n')
proc.stdout.buffer.write.assert_called_with(b'h\x1b[38;5;51mi\x1b[0m\n')
@pytest.mark.asyncio
@ -189,9 +189,40 @@ async def test_highlight_if_not_colored():
"""
proc = await Proc(
'echo -e h"\\e[31m"i',
write=Mock(),
stdout=Mock(),
regexps={
r'h([\w\d-]+)': 'h{cyan}\\1',
}
).wait()
proc.write.assert_called_with(b'h\x1b[31mi\n')
proc.stdout.buffer.write.assert_called_with(b'h\x1b[31mi\n')
@pytest.mark.asyncio
async def test_expect():
proc = Proc(
'sh', '-euc',
'echo "x?"; read x; echo x=$x; echo "z?"; read z; echo z=$z',
expects=[
(b'x?', b'y\n'),
(b'z?', b'w\n'),
],
quiet=True,
)
await proc.wait()
assert proc.out == 'x?\nx=y\nz?\nz=w'
@pytest.mark.asyncio
async def test_stderr():
proc = await Proc(
'sh',
'-euc',
'echo hi >&2',
stdout=Mock(),
stderr=Mock()
).wait()
assert proc.err_raw == bytearray(b'hi\n')
assert proc.err == 'hi'
proc.stderr.buffer.write.assert_called_once_with(
f'hi{proc.colors.reset}\n'.encode()
)