SERVER-106702: Avoid unbounded buffering of recording events [reapply] (#44980)

GitOrigin-RevId: a834ffc98dcede190584be5efceb3b57e93b14d0
This commit is contained in:
James H 2025-12-16 16:05:55 +00:00 committed by MongoDB Bot
parent 9a8289c145
commit 8041ad4686
14 changed files with 487 additions and 517 deletions

View File

@ -128,6 +128,10 @@ public:
_state->stop();
}
bool stop_requested() const {
return _state && _state->stopped();
}
private:
std::shared_ptr<stop_state> _state = std::make_shared<stop_state>();
};

View File

@ -57,7 +57,6 @@ mongo_cc_unit_test(
"replay_command_executor_test.cpp",
"replay_command_test.cpp",
"replay_test_server.cpp",
"session_handler_test.cpp",
"session_scheduler_test.cpp",
"session_simulator_test.cpp",
"test_packet.cpp",

View File

@ -31,12 +31,15 @@
#include "mongo/base/error_codes.h"
#include "mongo/util/assert_util.h"
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <absl/time/time.h>
#include <boost/filesystem/operations.hpp>
#include <boost/optional/optional.hpp>
#include <boost/program_options.hpp>
#include <boost/program_options/errors.hpp>
#include <boost/program_options/options_description.hpp>
@ -54,6 +57,9 @@ std::vector<ReplayConfig> ConfigHandler::parse(int argc, char** argv) {
auto mongodbTarget = "URI of the shadow mongod/s";
auto configFilePath = "Path to the config file";
auto enablePerfRecording = "Enable/Disable perf recording specifying file name";
auto graceTime =
"Delay replay to allow time for session initialization before the first event for that "
"session";
namespace po = boost::program_options;
@ -65,7 +71,8 @@ std::vector<ReplayConfig> ConfigHandler::parse(int argc, char** argv) {
("input,i", po::value<std::string>(), recordingPath)
("target,t", po::value<std::string>(), mongodbTarget)
("config,c", po::value<std::string>(), configFilePath)
("perf,p", po::value<std::string>()->default_value(""), enablePerfRecording);
("perf,p", po::value<std::string>()->default_value(""), enablePerfRecording)
("graceTime,a", po::value<std::string>()->default_value("5s"), graceTime);
// clang-format on
// Parse the program options
@ -130,6 +137,15 @@ std::vector<ReplayConfig> ConfigHandler::parse(int argc, char** argv) {
uassert(ErrorCodes::ReplayClientConfigurationError, "target URI is empty", !uri.empty());
config.mongoURI = uri;
config.enablePerformanceRecording = vm["perf"].as<std::string>();
absl::Duration dur;
uassert(ErrorCodes::ReplayClientConfigurationError,
"Invalid graceTime",
absl::ParseDuration(vm["graceTime"].as<std::string>(), &dur));
uassert(ErrorCodes::ReplayClientConfigurationError,
"Invalid graceTime",
dur >= absl::ZeroDuration());
config.sessionPreInitTime = absl::ToChronoSeconds(dur);
return {config};
}
@ -160,6 +176,20 @@ std::vector<ReplayConfig> ConfigHandler::parseMultipleInstanceConfig(const std::
json config;
configFile >> config;
boost::optional<std::chrono::seconds> graceTime;
if (config.contains("graceTime")) {
absl::Duration dur;
uassert(ErrorCodes::ReplayClientConfigurationError,
"Invalid graceTime",
absl::ParseDuration(config["graceTime"].get<std::string>(), &dur));
uassert(ErrorCodes::ReplayClientConfigurationError,
"Invalid graceTime",
dur >= absl::ZeroDuration());
graceTime = absl::ToChronoSeconds(dur);
}
uassert(ErrorCodes::ReplayClientConfigurationError,
"'recordings' key is missing",
config.contains("recordings"));
@ -174,6 +204,9 @@ std::vector<ReplayConfig> ConfigHandler::parseMultipleInstanceConfig(const std::
std::string filePath = recording["path"].get<std::string>();
std::string targetUri = recording["uri"].get<std::string>();
ReplayConfig replayConfig = {filePath, targetUri};
if (graceTime) {
replayConfig.sessionPreInitTime = *graceTime;
}
configurations.push_back(std::move(replayConfig));
}

View File

@ -30,25 +30,23 @@
#include "mongo/replay/replay_client.h"
#include "mongo/db/query/util/stop_token.h"
#include "mongo/db/service_context.h"
#include "mongo/db/wire_version.h"
#include "mongo/logv2/log.h"
#include "mongo/replay/replay_command.h"
#include "mongo/replay/replay_config.h"
#include "mongo/replay/session_handler.h"
#include "mongo/replay/traffic_recording_iterator.h"
#include "mongo/transport/asio/asio_session_manager.h"
#include "mongo/transport/asio/asio_transport_layer.h"
#include "mongo/transport/transport_layer_manager.h"
#include "mongo/transport/transport_layer_manager_impl.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/version.h"
#include "mongo/util/duration.h"
#include <chrono>
#include <condition_variable>
#include <exception>
#include <memory>
#include <mutex>
#include <string>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
namespace mongo {
/**
@ -166,26 +164,6 @@ private:
std::exception_ptr exception = nullptr;
};
// We don't want to replay all the commands we find in the recording file. Mainly we want to skip:
// 1. legacy commands. Everything that is marked legacy, won't be replayable.
// 2. Responses (cursor). These will be the result of some query like find and aggregate.
// 3. n/ok commands. These are just responses.
// 4. isWritablePrimary/isMaster. These are mostly diagnostic commands that we don't want.
// NOLINTNEXTLINE needs audit
static std::unordered_set<std::string> forbiddenKeywords{"legacy",
"cursor",
"endSessions",
"ok",
"isWritablePrimary",
"n",
"isMaster",
"ismaster",
"stopTrafficRecording"};
bool isReplayable(const std::string& commandType) {
return !commandType.empty() && !forbiddenKeywords.contains(commandType);
}
/**
* Consumes a collection of recording files from a _single_ node.
*
@ -200,41 +178,77 @@ void recordingDispatcher(mongo::stop_token stop, const ReplayConfig& replayConfi
}
try {
/**
* Begin reading the provided recording, searching for session starts.
*
* Upon finding a session start, spawn a new thread to manage that session.
* That thread will replay further events for that session at the appropriate time.
*
*/
auto iter = RecordingSetIterator(files);
if (iter == end(iter)) {
// There are no events in the recording.
LOGV2_INFO(10893009, "Empty or invalid recording - exiting");
return;
}
// create a new session handler for mananging the recording.
SessionHandler sessionHandler{replayConfig.mongoURI,
replayConfig.enablePerformanceRecording};
// State for sessions will be constructed a little "earlier" than the time
// at which the session needs to start, to avoid initial delays from e.g.,
// spawning the thread.
const auto timePadding = replayConfig.sessionPreInitTime;
for (const auto& packet : iter) {
if (stop.stop_requested()) {
return;
}
ReplayCommand command{packet};
if (!isReplayable(command.parseOpType())) {
continue;
// Plan the replay to start a small time into the future, so sessions can be
// constructed ready to replay at the "correct" time.
const auto replayStartTime = std::chrono::steady_clock::now() + timePadding;
// create a new session handler for managing the recording.
SessionHandler sessionHandler{
replayConfig.mongoURI, replayStartTime, replayConfig.enablePerformanceRecording};
mongo::stop_callback sc(stop, [&] { sessionHandler.stopAllSessions(); });
LOGV2_INFO(10893005, "Replay starting");
for (; iter != end(iter); ++iter) {
// Read ahead by a small time window to find session starts, to initialize session
// state with a small grace period before the first event for that session needs to be
// processed.
// Reading too far (or unlimited) ahead would needlessly create session state before
// it is needed, wasting resources.
auto nextEventTS = replayStartTime + iter->offset.toSystemDuration() - timePadding;
if (!sessionHandler.waitUntil(nextEventTS)) {
// Didn't reach the expected time; a session failed or stop was requested by the
// caller.
break;
}
ReplayCommand command{*iter};
if (command.isSessionStart()) {
// will associated the URI to a session task and run all the commands associated
// with this session id.
const auto& [offset, sessionId] = extractOffsetAndSessionFromCommand(command);
sessionHandler.onSessionStart(offset, sessionId);
} else if (command.isSessionEnd()) {
// stop commad will reset the complete the simulation and reset the connection.
sessionHandler.onSessionStop(command);
} else {
// must be a runnable command.
sessionHandler.onBsonCommand(command);
sessionHandler.createSession(command.fetchRequestSessionId(), iter);
}
}
// All sessions seen in the recording have been created, and are independently replaying
// in dedicated threads.
LOGV2_INFO(10893006, "All sessions initialized");
// Wait for all the sessions to complete.
sessionHandler.waitForRunningSessions();
sessionHandler.rethrowIfSessionFailed();
} catch (DBException& e) {
LOGV2_INFO(10893010, "Replay failed", "exception"_attr = e.what());
e.addContext("Session replay failed");
throw;
} catch (const std::exception& e) {
tasserted(ErrorCodes::ReplayClientInternalError, e.what());
LOGV2_INFO(10893007, "Replay failed", "exception"_attr = e.what());
throw;
}
LOGV2_INFO(10893008, "Replay completed");
}
void ReplayClient::replayRecording(const ReplayConfigs& configs) {

View File

@ -123,9 +123,9 @@ bool ReplayCommand::isSessionEnd() const {
return _packet.eventType == EventType::kSessionEnd;
}
std::pair<Microseconds, int64_t> extractOffsetAndSessionFromCommand(const ReplayCommand& command) {
std::pair<Microseconds, uint64_t> extractOffsetAndSessionFromCommand(const ReplayCommand& command) {
const Microseconds offset = command.fetchRequestOffset();
const int64_t sessionId = command.fetchRequestSessionId();
const uint64_t sessionId = command.fetchRequestSessionId();
return {offset, sessionId};
}
} // namespace mongo

View File

@ -93,6 +93,6 @@ private:
TrafficReaderPacket _packet;
};
std::pair<Microseconds, int64_t> extractOffsetAndSessionFromCommand(const ReplayCommand& command);
std::pair<Microseconds, uint64_t> extractOffsetAndSessionFromCommand(const ReplayCommand& command);
} // namespace mongo

View File

@ -30,6 +30,7 @@
#include "mongo/util/modules.h"
#include <chrono>
#include <string>
#include <vector>
@ -38,6 +39,7 @@ struct ReplayConfig {
std::string recordingPath;
std::string mongoURI;
std::string enablePerformanceRecording;
std::chrono::seconds sessionPreInitTime = std::chrono::seconds(5);
explicit operator bool() const {
return !recordingPath.empty() && !mongoURI.empty();

View File

@ -29,87 +29,78 @@
#include "mongo/replay/session_handler.h"
#include "mongo/db/query/util/stop_token.h"
#include "mongo/logv2/log.h"
#include "mongo/replay/performance_reporter.h"
#include "mongo/replay/rawop_document.h"
#include "mongo/replay/replay_command.h"
#include "mongo/util/duration.h"
#include "mongo/util/time_support.h"
#include <chrono>
#include <exception>
#include <mutex>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
namespace mongo {
void SessionHandler::onSessionStart(Microseconds offset, int64_t sessionId) {
auto& session = createSession(sessionId);
// connects to the server
session.start(_uri, _replayStartTime, offset);
}
void SessionHandler::onSessionStart(const ReplayCommand& command) {
const auto& [offset, sid] = extractOffsetAndSessionFromCommand(command);
onSessionStart(offset, sid);
}
void SessionHandler::onSessionStop(const ReplayCommand& stopCommand) {
uassert(ErrorCodes::ReplayClientSessionSimulationError,
"Error, failed the command does not represent a stop recording event.",
stopCommand.isSessionEnd());
const auto& [offset, sessionId] = extractOffsetAndSessionFromCommand(stopCommand);
auto& session = getSessionSimulator(sessionId);
session.stop(offset);
// this is correct, because the scheduler will wait until the stop command would have run. In
// case of errors, the session will need to be deleted either way.
destroySession(sessionId);
}
void SessionHandler::onBsonCommand(const ReplayCommand& command) {
// just run the command. the Session simulator will make sure things work.
const auto& [offset, sessionId] = extractOffsetAndSessionFromCommand(command);
uassert(ErrorCodes::ReplayClientSessionSimulationError,
"Error, the session should be active",
isSessionActive(sessionId));
auto& session = getSessionSimulator(sessionId);
session.run(command, offset);
}
void SessionHandler::clear() {
_runningSessions.clear();
}
SessionSimulator& SessionHandler::createSession(key_t key) {
uassert(ErrorCodes::ReplayClientSessionSimulationError,
"Error, running session cannot contain the same key",
!isSessionActive(key));
void SessionHandler::createSession(key_t key, PacketSource source) {
auto commandExecutor = std::make_unique<ReplayCommandExecutor>();
auto sessionScheduler = std::make_unique<SessionScheduler>();
auto perfReporter = std::make_unique<PerformanceReporter>(_uri, _perfFileName);
auto session = std::make_unique<SessionSimulator>(
std::move(commandExecutor), std::move(sessionScheduler), std::move(perfReporter));
return *_runningSessions.insert({key, std::move(session)}).first->second;
auto session = std::make_unique<SessionSimulator>(std::move(source),
key,
_replayStartTime,
_uri,
std::move(commandExecutor),
std::move(perfReporter));
std::thread([session = std::move(session), this] {
++_runningSessionCount;
try {
session->run(_allSessionStop.get_token());
} catch (...) {
auto recordedException = _sessionException.synchronize();
if (!*recordedException) {
*recordedException = std::current_exception();
}
stopAllSessions();
}
--_runningSessionCount;
notify();
}).detach();
LOGV2_DEBUG(10893000, 1, "New Session", "sessionID"_attr = key);
}
void SessionHandler::destroySession(key_t key) {
uassert(ErrorCodes::ReplayClientSessionSimulationError,
"Error, running session must contain the key passed",
isSessionActive(key));
_runningSessions.erase(key);
void SessionHandler::stopAllSessions() {
_allSessionStop.request_stop();
}
SessionSimulator& SessionHandler::getSessionSimulator(SessionHandler::key_t key) {
uassert(ErrorCodes::ReplayClientSessionSimulationError,
"Error, running session must contain the key passed",
isSessionActive(key));
return *(_runningSessions.at(key));
void SessionHandler::rethrowIfSessionFailed() {
auto exception = _sessionException.get();
if (exception) {
std::rethrow_exception(exception);
}
}
const SessionSimulator& SessionHandler::getSessionSimulator(SessionHandler::key_t key) const {
uassert(ErrorCodes::ReplayClientSessionSimulationError,
"Error, running session must contain the key passed",
isSessionActive(key));
return *(_runningSessions.at(key));
void SessionHandler::waitForRunningSessions() {
std::unique_lock ul(_notificationMutex);
_cv.wait(ul, [&] { return _runningSessionCount == 0; });
}
bool SessionHandler::isSessionActive(key_t key) const {
return _runningSessions.contains(key);
bool SessionHandler::waitUntil(std::chrono::steady_clock::time_point tp) {
std::unique_lock ul(_notificationMutex);
// std::stop_token not supported on all toolchains, cannot use
// condition_variable_any::wait* overloads which take std::stop_token.
// Reproduce behaviour with a mongo::stop_callback.
mongo::stop_callback sc(_allSessionStop.get_token(), [&] { _cv.notify_all(); });
_cv.wait_until(ul, tp, [&] {
return _sessionException.get() != nullptr || _allSessionStop.stop_requested();
});
// Return true if the requested time was reached, without an exception or stop request.
return !_allSessionStop.stop_requested() && _sessionException.get() == nullptr;
}
void SessionHandler::notify() {
_cv.notify_all();
}
} // namespace mongo

View File

@ -28,16 +28,13 @@
*/
#pragma once
#include "mongo/base/string_data.h"
#include "mongo/db/traffic_reader.h"
#include "mongo/replay/session_simulator.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/stdx/unordered_set.h"
#include "mongo/util/modules.h"
#include "mongo/util/time_support.h"
#include "mongo/util/synchronized_value.h"
#include <chrono>
#include <memory>
#include <condition_variable>
#include <exception>
namespace mongo {
class ReplayCommand;
@ -51,51 +48,56 @@ public:
* response. By default enable perf recording is disabled (useful for testing). But for real
* simulations the recording will always be enabled.
*/
explicit SessionHandler(std::string uri, std::string perfFileName = "")
: _replayStartTime(std::chrono::steady_clock::now()),
explicit SessionHandler(
std::string uri,
std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now(),
std::string perfFileName = "")
: _replayStartTime(startTime),
_uri(std::move(uri)),
_perfFileName(std::move(perfFileName)) {}
/* Global start time shared with all the sessions*/
void setStartTime(Date_t recordStartTime);
/**
* Start a new session given uri and start session recorded command. Returns the key for the
* session just started
*/
void onSessionStart(Microseconds offset, int64_t sessionId);
void onSessionStart(const ReplayCommand& command);
/**
* Stop the session started with the key provided as argument and use the stop command received
*/
void onSessionStop(const ReplayCommand&);
/**
* Just replay the command, read from the recording file
*/
void onBsonCommand(const ReplayCommand&);
/**
* To use carefully, basically destroys all the sessions and reset the session cache
*/
void clear();
void createSession(key_t sid, PacketSource source);
/**
* Return number of sessions running
*/
size_t fetchTotalRunningSessions() const {
return _runningSessions.size();
return _runningSessionCount.load();
}
void stopAllSessions();
/**
* Re-throw exception captured from a failed session.
*/
void rethrowIfSessionFailed();
/**
* Wait for any remaining session replays to end, or for stop to be requested.
*
*/
void waitForRunningSessions();
/**
* Wait until the provided timepoint, stop is requested by the parent, or an exception is thrown
* by a session replay.
*/
bool waitUntil(std::chrono::steady_clock::time_point tp);
void notify();
private:
stdx::unordered_map<key_t, std::unique_ptr<SessionSimulator>> _runningSessions;
mongo::stop_source _allSessionStop;
std::mutex _notificationMutex; // NOLINT
std::condition_variable _cv; // NOLINT
std::atomic<int64_t> _runningSessionCount = 0; // NOLINT
mongo::synchronized_value<std::exception_ptr> _sessionException;
std::chrono::steady_clock::time_point _replayStartTime; // when the replay started
std::string _uri; // uri of the mongo shadow instance
std::string _perfFileName; // perf recording file name if specified
SessionSimulator& createSession(key_t);
void destroySession(key_t);
bool isSessionActive(key_t) const;
SessionSimulator& getSessionSimulator(key_t);
const SessionSimulator& getSessionSimulator(key_t) const;
};
} // namespace mongo

View File

@ -1,170 +0,0 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/replay/session_handler.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/json.h"
#include "mongo/replay/rawop_document.h"
#include "mongo/replay/replay_command.h"
#include "mongo/replay/replay_command_executor.h"
#include "mongo/replay/replay_test_server.h"
#include "mongo/replay/test_packet.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/duration.h"
#include "mongo/util/time_support.h"
#include <ratio>
namespace mongo {
TEST(SessionHandlerTest, StartAndStopSession) {
ReplayTestServer server;
auto startRecording = cmds::start({.offset = Milliseconds(0)});
auto stopRecording = cmds::stop({.offset = Milliseconds(5)});
{
const auto uri = server.getConnectionString();
SessionHandler sessionHandler{uri};
sessionHandler.onSessionStart(startRecording);
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 1);
sessionHandler.onSessionStop(stopRecording);
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 0);
// session is not deleted. Should be ready to be reused.
sessionHandler.clear();
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 0);
}
}
TEST(SessionHandlerTest, StartSessionSameSessionIDError) {
ReplayTestServer server;
const auto uri = server.getConnectionString();
auto commandStart1 = cmds::start({.offset = Microseconds(0)});
{
SessionHandler sessionHandler{uri};
sessionHandler.onSessionStart(commandStart1);
auto commandStart2 = cmds::start({.offset = Milliseconds(100)});
// this will throw. we can't have different sessions with same session id.
ASSERT_THROWS_CODE(sessionHandler.onSessionStart(commandStart2),
DBException,
ErrorCodes::ReplayClientSessionSimulationError);
// closing the first session and starting again with the same sessionId will work.
auto commandStop1 = cmds::stop({.offset = Microseconds(0)});
sessionHandler.onSessionStop(commandStop1);
// there should be 0 active sessions and 1 free session simulator to be re-used
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 0);
sessionHandler.onSessionStart(commandStart2);
// not there should be 1 active session and 0 free sessions.
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 1);
sessionHandler.clear();
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 0);
}
}
TEST(SessionHandlerTest, StartTwoSessionsDifferentSessionIDSameKey) {
ReplayTestServer server;
const auto uri = server.getConnectionString();
// start command from session 1
auto commandStart1 = cmds::start({.id = 1, .offset = Microseconds(0)});
// start command from session2
auto commandStart2 = cmds::start({.id = 2, .offset = Microseconds(50)});
// stop command from session1
auto commandStop1 = cmds::stop({.offset = Microseconds(100)});
{
SessionHandler sessionHandler{uri};
sessionHandler.onSessionStart(commandStart1);
sessionHandler.onSessionStop(commandStop1);
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 0);
ASSERT_THROWS_CODE(sessionHandler.onBsonCommand(commandStart1),
DBException,
ErrorCodes::ReplayClientSessionSimulationError);
sessionHandler.onSessionStart(commandStart2);
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 1);
sessionHandler.clear();
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 0);
}
}
TEST(SessionHandlerTest, ExecuteCommand) {
// start
auto startRecording = cmds::start({.offset = Microseconds(0)});
// stop
auto stopRecording = cmds::stop({.offset = Milliseconds(20)});
// find
BSONObj filterBSON = BSON("name" << "Alice");
auto findCommand = cmds::find({.offset = Milliseconds(10)}, filterBSON);
std::string jsonStr = R"([{
"_id": "681cb423980b72695075137f",
"name": "Alice",
"age": 30,
"city": "New York"}])";
// server
ReplayTestServer server{{"find"}, {jsonStr}};
const auto uri = server.getConnectionString();
{
SessionHandler sessionHandler{uri};
sessionHandler.onSessionStart(startRecording);
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 1);
sessionHandler.onBsonCommand(findCommand);
sessionHandler.onSessionStop(stopRecording);
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 0);
// session has now been closed, no connections, so trying to submit the same command should
// throw.
ASSERT_THROWS_CODE(sessionHandler.onBsonCommand(findCommand),
DBException,
ErrorCodes::ReplayClientSessionSimulationError);
// clear the state
sessionHandler.clear();
ASSERT_TRUE(sessionHandler.fetchTotalRunningSessions() == 0);
}
}
} // namespace mongo

View File

@ -29,15 +29,17 @@
#include "mongo/replay/session_simulator.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/query/util/stop_token.h"
#include "mongo/logv2/log.h"
#include "mongo/replay/replay_command.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/duration.h"
#include "mongo/util/time_support.h"
#include "mongo/util/scopeguard.h"
#include <chrono>
#include <exception>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
namespace mongo {
template <typename Callable>
@ -58,73 +60,116 @@ void handleErrors(Callable&& callable) {
static constexpr size_t MAX_SIMULATION_PROCESSING_TASKS = 1;
SessionSimulator::SessionSimulator(std::unique_ptr<ReplayCommandExecutor> replayCommandExecutor,
std::unique_ptr<SessionScheduler> sessionScheduler,
SessionSimulator::SessionSimulator(PacketSource source,
uint64_t sessionID,
std::chrono::steady_clock::time_point globalStartTime,
std::string uri,
std::unique_ptr<ReplayCommandExecutor> replayCommandExecutor,
std::unique_ptr<PerformanceReporter> perfReporter)
: _commandExecutor(std::move(replayCommandExecutor)),
_sessionScheduler(std::move(sessionScheduler)),
: _replayStartTime(globalStartTime),
_uri(uri),
_source(std::move(source)),
_sessionID(sessionID),
_commandExecutor(std::move(replayCommandExecutor)),
_perfReporter(std::move(perfReporter)) {}
SessionSimulator::~SessionSimulator() {
shutdown();
// We don't want to replay all the commands we find in the recording file. Mainly we want to skip:
// 1. legacy commands. Everything that is marked legacy, won't be replayable.
// 2. Responses (cursor). These will be the result of some query like find and aggregate.
// 3. n/ok commands. These are just responses.
// 4. isWritablePrimary/isMaster. These are mostly diagnostic commands that we don't want.
// NOLINTNEXTLINE needs audit
static const std::unordered_set<std::string> forbiddenKeywords{"legacy",
"cursor",
"endSessions",
"ok",
"isWritablePrimary",
"n",
"isMaster",
"ismaster",
"stopTrafficRecording"};
bool isReplayable(const std::string& commandType) {
return !commandType.empty() && !forbiddenKeywords.contains(commandType);
}
void SessionSimulator::shutdown() {
_sessionScheduler->join();
}
void SessionSimulator::run(mongo::stop_token stopToken) {
LOGV2_DEBUG(10893001, 1, "Session execution started");
auto onFail = ScopeGuard([] { LOGV2_ERROR(10893003, "Session execution failed"); });
bool stopEventSeen = false;
for (const auto& packet : _source) {
if (stopToken.stop_requested()) {
LOGV2_WARNING(10893004, "Session execution halted");
onFail.dismiss();
return;
}
ReplayCommand command{packet};
if (!isReplayable(command.parseOpType())) {
continue;
}
const auto& [offset, sessionId] = extractOffsetAndSessionFromCommand(command);
if (sessionId != _sessionID) {
continue;
}
void SessionSimulator::start(StringData uri,
std::chrono::steady_clock::time_point replayStartTime,
const Microseconds& offset) {
// It safe to pass this (because it will be kept alive by the SessionHandler) and to write or
// read member variables, because there is only one thread. Beware about spawning multiple
// threads. Order of commands can be different than the ones recorded and a mutex must be used
// for supporting multiple threads.
auto f = [this, uri = std::string(uri), replayStartTime, offset]() {
_replayStartTime = replayStartTime;
// wait if simulation and recording start time have diverged.
waitIfNeeded(offset);
if (command.isSessionStart()) {
start();
continue;
}
// TODO SERVER-105627: Until session start events are recorded, treat the first observed
// command as starting the session.
if (!_running) {
start();
}
if (command.isSessionEnd()) {
stop();
stopEventSeen = true;
break;
}
// must be a runnable command.
runCommand(command);
}
if (!stopEventSeen) {
// TODO: SERVER-111903 strengthen this to a uassert once session end events
// are guaranteed to be observed at recording end.
LOGV2_WARNING(10893011,
"Recording exhausted without observing session end",
"sessionID"_attr = _sessionID);
}
onFail.dismiss();
LOGV2_DEBUG(10893002, 1, "Session execution completed");
}
void SessionSimulator::start() {
// connect
_commandExecutor->connect(uri);
_commandExecutor->connect(_uri);
// set running flag.
_running = true;
};
_sessionScheduler->submit([f]() { handleErrors(f); });
}
void SessionSimulator::stop(const Microseconds& sessionEndOffset) {
void SessionSimulator::stop() {
// It safe to pass this (because it will be kept alive by the SessionHandler) and to write or
// read member variables, because there is only one thread. Beware about spawning multiple
// threads. Order of commands can be different than the ones recorded and a mutex must be used
// for supporting multiple threads.
auto f = [this, sessionEndOffset]() {
uassert(ErrorCodes::ReplayClientSessionSimulationError,
"SessionSimulator is not connected to a valid mongod/s instance.",
_running);
waitIfNeeded(sessionEndOffset);
_commandExecutor->reset();
};
_sessionScheduler->submit([f]() { handleErrors(f); });
}
void SessionSimulator::run(const ReplayCommand& command, const Microseconds& commandOffset) {
// It safe to pass this (because it will be kept alive by the SessionHandler) and to write or
// read member variables, because there is only one thread. Beware about spawning multiple
// threads. Order of commands can be different than the ones recorded and a mutex must be used
// for supporting multiple threads.
auto f = [this, command, commandOffset]() {
void SessionSimulator::runCommand(const ReplayCommand& command) const {
uassert(ErrorCodes::ReplayClientSessionSimulationError,
"SessionSimulator is not connected to a valid mongod/s instance.",
_running);
waitIfNeeded(commandOffset);
_perfReporter->executeAndRecordPerf(
[this](const ReplayCommand& command) { return _commandExecutor->runCommand(command); },
command);
};
_sessionScheduler->submit([f]() { handleErrors(f); });
}
std::chrono::steady_clock::time_point SessionSimulator::now() const {
@ -137,6 +182,8 @@ void SessionSimulator::sleepFor(std::chrono::steady_clock::duration duration) co
void SessionSimulator::waitIfNeeded(Microseconds recordingOffset) const {
LOGV2_DEBUG(
1232304, 2, "Session waiting until offset", "offset"_attr = recordingOffset.toString());
auto targetTime = _replayStartTime + recordingOffset.toSystemDuration();
auto requiredDelay = targetTime - now();
// wait if needed

View File

@ -33,19 +33,16 @@
* dispatches requests to the contained ReplayCommandExecutor.
*/
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/db/query/util/stop_token.h"
#include "mongo/replay/performance_reporter.h"
#include "mongo/replay/replay_command.h"
#include "mongo/replay/replay_command_executor.h"
#include "mongo/replay/session_scheduler.h"
#include "mongo/replay/traffic_recording_iterator.h"
#include "mongo/util/duration.h"
#include "mongo/util/modules.h"
#include "mongo/util/time_support.h"
#include <chrono>
#include <memory>
namespace mongo {
/**
@ -65,29 +62,26 @@ namespace mongo {
* design as simple as possible. Performance is not very important in this case, however
* SessionSimulator is not supposed to be thrown away and reconstructed constantly.
*/
class SessionSimulator {
public:
SessionSimulator(std::unique_ptr<ReplayCommandExecutor>,
std::unique_ptr<SessionScheduler>,
std::unique_ptr<PerformanceReporter>);
virtual ~SessionSimulator();
void start(StringData uri,
std::chrono::steady_clock::time_point replayStartTime,
const Microseconds& eventOffset);
void stop(const Microseconds& sessionEndOffset);
void run(const ReplayCommand&, const Microseconds& commandOffset);
using PacketSource = RecordingSetIterator;
class SessionSimulator : public std::enable_shared_from_this<SessionSimulator> {
public:
SessionSimulator(PacketSource source,
uint64_t sessionID,
std::chrono::steady_clock::time_point globalStartTime,
std::string uri,
std::unique_ptr<ReplayCommandExecutor>,
std::unique_ptr<PerformanceReporter>);
virtual ~SessionSimulator() = default;
void run(mongo::stop_token stopToken = {});
protected:
/**
* Halt all work, and join any spawned threads.
*
* Optional, only required if simulator must be halted before destruction.
* (e.g., subclass needs to halt threads before destruction).
*/
void shutdown();
void start();
void stop();
void runCommand(const ReplayCommand&) const;
private:
virtual std::chrono::steady_clock::time_point now() const;
virtual void sleepFor(std::chrono::steady_clock::duration duration) const;
void waitIfNeeded(Microseconds) const;
@ -97,8 +91,11 @@ private:
// Derived from steady_clock not system_clock as the replay should
// not be affected by clock manipulation (e.g., by NTP).
std::chrono::steady_clock::time_point _replayStartTime;
std::string _uri;
PacketSource _source;
uint64_t _sessionID;
std::unique_ptr<ReplayCommandExecutor> _commandExecutor;
std::unique_ptr<SessionScheduler> _sessionScheduler;
std::unique_ptr<PerformanceReporter> _perfReporter;
};

View File

@ -29,35 +29,47 @@
#include "mongo/replay/session_simulator.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/json.h"
#include "mongo/db/traffic_reader.h"
#include "mongo/db/traffic_recorder.h"
#include "mongo/replay/mini_mock.h"
#include "mongo/replay/performance_reporter.h"
#include "mongo/replay/rawop_document.h"
#include "mongo/replay/replay_command.h"
#include "mongo/replay/replay_command_executor.h"
#include "mongo/replay/replay_test_server.h"
#include "mongo/replay/session_scheduler.h"
#include "mongo/replay/test_packet.h"
#include "mongo/replay/traffic_recording_iterator.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/duration.h"
#include "mongo/util/synchronized_value.h"
#include "mongo/util/time_support.h"
#include "mongo/util/uuid.h"
#include <chrono>
#include <memory>
#include <ratio>
#include <boost/filesystem/path.hpp>
namespace mongo {
using namespace std::chrono_literals;
static const std::string fakeResponse = R"([{
"_id": "681cb423980b72695075137f",
"name": "Alice",
"age": 30,
"city": "New York"}])";
class TestSessionSimulator : public SessionSimulator {
public:
TestSessionSimulator()
: SessionSimulator(std::make_unique<ReplayCommandExecutor>(),
std::make_unique<SessionScheduler>(),
std::make_unique<PerformanceReporter>("test.bin")) {}
using SessionSimulator::SessionSimulator;
TestSessionSimulator(PacketSource source,
std::chrono::steady_clock::time_point startTime,
StringData uri)
: SessionSimulator(std::move(source),
0 /* sessionID */,
startTime,
std::string(uri),
std::make_unique<ReplayCommandExecutor>(),
std::make_unique<PerformanceReporter>(uri, "test.bin")) {}
std::chrono::steady_clock::time_point now() const override {
auto handle = nowHook.synchronize();
@ -69,11 +81,6 @@ public:
(*handle)(duration);
}
~TestSessionSimulator() override {
// Halt all worker threads, before mock functions are destroyed.
shutdown();
}
using NowMockFunction = MiniMockFunction<std::chrono::steady_clock::time_point>;
using SleepMockFunction = MiniMockFunction<void, std::chrono::steady_clock::duration>;
mutable synchronized_value<NowMockFunction> nowHook{NowMockFunction{"now"}};
@ -90,133 +97,177 @@ auto operator+(const MongoDur& mongoDuration,
const std::chrono::duration<Rep, Period>& nonMongoDuration) {
return mongoDuration + mongo::duration_cast<MongoDur>(nonMongoDuration);
}
class TestPackets {
public:
struct Args {
std::chrono::seconds offset;
TestReaderPacket packet;
};
TestPackets& operator+=(Args&& args) {
args.packet.offset = mongo::duration_cast<Microseconds>(args.offset);
packets.push_back(std::move(args.packet));
return *this;
}
operator PacketSource() const;
const auto& operator[](size_t idx) {
return packets[idx];
}
Date_t recordingStartTime = Date_t::now();
std::vector<TestReaderPacket> packets;
};
TrafficRecordingPacket toOwned(const TrafficReaderPacket& packet) {
Message ownedMessage;
ownedMessage.setData(
packet.message.getNetworkOp(), packet.message.data(), packet.message.dataLen());
return {
.eventType = packet.eventType,
.id = packet.id,
.session = std::string(packet.session),
.offset = packet.offset,
.order = packet.order,
.message = std::move(ownedMessage),
};
}
class TestFiles : public FileSet {
public:
TestFiles(std::vector<TestReaderPacket> packets) {
PacketWriter writer;
const boost::filesystem::path filename = UUID::gen().toString() + ".bin";
writer.open(filename);
for (const auto& packet : packets) {
writer.writePacket(toOwned(packet));
}
writer.close();
files = {std::make_shared<boost::iostreams::mapped_file_source>(filename.string())};
}
/**
* Acquire (read only) memory mapped access to the file at `index`.
*
* If the file is already mapped, shared access will be provided to the same map;
* it will not be blindly re-mapped.
*/
std::shared_ptr<boost::iostreams::mapped_file_source> get(size_t index) override {
if (index >= files.size()) {
return nullptr;
}
return files[index];
}
/**
* Check if this FileSet contains any files.
*/
bool empty() const override {
return files.empty();
}
std::vector<std::shared_ptr<boost::iostreams::mapped_file_source>> files;
};
TestPackets::operator PacketSource() const {
return PacketSource(std::make_shared<TestFiles>(packets));
}
TEST(SessionSimulatorTest, TestSimpleCommandNoWait) {
ReplayTestServer server{{"find"}, {fakeResponse}};
auto packet = TestReaderPacket::find(BSON("name" << "Alice"));
std::string jsonStr = R"([{
"_id": "681cb423980b72695075137f",
"name": "Alice",
"age": 30,
"city": "New York"}])";
ReplayTestServer server{{"find"}, {jsonStr}};
auto replayStartTime = std::chrono::steady_clock::now();
TestPackets packets;
// Simulate a find command occurring 1 second into the recording.
packets += {1s, TestReaderPacket::find(BSON("name" << "Alice"))};
// test simulator scoped in order to complete all the tasks.
{
TestSessionSimulator sessionSimulator;
TestSessionSimulator sessionSimulator{
packets, replayStartTime, server.getConnectionString()};
// connect to server with time
const auto uri = server.getConnectionString();
auto begin = std::chrono::steady_clock::now();
auto eventOffset = Microseconds(0);
// For the next call to now(), report the timestamp the replay started at.
sessionSimulator.nowHook->ret(begin);
// TODO SERVER-105627: First command will start session, and will call now() to
// delay until the correct time. SessionStart events will explicitly do this.
sessionSimulator.nowHook->ret(replayStartTime + 1s);
// Recording and session both start "now".
sessionSimulator.start(uri, begin, eventOffset);
// Initially report "now" as the exact time the find request needs to be issued.
sessionSimulator.nowHook->ret(replayStartTime + 1s);
// Don't expect any call to sleepFor.
using namespace std::chrono_literals;
eventOffset += Duration<std::milli>(1000);
packet.offset = eventOffset;
ReplayCommand command{packet};
// For the next call to now(), report the replay is 1s in - the same time the find should be
// issued at.
sessionSimulator.nowHook->ret(begin + 1s);
sessionSimulator.run(command, eventOffset);
sessionSimulator.run();
}
BSONObj response = fromjson(jsonStr);
ASSERT_TRUE(server.checkResponse("find", response));
}
TEST(SessionSimulatorTest, TestSimpleCommandWait) {
auto packet = TestReaderPacket::find(BSON("name" << "Alice"));
ReplayTestServer server{{"find"}, {fakeResponse}};
std::string jsonStr = R"([{
"_id": "681cb423980b72695075137f",
"name": "Alice",
"age": 30,
"city": "New York"}])";
ReplayTestServer server{{"find"}, {jsonStr}};
auto replayStartTime = std::chrono::steady_clock::now();
TestPackets packets;
// Simulate a find command occurring 2 second into the recording.
packets += {2s, TestReaderPacket::find(BSON("name" << "Alice"))};
// Simulate another command, 3 seconds later (total offset of 5s into the recording).
packets += {5s, TestReaderPacket::find(BSON("name" << "Alice"))};
// test simulator scoped in order to complete all the tasks.
{
TestSessionSimulator sessionSimulator;
TestSessionSimulator sessionSimulator{
packets, replayStartTime, server.getConnectionString()};
// connect to server with time
const auto uri = server.getConnectionString();
auto begin = std::chrono::steady_clock::now();
// First find
using namespace std::chrono_literals;
// The session start occurred two seconds into the recording.
auto eventOffset = Duration<std::milli>(2000); // 2 seconds into the recording
// For the first call to now() return the same timepoint the replay started at.
sessionSimulator.nowHook->ret(begin);
// Expect the simulator to try sleep for 2 seconds.
// Initially report "now" as the recording start time.
sessionSimulator.nowHook->ret(replayStartTime);
// Expect the simulator to try sleep for 2s.
sessionSimulator.sleepHook->expect(2s);
sessionSimulator.start(uri, begin, eventOffset);
// Second find
// Issue a find request at 5s into the recording
eventOffset = Duration<std::milli>(5000);
packet.offset = eventOffset;
ReplayCommand command{packet};
// Report "now" as if time has advanced to when the session started.
sessionSimulator.nowHook->ret(begin + 2s);
// Simulator should attempt to sleep the remaining time to when the
// find request was issued.
// Report "now" as if immediately after sleeping for the previous command.
sessionSimulator.nowHook->ret(replayStartTime + 2s);
// Expect the simulator to try sleep for the remaining 3s to reach the target offset time.
sessionSimulator.sleepHook->expect(3s);
sessionSimulator.run(command, eventOffset);
sessionSimulator.run();
}
BSONObj response = fromjson(jsonStr);
ASSERT_TRUE(server.checkResponse("find", response));
}
TEST(SessionSimulatorTest, TestSimpleCommandNoWaitTimeInThePast) {
// Simulate a real scenario where time is in the past. No wait should happen.
auto packet = TestReaderPacket::find(BSON("name" << "Alice"));
ReplayTestServer server{{"find"}, {fakeResponse}};
auto replayStartTime = std::chrono::steady_clock::now();
TestPackets packets;
// Simulate a find command occurring 1 second into the recording.
packets += {1s, TestReaderPacket::find(BSON("name" << "Alice"))};
std::string jsonStr = R"([{
"_id": "681cb423980b72695075137f",
"name": "Alice",
"age": 30,
"city": "New York"}])";
ReplayTestServer server{{"find"}, {jsonStr}};
// test simulator scoped in order to complete all the tasks.
{
TestSessionSimulator sessionSimulator;
TestSessionSimulator sessionSimulator{
packets, replayStartTime, server.getConnectionString()};
// connect to server with time
const auto uri = server.getConnectionString();
auto begin = stdx::chrono::steady_clock::now();
using namespace std::chrono_literals;
auto eventOffset =
Duration<std::milli>(1000); // A session started one second into the recording
// TODO SERVER-105627: First command will start session, and will call now() to
// delay until the correct time. SessionStart events will explicitly do this.
sessionSimulator.nowHook->ret(replayStartTime + 10s);
// Pretend the replay is actually *10* seconds into the replay.
// That means it is now "late" starting this session, so should not sleep.
sessionSimulator.nowHook->ret(begin + 10s);
// Initially report "now" as _later than_ the command should have run.
sessionSimulator.nowHook->ret(replayStartTime + 10s);
// Don't expect any call to sleepFor.
sessionSimulator.start(uri, begin, eventOffset);
eventOffset = Duration<std::milli>(2000);
packet.offset = eventOffset;
ReplayCommand command{packet};
// Replay is also "late" trying to replay this find, so should not sleep.
sessionSimulator.nowHook->ret(begin + 10s);
sessionSimulator.run(command, eventOffset);
sessionSimulator.run();
}
BSONObj response = fromjson(jsonStr);
ASSERT_TRUE(server.checkResponse("find", response));
}
} // namespace mongo

View File

@ -110,7 +110,7 @@ public:
// Currently files remain mapped for the lifetime of the LocalFileSet.
// This is required as replayThread dispatches ReplayCommand objects to other threads;
// this is not an owning type, and does not extend the life of the mmapped data.
// TODO SERVER-108930: Change this to weak ownership here, and have worker threads maintain an
// TODO SERVER-106702: Change this to weak ownership here, and have worker threads maintain an
// iterator,
// so files remain mapped while in use, but can be unmmapped when no longer referenced by
// any thread.