Refactor Subprocess, document expects

This commit is contained in:
jpic 2021-11-12 13:26:19 +01:00
parent 2a0bb424fd
commit daba65d455
4 changed files with 105 additions and 76 deletions

View File

@ -36,6 +36,8 @@ Basic example, this will both stream output and capture it:
proc = await Subprocess('echo hi').wait() proc = await Subprocess('echo hi').wait()
print(proc.rc, proc.out, proc.err, proc.out_raw, proc.err_raw) print(proc.rc, proc.out, proc.err, proc.out_raw, proc.err_raw)
Arguments may be a string command or a list of arguments.
Longer Longer
------ ------
@ -44,7 +46,7 @@ any of ``start()`` and ``wait()``, or both, explicitely:
.. code-block:: python .. code-block:: python
proc = Subprocess('echo hi') proc = Subprocess('echo', 'hi')
await proc.start() # start the process await proc.start() # start the process
await proc.wait() # wait for completion await proc.wait() # wait for completion
@ -93,18 +95,36 @@ will be applied line by line:
} }
await asyncio.gather(*[ await asyncio.gather(*[
Subprocess( Subprocess(
f'find {path}', 'find',
path,
regexps=regexps, regexps=regexps,
shell=True,
).wait() ).wait()
for path in sys.path for path in sys.path
]) ])
Automating input
----------------
You can pass a list of tuples of two bytestrings ``(regexp, characters_to_send)``:
.. code-block:: python
proc = Proc(
'sh',
'-euc',
'echo "x?"; read x; echo x=$x; echo "z?"; read z; echo z=$z',
expects=[
(b'x?', b'y\n'),
(b'z?', b'w\n'),
],
)
await proc.wait()
assert proc.out == 'x?\nx=y\nz?\nz=w'
Where is the rest? Where is the rest?
================== ==================
Shlax used to be the name of a much more ambitious poc-project, that you can Shlax used to be the name of a much more ambitious poc-project, that you can
still find in the ``OLD`` branch of this repository. It has been extracted in still find in the ``OLD`` branch of this repository. Parts of it have been
two projects with clear boundaries, namely `sysplan extracted into smaller repositories.
<https://yourlabs.io/oss/sysplan>`_ and `podplan
<https://yourlabs.io/oss/podplan>`_ which are still in alpha state, although
Shlax as it now, is feature complete and stable.

View File

@ -8,17 +8,19 @@ async def main():
'^(.*).txt$': '{green}\\1.txt', '^(.*).txt$': '{green}\\1.txt',
'^(.*).py$': '{bred}\\1.py', '^(.*).py$': '{bred}\\1.py',
} }
await asyncio.gather( await asyncio.gather(
Subprocess( Subprocess(
'sh', '-euc',
'for i in $(find .. | head); do echo $i; sleep .2; done', 'for i in $(find .. | head); do echo $i; sleep .2; done',
regexps=colors, regexps=colors,
prefix='parent', prefix='parent',
).wait(), ).wait(),
Subprocess( Subprocess(
'for i in $(find . | head); do echo $i; sleep .3; done', 'sh -euc "for i in $(find . | head); do echo $i; sleep .3; done"',
regexps=colors, regexps=colors,
prefix='cwd', prefix='cwd',
).wait(), ).wait()
) )
asyncio.run(main()) asyncio.run(main())

View File

@ -1,5 +1,6 @@
import asyncio import asyncio
import functools import functools
import os
import re import re
import shlex import shlex
import sys import sys
@ -12,16 +13,23 @@ class SubprocessProtocol(asyncio.subprocess.SubprocessStreamProtocol):
self.proc = proc self.proc = proc
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def receive(self, data, raw, target):
raw.extend(data)
if not self.proc.quiet:
for line in self.proc.lines(data):
target.buffer.write(line)
target.flush()
def pipe_data_received(self, fd, data): def pipe_data_received(self, fd, data):
if fd == 1: if fd == 1:
self.proc.stdout(data) self.receive(data, self.proc.out_raw, self.proc.stdout)
elif fd == 2: elif fd == 2:
self.proc.stderr(data) self.receive(data, self.proc.err_raw, self.proc.stderr)
if self.proc.expect_index < len(self.proc.expects): if self.proc.expect_index < len(self.proc.expects):
expected = self.proc.expects[self.proc.expect_index] expected = self.proc.expects[self.proc.expect_index]
if re.match(expected['regexp'], data): if re.match(expected[0], data):
self.stdin.write(expected['sendline']) self.stdin.write(expected[1])
event_loop = asyncio.get_event_loop() event_loop = asyncio.get_event_loop()
asyncio.create_task(self.stdin.drain()) asyncio.create_task(self.stdin.drain())
self.proc.expect_index += 1 self.proc.expect_index += 1
@ -57,12 +65,14 @@ class Subprocess:
expects=None, expects=None,
write=None, write=None,
flush=None, flush=None,
stdout=None,
stderr=None,
): ):
self.args = args self.args = args
self.quiet = quiet if quiet is not None else False self.quiet = quiet if quiet is not None else False
self.prefix = prefix self.prefix = prefix
self.write = write or sys.stdout.buffer.write self.stdout = stdout or sys.stdout
self.flush = flush or sys.stdout.flush self.stderr = stderr or sys.stderr
self.expects = expects or [] self.expects = expects or []
self.expect_index = 0 self.expect_index = 0
self.started = False self.started = False
@ -80,26 +90,21 @@ class Subprocess:
self.regexps[search] = replace self.regexps[search] = replace
async def start(self, wait=True): async def start(self, wait=True):
if len(self.args) == 1 and ' ' in self.args[0]: if len(self.args) == 1 and not os.path.exists(self.args[0]):
# Bottom line is that the conversion of argument from one into args = shlex.split(self.args[0])
# another is done in Popen based on the shell argument calling the
# list2cmdline function that has been masked from the public API
# issue10838, workaround it by converting command line to shell
# arguments
cmd = self.args[0]
args = ['sh', '-euc', cmd]
else: else:
cmd = shlex.join(self.args)
args = self.args args = self.args
if not self.quiet: if not self.quiet:
self.output( message = b''.join([
self.colors.bgray.encode() self.colors.bgray.encode(),
+ b'+ ' b'+ ',
+ cmd.replace('\n', '\\n').encode() shlex.join(args).replace('\n', '\\n').encode(),
+ self.colors.reset.encode(), self.colors.reset.encode(),
highlight=False ])
) for line in self.lines(message, highlight=False):
self.stdout.buffer.write(line)
self.stdout.flush()
# The following is a copy of what asyncio.subprocess_exec and # The following is a copy of what asyncio.subprocess_exec and
# asyncio.create_subprocess_exec do except we inject our own # asyncio.create_subprocess_exec do except we inject our own
@ -118,8 +123,8 @@ class Subprocess:
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
) )
self.proc = asyncio.subprocess.Process(self.transport, self.protocol, loop)
self.proc = asyncio.subprocess.Process(self.transport, self.protocol, loop)
self.started = True self.started = True
async def wait(self, *args, **kwargs): async def wait(self, *args, **kwargs):
@ -128,43 +133,34 @@ class Subprocess:
if not self.waited: if not self.waited:
await self.proc.communicate() await self.proc.communicate()
self.rc = self.transport.get_returncode()
self.waited = True self.waited = True
return self return self
def stdout(self, data): @property
self.out_raw.extend(data)
if not self.quiet:
self.output(data)
def stderr(self, data):
self.err_raw.extend(data)
if not self.quiet:
self.output(data)
@functools.cached_property
def out(self): def out(self):
if self.waited:
if '_out_cached' not in self.__dict__:
self._out_cached = self.out_raw.decode().strip()
return self._out_cached
return self.out_raw.decode().strip() return self.out_raw.decode().strip()
@functools.cached_property @property
def err(self): def err(self):
if self.waited:
if '_err_cached' not in self.__dict__:
self._err_cached = self.err_raw.decode().strip()
return self._err_cached
return self.err_raw.decode().strip() return self.err_raw.decode().strip()
@functools.cached_property def lines(self, data, highlight=True):
def rc(self):
return self.transport.get_returncode()
def output(self, data, highlight=True, flush=True):
for line in data.strip().split(b'\n'): for line in data.strip().split(b'\n'):
line = [self.highlight(line) if highlight else line] line = [self.highlight(line) if highlight else line]
if self.prefix: if self.prefix:
line = self.prefix_line() + line line = self.prefix_line() + line
line.append(b'\n') line.append(b'\n')
line = b''.join(line) yield b''.join(line)
self.write(line)
if flush:
self.flush()
def highlight(self, line, highlight=True): def highlight(self, line, highlight=True):
if not highlight or ( if not highlight or (

View File

@ -36,7 +36,7 @@ async def test_wait_unbound():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_rc_1(): async def test_rc_1():
proc = await Proc( proc = await Proc(
'NON EXISTING COMMAND', 'sh', '-euc', 'NON EXISTING COMMAND',
quiet=True, quiet=True,
).wait() ).wait()
assert proc.rc != 0 assert proc.rc != 0
@ -50,24 +50,24 @@ async def test_prefix():
""" """
Proc.prefix_length = 0 # reset Proc.prefix_length = 0 # reset
write = Mock() stdout = Mock()
await Proc( await Proc(
'echo hi', 'echo hi',
write=write, stdout=stdout,
prefix='test_prefix', prefix='test_prefix',
).wait() ).wait()
await Proc( await Proc(
'echo hi', 'echo hi',
write=write, stdout=stdout,
prefix='test_prefix_1' prefix='test_prefix_1'
).wait() ).wait()
await Proc( await Proc(
'echo hi', 'echo hi',
write=write, stdout=stdout,
prefix='test_prefix', prefix='test_prefix',
).wait() ).wait()
assert write.mock_calls == [ assert stdout.buffer.write.mock_calls == [
call( call(
Proc.prefix_colors[0].encode() Proc.prefix_colors[0].encode()
+ b'test_prefix ' + b'test_prefix '
@ -133,17 +133,17 @@ async def test_prefix_multiline():
Proc.prefix_length = 0 # reset Proc.prefix_length = 0 # reset
proc = await Proc( proc = await Proc(
'echo -e "a\nb"', 'echo -e "a\nb"',
write=Mock(), stdout=Mock(),
prefix='test_prefix', prefix='test_prefix',
).wait() ).wait()
assert proc.write.mock_calls == [ assert proc.stdout.buffer.write.mock_calls == [
call( call(
Proc.prefix_colors[0].encode() Proc.prefix_colors[0].encode()
+ b'test_prefix ' + b'test_prefix '
+ Proc.colors.reset.encode() + Proc.colors.reset.encode()
+ b'| ' + b'| '
+ Proc.colors.bgray.encode() + Proc.colors.bgray.encode()
+ b'+ echo -e "a\\nb"' + b"+ echo -e 'a\\nb'"
+ Proc.colors.reset.encode() + Proc.colors.reset.encode()
+ b'\n' + b'\n'
), ),
@ -174,12 +174,12 @@ async def test_highlight():
""" """
proc = await Proc( proc = await Proc(
'echo hi', 'echo hi',
write=Mock(), stdout=Mock(),
regexps={ regexps={
r'h([\w\d-]+)': 'h{cyan}\\1', r'h([\w\d-]+)': 'h{cyan}\\1',
} }
).wait() ).wait()
proc.write.assert_called_with(b'h\x1b[38;5;51mi\x1b[0m\n') proc.stdout.buffer.write.assert_called_with(b'h\x1b[38;5;51mi\x1b[0m\n')
@pytest.mark.asyncio @pytest.mark.asyncio
@ -189,29 +189,40 @@ async def test_highlight_if_not_colored():
""" """
proc = await Proc( proc = await Proc(
'echo -e h"\\e[31m"i', 'echo -e h"\\e[31m"i',
write=Mock(), stdout=Mock(),
regexps={ regexps={
r'h([\w\d-]+)': 'h{cyan}\\1', r'h([\w\d-]+)': 'h{cyan}\\1',
} }
).wait() ).wait()
proc.write.assert_called_with(b'h\x1b[31mi\n') proc.stdout.buffer.write.assert_called_with(b'h\x1b[31mi\n')
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_expect(): async def test_expect():
proc = Proc( proc = Proc(
'sh', '-euc',
'echo "x?"; read x; echo x=$x; echo "z?"; read z; echo z=$z', 'echo "x?"; read x; echo x=$x; echo "z?"; read z; echo z=$z',
expects=[ expects=[
dict( (b'x?', b'y\n'),
regexp=b'x?', (b'z?', b'w\n'),
sendline=b'y\n',
),
dict(
regexp=b'z?',
sendline=b'w\n',
)
], ],
quiet=True, quiet=True,
) )
await proc.wait() await proc.wait()
assert proc.out == 'x?\nx=y\nz?\nz=w' assert proc.out == 'x?\nx=y\nz?\nz=w'
@pytest.mark.asyncio
async def test_stderr():
proc = await Proc(
'sh',
'-euc',
'echo hi >&2',
stdout=Mock(),
stderr=Mock()
).wait()
assert proc.err_raw == bytearray(b'hi\n')
assert proc.err == 'hi'
proc.stderr.buffer.write.assert_called_once_with(
f'hi{proc.colors.reset}\n'.encode()
)