mirror of https://github.com/mongodb/mongo
217 lines
8.3 KiB
Python
217 lines
8.3 KiB
Python
"""Thread lifecycle management for periodic hooks such as ContinuousStepdown."""
|
|
|
|
import collections
|
|
import os.path
|
|
import threading
|
|
|
|
import buildscripts.resmokelib.utils.filesystem as fs
|
|
|
|
ActionFiles = collections.namedtuple("ActionFiles", ["permitted", "idle_request", "idle_ack"])
|
|
|
|
|
|
class FlagBasedThreadLifecycle(object):
|
|
"""Class for managing the various states of a continuously running hook thread.
|
|
|
|
The job thread alternates between calling mark_test_started() and mark_test_finished(). The
|
|
hook thread is allowed to perform actions at any point between these two calls. Note that
|
|
the job thread synchronizes with the hook thread outside the context of this object to know
|
|
it isn't in the process of running an action.
|
|
"""
|
|
|
|
_TEST_STARTED_STATE = "start"
|
|
_TEST_FINISHED_STATE = "finished"
|
|
|
|
def __init__(self):
|
|
"""Initialize the FlagBasedThreadLifecycle instance."""
|
|
self.__lock = threading.Lock()
|
|
self.__cond = threading.Condition(self.__lock)
|
|
|
|
self.__test_state = self._TEST_FINISHED_STATE
|
|
self.__should_stop = False
|
|
|
|
def mark_test_started(self):
|
|
"""Signal to the hook thread that a new test has started.
|
|
|
|
This function should be called during before_test(). Calling it causes the
|
|
wait_for_action_permitted() function to no longer block and to instead return true.
|
|
"""
|
|
with self.__lock:
|
|
self.__test_state = self._TEST_STARTED_STATE
|
|
self.__cond.notify_all()
|
|
|
|
def mark_test_finished(self):
|
|
"""Signal to the hook thread that the current test has finished.
|
|
|
|
This function should be called during after_test(). Calling it causes the
|
|
wait_for_action_permitted() function to block until mark_test_started() is called again.
|
|
"""
|
|
with self.__lock:
|
|
self.__test_state = self._TEST_FINISHED_STATE
|
|
self.__cond.notify_all()
|
|
|
|
def stop(self):
|
|
"""Signal to the hook thread that it should exit.
|
|
|
|
This function should be called during after_suite(). Calling it causes the
|
|
wait_for_action_permitted() function to no longer block and to instead return false.
|
|
"""
|
|
with self.__lock:
|
|
self.__should_stop = True
|
|
self.__cond.notify_all()
|
|
|
|
def wait_for_action_permitted(self):
|
|
"""Block until actions are permitted, or until stop() is called.
|
|
|
|
:return: true if actions are permitted, and false if steps are not permitted.
|
|
"""
|
|
with self.__lock:
|
|
while not self.__should_stop:
|
|
if self.__test_state == self._TEST_STARTED_STATE:
|
|
return True
|
|
|
|
self.__cond.wait()
|
|
|
|
return False
|
|
|
|
def wait_for_action_interval(self, timeout):
|
|
"""Block for 'timeout' seconds, or until stop() is called."""
|
|
with self.__lock:
|
|
self.__cond.wait(timeout)
|
|
|
|
def poll_for_idle_request(self): # noqa: D205,D400
|
|
"""Return true if the hook thread should continue running actions, or false if it
|
|
should temporarily stop running actions.
|
|
"""
|
|
with self.__lock:
|
|
return self.__test_state == self._TEST_FINISHED_STATE
|
|
|
|
def send_idle_acknowledgement(self):
|
|
"""No-op.
|
|
|
|
This method exists so this class has the same interface as FileBasedThreadLifecycle.
|
|
"""
|
|
pass
|
|
|
|
|
|
class FileBasedThreadLifecycle(object):
|
|
"""Class for managing the various states of the hook thread using files.
|
|
|
|
Unlike in the FlagBasedThreadLifecycle class, the job thread alternating between calls to
|
|
mark_test_started() and mark_test_finished() doesn't automatically grant permission for the
|
|
hook thread to perform actions. Instead, the test will part-way through allow actions to
|
|
be performed and then will part-way through disallow actions from continuing to be performed.
|
|
|
|
See jstests/concurrency/fsm_libs/resmoke_runner.js for the other half of the file-base protocol.
|
|
|
|
Python inside of resmoke.py JavaScript inside of the mongo shell
|
|
--------------------------- ------------------------------------
|
|
|
|
FSM workload starts.
|
|
Call $config.setup() function.
|
|
Create "permitted" file.
|
|
|
|
Wait for "permitted" file to be created.
|
|
|
|
Action runs.
|
|
Check if "idle_request" file exists.
|
|
|
|
Action runs.
|
|
Check if "idle_request" file exists.
|
|
|
|
FSM workload threads all finish.
|
|
Create "idle_request" file.
|
|
|
|
Action runs.
|
|
Check if "idle_request" file exists.
|
|
Create "idle_ack" file.
|
|
(No more actions run.)
|
|
|
|
Wait for "idle_ack" file.
|
|
Call $config.teardown() function.
|
|
FSM workload finishes.
|
|
|
|
Note that the job thread still synchronizes with the hook thread outside the context of this
|
|
object to know it isn't in the process of running an action.
|
|
"""
|
|
|
|
def __init__(self, action_files):
|
|
"""Initialize the FileBasedThreadLifecycle instance."""
|
|
self.__action_files = action_files
|
|
|
|
self.__lock = threading.Lock()
|
|
self.__cond = threading.Condition(self.__lock)
|
|
|
|
self.__should_stop = False
|
|
|
|
def mark_test_started(self):
|
|
"""Signal to the hook thread that a new test has started.
|
|
|
|
This function should be called during before_test(). Calling it does nothing because
|
|
permission for running actions is given by the test itself writing the "permitted" file.
|
|
"""
|
|
pass
|
|
|
|
def mark_test_finished(self):
|
|
"""Signal to the hook thread that the current test has finished.
|
|
|
|
This function should be called during after_test(). Calling it causes the
|
|
wait_for_action_permitted() function to block until the next test gives permission for
|
|
running actions.
|
|
"""
|
|
# It is possible something went wrong during the test's execution and prevented the
|
|
# "permitted" and "idle_request" files from being created. We therefore don't consider it an
|
|
# error if they don't exist after the test has finished.
|
|
fs.remove_if_exists(self.__action_files.permitted)
|
|
fs.remove_if_exists(self.__action_files.idle_request)
|
|
fs.remove_if_exists(self.__action_files.idle_ack)
|
|
|
|
def stop(self):
|
|
"""Signal to the hook thread that it should exit.
|
|
|
|
This function should be called during after_suite(). Calling it causes the
|
|
wait_for_action_permitted() function to no longer block and to instead return false.
|
|
"""
|
|
with self.__lock:
|
|
self.__should_stop = True
|
|
self.__cond.notify_all()
|
|
|
|
def wait_for_action_permitted(self):
|
|
"""Block until actions are permitted, or until stop() is called.
|
|
|
|
:return: true if actions are permitted, and false if steps are not permitted.
|
|
"""
|
|
with self.__lock:
|
|
while not self.__should_stop:
|
|
if os.path.isfile(self.__action_files.permitted):
|
|
return True
|
|
|
|
# Wait a little bit before checking for the "permitted" file again.
|
|
self.__cond.wait(0.1)
|
|
|
|
return False
|
|
|
|
def wait_for_action_interval(self, timeout):
|
|
"""Block for 'timeout' seconds, or until stop() is called."""
|
|
with self.__lock:
|
|
self.__cond.wait(timeout)
|
|
|
|
def poll_for_idle_request(self): # noqa: D205,D400
|
|
"""Return true if the hook thread should continue running actions, or false if it
|
|
should temporarily stop running actions.
|
|
"""
|
|
if os.path.isfile(self.__action_files.idle_request):
|
|
with self.__lock:
|
|
return True
|
|
|
|
return False
|
|
|
|
def send_idle_acknowledgement(self):
|
|
"""Signal to the running test that the hook thread won't continue to run actions."""
|
|
|
|
with open(self.__action_files.idle_ack, "w"):
|
|
pass
|
|
|
|
# We remove the "permitted" file to revoke permission for the hook thread to continue
|
|
# performing actions.
|
|
fs.remove_if_exists(self.__action_files.permitted)
|