mirror of https://github.com/mongodb/mongo
SERVER-114346: Make validate command support a cluster time (#44470)
GitOrigin-RevId: 9a4f836e090034846592d304280fccd76efe9c71
This commit is contained in:
parent
99e7660882
commit
c66464891f
|
|
@ -1168,6 +1168,14 @@ class ReplicaSetFixture(interface.ReplFixture, interface._DockerComposeInterface
|
||||||
coll.insert_one({"a": 1})
|
coll.insert_one({"a": 1})
|
||||||
coll.drop()
|
coll.drop()
|
||||||
|
|
||||||
|
res = primary_client.admin.command({"replSetGetStatus": 1})
|
||||||
|
|
||||||
|
if "appliedOpTime" not in res["optimes"]:
|
||||||
|
# This can be null when the node is starting up.
|
||||||
|
return False
|
||||||
|
|
||||||
|
clusterTime = res["optimes"]["appliedOpTime"]["ts"]
|
||||||
|
|
||||||
self.logger.info("Performing Internode Validation")
|
self.logger.info("Performing Internode Validation")
|
||||||
|
|
||||||
# Collections we exclude from the hash comparisons. This is because these collections can contain different document contents for valid reasons (i.e. implicitly replicated, TTL indexes, updated by background threads, etc)
|
# Collections we exclude from the hash comparisons. This is because these collections can contain different document contents for valid reasons (i.e. implicitly replicated, TTL indexes, updated by background threads, etc)
|
||||||
|
|
@ -1224,7 +1232,11 @@ class ReplicaSetFixture(interface.ReplFixture, interface._DockerComposeInterface
|
||||||
if "expireAfterSeconds" in coll["options"]:
|
if "expireAfterSeconds" in coll["options"]:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
validate_cmd = {"validate": coll_name, "collHash": True}
|
validate_cmd = {
|
||||||
|
"validate": coll_name,
|
||||||
|
"collHash": True,
|
||||||
|
"atClusterTime": clusterTime,
|
||||||
|
}
|
||||||
ret = db.command(validate_cmd, check=False)
|
ret = db.command(validate_cmd, check=False)
|
||||||
if "all" in ret and "metadata" in ret:
|
if "all" in ret and "metadata" in ret:
|
||||||
something_set = True
|
something_set = True
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,141 @@
|
||||||
|
/**
|
||||||
|
* Verifying that setting atClusterTime on extended validate performs timestamped reads.
|
||||||
|
*
|
||||||
|
* @tags: [requires_wiredtiger, requires_persistence]
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||||
|
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
||||||
|
|
||||||
|
// Skip DB hash check in stopSet() since we expect it to fail in this test along with fastcount.
|
||||||
|
TestData.skipCheckDBHashes = true;
|
||||||
|
TestData.skipEnforceFastCountOnValidate = true;
|
||||||
|
|
||||||
|
const rst = new ReplSetTest({nodes: 3, settings: {chainingAllowed: false}});
|
||||||
|
rst.startSet();
|
||||||
|
rst.initiate();
|
||||||
|
let primary = rst.getPrimary();
|
||||||
|
let secondary = rst.getSecondary();
|
||||||
|
|
||||||
|
let db = primary.getDB("test");
|
||||||
|
|
||||||
|
assert(db.coll.drop());
|
||||||
|
assert.commandWorked(db.createCollection("coll"));
|
||||||
|
|
||||||
|
let res1 = assert.commandWorked(db.runCommand({insert: "coll", "documents": [{_id: 1}]}));
|
||||||
|
res1 = assert.commandWorked(db.runCommand({insert: "coll", "documents": [{_id: 2}]}));
|
||||||
|
|
||||||
|
// Save the opTime we want to validate at
|
||||||
|
let opTime = res1.operationTime;
|
||||||
|
|
||||||
|
// Make sure the writes make it to all nodes
|
||||||
|
rst.awaitLastOpCommitted();
|
||||||
|
|
||||||
|
jsTest.log.info("Validate with background: true and atClusterTime should fail");
|
||||||
|
assert.commandFailed(db.runCommand({validate: "coll", background: true, collHash: true, atClusterTime: opTime}));
|
||||||
|
|
||||||
|
jsTest.log.info("Validate with atClusterTime on an unreplicated collection should fail");
|
||||||
|
assert.commandFailed(
|
||||||
|
primary.getDB("local").runCommand({validate: "oplog.rs", background: true, collHash: true, atClusterTime: opTime}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const fp = configureFailPoint(secondary, "stopReplProducer");
|
||||||
|
|
||||||
|
// Insert a write on the primary that does not replicate to the secondary.
|
||||||
|
res1 = assert.commandWorked(db.runCommand({insert: "coll", "documents": [{_id: 3}]}));
|
||||||
|
|
||||||
|
// Validate with no readConcern should not be equal since extra document on the secondary
|
||||||
|
jsTest.log.info("Validate with no readConcern should not be equal since extra document on the secondary");
|
||||||
|
res1 = assert.commandWorked(db.runCommand({validate: "coll", background: false, collHash: true}));
|
||||||
|
let res2 = assert.commandWorked(
|
||||||
|
secondary.getDB("test").runCommand({validate: "coll", background: false, collHash: true}),
|
||||||
|
);
|
||||||
|
|
||||||
|
assert.eq(false, res1.all == res2.all, `res1: ${tojson(res1)}, res2: ${tojson(res2)}`);
|
||||||
|
|
||||||
|
// Validate with readConcern should be equal
|
||||||
|
jsTest.log.info("Validate with readConcern should be equal");
|
||||||
|
res1 = assert.commandWorked(
|
||||||
|
db.runCommand({validate: "coll", background: false, collHash: true, atClusterTime: opTime}),
|
||||||
|
);
|
||||||
|
res2 = assert.commandWorked(
|
||||||
|
secondary.getDB("test").runCommand({validate: "coll", background: false, collHash: true, atClusterTime: opTime}),
|
||||||
|
);
|
||||||
|
|
||||||
|
assert.eq(true, res1.all == res2.all, `res1: ${tojson(res1)}, res2: ${tojson(res2)}`);
|
||||||
|
|
||||||
|
let primaryPath = primary.dbpath;
|
||||||
|
let secondaryPath = secondary.dbpath;
|
||||||
|
|
||||||
|
rst.stopSet(null /* signal */, true /* forRestart */);
|
||||||
|
|
||||||
|
function runValidate(path, opts) {
|
||||||
|
MongoRunner.runMongod({
|
||||||
|
dbpath: path,
|
||||||
|
validate: "",
|
||||||
|
setParameter: {
|
||||||
|
validateDbName: "test",
|
||||||
|
validateCollectionName: "coll",
|
||||||
|
collectionValidateOptions: {options: opts},
|
||||||
|
},
|
||||||
|
noCleanData: true,
|
||||||
|
});
|
||||||
|
const validateResults = rawMongoProgramOutput("(9437301)")
|
||||||
|
.split("\n")
|
||||||
|
.filter((line) => line.trim() !== "")
|
||||||
|
.map((line) => JSON.parse(line.split("|").slice(1).join("|")));
|
||||||
|
assert.eq(validateResults.length, 1);
|
||||||
|
// jsTest.log.info(`Validate result \n${tojson(validateResults[0])}`);
|
||||||
|
clearRawMongoProgramOutput();
|
||||||
|
return validateResults[0].attr.results;
|
||||||
|
}
|
||||||
|
|
||||||
|
jsTest.log.info("Modal Validate Tests");
|
||||||
|
|
||||||
|
// Validate with no readConcern should not be equal due to extra document on the secondary
|
||||||
|
jsTest.log.info("Validate with no readConcern should not be equal due to extra document on the secondary");
|
||||||
|
res1 = runValidate(primaryPath, {collHash: true});
|
||||||
|
res2 = runValidate(secondaryPath, {collHash: true});
|
||||||
|
|
||||||
|
assert.eq(false, res1.all == res2.all, `res1: ${tojson(res1)}, res2: ${tojson(res2)}`);
|
||||||
|
|
||||||
|
// Validate drill down should be different due to extra document on the secondary
|
||||||
|
jsTest.log.info("Validate drill down should be different due to extra document on the secondary");
|
||||||
|
res1 = runValidate(primaryPath, {collHash: true, hashPrefixes: []});
|
||||||
|
res2 = runValidate(secondaryPath, {collHash: true, hashPrefixes: []});
|
||||||
|
|
||||||
|
assert.eq(true, res1.all == res2.all, `res1: ${tojson(res1)}, res2: ${tojson(res2)}`);
|
||||||
|
|
||||||
|
let keys1 = Object.keys(res1.partial);
|
||||||
|
let keys2 = Object.keys(res2.partial);
|
||||||
|
|
||||||
|
let inconsistency = keys1.length === keys2.length;
|
||||||
|
for (const key of keys1) {
|
||||||
|
inconsistency = inconsistency || !keys2.includes(key) || res2.partial[key].hash !== res1.partial[key].hash;
|
||||||
|
}
|
||||||
|
assert(inconsistency, `res1: ${tojson(res1)}, res2: ${tojson(res2)}`);
|
||||||
|
|
||||||
|
// Validate should be equal at cluster time
|
||||||
|
jsTest.log.info("Validate should be equal at cluster time");
|
||||||
|
res1 = runValidate(primaryPath, {collHash: true, atClusterTime: opTime});
|
||||||
|
res2 = runValidate(secondaryPath, {collHash: true, atClusterTime: opTime});
|
||||||
|
|
||||||
|
assert.eq(true, res1.all == res2.all, `res1: ${tojson(res1)}, res2: ${tojson(res2)}`);
|
||||||
|
|
||||||
|
// Validate drill down should be equal at cluster time
|
||||||
|
jsTest.log.info("Validate drill down should be equal at cluster time");
|
||||||
|
res1 = runValidate(primaryPath, {collHash: true, hashPrefixes: [], atClusterTime: opTime});
|
||||||
|
res2 = runValidate(secondaryPath, {collHash: true, hashPrefixes: [], atClusterTime: opTime});
|
||||||
|
|
||||||
|
assert.eq(true, res1.all == res2.all, `res1: ${tojson(res1)}, res2: ${tojson(res2)}`);
|
||||||
|
|
||||||
|
keys1 = Object.keys(res1.partial);
|
||||||
|
keys2 = Object.keys(res2.partial);
|
||||||
|
|
||||||
|
assert.eq(keys1.length, keys2.length, `res1: ${tojson(res1)}, res2: ${tojson(res2)}`);
|
||||||
|
for (const key of keys1) {
|
||||||
|
assert(
|
||||||
|
keys2.includes(key) && res2.partial[key].hash === res1.partial[key].hash,
|
||||||
|
`res1: ${tojson(res1)}, res2: ${tojson(res2)}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
@ -740,6 +740,32 @@ ValidationOptions parseValidateOptions(OperationContext* opCtx,
|
||||||
CollectionValidation::validateHashes(*revealHashedIds, /*equalLength=*/false);
|
CollectionValidation::validateHashes(*revealHashedIds, /*equalLength=*/false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boost::optional<Timestamp> timestamp = boost::none;
|
||||||
|
if (cmdObj["atClusterTime"]) {
|
||||||
|
if (background) {
|
||||||
|
uasserted(ErrorCodes::InvalidOptions,
|
||||||
|
str::stream() << "Running the validate command with { background: true } "
|
||||||
|
"cannot be done with {atClusterTime: ...} because "
|
||||||
|
"background already sets a read timestamp.");
|
||||||
|
}
|
||||||
|
if (!nss.isReplicated()) {
|
||||||
|
uasserted(ErrorCodes::CommandNotSupported,
|
||||||
|
str::stream() << "Running the validate command with { atClusterTime: ... } "
|
||||||
|
<< "is not supported on unreplicated collections");
|
||||||
|
}
|
||||||
|
timestamp = cmdObj["atClusterTime"].timestamp();
|
||||||
|
}
|
||||||
|
if (background) {
|
||||||
|
// Background validation reads data from the last stable checkpoint.
|
||||||
|
timestamp =
|
||||||
|
opCtx->getServiceContext()->getStorageEngine()->getLastStableRecoveryTimestamp();
|
||||||
|
if (!timestamp) {
|
||||||
|
uasserted(ErrorCodes::NamespaceNotFound,
|
||||||
|
fmt::format("Cannot run background validation on collection because there "
|
||||||
|
"is no checkpoint yet"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const auto validateMode = [&] {
|
const auto validateMode = [&] {
|
||||||
if (metadata) {
|
if (metadata) {
|
||||||
return CollectionValidation::ValidateMode::kMetadata;
|
return CollectionValidation::ValidateMode::kMetadata;
|
||||||
|
|
@ -808,6 +834,7 @@ ValidationOptions parseValidateOptions(OperationContext* opCtx,
|
||||||
getTestCommandsEnabled() ? (ValidationVersion)bsonTestValidationVersion
|
getTestCommandsEnabled() ? (ValidationVersion)bsonTestValidationVersion
|
||||||
: currentValidationVersion,
|
: currentValidationVersion,
|
||||||
getConfigOverrideOrThrow(rawConfigOverride),
|
getConfigOverrideOrThrow(rawConfigOverride),
|
||||||
|
timestamp,
|
||||||
hashPrefixes,
|
hashPrefixes,
|
||||||
revealHashedIds};
|
revealHashedIds};
|
||||||
}
|
}
|
||||||
|
|
@ -879,7 +906,7 @@ Status validate(OperationContext* opCtx,
|
||||||
|
|
||||||
results->setNamespaceString(validateState.nss());
|
results->setNamespaceString(validateState.nss());
|
||||||
results->setUUID(validateState.uuid());
|
results->setUUID(validateState.uuid());
|
||||||
results->setReadTimestamp(validateState.getValidateTimestamp());
|
results->setReadTimestamp(validateState.getReadTimestamp());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
invariant(!validateState.isFullIndexValidation() ||
|
invariant(!validateState.isFullIndexValidation() ||
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ ValidationOptions::ValidationOptions(ValidateMode validateMode,
|
||||||
bool logDiagnostics,
|
bool logDiagnostics,
|
||||||
ValidationVersion validationVersion,
|
ValidationVersion validationVersion,
|
||||||
boost::optional<std::string> verifyConfigurationOverride,
|
boost::optional<std::string> verifyConfigurationOverride,
|
||||||
|
boost::optional<Timestamp> readTimestamp,
|
||||||
boost::optional<std::vector<std::string>> hashPrefixes,
|
boost::optional<std::vector<std::string>> hashPrefixes,
|
||||||
boost::optional<std::vector<std::string>> revealHashedIds)
|
boost::optional<std::vector<std::string>> revealHashedIds)
|
||||||
: _validateMode(validateMode),
|
: _validateMode(validateMode),
|
||||||
|
|
@ -43,6 +44,7 @@ ValidationOptions::ValidationOptions(ValidateMode validateMode,
|
||||||
_logDiagnostics(logDiagnostics),
|
_logDiagnostics(logDiagnostics),
|
||||||
_validationVersion(validationVersion),
|
_validationVersion(validationVersion),
|
||||||
_verifyConfigurationOverride(std::move(verifyConfigurationOverride)),
|
_verifyConfigurationOverride(std::move(verifyConfigurationOverride)),
|
||||||
|
_readTimestamp(readTimestamp),
|
||||||
_hashPrefixes(std::move(hashPrefixes)),
|
_hashPrefixes(std::move(hashPrefixes)),
|
||||||
_revealHashedIds(std::move(revealHashedIds)) {}
|
_revealHashedIds(std::move(revealHashedIds)) {}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -108,6 +108,7 @@ public:
|
||||||
bool logDiagnostics,
|
bool logDiagnostics,
|
||||||
ValidationVersion validationVersion = currentValidationVersion,
|
ValidationVersion validationVersion = currentValidationVersion,
|
||||||
boost::optional<std::string> verifyConfigurationOverride = boost::none,
|
boost::optional<std::string> verifyConfigurationOverride = boost::none,
|
||||||
|
boost::optional<Timestamp> readTimestamp = boost::none,
|
||||||
boost::optional<std::vector<std::string>> hashPrefixes = boost::none,
|
boost::optional<std::vector<std::string>> hashPrefixes = boost::none,
|
||||||
boost::optional<std::vector<std::string>> revealHashedIds = boost::none);
|
boost::optional<std::vector<std::string>> revealHashedIds = boost::none);
|
||||||
|
|
||||||
|
|
@ -161,6 +162,10 @@ public:
|
||||||
return _validateMode == ValidateMode::kForegroundFullEnforceFastCount;
|
return _validateMode == ValidateMode::kForegroundFullEnforceFastCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const boost::optional<Timestamp>& getReadTimestamp() const {
|
||||||
|
return _readTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
const boost::optional<std::vector<std::string>>& getHashPrefixes() const {
|
const boost::optional<std::vector<std::string>>& getHashPrefixes() const {
|
||||||
return _hashPrefixes;
|
return _hashPrefixes;
|
||||||
}
|
}
|
||||||
|
|
@ -208,6 +213,8 @@ private:
|
||||||
|
|
||||||
const boost::optional<std::string> _verifyConfigurationOverride;
|
const boost::optional<std::string> _verifyConfigurationOverride;
|
||||||
|
|
||||||
|
const boost::optional<Timestamp> _readTimestamp;
|
||||||
|
|
||||||
const boost::optional<std::vector<std::string>> _hashPrefixes;
|
const boost::optional<std::vector<std::string>> _hashPrefixes;
|
||||||
|
|
||||||
const boost::optional<std::vector<std::string>> _revealHashedIds;
|
const boost::optional<std::vector<std::string>> _revealHashedIds;
|
||||||
|
|
|
||||||
|
|
@ -174,19 +174,9 @@ void ValidateState::yieldCursors(OperationContext* opCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status ValidateState::initializeCollection(OperationContext* opCtx) {
|
Status ValidateState::initializeCollection(OperationContext* opCtx) {
|
||||||
if (isBackground()) {
|
if (getReadTimestamp()) {
|
||||||
// Background validation reads data from the last stable checkpoint.
|
|
||||||
_validateTs =
|
|
||||||
opCtx->getServiceContext()->getStorageEngine()->getLastStableRecoveryTimestamp();
|
|
||||||
if (!_validateTs) {
|
|
||||||
return Status(
|
|
||||||
ErrorCodes::NamespaceNotFound,
|
|
||||||
fmt::format("Cannot run background validation on collection {} because there "
|
|
||||||
"is no checkpoint yet",
|
|
||||||
_nss.toStringForErrorMsg()));
|
|
||||||
}
|
|
||||||
shard_role_details::getRecoveryUnit(opCtx)->setTimestampReadSource(
|
shard_role_details::getRecoveryUnit(opCtx)->setTimestampReadSource(
|
||||||
RecoveryUnit::ReadSource::kProvided, *_validateTs);
|
RecoveryUnit::ReadSource::kProvided, *getReadTimestamp());
|
||||||
|
|
||||||
invariant(!shard_role_details::getRecoveryUnit(opCtx)->isActive());
|
invariant(!shard_role_details::getRecoveryUnit(opCtx)->isActive());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -148,10 +148,6 @@ public:
|
||||||
*/
|
*/
|
||||||
void initializeCursors(OperationContext* opCtx);
|
void initializeCursors(OperationContext* opCtx);
|
||||||
|
|
||||||
boost::optional<Timestamp> getValidateTimestamp() {
|
|
||||||
return _validateTs;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ValidateState() = delete;
|
ValidateState() = delete;
|
||||||
|
|
||||||
|
|
@ -182,8 +178,6 @@ private:
|
||||||
RecordId _firstRecordId;
|
RecordId _firstRecordId;
|
||||||
|
|
||||||
DataThrottle _dataThrottle;
|
DataThrottle _dataThrottle;
|
||||||
|
|
||||||
boost::optional<Timestamp> _validateTs = boost::none;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace CollectionValidation
|
} // namespace CollectionValidation
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue