Add execReadlines to executils.

Returns output in realtime instead of buffering it.
This commit is contained in:
Brian C. Lane 2015-07-29 15:41:55 -07:00
parent 11b8eb8b97
commit cfe4777042

View File

@ -20,8 +20,8 @@
import os
import subprocess
from subprocess import TimeoutExpired
import signal
from time import sleep
import logging
log = logging.getLogger("pylorax")
@ -76,6 +76,7 @@ def startProgram(argv, root='/', stdin=None, stdout=subprocess.PIPE, stderr=subp
:param reset_lang: whether to set the locale of the child process to C
:param kwargs: Additional parameters to pass to subprocess.Popen
:return: A Popen object for the running command.
:keyword preexec_fn: A function to run before execution starts.
"""
if env_prune is None:
env_prune = []
@ -136,6 +137,9 @@ def _run_program(argv, root='/', stdin=None, stdout=None, env_prune=None, log_ou
:param filter_stderr: whether to exclude the contents of stderr from the returned output
:param raise_err: whether to raise a CalledProcessError if the returncode is non-zero
:param callback: method to call while waiting for process to finish, passed Popen object
:param env_add: environment variables to add before execution
:param reset_handlers: whether to reset to SIG_DFL any signal handlers set to SIG_IGN
:param reset_lang: whether to set the locale of the child process to C
:return: The return code of the command and the output
:raises: OSError or CalledProcessError
"""
@ -151,9 +155,13 @@ def _run_program(argv, root='/', stdin=None, stdout=None, env_prune=None, log_ou
if callback:
while callback(proc) and proc.poll() is None:
sleep(1)
(output_string, err_string) = proc.communicate()
try:
(output_string, err_string) = proc.communicate(timeout=1)
break
except TimeoutExpired:
pass
else:
(output_string, err_string) = proc.communicate()
if output_string:
if binary_output:
output_lines = [output_string]
@ -206,6 +214,9 @@ def execWithRedirect(command, argv, stdin=None, stdout=None, root='/', env_prune
:param binary_output: whether to treat the output of command as binary data
:param raise_err: whether to raise a CalledProcessError if the returncode is non-zero
:param callback: method to call while waiting for process to finish, passed Popen object
:param env_add: environment variables to add before execution
:param reset_handlers: whether to reset to SIG_DFL any signal handlers set to SIG_IGN
:param reset_lang: whether to set the locale of the child process to C
:return: The return code of the command
"""
argv = [command] + list(argv)
@ -223,7 +234,10 @@ def execWithCapture(command, argv, stdin=None, root='/', log_output=True, filter
:param root: The directory to chroot to before running command.
:param log_output: Whether to log the output of command
:param filter_stderr: Whether stderr should be excluded from the returned output
:param raise_err: whether to raise a CalledProcessError if the returncode is non-zero
:param callback: method to call while waiting for process to finish, passed Popen object
:param env_add: environment variables to add before execution
:param reset_handlers: whether to reset to SIG_DFL any signal handlers set to SIG_IGN
:param reset_lang: whether to set the locale of the child process to C
:return: The output of the command
"""
argv = [command] + list(argv)
@ -231,6 +245,92 @@ def execWithCapture(command, argv, stdin=None, root='/', log_output=True, filter
raise_err=raise_err, callback=callback, env_add=env_add,
reset_handlers=reset_handlers, reset_lang=reset_lang)[1]
def execReadlines(command, argv, stdin=None, root='/', env_prune=None, filter_stderr=False,
callback=lambda x: True, env_add=None, reset_handlers=True, reset_lang=True):
""" Execute an external command and return the line output of the command
in real-time.
This method assumes that there is a reasonably low delay between the
end of output and the process exiting. If the child process closes
stdout and then keeps on truckin' there will be problems.
NOTE/WARNING: UnicodeDecodeError will be raised if the output of the
external command can't be decoded as UTF-8.
:param command: The command to run
:param argv: The argument list
:param stdin: The file object to read stdin from.
:param stdout: Optional file object to redirect stdout and stderr to.
:param root: The directory to chroot to before running command.
:param env_prune: environment variable to remove before execution
:param filter_stderr: Whether stderr should be excluded from the returned output
:param callback: method to call while waiting for process to finish, passed Popen object
:param env_add: environment variables to add before execution
:param reset_handlers: whether to reset to SIG_DFL any signal handlers set to SIG_IGN
:param reset_lang: whether to set the locale of the child process to C
:return: Iterator of the lines from the command
Output from the file is not logged to program.log
This returns an iterator with the lines from the command until it has finished
"""
class ExecLineReader(object):
"""Iterator class for returning lines from a process and cleaning
up the process when the output is no longer needed.
"""
def __init__(self, proc, argv, callback):
self._proc = proc
self._argv = argv
self._callback = callback
def __iter__(self):
return self
def __del__(self):
# See if the process is still running
if self._proc.poll() is None:
# Stop the process and ignore any problems that might arise
try:
self._proc.terminate()
except OSError:
pass
def __next__(self):
# Read the next line, blocking if a line is not yet available
line = self._proc.stdout.readline().decode("utf-8")
if line == '' or not self._callback(self._proc):
# Output finished, wait for the process to end
self._proc.communicate()
# Check for successful exit
if self._proc.returncode < 0:
raise OSError("process '%s' was killed by signal %s" %
(self._argv, -self._proc.returncode))
elif self._proc.returncode > 0:
raise OSError("process '%s' exited with status %s" %
(self._argv, self._proc.returncode))
raise StopIteration
return line.strip()
argv = [command] + argv
if filter_stderr:
stderr = subprocess.DEVNULL
else:
stderr = subprocess.STDOUT
try:
proc = startProgram(argv, root=root, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, bufsize=1,
env_prune=env_prune, env_add=env_add, reset_handlers=reset_handlers, reset_lang=reset_lang)
except OSError as e:
with program_log_lock:
program_log.error("Error running %s: %s", argv[0], e.strerror)
raise
return ExecLineReader(proc, argv, callback)
def runcmd(cmd, **kwargs):
""" run execWithRedirect with raise_err=True
"""