mongo/buildscripts/resmokelib/testing/hooks/lifecycle.py

217 lines
8.3 KiB
Python

"""Thread lifecycle management for periodic hooks such as ContinuousStepdown."""
import collections
import os.path
import threading
import buildscripts.resmokelib.utils.filesystem as fs
ActionFiles = collections.namedtuple("ActionFiles", ["permitted", "idle_request", "idle_ack"])
class FlagBasedThreadLifecycle(object):
"""Class for managing the various states of a continuously running hook thread.
The job thread alternates between calling mark_test_started() and mark_test_finished(). The
hook thread is allowed to perform actions at any point between these two calls. Note that
the job thread synchronizes with the hook thread outside the context of this object to know
it isn't in the process of running an action.
"""
_TEST_STARTED_STATE = "start"
_TEST_FINISHED_STATE = "finished"
def __init__(self):
"""Initialize the FlagBasedThreadLifecycle instance."""
self.__lock = threading.Lock()
self.__cond = threading.Condition(self.__lock)
self.__test_state = self._TEST_FINISHED_STATE
self.__should_stop = False
def mark_test_started(self):
"""Signal to the hook thread that a new test has started.
This function should be called during before_test(). Calling it causes the
wait_for_action_permitted() function to no longer block and to instead return true.
"""
with self.__lock:
self.__test_state = self._TEST_STARTED_STATE
self.__cond.notify_all()
def mark_test_finished(self):
"""Signal to the hook thread that the current test has finished.
This function should be called during after_test(). Calling it causes the
wait_for_action_permitted() function to block until mark_test_started() is called again.
"""
with self.__lock:
self.__test_state = self._TEST_FINISHED_STATE
self.__cond.notify_all()
def stop(self):
"""Signal to the hook thread that it should exit.
This function should be called during after_suite(). Calling it causes the
wait_for_action_permitted() function to no longer block and to instead return false.
"""
with self.__lock:
self.__should_stop = True
self.__cond.notify_all()
def wait_for_action_permitted(self):
"""Block until actions are permitted, or until stop() is called.
:return: true if actions are permitted, and false if steps are not permitted.
"""
with self.__lock:
while not self.__should_stop:
if self.__test_state == self._TEST_STARTED_STATE:
return True
self.__cond.wait()
return False
def wait_for_action_interval(self, timeout):
"""Block for 'timeout' seconds, or until stop() is called."""
with self.__lock:
self.__cond.wait(timeout)
def poll_for_idle_request(self): # noqa: D205,D400
"""Return true if the hook thread should continue running actions, or false if it
should temporarily stop running actions.
"""
with self.__lock:
return self.__test_state == self._TEST_FINISHED_STATE
def send_idle_acknowledgement(self):
"""No-op.
This method exists so this class has the same interface as FileBasedThreadLifecycle.
"""
pass
class FileBasedThreadLifecycle(object):
"""Class for managing the various states of the hook thread using files.
Unlike in the FlagBasedThreadLifecycle class, the job thread alternating between calls to
mark_test_started() and mark_test_finished() doesn't automatically grant permission for the
hook thread to perform actions. Instead, the test will part-way through allow actions to
be performed and then will part-way through disallow actions from continuing to be performed.
See jstests/concurrency/fsm_libs/resmoke_runner.js for the other half of the file-base protocol.
Python inside of resmoke.py JavaScript inside of the mongo shell
--------------------------- ------------------------------------
FSM workload starts.
Call $config.setup() function.
Create "permitted" file.
Wait for "permitted" file to be created.
Action runs.
Check if "idle_request" file exists.
Action runs.
Check if "idle_request" file exists.
FSM workload threads all finish.
Create "idle_request" file.
Action runs.
Check if "idle_request" file exists.
Create "idle_ack" file.
(No more actions run.)
Wait for "idle_ack" file.
Call $config.teardown() function.
FSM workload finishes.
Note that the job thread still synchronizes with the hook thread outside the context of this
object to know it isn't in the process of running an action.
"""
def __init__(self, action_files):
"""Initialize the FileBasedThreadLifecycle instance."""
self.__action_files = action_files
self.__lock = threading.Lock()
self.__cond = threading.Condition(self.__lock)
self.__should_stop = False
def mark_test_started(self):
"""Signal to the hook thread that a new test has started.
This function should be called during before_test(). Calling it does nothing because
permission for running actions is given by the test itself writing the "permitted" file.
"""
pass
def mark_test_finished(self):
"""Signal to the hook thread that the current test has finished.
This function should be called during after_test(). Calling it causes the
wait_for_action_permitted() function to block until the next test gives permission for
running actions.
"""
# It is possible something went wrong during the test's execution and prevented the
# "permitted" and "idle_request" files from being created. We therefore don't consider it an
# error if they don't exist after the test has finished.
fs.remove_if_exists(self.__action_files.permitted)
fs.remove_if_exists(self.__action_files.idle_request)
fs.remove_if_exists(self.__action_files.idle_ack)
def stop(self):
"""Signal to the hook thread that it should exit.
This function should be called during after_suite(). Calling it causes the
wait_for_action_permitted() function to no longer block and to instead return false.
"""
with self.__lock:
self.__should_stop = True
self.__cond.notify_all()
def wait_for_action_permitted(self):
"""Block until actions are permitted, or until stop() is called.
:return: true if actions are permitted, and false if steps are not permitted.
"""
with self.__lock:
while not self.__should_stop:
if os.path.isfile(self.__action_files.permitted):
return True
# Wait a little bit before checking for the "permitted" file again.
self.__cond.wait(0.1)
return False
def wait_for_action_interval(self, timeout):
"""Block for 'timeout' seconds, or until stop() is called."""
with self.__lock:
self.__cond.wait(timeout)
def poll_for_idle_request(self): # noqa: D205,D400
"""Return true if the hook thread should continue running actions, or false if it
should temporarily stop running actions.
"""
if os.path.isfile(self.__action_files.idle_request):
with self.__lock:
return True
return False
def send_idle_acknowledgement(self):
"""Signal to the running test that the hook thread won't continue to run actions."""
with open(self.__action_files.idle_ack, "w"):
pass
# We remove the "permitted" file to revoke permission for the hook thread to continue
# performing actions.
fs.remove_if_exists(self.__action_files.permitted)