SERVER-84723 Ensure sharding catalog is valid when acquiring collections in multi-document transactions (#18193)

This commit is contained in:
Jordi Serra Torrens 2024-02-21 21:34:57 +00:00 committed by Evergreen Agent
parent d5504a260e
commit 864ace1cd2
10 changed files with 384 additions and 19 deletions

View File

@ -392,7 +392,7 @@ last-continuous:
- test_file: jstests/sharding/query/sharded_lookup_execution.js
ticket: SERVER-77427
- test_file: jstests/sharding/multi_collection_transaction_placement_conflict_workaround.js
ticket: SERVER-82353
ticket: SERVER-84723
- test_file: jstests/sharding/fsync_lock_unlock.js
ticket: SERVER-73685
- test_file: jstests/sharding/fsync_lock_ddl_lock.js
@ -1001,7 +1001,7 @@ last-lts:
- test_file: jstests/sharding/query/sharded_lookup_execution.js
ticket: SERVER-77427
- test_file: jstests/sharding/multi_collection_transaction_placement_conflict_workaround.js
ticket: SERVER-82353
ticket: SERVER-84723
- test_file: jstests/sharding/fsync_lock_unlock.js
ticket: SERVER-73685
- test_file: jstests/sharding/fsync_lock_ddl_lock.js

View File

@ -15,6 +15,7 @@ import {
createIndexAndCRUDInTxn,
indexSpecs
} from "jstests/libs/create_index_txn_helpers.js";
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
let doParallelCreateIndexesTest = function(explicitCollectionCreate, multikeyIndex) {
const dbName = 'test_txns_create_indexes_parallel';
@ -95,14 +96,25 @@ let doParallelCreateIndexesTest = function(explicitCollectionCreate, multikeyInd
assert.eq(secondSessionColl.find({}).itcount(), 1);
assert.eq(secondSessionColl.getIndexes().length, 2);
// createIndexes cannot observe the index created in the other transaction so the command will
// succeed and we will instead throw WCE when trying to commit the transaction.
retryOnceOnTransientAndRestartTxnOnMongos(session, () => {
assert.commandWorked(
sessionColl.runCommand({createIndexes: collName, indexes: [conflictingIndexSpecs]}));
}, {writeConcern: {w: "majority"}});
if (FixtureHelpers.isMongos(db) || TestData.testingReplicaSetEndpoint) {
// createIndexes takes minimum visible snapshots of new collections into consideration when
// checking for existing indexes.
assert.commandFailedWithCode(
sessionColl.runCommand({createIndexes: collName, indexes: [conflictingIndexSpecs]}),
ErrorCodes.SnapshotUnavailable);
assert.commandFailedWithCode(session.abortTransaction_forTesting(),
ErrorCodes.NoSuchTransaction);
} else {
// createIndexes cannot observe the index created in the other transaction so the command
// will succeed and we will instead throw WCE when trying to commit the transaction.
retryOnceOnTransientAndRestartTxnOnMongos(session, () => {
assert.commandWorked(sessionColl.runCommand(
{createIndexes: collName, indexes: [conflictingIndexSpecs]}));
}, {writeConcern: {w: "majority"}});
assert.commandFailedWithCode(session.commitTransaction_forTesting(), ErrorCodes.WriteConflict);
assert.commandFailedWithCode(session.commitTransaction_forTesting(),
ErrorCodes.WriteConflict);
}
assert.eq(sessionColl.find({}).itcount(), 1);
assert.eq(sessionColl.getIndexes().length, 2);

View File

@ -1,7 +1,7 @@
/*
* Tests that multi-document transactions fail with MigrationConflict (and TransientTransactionError
* label) if a collection or database placement changes have occurred later than the transaction
* data snapshot timestamp.
* Tests that multi-document transactions fail with MigrationConflict/SnapshotUnavailable (and
* TransientTransactionError label) if a collection or database placement changes have occurred
* later than the transaction data snapshot timestamp.
*/
const st = new ShardingTest({mongos: 1, shards: 2});
@ -101,4 +101,149 @@ const st = new ShardingTest({mongos: 1, shards: 2});
runTest('snapshot');
}
// Tests transactions with concurrent DDL operations.
{
const dbName = 'test';
const collName1 = 'foo';
const collName2 = 'bar';
const collName3 = 'foo2';
const ns1 = dbName + '.' + collName1;
let coll1 = st.s.getDB(dbName)[collName1];
let coll2 = st.s.getDB(dbName)[collName2];
let coll3 = st.s.getDB(dbName)[collName3];
const readConcerns = ['local', 'snapshot'];
const commands = ['find', 'aggregate', 'update'];
// Test transaction involving sharded collection with concurrent rename, where the transaction
// attempts to read the renamed-to collection.
{
function runTest(readConcernLevel, command) {
jsTest.log("Running transaction + rename test with read concern " + readConcernLevel +
" and command " + command);
// 1. Initial state:
// ns1: sharded collection with chunks both on shard0 and shard1, with documents: {x:
// -1}, {x: 1}, one doc on each shard. ns2: unsharded collection on shard0, with
// documents: {a: 0}. ns3: does not exist.
// 2. Start txn, hit shard0 for ns2 [shard0's snapshot has: ns1 and ns2]
// 3. Rename ns1 -> ns3
// 4. Target ns3. On shard0, ns3 does not exist on the txn snapshot. On shard1 it will.
// Transaction should conflict, otherwise the txn would see half the collection.
// Setup initial state:
st.getDB(dbName).dropDatabase();
st.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName});
st.adminCommand({shardCollection: ns1, key: {x: 1}});
assert.commandWorked(st.splitAt(ns1, {x: 0}));
assert.commandWorked(st.moveChunk(ns1, {x: -1}, st.shard0.shardName));
assert.commandWorked(st.moveChunk(ns1, {x: 0}, st.shard1.shardName));
assert.commandWorked(coll1.insert({x: -1}));
assert.commandWorked(coll1.insert({x: 1}));
assert.commandWorked(coll2.insert({a: 1}));
// Start a multi-document transaction and make one read on shard0
const session = st.s.startSession();
const sessionDB = session.getDatabase(dbName);
const sessionColl2 = sessionDB.getCollection(collName2);
const sessionColl3 = sessionDB.getCollection(collName3);
session.startTransaction({readConcern: {level: readConcernLevel}});
assert.eq(1, sessionColl2.find().itcount()); // Targets shard0.
// While the transaction is still open, rename coll1 to coll3.
assert.commandWorked(coll1.renameCollection(collName3));
// Refresh the router so that it doesn't send a stale SV to the shard, which would cause
// the txn to be aborted.
assert.eq(2, coll3.find().itcount());
// Now read coll3 within the transaction and expect to get a conflict.
let err = assert.throwsWithCode(() => {
if (command === 'find') {
sessionColl3.find().itcount();
} else if (command === 'aggregate') {
sessionColl3.aggregate().itcount();
} else if (command === 'update') {
assert.commandWorked(sessionColl3.update({x: 1}, {$set: {c: 1}}));
}
}, [ErrorCodes.WriteConflict, ErrorCodes.SnapshotUnavailable]);
assert.contains("TransientTransactionError", err.errorLabels, tojson(err));
}
readConcerns.forEach((readConcern) => commands.forEach((command) => {
runTest(readConcern, command);
}));
}
// Test transaction involving sharded collection with concurrent drop, where the transaction
// attempts to read the dropped collection.
{
function runTest(readConcernLevel, command) {
// Initial state:
// shard0 (dbPrimary): collA(sharded) and collB(unsharded)
// shard1: collA(sharded)
//
// 1. Start txn, hit shard0 for collB
// 2. Drop collA
// 3. Read collA. Will target only shard0 because the router believes it is no longer
// sharded, so it would read the sharded coll (but just half of it). Therefore, a
// conflict must be raised.
jsTest.log("Running transaction + drop test with read concern " + readConcernLevel +
" and command " + command);
assert(command === 'find' || command === 'aggregate' || command === 'update');
// Setup initial state:
assert.commandWorked(st.s.getDB(dbName).dropDatabase());
st.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName});
st.adminCommand({shardCollection: ns1, key: {x: 1}});
assert.commandWorked(st.splitAt(ns1, {x: 0}));
assert.commandWorked(st.moveChunk(ns1, {x: -1}, st.shard0.shardName));
assert.commandWorked(st.moveChunk(ns1, {x: 0}, st.shard1.shardName));
assert.commandWorked(coll1.insert({x: -1}));
assert.commandWorked(coll1.insert({x: 1}));
assert.commandWorked(coll2.insert({a: 1}));
// Start a multi-document transaction and make one read on shard0 for ns2/
const session = st.s.startSession();
const sessionDB = session.getDatabase(dbName);
const sessionColl1 = sessionDB.getCollection(collName1);
const sessionColl2 = sessionDB.getCollection(collName2);
session.startTransaction({readConcern: {level: readConcernLevel}});
assert.eq(1, sessionColl2.find().itcount()); // Targets shard0.
// While the transaction is still open, drop coll1.
assert(coll1.drop());
// Refresh the router so that it doesn't send a stale SV to the shard, which would cause
// the txn to be aborted.
assert.eq(0, coll1.find().itcount());
// Now read coll1 within the transaction and expect to get a conflict.
let isWriteCommand = command === 'update';
let err = assert.throwsWithCode(() => {
if (command === 'find') {
sessionColl1.find().itcount();
} else if (command === 'aggregate') {
sessionColl1.aggregate().itcount();
} else if (command === 'update') {
assert.commandWorked(sessionColl1.update({x: 1}, {$set: {c: 1}}));
}
}, isWriteCommand ? ErrorCodes.WriteConflict : ErrorCodes.SnapshotUnavailable);
assert.contains("TransientTransactionError", err.errorLabels, tojson(err));
}
readConcerns.forEach((readConcern) => commands.forEach((command) => {
runTest(readConcern, command);
}));
}
}
st.stop();

View File

@ -351,6 +351,9 @@ AutoGetCollection::AutoGetCollection(OperationContext* opCtx,
verifyWriteEligible);
}
const auto receivedShardVersion{
OperationShardingState::get(opCtx).getShardVersion(_resolvedNss)};
if (_coll) {
// Fetch and store the sharding collection description data needed for use during the
// operation. The shardVersion will be checked later if the shard filtering metadata is
@ -369,11 +372,18 @@ AutoGetCollection::AutoGetCollection(OperationContext* opCtx,
checkCollectionUUIDMismatch(opCtx, *catalog, _resolvedNss, _coll, options._expectedUUID);
if (receivedShardVersion && *receivedShardVersion == ShardVersion::UNSHARDED()) {
shard_role_details::checkLocalCatalogIsValidForUnshardedShardVersion(
opCtx, *catalog, _coll, _resolvedNss);
}
return;
}
const auto receivedShardVersion{
OperationShardingState::get(opCtx).getShardVersion(_resolvedNss)};
if (receivedShardVersion && *receivedShardVersion == ShardVersion::UNSHARDED()) {
shard_role_details::checkLocalCatalogIsValidForUnshardedShardVersion(
opCtx, *catalog, _coll, _resolvedNss);
}
if (!options._expectedUUID) {
// We only need to look up a view if an expected collection UUID was not provided. If this

View File

@ -411,6 +411,18 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
}
}
const auto receivedShardVersion{
OperationShardingState::get(opCtx).getShardVersion(_resolvedNss)};
if (receivedShardVersion) {
auto scopedCss = CollectionShardingState::acquire(opCtx, _resolvedNss);
scopedCss->checkShardVersionOrThrow(opCtx);
if (receivedShardVersion == ShardVersion::UNSHARDED()) {
shard_role_details::checkLocalCatalogIsValidForUnshardedShardVersion(
opCtx, *catalog, _coll, _resolvedNss);
}
}
if (_coll) {
// Fetch and store the sharding collection description data needed for use during the
// operation. The shardVersion will be checked later if the shard filtering metadata is
@ -419,8 +431,6 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
//
// Note: sharding versioning for an operation has no concept of multiple collections.
auto scopedCss = CollectionShardingState::acquire(opCtx, _resolvedNss);
scopedCss->checkShardVersionOrThrow(opCtx);
auto collDesc = scopedCss->getCollectionDescription(opCtx);
if (collDesc.isSharded()) {
_coll.setShardKeyPattern(collDesc.getKeyPattern());
@ -443,9 +453,6 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
}
// No Collection found, try and lookup view.
const auto receivedShardVersion{
OperationShardingState::get(opCtx).getShardVersion(_resolvedNss)};
if (!options._expectedUUID) {
// We only need to look up a view if an expected collection UUID was not provided. If this
// namespace were a view, the collection UUID mismatch check would have failed above.

View File

@ -118,6 +118,43 @@ ShardVersion ShardVersionPlacementIgnoredNoIndexes() {
boost::optional<CollectionIndexes>(boost::none));
}
// Checks that the overall collection 'timestamp' is valid for the current transaction (i.e. this
// is, that the collection 'timestamp' is not greater than the transaction atClusterTime or
// placementConflictTime).
void checkShardingMetadataWasValidAtTxnClusterTime(
OperationContext* opCtx,
const NamespaceString& nss,
const boost::optional<LogicalTime>& placementConflictTime,
const CollectionMetadata& collectionMetadata) {
const auto& atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
if (atClusterTime &&
atClusterTime->asTimestamp() <
collectionMetadata.getCollPlacementVersion().getTimestamp()) {
uasserted(ErrorCodes::SnapshotUnavailable,
str::stream() << "Collection " << nss.toStringForErrorMsg()
<< " has undergone a catalog change operation at time "
<< collectionMetadata.getCollPlacementVersion().getTimestamp()
<< " and no longer satisfies the requirements for the current "
"transaction which requires "
<< atClusterTime->asTimestamp()
<< ". Transaction will be aborted.");
}
if (placementConflictTime &&
placementConflictTime->asTimestamp() <
collectionMetadata.getCollPlacementVersion().getTimestamp()) {
uasserted(ErrorCodes::SnapshotUnavailable,
str::stream() << "Collection " << nss.toStringForErrorMsg()
<< " has undergone a catalog change operation at time "
<< collectionMetadata.getCollPlacementVersion().getTimestamp()
<< " and no longer satisfies the requirements for the current "
"transaction which requires "
<< placementConflictTime->asTimestamp()
<< ". Transaction will be aborted.");
}
}
} // namespace
CollectionShardingRuntime::ScopedSharedCollectionShardingRuntime::
@ -540,6 +577,9 @@ CollectionShardingRuntime::_getMetadataWithVersionCheckAt(
<< placementConflictTime->asTimestamp()
<< ". Transaction will be aborted.");
}
checkShardingMetadataWasValidAtTxnClusterTime(
opCtx, _nss, placementConflictTime, currentMetadata);
return optCurrentMetadata;
}

View File

@ -303,6 +303,64 @@ TEST_F(CollectionShardingRuntimeTest, ReturnUnshardedMetadataInServerlessMode) {
setGlobalReplSettings(originalRs);
}
TEST_F(CollectionShardingRuntimeTest, ShardVersionCheckDetectsClusterTimeConflicts) {
OperationContext* opCtx = operationContext();
CollectionShardingRuntime csr(getServiceContext(), kTestNss);
const auto metadata = makeShardedMetadata(opCtx);
csr.setFilteringMetadata(opCtx, metadata);
const auto collectionTimestamp = metadata.getShardPlacementVersion().getTimestamp();
auto receivedShardVersion =
ShardVersionFactory::make(metadata, boost::optional<CollectionIndexes>(boost::none));
// Test that conflict is thrown when transaction 'atClusterTime' is not valid the current shard
// version.
{
const auto previousReadConcern = repl::ReadConcernArgs::get(operationContext());
repl::ReadConcernArgs::get(operationContext()) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern);
// Valid atClusterTime (equal or later than collection timestamp).
{
repl::ReadConcernArgs::get(operationContext())
.setArgsAtClusterTimeForSnapshot(collectionTimestamp + 1);
ScopedSetShardRole scopedSetShardRole{
opCtx, kTestNss, receivedShardVersion, boost::none /* databaseVersion */};
ASSERT_DOES_NOT_THROW(csr.checkShardVersionOrThrow(opCtx));
}
// Conflicting atClusterTime (earlier than collection timestamp).
repl::ReadConcernArgs::get(operationContext())
.setArgsAtClusterTimeForSnapshot(collectionTimestamp - 1);
ScopedSetShardRole scopedSetShardRole{
opCtx, kTestNss, receivedShardVersion, boost::none /* databaseVersion */};
ASSERT_THROWS_CODE(
csr.checkShardVersionOrThrow(opCtx), DBException, ErrorCodes::SnapshotUnavailable);
repl::ReadConcernArgs::get(operationContext()) = previousReadConcern;
}
// Test that conflict is thrown when transaction 'placementConflictTime' is not valid the
// current shard version.
{
// Valid placementConflictTime (equal or later than collection timestamp).
{
receivedShardVersion.setPlacementConflictTime(LogicalTime(collectionTimestamp + 1));
ScopedSetShardRole scopedSetShardRole{
opCtx, kTestNss, receivedShardVersion, boost::none /* databaseVersion */};
ASSERT_DOES_NOT_THROW(csr.checkShardVersionOrThrow(opCtx));
}
// Conflicting placementConflictTime (earlier than collection timestamp).
receivedShardVersion.setPlacementConflictTime(LogicalTime(collectionTimestamp - 1));
ScopedSetShardRole scopedSetShardRole{
opCtx, kTestNss, receivedShardVersion, boost::none /* databaseVersion */};
ASSERT_THROWS_CODE(
csr.checkShardVersionOrThrow(opCtx), DBException, ErrorCodes::SnapshotUnavailable);
}
}
class CollectionShardingRuntimeTestWithMockedLoader
: public ShardServerTestFixtureWithCatalogCacheLoaderMock {
public:

View File

@ -353,6 +353,19 @@ CollectionOrViewAcquisitions acquireResolvedCollectionsOrViewsWithoutTakingLocks
const bool isCollection =
std::holds_alternative<CollectionPtr>(snapshotedServices.collectionPtrOrView);
if (holds_alternative<PlacementConcern>(prerequisites.placementConcern)) {
const auto& placementConcern = get<PlacementConcern>(prerequisites.placementConcern);
if (placementConcern.shardVersion == ShardVersion::UNSHARDED()) {
shard_role_details::checkLocalCatalogIsValidForUnshardedShardVersion(
opCtx,
catalog,
isCollection ? get<CollectionPtr>(snapshotedServices.collectionPtrOrView)
: CollectionPtr::null,
prerequisites.nss);
}
}
if (isCollection) {
const auto& collectionPtr =
std::get<CollectionPtr>(snapshotedServices.collectionPtrOrView);
@ -1602,4 +1615,43 @@ void HandleTransactionResourcesFromStasher::dismissRestoredResources() {
txnResources.state = shard_role_details::TransactionResources::State::FAILED;
_stasher = nullptr;
}
void shard_role_details::checkLocalCatalogIsValidForUnshardedShardVersion(
OperationContext* opCtx,
const CollectionCatalog& stashedCatalog,
const CollectionPtr& collectionPtr,
const NamespaceString& nss) {
if (opCtx->inMultiDocumentTransaction()) {
// The latest catalog.
const auto latestCatalog = CollectionCatalog::latest(opCtx);
const auto makeErrorMessage = [&nss]() {
std::string errmsg = str::stream()
<< "Collection " << nss.toStringForErrorMsg()
<< " has undergone a catalog change and no longer satisfies the "
"requirements for the current transaction.";
return errmsg;
};
if (collectionPtr) {
// The transaction sees a collection exists.
uassert(ErrorCodes::SnapshotUnavailable,
makeErrorMessage(),
latestCatalog->isLatestCollection(opCtx, collectionPtr.get()));
} else if (const auto currentView = stashedCatalog.lookupView(opCtx, nss)) {
// The transaction sees a view exists.
uassert(ErrorCodes::SnapshotUnavailable,
makeErrorMessage(),
currentView == latestCatalog->lookupView(opCtx, nss));
} else {
// The transaction sees neither a collection nor a view exist. Make sure that the latest
// catalog looks the same.
uassert(ErrorCodes::SnapshotUnavailable,
makeErrorMessage(),
!latestCatalog->lookupCollectionByNamespace(opCtx, nss) &&
!latestCatalog->lookupView(opCtx, nss));
}
}
}
} // namespace mongo

View File

@ -586,5 +586,15 @@ private:
boost::optional<std::shared_ptr<const CollectionCatalog>> _catalogBeforeSnapshot;
boost::optional<bool> _shouldReadAtLastApplied;
};
/*
* Checks that, when in multi-document transaction, local catalog stashed by the transaction and the
* CollectionPtr it obtained are valid to be used for a request that attached
*/
void checkLocalCatalogIsValidForUnshardedShardVersion(OperationContext* opCtx,
const CollectionCatalog& stashedCatalog,
const CollectionPtr& collectionPtr,
const NamespaceString& nss);
} // namespace shard_role_details
} // namespace mongo

View File

@ -923,6 +923,37 @@ TEST_F(ShardRoleTest, WritesOnMultiDocTransactionsUseLatestCatalog) {
ErrorCodes::WriteConflict);
}
TEST_F(ShardRoleTest, ConflictIsThrownWhenShardVersionUnshardedButStashedCatalogDiffersFromLatest) {
opCtx()->setInMultiDocumentTransaction();
opCtx()->recoveryUnit()->preallocateSnapshot();
CollectionCatalog::stash(opCtx(), CollectionCatalog::get(opCtx()));
// Drop a collection
{
auto newClient =
opCtx()->getServiceContext()->getService()->makeClient("AlternativeClient");
AlternativeClientRegion acr(newClient);
auto newOpCtx = cc().makeOperationContext();
DBDirectClient directClient(newOpCtx.get());
ASSERT_TRUE(directClient.dropCollection(nssUnshardedCollection1));
}
// Try to acquire the now-dropped collection, with declared placement concern
// ShardVersion::UNSHARDED. Expect a conflict to be detected.
{
ScopedSetShardRole setShardRole(
opCtx(), nssUnshardedCollection1, ShardVersion::UNSHARDED(), boost::none);
ASSERT_THROWS_CODE(
acquireCollectionOrView(
opCtx(),
CollectionOrViewAcquisitionRequest::fromOpCtx(
opCtx(), nssUnshardedCollection1, AcquisitionPrerequisites::kRead),
MODE_IX),
DBException,
ErrorCodes::SnapshotUnavailable);
}
}
// ---------------------------------------------------------------------------
// MaybeLockFree
TEST_F(ShardRoleTest, AcquireCollectionMaybeLockFreeTakesLocksWhenInMultiDocTransaction) {