mirror of https://github.com/mongodb/mongo
324 lines
14 KiB
Python
324 lines
14 KiB
Python
"""A more reliable way to create and destroy processes.
|
|
|
|
Uses job objects when running on Windows to ensure that all created
|
|
processes are terminated.
|
|
"""
|
|
|
|
import atexit
|
|
import logging
|
|
import os
|
|
import os.path
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from shlex import quote
|
|
|
|
import psutil
|
|
from buildscripts.resmokelib import config as _config
|
|
from buildscripts.resmokelib import errors
|
|
from buildscripts.resmokelib import utils
|
|
from buildscripts.resmokelib.core import pipe
|
|
from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
|
|
|
|
# Attempt to avoid race conditions (e.g. hangs caused by a file descriptor being left open) when
|
|
# starting subprocesses concurrently from multiple threads by guarding calls to subprocess.Popen()
|
|
# with a lock. See https://bugs.python.org/issue2320 and https://bugs.python.org/issue12739 as
|
|
# reports of such hangs.
|
|
#
|
|
# This lock probably isn't necessary when both the subprocess32 module and its _posixsubprocess C
|
|
# extension module are available because either
|
|
# (a) the pipe2() syscall is available on the platform we're using, so pipes are atomically
|
|
# created with the FD_CLOEXEC flag set on them, or
|
|
# (b) the pipe2() syscall isn't available, but the GIL isn't released during the
|
|
# _posixsubprocess.fork_exec() call or the _posixsubprocess.cloexec_pipe() call.
|
|
# See https://bugs.python.org/issue7213 for more details.
|
|
_POPEN_LOCK = threading.Lock()
|
|
|
|
# Job objects are the only reliable way to ensure that processes are terminated on Windows.
|
|
if sys.platform == "win32":
|
|
import win32api
|
|
import win32con
|
|
import win32event
|
|
import win32job
|
|
import win32process
|
|
import winerror
|
|
|
|
def _init_job_object():
|
|
job_object = win32job.CreateJobObject(None, "")
|
|
|
|
# Get the limit and job state information of the newly-created job object.
|
|
job_info = win32job.QueryInformationJobObject(job_object,
|
|
win32job.JobObjectExtendedLimitInformation)
|
|
|
|
# Set up the job object so that closing the last handle to the job object
|
|
# will terminate all associated processes and destroy the job object itself.
|
|
job_info["BasicLimitInformation"]["LimitFlags"] |= \
|
|
win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
|
|
|
|
# Update the limits of the job object.
|
|
win32job.SetInformationJobObject(job_object, win32job.JobObjectExtendedLimitInformation,
|
|
job_info)
|
|
|
|
return job_object
|
|
|
|
# Don't create a job object if the current process is already inside one.
|
|
if win32job.IsProcessInJob(win32process.GetCurrentProcess(), None):
|
|
_JOB_OBJECT = None
|
|
else:
|
|
_JOB_OBJECT = _init_job_object()
|
|
atexit.register(win32api.CloseHandle, _JOB_OBJECT)
|
|
|
|
|
|
class Process(object):
|
|
"""Wrapper around subprocess.Popen class."""
|
|
|
|
# pylint: disable=protected-access
|
|
|
|
def __init__(self, logger, args, env=None, env_vars=None, cwd=None):
|
|
"""Initialize the process with the specified logger, arguments, and environment."""
|
|
|
|
# Ensure that executable files that don't already have an
|
|
# extension on Windows have a ".exe" extension.
|
|
if sys.platform == "win32" and not os.path.splitext(args[0])[1]:
|
|
args[0] += ".exe"
|
|
|
|
self.logger = logger
|
|
self.args = args
|
|
|
|
self.env = utils.default_if_none(env, os.environ.copy())
|
|
if not self.env.get('RESMOKE_PARENT_PROCESS'):
|
|
self.env['RESMOKE_PARENT_PROCESS'] = os.environ.get('RESMOKE_PARENT_PROCESS',
|
|
str(os.getpid()))
|
|
if not self.env.get('RESMOKE_PARENT_CTIME'):
|
|
self.env['RESMOKE_PARENT_CTIME'] = os.environ.get('RESMOKE_PARENT_CTIME',
|
|
str(psutil.Process().create_time()))
|
|
if env_vars is not None:
|
|
self.env.update(env_vars)
|
|
|
|
# If we are running against an External System Under Test & this is a `mongo{d,s}` process, we make this process a NOOP.
|
|
# `mongo{d,s}` processes are not running locally for an External System Under Test.
|
|
self.NOOP = _config.EXTERNAL_SUT and os.path.basename(self.args[0]) in ["mongod", "mongos"]
|
|
|
|
# The `pid` attribute is assigned after the local process is started. If this process is a NOOP, we assign it a dummy value.
|
|
self.pid = 1 if self.NOOP else None
|
|
|
|
self._process = None
|
|
self._recorder = None
|
|
self._stdout_pipe = None
|
|
self._stderr_pipe = None
|
|
self._cwd = cwd
|
|
|
|
def start(self):
|
|
"""Start the process and the logger pipes for its stdout and stderr."""
|
|
if self.NOOP:
|
|
return None
|
|
|
|
creation_flags = 0
|
|
if sys.platform == "win32" and _JOB_OBJECT is not None:
|
|
creation_flags |= win32process.CREATE_BREAKAWAY_FROM_JOB
|
|
|
|
# Tests fail if a process takes too long to startup and listen to a socket. Use buffered
|
|
# I/O pipes to give the process some leeway.
|
|
buffer_size = 1024 * 1024
|
|
|
|
# Close file descriptors in the child process before executing the program. This prevents
|
|
# file descriptors that were inherited due to multiple calls to fork() -- either within one
|
|
# thread, or concurrently from multiple threads -- from causing another subprocess to wait
|
|
# for the completion of the newly spawned child process. Closing other file descriptors
|
|
# isn't supported on Windows when stdout and stderr are redirected.
|
|
close_fds = (sys.platform != "win32")
|
|
|
|
with _POPEN_LOCK:
|
|
|
|
# Record unittests directly since resmoke doesn't not interact with them and they can finish
|
|
# too quickly for the recorder to have a chance at attaching.
|
|
recorder_args = []
|
|
if _config.UNDO_RECORDER_PATH is not None and self.args[0].endswith("_test"):
|
|
now_str = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
|
|
# Only use the process name since we have to be able to correlate the recording name
|
|
# with the binary name easily.
|
|
recorder_output_file = "{process}-{t}.undo".format(
|
|
process=os.path.basename(self.args[0]), t=now_str)
|
|
recorder_args = [_config.UNDO_RECORDER_PATH, "-o", recorder_output_file]
|
|
|
|
self._process = subprocess.Popen(recorder_args + self.args, bufsize=buffer_size,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
close_fds=close_fds, env=self.env,
|
|
creationflags=creation_flags, cwd=self._cwd)
|
|
self.pid = self._process.pid
|
|
|
|
if _config.UNDO_RECORDER_PATH is not None and (not self.args[0].endswith("_test")) and (
|
|
"mongod" in self.args[0] or "mongos" in self.args[0]):
|
|
now_str = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
|
|
recorder_output_file = "{logger}-{process}-{pid}-{t}.undo".format(
|
|
logger=self.logger.name.replace('/', '-'),
|
|
process=os.path.basename(self.args[0]), pid=self.pid, t=now_str)
|
|
recorder_args = [
|
|
_config.UNDO_RECORDER_PATH, "-p",
|
|
str(self.pid), "-o", recorder_output_file
|
|
]
|
|
self._recorder = subprocess.Popen(recorder_args, bufsize=buffer_size, env=self.env,
|
|
creationflags=creation_flags)
|
|
|
|
self._stdout_pipe = pipe.LoggerPipe(self.logger, logging.INFO, self._process.stdout)
|
|
self._stderr_pipe = pipe.LoggerPipe(self.logger, logging.ERROR, self._process.stderr)
|
|
|
|
self._stdout_pipe.wait_until_started()
|
|
self._stderr_pipe.wait_until_started()
|
|
|
|
if sys.platform == "win32" and _JOB_OBJECT is not None:
|
|
try:
|
|
win32job.AssignProcessToJobObject(_JOB_OBJECT, self._process._handle)
|
|
except win32job.error as err:
|
|
# ERROR_ACCESS_DENIED (winerror=5) is received when the process has already died.
|
|
if err.winerror != winerror.ERROR_ACCESS_DENIED:
|
|
raise
|
|
return_code = win32process.GetExitCodeProcess(self._process._handle)
|
|
if return_code == win32con.STILL_ACTIVE:
|
|
raise
|
|
|
|
def stop(self, mode=None):
|
|
"""Terminate the process."""
|
|
if self.NOOP:
|
|
return None
|
|
|
|
if mode is None:
|
|
mode = fixture_interface.TeardownMode.TERMINATE
|
|
|
|
if sys.platform == "win32":
|
|
|
|
# Attempt to cleanly shutdown mongod.
|
|
if mode != fixture_interface.TeardownMode.KILL and self.args and self.args[0].find(
|
|
"mongod") != -1:
|
|
mongo_signal_handle = None
|
|
try:
|
|
mongo_signal_handle = win32event.OpenEvent(
|
|
win32event.EVENT_MODIFY_STATE, False,
|
|
"Global\\Mongo_" + str(self._process.pid))
|
|
|
|
if not mongo_signal_handle:
|
|
# The process has already died.
|
|
return
|
|
win32event.SetEvent(mongo_signal_handle)
|
|
# Wait 60 seconds for the program to exit.
|
|
status = win32event.WaitForSingleObject(self._process._handle, 60 * 1000)
|
|
if status == win32event.WAIT_OBJECT_0:
|
|
return
|
|
except win32process.error as err:
|
|
# ERROR_FILE_NOT_FOUND (winerror=2)
|
|
# ERROR_ACCESS_DENIED (winerror=5)
|
|
# ERROR_INVALID_HANDLE (winerror=6)
|
|
# One of the above errors is received if the process has
|
|
# already died.
|
|
if err.winerror not in (2, 5, 6):
|
|
raise
|
|
finally:
|
|
win32api.CloseHandle(mongo_signal_handle)
|
|
|
|
print("Failed to cleanly exit the program, calling TerminateProcess() on PID: " +\
|
|
str(self._process.pid))
|
|
|
|
# Adapted from implementation of Popen.terminate() in subprocess.py of Python 2.7
|
|
# because earlier versions do not catch exceptions.
|
|
try:
|
|
# Have the process exit with code 0 if it is terminated by us to simplify the
|
|
# success-checking logic later on.
|
|
win32process.TerminateProcess(self._process._handle, 0)
|
|
except win32process.error as err:
|
|
# ERROR_ACCESS_DENIED (winerror=5) is received when the process
|
|
# has already died.
|
|
if err.winerror != winerror.ERROR_ACCESS_DENIED:
|
|
raise
|
|
return_code = win32process.GetExitCodeProcess(self._process._handle)
|
|
if return_code == win32con.STILL_ACTIVE:
|
|
raise
|
|
else:
|
|
try:
|
|
if mode == fixture_interface.TeardownMode.KILL:
|
|
self._process.kill()
|
|
elif mode == fixture_interface.TeardownMode.TERMINATE:
|
|
self._process.terminate()
|
|
elif mode == fixture_interface.TeardownMode.ABORT:
|
|
self._process.send_signal(mode.value)
|
|
else:
|
|
raise errors.ProcessError("Process wrapper given unrecognized teardown mode: " +
|
|
mode.value)
|
|
|
|
except OSError as err:
|
|
# ESRCH (errno=3) is received when the process has already died.
|
|
if err.errno != 3:
|
|
raise
|
|
|
|
def poll(self):
|
|
"""Poll."""
|
|
if self.NOOP:
|
|
return None
|
|
return self._process.poll()
|
|
|
|
def wait(self, timeout=None):
|
|
"""Wait until process has terminated and all output has been consumed by the logger pipes."""
|
|
if self.NOOP:
|
|
return None
|
|
|
|
return_code = self._process.wait(timeout)
|
|
|
|
if self._recorder is not None:
|
|
self.logger.info('Saving the UndoDB recording; it may take a few minutes...')
|
|
recorder_return = self._recorder.wait(timeout)
|
|
if recorder_return != 0:
|
|
raise errors.ServerFailure(
|
|
"UndoDB live-record did not terminate correctly. This is likely a bug with UndoDB. "
|
|
"Please record the logs and notify the #server-testing Slack channel")
|
|
|
|
if self._stdout_pipe:
|
|
self._stdout_pipe.wait_until_finished()
|
|
if self._stderr_pipe:
|
|
self._stderr_pipe.wait_until_finished()
|
|
|
|
return return_code
|
|
|
|
def as_command(self):
|
|
"""Return an equivalent command line invocation of the process."""
|
|
|
|
default_env = os.environ
|
|
env_diff = self.env.copy()
|
|
|
|
# Remove environment variables that appear in both 'os.environ' and 'self.env'.
|
|
for env_var in default_env:
|
|
if env_var in env_diff and env_diff[env_var] == default_env[env_var]:
|
|
del env_diff[env_var]
|
|
|
|
sb = [] # String builder.
|
|
for env_var in env_diff:
|
|
sb.append(quote("%s=%s" % (env_var, env_diff[env_var])))
|
|
sb.extend(map(quote, self.args))
|
|
|
|
return " ".join(sb)
|
|
|
|
def pause(self):
|
|
"""Send the SIGSTOP signal to the process and wait for it to be stopped."""
|
|
if self.NOOP:
|
|
return None
|
|
while True:
|
|
self._process.send_signal(signal.SIGSTOP)
|
|
mongod_process = psutil.Process(self.pid)
|
|
process_status = mongod_process.status()
|
|
if process_status == psutil.STATUS_STOPPED:
|
|
break
|
|
self.logger.info("Process status: {}".format(process_status))
|
|
time.sleep(1)
|
|
|
|
def resume(self):
|
|
"""Send the SIGCONT signal to the process."""
|
|
if self.NOOP:
|
|
return None
|
|
self._process.send_signal(signal.SIGCONT)
|
|
|
|
def __str__(self):
|
|
if self.pid is None:
|
|
return self.as_command()
|
|
return "%s (%d)" % (self.as_command(), self.pid)
|