mirror of https://github.com/mongodb/mongo
348 lines
14 KiB
Python
348 lines
14 KiB
Python
"""A more reliable way to create and destroy processes.
|
|
|
|
Uses job objects when running on Windows to ensure that all created
|
|
processes are terminated.
|
|
"""
|
|
|
|
import atexit
|
|
import logging
|
|
import os
|
|
import os.path
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from shlex import quote
|
|
|
|
import psutil
|
|
|
|
from buildscripts.resmokelib import config as _config
|
|
from buildscripts.resmokelib import errors, utils
|
|
from buildscripts.resmokelib.core import pipe
|
|
from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
|
|
|
|
# Attempt to avoid race conditions (e.g. hangs caused by a file descriptor being left open) when
|
|
# starting subprocesses concurrently from multiple threads by guarding calls to subprocess.Popen()
|
|
# with a lock. See https://bugs.python.org/issue2320 and https://bugs.python.org/issue12739 as
|
|
# reports of such hangs.
|
|
#
|
|
# This lock probably isn't necessary when both the subprocess32 module and its _posixsubprocess C
|
|
# extension module are available because either
|
|
# (a) the pipe2() syscall is available on the platform we're using, so pipes are atomically
|
|
# created with the FD_CLOEXEC flag set on them, or
|
|
# (b) the pipe2() syscall isn't available, but the GIL isn't released during the
|
|
# _posixsubprocess.fork_exec() call or the _posixsubprocess.cloexec_pipe() call.
|
|
# See https://bugs.python.org/issue7213 for more details.
|
|
_POPEN_LOCK = threading.Lock()
|
|
|
|
# we track the pids that we abort to know which core dumps are high priority to analyze
|
|
core_dump_file_pid_lock = threading.Lock()
|
|
BORING_CORE_DUMP_PIDS_FILE = "boring_core_dumps.txt"
|
|
|
|
# Job objects are the only reliable way to ensure that processes are terminated on Windows.
|
|
if sys.platform == "win32":
|
|
import win32api
|
|
import win32con
|
|
import win32event
|
|
import win32job
|
|
import win32process
|
|
import winerror
|
|
|
|
def _init_job_object():
|
|
job_object = win32job.CreateJobObject(None, "")
|
|
|
|
# Get the limit and job state information of the newly-created job object.
|
|
job_info = win32job.QueryInformationJobObject(
|
|
job_object, win32job.JobObjectExtendedLimitInformation
|
|
)
|
|
|
|
# Set up the job object so that closing the last handle to the job object
|
|
# will terminate all associated processes and destroy the job object itself.
|
|
job_info["BasicLimitInformation"]["LimitFlags"] |= (
|
|
win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
|
|
)
|
|
|
|
# Update the limits of the job object.
|
|
win32job.SetInformationJobObject(
|
|
job_object, win32job.JobObjectExtendedLimitInformation, job_info
|
|
)
|
|
|
|
return job_object
|
|
|
|
# Don't create a job object if the current process is already inside one.
|
|
if win32job.IsProcessInJob(win32process.GetCurrentProcess(), None):
|
|
_JOB_OBJECT = None
|
|
else:
|
|
_JOB_OBJECT = _init_job_object()
|
|
atexit.register(win32api.CloseHandle, _JOB_OBJECT)
|
|
|
|
|
|
class Process(object):
|
|
"""Wrapper around subprocess.Popen class."""
|
|
|
|
def __init__(self, logger, args, env=None, env_vars=None, cwd=None, start_new_session=False):
|
|
"""Initialize the process with the specified logger, arguments, and environment."""
|
|
|
|
# Ensure that executable files that don't already have an
|
|
# extension on Windows have a ".exe" extension.
|
|
if sys.platform == "win32" and not os.path.splitext(args[0])[1]:
|
|
args[0] += ".exe"
|
|
|
|
self.logger = logger
|
|
self.args = args
|
|
|
|
self.env = utils.default_if_none(env, os.environ.copy())
|
|
if not self.env.get("RESMOKE_PARENT_PROCESS"):
|
|
self.env["RESMOKE_PARENT_PROCESS"] = os.environ.get(
|
|
"RESMOKE_PARENT_PROCESS", str(os.getpid())
|
|
)
|
|
if not self.env.get("RESMOKE_PARENT_CTIME"):
|
|
self.env["RESMOKE_PARENT_CTIME"] = os.environ.get(
|
|
"RESMOKE_PARENT_CTIME", str(psutil.Process().create_time())
|
|
)
|
|
if env_vars is not None:
|
|
self.env.update(env_vars)
|
|
|
|
# If we are running against an External System Under Test & this is a `mongo{d,s}` process, we make this process a NOOP.
|
|
# `mongo{d,s}` processes are not running locally for an External System Under Test.
|
|
self.NOOP = _config.NOOP_MONGO_D_S_PROCESSES and os.path.basename(self.args[0]) in [
|
|
"mongod",
|
|
"mongos",
|
|
]
|
|
|
|
# The `pid` attribute is assigned after the local process is started. If this process is a NOOP, we assign it a dummy value.
|
|
self.pid = 1 if self.NOOP else None
|
|
|
|
self._process = None
|
|
self._stdout_pipe = None
|
|
self._stderr_pipe = None
|
|
self._cwd = cwd
|
|
self._start_new_session = start_new_session
|
|
|
|
if sys.platform == "win32":
|
|
self._windows_shutdown_event_set = False
|
|
|
|
def start(self):
|
|
"""Start the process and the logger pipes for its stdout and stderr."""
|
|
if self.NOOP:
|
|
return None
|
|
|
|
creation_flags = 0
|
|
if sys.platform == "win32" and _JOB_OBJECT is not None:
|
|
creation_flags |= win32process.CREATE_BREAKAWAY_FROM_JOB
|
|
|
|
# Tests fail if a process takes too long to startup and listen to a socket. Use buffered
|
|
# I/O pipes to give the process some leeway.
|
|
buffer_size = 1024 * 1024
|
|
|
|
# Close file descriptors in the child process before executing the program. This prevents
|
|
# file descriptors that were inherited due to multiple calls to fork() -- either within one
|
|
# thread, or concurrently from multiple threads -- from causing another subprocess to wait
|
|
# for the completion of the newly spawned child process. Closing other file descriptors
|
|
# isn't supported on Windows when stdout and stderr are redirected.
|
|
close_fds = sys.platform != "win32"
|
|
|
|
with _POPEN_LOCK:
|
|
self._process = subprocess.Popen(
|
|
self.args,
|
|
bufsize=buffer_size,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
close_fds=close_fds,
|
|
env=self.env,
|
|
creationflags=creation_flags,
|
|
cwd=self._cwd,
|
|
start_new_session=self._start_new_session,
|
|
)
|
|
self.pid = self._process.pid
|
|
|
|
self._stdout_pipe = pipe.LoggerPipe(self.logger, logging.INFO, self._process.stdout)
|
|
self._stderr_pipe = pipe.LoggerPipe(self.logger, logging.ERROR, self._process.stderr)
|
|
|
|
self._stdout_pipe.wait_until_started()
|
|
self._stderr_pipe.wait_until_started()
|
|
|
|
if sys.platform == "win32" and _JOB_OBJECT is not None:
|
|
try:
|
|
win32job.AssignProcessToJobObject(_JOB_OBJECT, self._process._handle)
|
|
except win32job.error as err:
|
|
# ERROR_ACCESS_DENIED (winerror=5) is received when the process has already died.
|
|
if err.winerror != winerror.ERROR_ACCESS_DENIED:
|
|
raise
|
|
return_code = win32process.GetExitCodeProcess(self._process._handle)
|
|
if return_code == win32con.STILL_ACTIVE:
|
|
raise
|
|
|
|
def stop(self, mode=None):
|
|
"""Terminate the process."""
|
|
if self.NOOP:
|
|
return None
|
|
|
|
if mode is None:
|
|
mode = fixture_interface.TeardownMode.TERMINATE
|
|
|
|
if sys.platform == "win32":
|
|
if (
|
|
mode != fixture_interface.TeardownMode.KILL
|
|
and self.args
|
|
and self.args[0].find("mongod") != -1
|
|
):
|
|
self._request_clean_shutdown_on_windows()
|
|
else:
|
|
self._terminate_on_windows()
|
|
else:
|
|
try:
|
|
if mode == fixture_interface.TeardownMode.KILL:
|
|
self._process.kill()
|
|
elif mode == fixture_interface.TeardownMode.TERMINATE:
|
|
self._process.terminate()
|
|
elif mode == fixture_interface.TeardownMode.ABORT:
|
|
# the core dumps taken when we abort are low prioirty to get analyzed.
|
|
# The data files are more useful most of the time.
|
|
# We poll the process just to make sure we don't count processes that
|
|
# ended prematurely as boring.
|
|
if _config.EVERGREEN_TASK_ID and self._process.poll() is None:
|
|
with core_dump_file_pid_lock:
|
|
pid = str(self._process.pid)
|
|
with open(BORING_CORE_DUMP_PIDS_FILE, "a") as file:
|
|
file.write(f"{pid}\n")
|
|
self._process.send_signal(mode.value)
|
|
else:
|
|
raise errors.ProcessError(
|
|
"Process wrapper given unrecognized teardown mode: " + mode.value
|
|
)
|
|
|
|
except OSError as err:
|
|
# ESRCH (errno=3) is received when the process has already died.
|
|
if err.errno != 3:
|
|
raise
|
|
|
|
def poll(self):
|
|
"""Poll."""
|
|
if self.NOOP:
|
|
return None
|
|
return self._process.poll()
|
|
|
|
def wait(self, timeout=None):
|
|
"""Wait until process has terminated and all output has been consumed by the logger pipes."""
|
|
if self.NOOP:
|
|
return None
|
|
|
|
if sys.platform == "win32" and self._windows_shutdown_event_set:
|
|
status = None
|
|
try:
|
|
# Wait 60 seconds for the program to exit.
|
|
status = win32event.WaitForSingleObject(self._process._handle, 60 * 1000)
|
|
except win32process.error as err:
|
|
# ERROR_FILE_NOT_FOUND (winerror=2)
|
|
# ERROR_ACCESS_DENIED (winerror=5)
|
|
# ERROR_INVALID_HANDLE (winerror=6)
|
|
# One of the above errors is received if the process has
|
|
# already died.
|
|
if err.winerror not in (2, 5, 6):
|
|
raise
|
|
|
|
if status is not None and status != win32event.WAIT_OBJECT_0:
|
|
self.logger.info(
|
|
f"Failed to cleanly exit the program, calling TerminateProcess() on PID:"
|
|
f" {str(self._process.pid)}"
|
|
)
|
|
self._terminate_on_windows()
|
|
|
|
return_code = self._process.wait(timeout)
|
|
|
|
if self._stdout_pipe:
|
|
self._stdout_pipe.wait_until_finished()
|
|
if self._stderr_pipe:
|
|
self._stderr_pipe.wait_until_finished()
|
|
|
|
return return_code
|
|
|
|
def as_command(self):
|
|
"""Return an equivalent command line invocation of the process."""
|
|
|
|
default_env = os.environ
|
|
env_diff = self.env.copy()
|
|
|
|
# Remove environment variables that appear in both 'os.environ' and 'self.env'.
|
|
for env_var in default_env:
|
|
if env_var in env_diff and env_diff[env_var] == default_env[env_var]:
|
|
del env_diff[env_var]
|
|
|
|
sb = [] # String builder.
|
|
for env_var in env_diff:
|
|
sb.append(quote("%s=%s" % (env_var, env_diff[env_var])))
|
|
sb.extend(map(quote, self.args))
|
|
|
|
return " ".join(sb)
|
|
|
|
def pause(self):
|
|
"""Send the SIGSTOP signal to the process and wait for it to be stopped."""
|
|
if self.NOOP:
|
|
return None
|
|
while True:
|
|
self._process.send_signal(signal.SIGSTOP)
|
|
mongod_process = psutil.Process(self.pid)
|
|
process_status = mongod_process.status()
|
|
if process_status == psutil.STATUS_STOPPED:
|
|
break
|
|
self.logger.info("Process status: {}".format(process_status))
|
|
time.sleep(1)
|
|
|
|
def resume(self):
|
|
"""Send the SIGCONT signal to the process."""
|
|
if self.NOOP:
|
|
return None
|
|
self._process.send_signal(signal.SIGCONT)
|
|
|
|
def __str__(self):
|
|
if self.pid is None:
|
|
return self.as_command()
|
|
return "%s (%d)" % (self.as_command(), self.pid)
|
|
|
|
if sys.platform == "win32":
|
|
|
|
def _request_clean_shutdown_on_windows(self):
|
|
"""Request clean shutdown on Windows."""
|
|
_windows_mongo_signal_handle = None
|
|
try:
|
|
_windows_mongo_signal_handle = win32event.OpenEvent(
|
|
win32event.EVENT_MODIFY_STATE, False, "Global\\Mongo_" + str(self._process.pid)
|
|
)
|
|
|
|
if not _windows_mongo_signal_handle:
|
|
# The process has already died.
|
|
return
|
|
win32event.SetEvent(_windows_mongo_signal_handle)
|
|
self._windows_shutdown_event_set = True
|
|
|
|
except win32process.error as err:
|
|
# ERROR_FILE_NOT_FOUND (winerror=2)
|
|
# ERROR_ACCESS_DENIED (winerror=5)
|
|
# ERROR_INVALID_HANDLE (winerror=6)
|
|
# One of the above errors is received if the process has
|
|
# already died.
|
|
if err.winerror not in (2, 5, 6):
|
|
raise
|
|
|
|
finally:
|
|
win32api.CloseHandle(_windows_mongo_signal_handle)
|
|
|
|
def _terminate_on_windows(self):
|
|
"""Terminate the process on Windows."""
|
|
# Adapted from implementation of Popen.terminate() in subprocess.py of Python 2.7
|
|
# because earlier versions do not catch exceptions.
|
|
try:
|
|
# Have the process exit with code 0 if it is terminated by us to simplify the
|
|
# success-checking logic later on.
|
|
win32process.TerminateProcess(self._process._handle, 0)
|
|
except win32process.error as err:
|
|
# ERROR_ACCESS_DENIED (winerror=5) is received when the process
|
|
# has already died.
|
|
if err.winerror != winerror.ERROR_ACCESS_DENIED:
|
|
raise
|
|
return_code = win32process.GetExitCodeProcess(self._process._handle)
|
|
if return_code == win32con.STILL_ACTIVE:
|
|
raise
|