Commit fff1f6e2 authored by Mike Hommey's avatar Mike Hommey
Browse files

Bug 1906191 - Change how ProcessHandler handles output_timeout....

Bug 1906191 - Change how ProcessHandler handles output_timeout. r=releng-reviewers,gbrown a=test-only

First and foremost, this undoes the patch for bug 1845125, which causes
other problems (e.g. output being dropped past a certain point).

The rest of the patch addresses bug 1845125 at the root: Ultimately, this
is a similar problem to bug 1863675.

wait is blocked on joining the stream reader thread, but the stream
reader thread is blocked on readline because the process is finished but
hasn't been waited on yet.

So joining the stream reader thread after the process is known to have
been finished is expected to work more reliably... except when a child
has been spun up that keeps it alive. In this case, we just keep the
reader thread working in the background, which is a similar strategy to
what was done in bug 1863675.

Differential Revision: https://phabricator.services.mozilla.com/D218389
parent c26f87a5
Loading
Loading
Loading
Loading
+2 −5
Original line number Diff line number Diff line
@@ -170,13 +170,10 @@ class ProcessExecutionMixin(LoggingMixin):
            p.processOutput()
            status = None
            sig = None
            # XXX: p.wait() sometimes fails to detect the process exit and never returns a status code.
            # Time out and check if the pid still exists.
            # See bug 1845125 for example.
            while status is None and p.pid_exists(p.pid):
            while status is None:
                try:
                    if sig is None:
                        status = p.wait(5)
                        status = p.wait()
                    else:
                        status = p.kill(sig=sig)
                except KeyboardInterrupt:
+47 −61
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@

import codecs
import errno
import io
import os
import signal
import subprocess
@@ -897,7 +898,7 @@ falling back to not using job objects for managing child processes""",
        # Ensure that we first check for the reader status. Otherwise
        # we might mark the process as finished while output is still getting
        # processed.
        elif self.reader.is_alive():
        elif not self._ignore_children and self.reader.is_alive():
            return None
        elif hasattr(self, "returncode"):
            return self.returncode
@@ -940,18 +941,17 @@ falling back to not using job objects for managing child processes""",
        - '0' if the process ended without failures

        """
        # Thread.join() blocks the main thread until the reader thread is finished
        # wake up once a second in case a keyboard interrupt is sent
        if self.reader.thread and self.reader.thread is not threading.current_thread():
            count = 0
            while self.reader.is_alive():
                if timeout is not None and count > timeout:
                    self.debug("wait timeout for reader thread")
                    return None
                self.reader.join(timeout=1)
                count += 1

        self.returncode = self.proc.wait(timeout)
        if (
            self.returncode is not None
            and self.reader.thread
            and self.reader.thread is not threading.current_thread()
            # If children are ignored and a child is still running because it's
            # been daemonized or something, the reader might still be attached
            # to that child'd output... and joining will deadlock.
            and not self._ignore_children
        ):
            self.reader.join()
        return self.returncode

    @property
@@ -1072,85 +1072,71 @@ class ProcessReader(object):
        return thread

    def _read_stream(self, stream, queue, callback):
        while True:
            line = stream.readline()
            if not line:
                break
        sentinel = "" if isinstance(stream, io.TextIOBase) else b""
        for line in iter(stream.readline, sentinel):
            queue.put((line, callback))
        # Give a chance to the reading loop to exit without a timeout.
        queue.put((b"", None))
        stream.close()

    def start(self, proc):
        queue = Queue()
        stdout_reader = None
        readers = 0
        if proc.stdout:
            stdout_reader = self._create_stream_reader(
            self._create_stream_reader(
                "ProcessReaderStdout", proc.stdout, queue, self.stdout_callback
            )
        stderr_reader = None
            readers += 1
        if proc.stderr and proc.stderr != proc.stdout:
            stderr_reader = self._create_stream_reader(
            self._create_stream_reader(
                "ProcessReaderStderr", proc.stderr, queue, self.stderr_callback
            )
            readers += 1
        self.thread = threading.Thread(
            name="ProcessReader",
            target=self._read,
            args=(stdout_reader, stderr_reader, queue),
            args=(queue, readers),
        )
        self.thread.daemon = True
        self.thread.start()
        self.debug("ProcessReader started")

    def _read(self, stdout_reader, stderr_reader, queue):
    def _read(self, queue, readers):
        start_time = time.time()
        timed_out = False
        timeout = self.timeout
        if timeout is not None:
            timeout += start_time
        output_timeout = self.output_timeout
        if output_timeout is not None:
            output_timeout += start_time

        while (stdout_reader and stdout_reader.is_alive()) or (
            stderr_reader and stderr_reader.is_alive()
        ):
            has_line = True
            try:
                line, callback = queue.get(True, INTERVAL_PROCESS_ALIVE_CHECK)
            except Empty:
                has_line = False
            now = time.time()
            if not has_line:
                if output_timeout is not None and now > output_timeout:
                    timed_out = True
                    self.didOutputTimeout = True
                    break
        def get_line():
            queue_timeout = None
            if timeout:
                queue_timeout = timeout - time.time()
            if output_timeout:
                if queue_timeout:
                    queue_timeout = min(queue_timeout, output_timeout)
                else:
                if output_timeout is not None:
                    output_timeout = now + self.output_timeout
                callback(line.rstrip())
            if timeout is not None and now > timeout:
                timed_out = True
                break
        self.debug("_read loop exited")
        # process remaining lines to read
        while not queue.empty():
            line, callback = queue.get(False)
                    queue_timeout = output_timeout
            return queue.get(timeout=queue_timeout)

        try:
            # We need to wait for as many `(b"", None)` sentinels as there are
            # reader threads setup in start.
            for n in range(readers):
                for line, callback in iter(get_line, (b"", None)):
                    try:
                        callback(line.rstrip())
                    except Exception:
                        traceback.print_exc()
        if timed_out:
            try:
                self.timeout_callback()
                self.finished_callback()
            except Exception:
                traceback.print_exc()
        if stdout_reader:
            stdout_reader.join()
        if stderr_reader:
            stderr_reader.join()
        if not timed_out:
        except Empty:
            if timeout and time.time() < timeout:
                self.didOutputTimeout = True
            try:
                self.finished_callback()
                self.timeout_callback()
            except Exception:
                traceback.print_exc()
        self.debug("_read exited")