mirror of https://github.com/mongodb/mongo
SERVER-100516: Allow repeated start recording calls with same ID (#43192)
GitOrigin-RevId: c94bb50d3762337f767e282002e453d011d2f7c4
This commit is contained in:
parent
fc166220bb
commit
443e034134
|
|
@ -238,6 +238,52 @@ function runTest(client, restartCommand) {
|
|||
return new Date(now.getTime() + milliseconds);
|
||||
};
|
||||
|
||||
// Validate start response when providing recording ID.
|
||||
{
|
||||
let res = assert.commandWorked(
|
||||
db.runCommand({"startTrafficRecording": 1, "destination": "recordings", "recordingID": "foobar"}),
|
||||
);
|
||||
assert(res.created);
|
||||
assert.eq(res.status, "running");
|
||||
}
|
||||
|
||||
{
|
||||
// Start with the same ID is allowed, but doesn't create a new recording.
|
||||
let res = assert.commandWorked(
|
||||
db.runCommand({"startTrafficRecording": 1, "destination": "recordings", "recordingID": "foobar"}),
|
||||
);
|
||||
assert(!res.created);
|
||||
assert.eq(res.status, "running");
|
||||
}
|
||||
|
||||
assert.commandWorked(db.runCommand({"stopTrafficRecording": 1}));
|
||||
|
||||
// Validate start response when providing recording ID and scheduling for the future.
|
||||
{
|
||||
let res = assert.commandWorked(
|
||||
db.runCommand({
|
||||
"startTrafficRecording": 1,
|
||||
"destination": "recordings",
|
||||
"recordingID": "foobar",
|
||||
startTime: nowPlusDays(0.5),
|
||||
endTime: nowPlusDays(2),
|
||||
}),
|
||||
);
|
||||
assert(res.created);
|
||||
assert.eq(res.status, "scheduled");
|
||||
}
|
||||
|
||||
{
|
||||
// Start with the same ID is allowed, but doesn't create a new recording.
|
||||
let res = assert.commandWorked(
|
||||
db.runCommand({"startTrafficRecording": 1, "destination": "recordings", "recordingID": "foobar"}),
|
||||
);
|
||||
assert(!res.created);
|
||||
assert.eq(res.status, "scheduled");
|
||||
}
|
||||
|
||||
assert.commandWorked(db.runCommand({"stopTrafficRecording": 1}));
|
||||
|
||||
// No start or end time - allowed.
|
||||
assert.commandWorked(db.runCommand({"startTrafficRecording": 1, "destination": "recordings"}));
|
||||
assert.commandWorked(db.runCommand({"stopTrafficRecording": 1}));
|
||||
|
|
|
|||
|
|
@ -63,13 +63,14 @@ public:
|
|||
public:
|
||||
using InvocationBase::InvocationBase;
|
||||
|
||||
void typedRun(OperationContext* opCtx) {
|
||||
TrafficRecorder::get(opCtx->getServiceContext())
|
||||
StartReply typedRun(OperationContext* opCtx) {
|
||||
auto recordingID = TrafficRecorder::get(opCtx->getServiceContext())
|
||||
.start(request(), opCtx->getServiceContext());
|
||||
LOGV2(20506,
|
||||
"** Warning: The recording file contains unencrypted user traffic. We recommend "
|
||||
"that you limit retention of this file and store it on an encrypted filesystem "
|
||||
"volume.");
|
||||
return StartReply(recordingID);
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@
|
|||
#include "mongo/util/producer_consumer_queue.h"
|
||||
#include "mongo/util/tick_source.h"
|
||||
#include "mongo/util/time_support.h"
|
||||
#include "mongo/util/uuid.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
|
|
@ -79,6 +80,7 @@
|
|||
#include <absl/crc/crc32c.h>
|
||||
#include <boost/filesystem/fstream.hpp>
|
||||
#include <boost/filesystem/path.hpp>
|
||||
#include <boost/none.hpp>
|
||||
#include <fmt/ostream.h>
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kRecorder
|
||||
|
|
@ -102,6 +104,10 @@ void appendPacketHeader(DataBuilder& db, const TrafficRecordingPacket& packet) {
|
|||
db.getCursor().write<LittleEndian<uint32_t>>(fullSize);
|
||||
}
|
||||
|
||||
TrafficRecorder::RecordingID generateID() {
|
||||
return UUID::gen().toString();
|
||||
}
|
||||
|
||||
TrafficRecorder::Recording::Recording(const StartTrafficRecording& options,
|
||||
std::filesystem::path globalRecordingDirectory,
|
||||
TickSource* tickSource)
|
||||
|
|
@ -109,6 +115,12 @@ TrafficRecorder::Recording::Recording(const StartTrafficRecording& options,
|
|||
_maxLogSize(options.getMaxFileSize()),
|
||||
_tickSource(tickSource) {
|
||||
|
||||
if (auto id = options.getRecordingID()) {
|
||||
_id = RecordingID(*id);
|
||||
} else {
|
||||
_id = generateID();
|
||||
}
|
||||
|
||||
// Scheduled times can be provided to startTrafficRecording.
|
||||
auto start = options.getStartTime();
|
||||
auto end = options.getEndTime();
|
||||
|
|
@ -323,7 +335,7 @@ std::shared_ptr<TrafficRecorder::Recording> TrafficRecorder::_makeRecording(
|
|||
return std::make_shared<Recording>(options, globalRecordingDirectory, tickSource);
|
||||
}
|
||||
|
||||
void TrafficRecorder::start(const StartTrafficRecording& options, ServiceContext* svcCtx) {
|
||||
StartReply TrafficRecorder::start(const StartTrafficRecording& options, ServiceContext* svcCtx) {
|
||||
uassert(ErrorCodes::BadValue,
|
||||
"startTime and endTime should both be provided, or neither",
|
||||
options.getStartTime().has_value() == options.getEndTime().has_value());
|
||||
|
|
@ -344,11 +356,19 @@ void TrafficRecorder::start(const StartTrafficRecording& options, ServiceContext
|
|||
|
||||
// May throw if not in a valid state to create a new recording i.e., if a pending or running
|
||||
// recording already exists.
|
||||
auto rec = _prepare(options, svcCtx);
|
||||
auto [rec, created] = _prepare(options, svcCtx);
|
||||
const auto recID = (*rec)->getID();
|
||||
if (!created) {
|
||||
// This request did not need to create a new recording - it is a repeated call with the same
|
||||
// recording ID. No further action needs to be taken.
|
||||
auto status =
|
||||
(*rec)->isStarted() ? RecordingStateEnum::Running : RecordingStateEnum::Scheduled;
|
||||
return {recID, false, status};
|
||||
}
|
||||
if (!options.getStartTime().has_value()) {
|
||||
// Start immediately.
|
||||
_start(std::move(rec), svcCtx);
|
||||
return;
|
||||
return {recID, true, RecordingStateEnum::Running};
|
||||
}
|
||||
|
||||
auto start = *options.getStartTime();
|
||||
|
|
@ -404,6 +424,7 @@ void TrafficRecorder::start(const StartTrafficRecording& options, ServiceContext
|
|||
}
|
||||
LOGV2_INFO(10849406, "Recording Stopped");
|
||||
});
|
||||
return {recID, true, RecordingStateEnum::Scheduled};
|
||||
}
|
||||
|
||||
void TrafficRecorder::stop(ServiceContext* svcCtx) {
|
||||
|
|
@ -479,7 +500,7 @@ void TrafficRecorder::_observe(Recording& recording,
|
|||
}
|
||||
|
||||
|
||||
TrafficRecorder::LockedRecordingHandle TrafficRecorder::_prepare(
|
||||
std::pair<TrafficRecorder::LockedRecordingHandle, bool> TrafficRecorder::_prepare(
|
||||
const StartTrafficRecording& options, ServiceContext* svcCtx) {
|
||||
auto globalRecordingDirectory = gTrafficRecordingDirectory;
|
||||
uassert(ErrorCodes::BadValue,
|
||||
|
|
@ -487,9 +508,28 @@ TrafficRecorder::LockedRecordingHandle TrafficRecorder::_prepare(
|
|||
!globalRecordingDirectory.empty());
|
||||
|
||||
auto rec = _recording.synchronize();
|
||||
uassert(ErrorCodes::BadValue, "Traffic recording already active", !*rec);
|
||||
auto recPtr = *rec;
|
||||
auto requestedID = options.getRecordingID();
|
||||
|
||||
if (recPtr) {
|
||||
// A recording already exists.
|
||||
if (requestedID.has_value()) {
|
||||
// A recording exists, and the request provided a recording ID.
|
||||
// If the current recording has the same ID, the start request should complete
|
||||
// successfully without taking further action, else it should fail.
|
||||
uassert(ErrorCodes::BadValue,
|
||||
"Traffic recording already active with a different recording ID",
|
||||
*requestedID == recPtr->getID());
|
||||
return {std::move(rec), false};
|
||||
} else {
|
||||
// If no recording ID was specified, any existing recording means the current start
|
||||
// command should fail.
|
||||
uasserted(ErrorCodes::BadValue, "Traffic recording already active");
|
||||
}
|
||||
}
|
||||
|
||||
*rec = _makeRecording(options, globalRecordingDirectory, svcCtx->getTickSource());
|
||||
return rec;
|
||||
return {std::move(rec), true};
|
||||
}
|
||||
|
||||
void TrafficRecorder::_start(LockedRecordingHandle rec, ServiceContext* svcCtx) {
|
||||
|
|
@ -498,7 +538,7 @@ void TrafficRecorder::_start(LockedRecordingHandle rec, ServiceContext* svcCtx)
|
|||
uassert(ErrorCodes::BadValue, "Non-existent traffic recording cannot be started", recording);
|
||||
uassert(ErrorCodes::BadValue,
|
||||
"Traffic recording instance cannot be started repeatedly",
|
||||
!recording->started());
|
||||
!recording->isStarted());
|
||||
|
||||
recording->start();
|
||||
_shouldRecord.store(true);
|
||||
|
|
|
|||
|
|
@ -84,6 +84,7 @@ void appendPacketHeader(DataBuilder& builder, const TrafficRecordingPacket& pack
|
|||
*/
|
||||
class TrafficRecorder {
|
||||
public:
|
||||
using RecordingID = std::string;
|
||||
// The Recorder may record some special events that are required by the replay client.
|
||||
|
||||
static TrafficRecorder& get(ServiceContext* svc);
|
||||
|
|
@ -92,7 +93,7 @@ public:
|
|||
|
||||
// Start and stop block until the associate operation has succeeded or failed
|
||||
// On failure these methods throw
|
||||
void start(const StartTrafficRecording& options, ServiceContext* svcCtx);
|
||||
StartReply start(const StartTrafficRecording& options, ServiceContext* svcCtx);
|
||||
void stop(ServiceContext* svcCtx);
|
||||
|
||||
void sessionStarted(const transport::Session& ts);
|
||||
|
|
@ -124,10 +125,14 @@ protected:
|
|||
virtual void start();
|
||||
virtual Status shutdown();
|
||||
|
||||
bool started() const {
|
||||
bool isStarted() const {
|
||||
return _started.loadRelaxed();
|
||||
}
|
||||
|
||||
const RecordingID& getID() {
|
||||
return _id;
|
||||
}
|
||||
|
||||
/**
|
||||
* pushRecord returns false if the queue was full. This is ultimately fatal to the
|
||||
* recording
|
||||
|
|
@ -181,14 +186,16 @@ protected:
|
|||
TrafficRecorderStats _trafficStats;
|
||||
int64_t _written = 0;
|
||||
Status _result = Status::OK();
|
||||
|
||||
RecordingID _id;
|
||||
};
|
||||
|
||||
using LockedRecordingHandle =
|
||||
decltype(std::declval<mongo::synchronized_value<std::shared_ptr<Recording>>>()
|
||||
.synchronize());
|
||||
|
||||
[[nodiscard]] LockedRecordingHandle _prepare(const StartTrafficRecording& options,
|
||||
ServiceContext* svcCtx);
|
||||
[[nodiscard]] std::pair<TrafficRecorder::LockedRecordingHandle, bool> _prepare(
|
||||
const StartTrafficRecording& options, ServiceContext* svcCtx);
|
||||
void _start(LockedRecordingHandle handle, ServiceContext* svcCtx);
|
||||
void _stop(LockedRecordingHandle handle, ServiceContext* svcCtx);
|
||||
void _fail();
|
||||
|
|
|
|||
|
|
@ -52,6 +52,16 @@ types:
|
|||
serializer: "mongo::memory_util::MemorySize::serializeToBSON"
|
||||
is_view: false
|
||||
|
||||
enums:
|
||||
RecordingState:
|
||||
description: State of a Recording instance
|
||||
type: string
|
||||
values:
|
||||
Scheduled: "scheduled"
|
||||
Running: "running"
|
||||
Failed: "failed"
|
||||
Finished: "finished"
|
||||
|
||||
structs:
|
||||
TrafficRecorderStats:
|
||||
description: "A struct representing the trafficRecording server status section"
|
||||
|
|
@ -69,6 +79,20 @@ structs:
|
|||
type: long
|
||||
currentFileSize:
|
||||
type: long
|
||||
StartReply:
|
||||
description: "Response for startTrafficRecording"
|
||||
strict: false
|
||||
is_command_reply: true
|
||||
fields:
|
||||
recordingID:
|
||||
description: "Unique identifier for the resulting recording"
|
||||
type: string
|
||||
created:
|
||||
description: "If a new recording was initialized by this request"
|
||||
type: bool
|
||||
status:
|
||||
description: "The current state of the recording"
|
||||
type: RecordingState
|
||||
|
||||
commands:
|
||||
startTrafficRecording:
|
||||
|
|
@ -100,6 +124,10 @@ commands:
|
|||
description: "Future time to halt the recording"
|
||||
optional: true
|
||||
type: date
|
||||
recordingID:
|
||||
description: "Recording ID; a call with the same ID as the current running recording will succeed with no changes, as the specified recording is already active"
|
||||
optional: true
|
||||
type: string
|
||||
|
||||
stopTrafficRecording:
|
||||
description: "stop recording Command"
|
||||
|
|
|
|||
Loading…
Reference in New Issue