mirror of https://github.com/mongodb/mongo
SERVER-99621 SERVER-106706 Fix lock ordering & modes for rename across DB (#44231)
GitOrigin-RevId: 1dcee2fc7d7a2aaad10d82c32ca91454dcd13ddd
This commit is contained in:
parent
15b1099b77
commit
2f88bba258
|
|
@ -73,6 +73,8 @@ last-continuous:
|
|||
ticket: SERVER-111017
|
||||
- test_file: jstests/core/index/geo/geo_polygon3.js
|
||||
ticket: SERVER-106239
|
||||
- test_file: jstests/core/txns/rename_collection_not_blocked_by_txn.js
|
||||
ticket: SERVER-106706
|
||||
- test_file: jstests/core/timeseries/geo/timeseries_geonear_measurements.js
|
||||
ticket: SERVER-112520
|
||||
- test_file: jstests/core/catalog/list_collections_config_debug_dump.js
|
||||
|
|
@ -662,6 +664,8 @@ last-lts:
|
|||
ticket: SERVER-111017
|
||||
- test_file: jstests/core/index/geo/geo_polygon3.js
|
||||
ticket: SERVER-106239
|
||||
- test_file: jstests/core/txns/rename_collection_not_blocked_by_txn.js
|
||||
ticket: SERVER-106706
|
||||
- test_file: jstests/core/timeseries/geo/timeseries_geonear_measurements.js
|
||||
ticket: SERVER-112520
|
||||
- test_file: jstests/aggregation/optimization/elemmatch_deps.js
|
||||
|
|
|
|||
|
|
@ -4,13 +4,19 @@
|
|||
* @tags: [uses_transactions, requires_db_locking, assumes_unsharded_collection]
|
||||
*/
|
||||
|
||||
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
|
||||
|
||||
let dbName = "rename_collection_not_blocked_by_txn";
|
||||
let otherDbName = dbName + "_other";
|
||||
let mydb = db.getSiblingDB(dbName);
|
||||
const otherDb = db.getSiblingDB(otherDbName);
|
||||
|
||||
mydb.t.drop({writeConcern: {w: "majority"}});
|
||||
mydb.a.drop({writeConcern: {w: "majority"}});
|
||||
mydb.b.drop({writeConcern: {w: "majority"}});
|
||||
mydb.c.drop({writeConcern: {w: "majority"}});
|
||||
otherDb.d.drop({writeConcern: {w: "majority"}});
|
||||
mydb.e.drop({writeConcern: {w: "majority"}});
|
||||
|
||||
assert.commandWorked(mydb.runCommand({insert: "t", documents: [{x: 1}]}));
|
||||
assert.commandWorked(mydb.runCommand({insert: "a", documents: [{x: 1}]}));
|
||||
|
|
@ -26,5 +32,16 @@ sessionDb.t.insert({y: 1});
|
|||
// This only requires database IX lock.
|
||||
assert.commandWorked(mydb.adminCommand({renameCollection: dbName + ".a", to: dbName + ".b", dropTarget: true}));
|
||||
assert.commandWorked(mydb.adminCommand({renameCollection: dbName + ".b", to: dbName + ".c"}));
|
||||
try {
|
||||
assert.commandWorked(mydb.adminCommand({renameCollection: dbName + ".c", to: otherDbName + ".d"}));
|
||||
assert.commandWorked(mydb.adminCommand({renameCollection: otherDbName + ".d", to: dbName + ".e"}));
|
||||
} catch (e) {
|
||||
if (e.code == ErrorCodes.CommandFailed && FixtureHelpers.isMongos(mydb)) {
|
||||
// Rename across databases fails if the source and target DBs have different primary shards.
|
||||
assert(mydb.getDatabasePrimaryShardId() != otherDb.getDatabasePrimaryShardId());
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
assert.commandWorked(session.commitTransaction_forTesting());
|
||||
|
|
|
|||
|
|
@ -2252,12 +2252,6 @@ TEST_F(StorageTimestampTest, TimestampMultiIndexBuildsDuringRename) {
|
|||
ASSERT(client.runCommand(nss.dbName(), createIndexesCmdObj, result)) << result;
|
||||
}
|
||||
|
||||
auto collAcq = acquireCollection(
|
||||
_opCtx,
|
||||
CollectionAcquisitionRequest::fromOpCtx(_opCtx, nss, AcquisitionPrerequisites::kWrite),
|
||||
MODE_X);
|
||||
;
|
||||
|
||||
NamespaceString renamedNss = NamespaceString::createNamespaceString_forTest(
|
||||
"unittestsRename.timestampMultiIndexBuildsDuringRename");
|
||||
create(renamedNss);
|
||||
|
|
|
|||
|
|
@ -624,145 +624,22 @@ Status renameCollectionWithinDBForApplyOps(OperationContext* opCtx,
|
|||
});
|
||||
}
|
||||
|
||||
Status renameCollectionAcrossDatabases(OperationContext* opCtx,
|
||||
const NamespaceString& source,
|
||||
const NamespaceString& target,
|
||||
const RenameCollectionOptions& options) {
|
||||
invariant(
|
||||
!source.isEqualDb(target),
|
||||
str::stream()
|
||||
<< "cannot rename within same database (use renameCollectionWithinDB instead): source: "
|
||||
<< source.toStringForErrorMsg() << "; target: " << target.toStringForErrorMsg());
|
||||
|
||||
// Refer to txnCmdAllowlist in commands.cpp.
|
||||
invariant(!opCtx->inMultiDocumentTransaction(),
|
||||
str::stream() << "renameCollectionAcrossDatabases not supported in multi-document "
|
||||
"transaction: source: "
|
||||
<< source.toStringForErrorMsg()
|
||||
<< "; target: " << target.toStringForErrorMsg());
|
||||
|
||||
uassert(ErrorCodes::InvalidOptions,
|
||||
"Cannot provide an expected collection UUID when renaming across databases",
|
||||
!options.expectedSourceUUID && !options.expectedTargetUUID);
|
||||
|
||||
boost::optional<Lock::DBLock> sourceDbLock;
|
||||
boost::optional<Lock::CollectionLock> sourceCollLock;
|
||||
if (!shard_role_details::getLocker(opCtx)->isCollectionLockedForMode(source, MODE_S)) {
|
||||
// Lock the DB using MODE_IX to ensure we have the global lock in that mode, as to prevent
|
||||
// upgrade from MODE_IS to MODE_IX, which caused deadlock on systems not supporting Database
|
||||
// locking and should be avoided in general.
|
||||
sourceDbLock.emplace(opCtx, source.dbName(), MODE_IX);
|
||||
sourceCollLock.emplace(opCtx, source, MODE_S);
|
||||
}
|
||||
|
||||
boost::optional<Lock::DBLock> targetDBLock;
|
||||
if (!shard_role_details::getLocker(opCtx)->isDbLockedForMode(target.dbName(), MODE_X)) {
|
||||
targetDBLock.emplace(opCtx, target.dbName(), MODE_X);
|
||||
}
|
||||
|
||||
DatabaseShardingState::acquire(opCtx, source.dbName())->checkDbVersionOrThrow(opCtx);
|
||||
|
||||
DisableDocumentValidation validationDisabler(opCtx);
|
||||
|
||||
auto sourceDB = DatabaseHolder::get(opCtx)->getDb(opCtx, source.dbName());
|
||||
if (!sourceDB)
|
||||
return Status(ErrorCodes::NamespaceNotFound, "source namespace does not exist");
|
||||
|
||||
boost::optional<AutoStatsTracker> statsTracker(
|
||||
boost::in_place_init,
|
||||
opCtx,
|
||||
source,
|
||||
Top::LockType::NotLocked,
|
||||
AutoStatsTracker::LogMode::kUpdateCurOp,
|
||||
DatabaseProfileSettings::get(opCtx->getServiceContext())
|
||||
.getDatabaseProfileLevel(source.dbName()));
|
||||
|
||||
auto catalog = CollectionCatalog::get(opCtx);
|
||||
const auto sourceColl = catalog->lookupCollectionByNamespace(opCtx, source);
|
||||
if (!sourceColl) {
|
||||
if (CollectionCatalog::get(opCtx)->lookupView(opCtx, source))
|
||||
return Status(ErrorCodes::CommandNotSupportedOnView,
|
||||
str::stream() << "cannot rename view: " << source.toStringForErrorMsg());
|
||||
return Status(ErrorCodes::NamespaceNotFound, "source namespace does not exist");
|
||||
}
|
||||
|
||||
// TODO SERVER-89089 move this check up in the rename stack execution once we have the guarante
|
||||
// that all bucket collection have timeseries options.
|
||||
if (source.isTimeseriesBucketsCollection() && sourceColl->getTimeseriesOptions() &&
|
||||
!target.isTimeseriesBucketsCollection()) {
|
||||
return Status(ErrorCodes::IllegalOperation,
|
||||
str::stream() << "Cannot rename timeseries buckets collection '"
|
||||
<< source.toStringForErrorMsg()
|
||||
<< "' to a namespace that is not timeseries buckets '"
|
||||
<< target.toStringForErrorMsg() << "'.");
|
||||
}
|
||||
|
||||
if (isReplicatedChanged(opCtx, source, target))
|
||||
return {ErrorCodes::IllegalOperation,
|
||||
"Cannot rename collections across a replicated and an unreplicated database"};
|
||||
|
||||
IndexBuildsCoordinator::get(opCtx)->assertNoIndexBuildInProgForCollection(sourceColl->uuid());
|
||||
|
||||
auto targetDB = DatabaseHolder::get(opCtx)->getDb(opCtx, target.dbName());
|
||||
|
||||
// Check if the target namespace exists and if dropTarget is true.
|
||||
// Return a non-OK status if target exists and dropTarget is not true or if the collection
|
||||
// is sharded.
|
||||
const auto targetColl =
|
||||
targetDB ? catalog->lookupCollectionByNamespace(opCtx, target) : nullptr;
|
||||
if (targetColl) {
|
||||
if (sourceColl->uuid() == targetColl->uuid()) {
|
||||
invariant(source == target);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (!options.dropTarget) {
|
||||
return Status(ErrorCodes::NamespaceExists, "target namespace exists");
|
||||
}
|
||||
|
||||
} else if (CollectionCatalog::get(opCtx)->lookupView(opCtx, target)) {
|
||||
return Status(ErrorCodes::NamespaceExists,
|
||||
str::stream() << "a view already exists with that name: "
|
||||
<< target.toStringForErrorMsg());
|
||||
}
|
||||
|
||||
Status copySourceToTemporaryCollectionOnTargetDB(
|
||||
OperationContext* opCtx,
|
||||
const CollectionPtr& sourceColl,
|
||||
const Database* targetDB,
|
||||
const NamespaceString& tmpName,
|
||||
const boost::optional<UUID>& newTargetCollectionUuid) {
|
||||
// Create a temporary collection in the target database. It will be removed if we fail to
|
||||
// copy the collection, or on restart, so there is no need to replicate these writes.
|
||||
if (!targetDB) {
|
||||
targetDB = DatabaseHolder::get(opCtx)->openDb(opCtx, target.dbName());
|
||||
}
|
||||
|
||||
// The generated unique collection name is only guaranteed to exist if the database is
|
||||
// exclusively locked.
|
||||
invariant(shard_role_details::getLocker(opCtx)->isDbLockedForMode(targetDB->name(),
|
||||
LockMode::MODE_X));
|
||||
|
||||
// Note that this temporary collection name is used by MongoMirror and thus must not be changed
|
||||
// without consultation.
|
||||
auto tmpNameResult = [&]() {
|
||||
std::string collectionNameModel = "tmp%%%%%.renameCollection";
|
||||
if (source.isTimeseriesBucketsCollection()) {
|
||||
collectionNameModel =
|
||||
NamespaceString::kTimeseriesBucketsCollectionPrefix + collectionNameModel;
|
||||
}
|
||||
return makeUniqueCollectionName(opCtx, target.dbName(), collectionNameModel);
|
||||
}();
|
||||
if (!tmpNameResult.isOK()) {
|
||||
return tmpNameResult.getStatus().withContext(
|
||||
str::stream() << "Cannot generate temporary collection name to rename "
|
||||
<< source.toStringForErrorMsg() << " to "
|
||||
<< target.toStringForErrorMsg());
|
||||
}
|
||||
const auto& tmpName = tmpNameResult.getValue();
|
||||
|
||||
LOGV2(705520,
|
||||
"Attempting to create temporary collection",
|
||||
"temporaryCollection"_attr = tmpName,
|
||||
"sourceCollection"_attr = source);
|
||||
"sourceCollection"_attr = sourceColl->ns());
|
||||
|
||||
// Renaming across databases will result in a new UUID.
|
||||
NamespaceStringOrUUID tmpCollUUID{tmpName.dbName(),
|
||||
options.newTargetCollectionUuid.get_value_or(UUID::gen())};
|
||||
newTargetCollectionUuid.get_value_or(UUID::gen())};
|
||||
|
||||
{
|
||||
OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
|
||||
|
|
@ -777,30 +654,6 @@ Status renameCollectionAcrossDatabases(OperationContext* opCtx,
|
|||
});
|
||||
}
|
||||
|
||||
// Dismissed on success
|
||||
ScopeGuard tmpCollectionDropper([&] {
|
||||
Status status = Status::OK();
|
||||
try {
|
||||
status = dropCollectionForApplyOps(
|
||||
opCtx,
|
||||
tmpName,
|
||||
{},
|
||||
DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
|
||||
} catch (...) {
|
||||
status = exceptionToStatus();
|
||||
}
|
||||
if (!status.isOK()) {
|
||||
// Ignoring failure case when dropping the temporary collection during cleanup because
|
||||
// the rename operation has already failed for another reason.
|
||||
LOGV2(705521,
|
||||
"Unable to drop temporary collection while renaming",
|
||||
"tempCollection"_attr = tmpName,
|
||||
"source"_attr = source,
|
||||
"target"_attr = target,
|
||||
"error"_attr = status);
|
||||
}
|
||||
});
|
||||
|
||||
// Copy the index descriptions from the source collection.
|
||||
std::vector<BSONObj> indexesToCopy;
|
||||
for (auto sourceIndIt =
|
||||
|
|
@ -834,112 +687,293 @@ Status renameCollectionAcrossDatabases(OperationContext* opCtx,
|
|||
}
|
||||
}
|
||||
|
||||
{
|
||||
statsTracker.reset();
|
||||
// Copy over all the data from source collection to temporary collection. Note we already
|
||||
// have an X lock over the temporary collection so this just retrieves its latest metadata.
|
||||
invariant(shard_role_details::getLocker(opCtx)->isCollectionLockedForMode(tmpName, MODE_X));
|
||||
AutoGetCollection autoTmpColl(opCtx, tmpCollUUID, MODE_X);
|
||||
invariant(autoTmpColl && autoTmpColl->ns() == tmpName);
|
||||
|
||||
// Copy over all the data from source collection to temporary collection. For this we can
|
||||
// drop the exclusive database lock on the target and grab an intent lock on the temporary
|
||||
// collection.
|
||||
targetDBLock.reset();
|
||||
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
|
||||
auto isOplogDisabledForTmpColl = replCoord->isOplogDisabledFor(opCtx, tmpName);
|
||||
bool canBeBatched =
|
||||
!(autoTmpColl->isCapped() && autoTmpColl->getIndexCatalog()->haveAnyIndexes());
|
||||
|
||||
AutoGetCollection autoTmpColl(opCtx, tmpCollUUID, MODE_IX);
|
||||
if (!autoTmpColl) {
|
||||
return Status(ErrorCodes::NamespaceNotFound,
|
||||
str::stream() << "Temporary collection '" << tmpName.toStringForErrorMsg()
|
||||
<< "' was removed while renaming collection across DBs");
|
||||
}
|
||||
auto cursor = sourceColl->getCursor(opCtx);
|
||||
auto record = cursor->next();
|
||||
auto batchedWriteMaxSizeBytes = gMaxSizeOfBatchedInsertsForRenameAcrossDatabasesBytes.load();
|
||||
auto batchedWriteMaxNumberOfInserts =
|
||||
gMaxNumberOfInsertsBatchInsertsForRenameAcrossDatabases.load();
|
||||
while (record) {
|
||||
opCtx->checkForInterrupt();
|
||||
// Cursor is left one past the end of the batch inside writeConflictRetry.
|
||||
auto beginBatchId = record->id;
|
||||
Status status = writeConflictRetry(opCtx, "renameCollection", tmpName, [&] {
|
||||
// Always reposition cursor in case it gets a WCE midway through.
|
||||
record = cursor->seekExact(beginBatchId);
|
||||
|
||||
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
|
||||
auto isOplogDisabledForTmpColl = replCoord->isOplogDisabledFor(opCtx, tmpName);
|
||||
bool canBeBatched =
|
||||
!(autoTmpColl->isCapped() && autoTmpColl->getIndexCatalog()->haveAnyIndexes());
|
||||
std::vector<InsertStatement> stmts;
|
||||
// Inserts to indexed capped collections cannot be batched.
|
||||
// Otherwise, CollectionImpl::_insertDocuments() will fail with
|
||||
// OperationCannotBeBatched. See SERVER-21512.
|
||||
buildBatchedWritesWithPolicy(
|
||||
batchedWriteMaxSizeBytes,
|
||||
batchedWriteMaxNumberOfInserts,
|
||||
[&cursor]() { return cursor->next(); },
|
||||
record,
|
||||
stmts,
|
||||
canBeBatched);
|
||||
|
||||
auto cursor = sourceColl->getCursor(opCtx);
|
||||
auto record = cursor->next();
|
||||
auto batchedWriteMaxSizeBytes =
|
||||
gMaxSizeOfBatchedInsertsForRenameAcrossDatabasesBytes.load();
|
||||
auto batchedWriteMaxNumberOfInserts =
|
||||
gMaxNumberOfInsertsBatchInsertsForRenameAcrossDatabases.load();
|
||||
while (record) {
|
||||
opCtx->checkForInterrupt();
|
||||
// Cursor is left one past the end of the batch inside writeConflictRetry.
|
||||
auto beginBatchId = record->id;
|
||||
Status status = writeConflictRetry(opCtx, "renameCollection", tmpName, [&] {
|
||||
// Always reposition cursor in case it gets a WCE midway through.
|
||||
record = cursor->seekExact(beginBatchId);
|
||||
bool isGroupedOplogEntries = stmts.size() > 1U;
|
||||
WriteUnitOfWork wunit(opCtx,
|
||||
isGroupedOplogEntries ? WriteUnitOfWork::kGroupForTransaction
|
||||
: WriteUnitOfWork::kDontGroup);
|
||||
|
||||
std::vector<InsertStatement> stmts;
|
||||
// Inserts to indexed capped collections cannot be batched.
|
||||
// Otherwise, CollectionImpl::_insertDocuments() will fail with
|
||||
// OperationCannotBeBatched. See SERVER-21512.
|
||||
buildBatchedWritesWithPolicy(
|
||||
batchedWriteMaxSizeBytes,
|
||||
batchedWriteMaxNumberOfInserts,
|
||||
[&cursor]() { return cursor->next(); },
|
||||
record,
|
||||
stmts,
|
||||
canBeBatched);
|
||||
|
||||
bool isGroupedOplogEntries = stmts.size() > 1U;
|
||||
WriteUnitOfWork wunit(opCtx,
|
||||
isGroupedOplogEntries ? WriteUnitOfWork::kGroupForTransaction
|
||||
: WriteUnitOfWork::kDontGroup);
|
||||
|
||||
if (!isOplogDisabledForTmpColl && !isGroupedOplogEntries) {
|
||||
if (autoTmpColl->needsCappedLock()) {
|
||||
// We do not expect any concurrency here, we acquire this lock to be
|
||||
// consistent with behavior in other inserts into non-clustered capped
|
||||
// collections where we acquire the lock before reserving oplog slots and to
|
||||
// satisfy a tassert. TODO SERVER-106004: Revisit this when cleaning up code
|
||||
// around reserving oplog slots for inserts into capped collections.
|
||||
Lock::ResourceLock heldUntilEndOfWUOW{
|
||||
opCtx, ResourceId(RESOURCE_METADATA, autoTmpColl->ns()), MODE_X};
|
||||
}
|
||||
auto oplogInfo = LocalOplogInfo::get(opCtx);
|
||||
auto slots = oplogInfo->getNextOpTimes(opCtx, 1U);
|
||||
stmts[0].oplogSlot = slots[0];
|
||||
if (!isOplogDisabledForTmpColl && !isGroupedOplogEntries) {
|
||||
if (autoTmpColl->needsCappedLock()) {
|
||||
// We do not expect any concurrency here, we acquire this lock to be
|
||||
// consistent with behavior in other inserts into non-clustered capped
|
||||
// collections where we acquire the lock before reserving oplog slots and to
|
||||
// satisfy a tassert. TODO SERVER-106004: Revisit this when cleaning up code
|
||||
// around reserving oplog slots for inserts into capped collections.
|
||||
Lock::ResourceLock heldUntilEndOfWUOW{
|
||||
opCtx, ResourceId(RESOURCE_METADATA, autoTmpColl->ns()), MODE_X};
|
||||
}
|
||||
auto oplogInfo = LocalOplogInfo::get(opCtx);
|
||||
auto slots = oplogInfo->getNextOpTimes(opCtx, 1U);
|
||||
stmts[0].oplogSlot = slots[0];
|
||||
}
|
||||
|
||||
OpDebug* const opDebug = nullptr;
|
||||
auto status = collection_internal::insertDocuments(opCtx,
|
||||
*autoTmpColl,
|
||||
stmts.begin(),
|
||||
stmts.end(),
|
||||
opDebug,
|
||||
false /* fromMigrate */);
|
||||
if (!status.isOK()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
// Used to make sure that a WCE can be handled by this logic without data loss.
|
||||
if (MONGO_unlikely(writeConflictInRenameCollCopyToTmp.shouldFail())) {
|
||||
throwWriteConflictException(
|
||||
str::stream() << "Hit failpoint '"
|
||||
<< writeConflictInRenameCollCopyToTmp.getName() << "'.");
|
||||
}
|
||||
|
||||
wunit.commit();
|
||||
|
||||
// Time to yield; make a safe copy of the current record before releasing our
|
||||
// cursor.
|
||||
if (record)
|
||||
record->data.makeOwned();
|
||||
|
||||
cursor->save();
|
||||
// When this exits via success or WCE, we need to restore the cursor.
|
||||
ON_BLOCK_EXIT([opCtx, ns = tmpName, &cursor]() {
|
||||
writeConflictRetry(opCtx, "retryRestoreCursor", ns, [opCtx, &cursor] {
|
||||
cursor->restore(*shard_role_details::getRecoveryUnit(opCtx));
|
||||
});
|
||||
});
|
||||
return Status::OK();
|
||||
});
|
||||
if (!status.isOK())
|
||||
OpDebug* const opDebug = nullptr;
|
||||
auto status = collection_internal::insertDocuments(
|
||||
opCtx, *autoTmpColl, stmts.begin(), stmts.end(), opDebug, false /* fromMigrate */);
|
||||
if (!status.isOK()) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
// Used to make sure that a WCE can be handled by this logic without data loss.
|
||||
if (MONGO_unlikely(writeConflictInRenameCollCopyToTmp.shouldFail())) {
|
||||
throwWriteConflictException(
|
||||
str::stream() << "Hit failpoint '"
|
||||
<< writeConflictInRenameCollCopyToTmp.getName() << "'.");
|
||||
}
|
||||
|
||||
wunit.commit();
|
||||
|
||||
// Time to yield; make a safe copy of the current record before releasing our
|
||||
// cursor.
|
||||
if (record)
|
||||
record->data.makeOwned();
|
||||
|
||||
cursor->save();
|
||||
// When this exits via success or WCE, we need to restore the cursor.
|
||||
ON_BLOCK_EXIT([opCtx, ns = tmpName, &cursor]() {
|
||||
writeConflictRetry(opCtx, "retryRestoreCursor", ns, [opCtx, &cursor] {
|
||||
cursor->restore(*shard_role_details::getRecoveryUnit(opCtx));
|
||||
});
|
||||
});
|
||||
return Status::OK();
|
||||
});
|
||||
if (!status.isOK())
|
||||
return status;
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status renameCollectionAcrossDatabases(OperationContext* opCtx,
|
||||
const NamespaceString& source,
|
||||
const NamespaceString& target,
|
||||
const RenameCollectionOptions& options) {
|
||||
invariant(
|
||||
!source.isEqualDb(target),
|
||||
str::stream()
|
||||
<< "cannot rename within same database (use renameCollectionWithinDB instead): source: "
|
||||
<< source.toStringForErrorMsg() << "; target: " << target.toStringForErrorMsg());
|
||||
|
||||
// Refer to txnCmdAllowlist in commands.cpp.
|
||||
invariant(!opCtx->inMultiDocumentTransaction(),
|
||||
str::stream() << "renameCollectionAcrossDatabases not supported in multi-document "
|
||||
"transaction: source: "
|
||||
<< source.toStringForErrorMsg()
|
||||
<< "; target: " << target.toStringForErrorMsg());
|
||||
|
||||
uassert(ErrorCodes::InvalidOptions,
|
||||
"Cannot provide an expected collection UUID when renaming across databases",
|
||||
!options.expectedSourceUUID && !options.expectedTargetUUID);
|
||||
|
||||
// Acquire database locks, which are held for the entire duration (data copy, then rename+drop).
|
||||
auto [sourceDB, targetDB] = [&]() -> std::pair<AutoGetDb, AutoGetDb> {
|
||||
// Take the locks in increasing ResourceId order to prevent deadlocks
|
||||
if (ResourceId{RESOURCE_DATABASE, source.dbName()} <
|
||||
ResourceId{RESOURCE_DATABASE, target.dbName()}) {
|
||||
AutoGetDb sourceDB{opCtx, source.dbName(), MODE_IX};
|
||||
AutoGetDb targetDB{opCtx, target.dbName(), MODE_IX};
|
||||
return {std::move(sourceDB), std::move(targetDB)};
|
||||
} else {
|
||||
AutoGetDb targetDB{opCtx, target.dbName(), MODE_IX};
|
||||
AutoGetDb sourceDB{opCtx, source.dbName(), MODE_IX};
|
||||
return {std::move(sourceDB), std::move(targetDB)};
|
||||
}
|
||||
}();
|
||||
|
||||
// Acquire the locks for the copy of the collection from the source to the target DB.
|
||||
struct RenameAcrossDatabasesCollectionLocks {
|
||||
// Source collection, locked in MODE_S.
|
||||
boost::optional<AutoGetCollection> sourceColl;
|
||||
// NSS of a temporary collection used to copy the data from the source to the target DB
|
||||
// before renaming it over the target namespace, guaranteed to not exist after acquiring it.
|
||||
NamespaceString tmpName;
|
||||
// MODE_X lock on the temporary collection.
|
||||
boost::optional<Lock::CollectionLock> tempCollLock;
|
||||
};
|
||||
|
||||
auto acqStatus = [&]() -> StatusWith<RenameAcrossDatabasesCollectionLocks> {
|
||||
while (true) {
|
||||
auto tmpNameResult = [&]() {
|
||||
std::string collectionNameModel = "tmp%%%%%.renameCollection";
|
||||
if (source.isTimeseriesBucketsCollection()) {
|
||||
collectionNameModel =
|
||||
NamespaceString::kTimeseriesBucketsCollectionPrefix + collectionNameModel;
|
||||
}
|
||||
return makeUniqueCollectionName(opCtx, target.dbName(), collectionNameModel);
|
||||
}();
|
||||
if (!tmpNameResult.isOK()) {
|
||||
return tmpNameResult.getStatus().withContext(
|
||||
str::stream() << "Cannot generate temporary collection name to rename "
|
||||
<< source.toStringForErrorMsg() << " to "
|
||||
<< target.toStringForErrorMsg());
|
||||
}
|
||||
const auto& tmpName = tmpNameResult.getValue();
|
||||
|
||||
auto locks = [&]() -> RenameAcrossDatabasesCollectionLocks {
|
||||
if (ResourceId{RESOURCE_COLLECTION, source} <
|
||||
ResourceId{RESOURCE_COLLECTION, tmpName}) {
|
||||
AutoGetCollection sourceColl{opCtx, source, MODE_S};
|
||||
Lock::CollectionLock tempCollLock{opCtx, tmpName, MODE_X};
|
||||
return {std::move(sourceColl), tmpName, std::move(tempCollLock)};
|
||||
} else {
|
||||
Lock::CollectionLock tempCollLock{opCtx, tmpName, MODE_X};
|
||||
AutoGetCollection sourceColl{opCtx, source, MODE_S};
|
||||
return {std::move(sourceColl), tmpName, std::move(tempCollLock)};
|
||||
}
|
||||
}();
|
||||
|
||||
if (!CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, locks.tmpName)) {
|
||||
return locks;
|
||||
}
|
||||
|
||||
// Conflicting on generating a unique temp collection name. Try again.
|
||||
LOGV2(9962100,
|
||||
"Rename conflicted on generating a unique temp collection name, retrying.",
|
||||
"source"_attr = source,
|
||||
"target"_attr = target,
|
||||
"tmpName"_attr = tmpName);
|
||||
}
|
||||
}();
|
||||
if (!acqStatus.isOK())
|
||||
return acqStatus.getStatus();
|
||||
auto& [sourceColl, tmpName, tempCollLock] = acqStatus.getValue();
|
||||
|
||||
DisableDocumentValidation validationDisabler(opCtx);
|
||||
|
||||
if (!sourceDB.getDb())
|
||||
return Status(ErrorCodes::NamespaceNotFound, "source namespace does not exist");
|
||||
|
||||
AutoStatsTracker statsTracker(opCtx,
|
||||
source,
|
||||
Top::LockType::NotLocked,
|
||||
AutoStatsTracker::LogMode::kUpdateCurOp,
|
||||
DatabaseProfileSettings::get(opCtx->getServiceContext())
|
||||
.getDatabaseProfileLevel(source.dbName()));
|
||||
|
||||
if (!sourceColl.get()) {
|
||||
return Status(ErrorCodes::NamespaceNotFound, "source namespace does not exist");
|
||||
}
|
||||
|
||||
// TODO SERVER-89089 move this check up in the rename stack execution once we have the guarante
|
||||
// that all bucket collection have timeseries options.
|
||||
if (source.isTimeseriesBucketsCollection() && sourceColl.get()->getTimeseriesOptions() &&
|
||||
!target.isTimeseriesBucketsCollection()) {
|
||||
return Status(ErrorCodes::IllegalOperation,
|
||||
str::stream() << "Cannot rename timeseries buckets collection '"
|
||||
<< source.toStringForErrorMsg()
|
||||
<< "' to a namespace that is not timeseries buckets '"
|
||||
<< target.toStringForErrorMsg() << "'.");
|
||||
}
|
||||
|
||||
if (isReplicatedChanged(opCtx, source, target))
|
||||
return {ErrorCodes::IllegalOperation,
|
||||
"Cannot rename collections across a replicated and an unreplicated database"};
|
||||
|
||||
IndexBuildsCoordinator::get(opCtx)->assertNoIndexBuildInProgForCollection(
|
||||
sourceColl.get()->uuid());
|
||||
|
||||
// Check if the target namespace exists and if dropTarget is true.
|
||||
// Return a non-OK status if target exists and dropTarget is not true or if the collection
|
||||
// is sharded.
|
||||
auto catalog = CollectionCatalog::get(opCtx);
|
||||
const auto targetColl =
|
||||
targetDB.getDb() ? catalog->lookupCollectionByNamespace(opCtx, target) : nullptr;
|
||||
if (targetColl) {
|
||||
if (sourceColl.get()->uuid() == targetColl->uuid()) {
|
||||
invariant(source == target);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (!options.dropTarget) {
|
||||
return Status(ErrorCodes::NamespaceExists, "target namespace exists");
|
||||
}
|
||||
|
||||
} else if (CollectionCatalog::get(opCtx)->lookupView(opCtx, target)) {
|
||||
return Status(ErrorCodes::NamespaceExists,
|
||||
str::stream() << "a view already exists with that name: "
|
||||
<< target.toStringForErrorMsg());
|
||||
}
|
||||
|
||||
// Dismissed on success
|
||||
ScopeGuard tmpCollectionDropper([&] {
|
||||
Status status = Status::OK();
|
||||
try {
|
||||
status = dropCollectionForApplyOps(
|
||||
opCtx,
|
||||
tmpName,
|
||||
{},
|
||||
DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
|
||||
} catch (...) {
|
||||
status = exceptionToStatus();
|
||||
}
|
||||
if (!status.isOK()) {
|
||||
// Ignoring failure case when dropping the temporary collection during cleanup because
|
||||
// the rename operation has already failed for another reason.
|
||||
LOGV2(705521,
|
||||
"Unable to drop temporary collection while renaming",
|
||||
"tempCollection"_attr = tmpName,
|
||||
"source"_attr = source,
|
||||
"target"_attr = target,
|
||||
"error"_attr = status);
|
||||
}
|
||||
});
|
||||
|
||||
Status copyStatus = copySourceToTemporaryCollectionOnTargetDB(opCtx,
|
||||
**sourceColl,
|
||||
targetDB.ensureDbExists(opCtx),
|
||||
tmpName,
|
||||
options.newTargetCollectionUuid);
|
||||
if (!copyStatus.isOK())
|
||||
return copyStatus;
|
||||
|
||||
// Drop the locks on the source and temporary collections
|
||||
sourceColl.reset();
|
||||
tempCollLock.reset();
|
||||
|
||||
// We shouldn't be holding any collection locks at this point.
|
||||
for (auto& nss : {tmpName, source, target}) {
|
||||
tassert(9962101,
|
||||
str::stream() << "Rename unexpectedly holding lock on "
|
||||
<< nss.toStringForErrorMsg(),
|
||||
shard_role_details::getLocker(opCtx)->getLockMode(
|
||||
ResourceId{RESOURCE_COLLECTION, nss}) == MODE_NONE);
|
||||
}
|
||||
sourceCollLock.reset();
|
||||
sourceDbLock.reset();
|
||||
|
||||
// Getting here means we successfully built the target copy. We now do the final
|
||||
// in-place rename and remove the source collection.
|
||||
|
|
@ -1131,8 +1165,6 @@ Status renameCollection(OperationContext* opCtx,
|
|||
if (source.isEqualDb(target))
|
||||
return renameCollectionWithinDB(opCtx, source, target, options);
|
||||
else {
|
||||
// TODO SERVER-99621: Remove this line once renames take locks in proper order.
|
||||
DisableLockerRuntimeOrderingChecks disableChecks{opCtx};
|
||||
return renameCollectionAcrossDatabases(opCtx, source, target, options);
|
||||
}
|
||||
}
|
||||
|
|
@ -1238,8 +1270,6 @@ Status renameCollectionForApplyOps(OperationContext* opCtx,
|
|||
return renameCollectionWithinDBForApplyOps(
|
||||
opCtx, sourceNss, targetNss, uuidToDrop, renameOpTime, options);
|
||||
} else {
|
||||
// TODO SERVER-99621: Remove this line once renames take locks in proper order.
|
||||
DisableLockerRuntimeOrderingChecks disableChecks{opCtx};
|
||||
return renameCollectionAcrossDatabases(opCtx, sourceNss, targetNss, options);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue