SERVER-107535 Set up TransactionRouter state if sub-router in RouterRole::route (#40055) (#43114)

GitOrigin-RevId: b3aa2a362e4c8b1cc5ff43d2dbb88a54cbe10bbe
This commit is contained in:
Janna Golden 2025-10-23 23:11:58 -04:00 committed by MongoDB Bot
parent c16da7c1b7
commit e7eb58d8ae
9 changed files with 277 additions and 2 deletions

View File

@ -871,6 +871,8 @@ last-continuous:
ticket: SERVER-103960
- test_file: jstests/sharding/resharding_commit_monitor_repl_lag.js
ticket: SERVER-103932
- test_file: jstests/sharding/txn_lookup_hits_migration_conflict_all_colls_local.js
ticket: SERVER-107535
suites: null
last-lts:
all:
@ -1804,4 +1806,6 @@ last-lts:
ticket: SERVER-103960
- test_file: jstests/sharding/resharding_commit_monitor_repl_lag.js
ticket: SERVER-103932
- test_file: jstests/sharding/txn_lookup_hits_migration_conflict_all_colls_local.js
ticket: SERVER-107535
suites: null

View File

@ -0,0 +1,75 @@
/**
* Tests that a concurrent migration will cause an aggregation running in a txn to fail even when
* the routing shard can take advantage of the query optimization to skip routing over the network,
* and instead push down to sbe (which occurs when the secondary sharded collection is fully local).
*
* @tags: [
* assumes_balancer_off,
* requires_sharding
* ]
*/
const st = new ShardingTest({mongos: 1, shards: 2});
const dbName = 'test_txn_with_chunk_migration';
const collName1 = 'coll1';
const collName2 = 'coll2';
const ns1 = dbName + '.' + collName1;
const ns2 = dbName + '.' + collName2;
st.s.getDB(dbName).dropDatabase();
let coll1 = st.s.getDB(dbName)[collName1];
let coll2 = st.s.getDB(dbName)[collName2];
st.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName});
st.adminCommand({shardCollection: ns1, key: {a: 1}});
st.adminCommand({shardCollection: ns2, key: {x: 1}});
assert.commandWorked(st.splitAt(ns2, {x: 0}));
assert.commandWorked(st.moveChunk(ns2, {x: -1}, st.shard0.shardName));
assert.commandWorked(st.moveChunk(ns2, {x: 1}, st.shard1.shardName));
assert.commandWorked(coll1.insert({a: 1}));
assert.commandWorked(coll2.insert({x: -1}));
assert.commandWorked(coll2.insert({x: 1}));
{
const session = st.s.startSession();
const sessionDB = session.getDatabase(dbName);
const sessionColl1 = sessionDB.getCollection(collName1);
session.startTransaction();
// Opens snapshot at time T on both shards
sessionColl1.find({a: 1}).itcount();
// Migration at time T+1 - move chunk with x:1 from shard1 to shard0
/*
* Distribution at T:
* shard0: ns2: {x: -1}, ns1: {a: 1}
* shard1: ns2 {x: 1}
* Distribution at T+1 after migration:
* shard0: ns2: {x: -1, x: 1}, ns1: {a: 1}
* shard1: {}
*/
assert.commandWorked(st.moveChunk(ns2, {x: 1}, st.shard0.shardName));
// A non-transactional agg should find: a:1 matches x:1
const lookupPipeline =
[{$lookup: {from: collName2, localField: "a", foreignField: "x", as: "result"}}];
let firstResult =
st.s.getDB(dbName).getCollection(collName1).aggregate(lookupPipeline).toArray();
jsTest.log("First aggregation result: " + tojson(firstResult));
assert.eq(1, firstResult[0].result.length, "First lookup should find match for a:1 with x:1");
// Run the same agg in the open transaction, and assert it fails with MigrationConflict.
assert.commandFailedWithCode(assert.throws(() => sessionColl1.aggregate(lookupPipeline)),
ErrorCodes.MigrationConflict);
// Cleanup
for (let db of [st.shard0.getDB('config'), st.shard1.getDB('config')]) {
assert.commandWorked(db.runCommand({killSessions: [session.id]}));
}
}
st.stop();

View File

@ -58,7 +58,9 @@ std::vector<ScopedSetShardRole> createScopedShardRoles(
auto shardVersion = [&] {
auto sv =
isTracked ? nssCri->second.getShardVersion(myShardId) : ShardVersion::UNSHARDED();
if (auto txnRouter = TransactionRouter::get(opCtx)) {
if (auto txnRouter = TransactionRouter::get(opCtx);
txnRouter && opCtx->inMultiDocumentTransaction()) {
if (auto optOriginalPlacementConflictTime = txnRouter.getPlacementConflictTime()) {
sv.setPlacementConflictTime(*optOriginalPlacementConflictTime);
}

View File

@ -902,7 +902,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> tryAttachCursorSourceForLocalRead(
auto shardVersion = [&] {
auto sv = cm.hasRoutingTable() ? targetingCri.getShardVersion(localShardId)
: ShardVersion::UNSHARDED();
if (auto txnRouter = TransactionRouter::get(opCtx)) {
if (auto txnRouter = TransactionRouter::get(opCtx);
txnRouter && opCtx->inMultiDocumentTransaction()) {
if (auto optOriginalPlacementConflictTime = txnRouter.getPlacementConflictTime()) {
sv.setPlacementConflictTime(*optOriginalPlacementConflictTime);
}

View File

@ -501,6 +501,7 @@ mongo_cc_unit_test(
"//src/mongo/s:mongos_topology_coordinator_test.cpp",
"//src/mongo/s:query_analysis_sample_tracker_test.cpp",
"//src/mongo/s:query_analysis_sampler_test.cpp",
"//src/mongo/s:router_role_test.cpp",
"//src/mongo/s:routing_table_cache_gossip_metadata_hook_test.cpp",
"//src/mongo/s:routing_table_history_test.cpp",
"//src/mongo/s:sessions_collection_sharded_test.cpp",

View File

@ -45,6 +45,7 @@
#include "mongo/s/shard_version.h"
#include "mongo/s/sharding_state.h"
#include "mongo/s/stale_exception.h"
#include "mongo/s/transaction_router.h"
#include "mongo/util/str.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
@ -55,6 +56,19 @@ namespace router {
RouterBase::RouterBase(ServiceContext* service) : _service(service) {}
void RouterBase::_initTxnRouterIfNeeded(OperationContext* opCtx) {
bool activeTxnParticipantAddParticipants =
opCtx->isActiveTransactionParticipant() && opCtx->inMultiDocumentTransaction();
auto txnRouter = TransactionRouter::get(opCtx);
if (txnRouter && activeTxnParticipantAddParticipants) {
auto opCtxTxnNum = opCtx->getTxnNumber();
invariant(opCtxTxnNum);
txnRouter.beginOrContinueTxn(
opCtx, *opCtxTxnNum, TransactionRouter::TransactionActions::kStartOrContinue);
}
}
DBPrimaryRouter::DBPrimaryRouter(ServiceContext* service, const DatabaseName& db)
: RouterBase(service), _dbName(db) {}

View File

@ -56,6 +56,8 @@ protected:
int numAttempts{0};
};
void _initTxnRouterIfNeeded(OperationContext* opCtx);
ServiceContext* const _service;
};
@ -74,6 +76,8 @@ public:
template <typename F>
auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) {
RouteContext context{comment.toString()};
_initTxnRouterIfNeeded(opCtx);
while (true) {
auto cdb = _getRoutingInfo(opCtx);
try {
@ -138,6 +142,7 @@ public:
template <typename F>
auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) {
RouteContext context{comment.toString()};
_initTxnRouterIfNeeded(opCtx);
while (true) {
auto cri = _getRoutingInfo(opCtx, _targetedNamespaces.front());
try {
@ -167,6 +172,7 @@ public:
template <typename F>
auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) {
RouteContext context{comment.toString()};
_initTxnRouterIfNeeded(opCtx);
while (true) {
stdx::unordered_map<NamespaceString, CollectionRoutingInfo> criMap;
for (const auto& nss : _targetedNamespaces) {

View File

@ -0,0 +1,171 @@
/**
* Copyright (C) 2024-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/s/router_role.h"
#include "mongo/base/status.h"
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/db/vector_clock.h"
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/s/catalog_cache_test_fixture.h"
#include "mongo/s/grid.h"
#include "mongo/s/session_catalog_router.h"
#include "mongo/s/shard_version_factory.h"
#include "mongo/s/sharding_mongos_test_fixture.h"
#include "mongo/s/transaction_participant_failed_unyield_exception.h"
#include "mongo/s/transaction_router.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
namespace mongo {
namespace {
/**
* This sub-class does not initialize a session, so the OperationContext will not contain
* a TransactionRouter instance. This allows testing cases when not operating within a transaction.
*/
class RouterRoleTest : public RouterCatalogCacheTestFixture {
public:
void setUp() override {
RouterCatalogCacheTestFixture::setUp();
_nss = NamespaceString::createNamespaceString_forTest("test.foo");
setupNShards(2);
loadRoutingTableWithTwoChunksAndTwoShards(_nss);
}
void tearDown() override {
RouterCatalogCacheTestFixture::tearDown();
}
protected:
NamespaceString _nss;
};
/**
* This sub-class initializes a session within the OperationContext.
*/
class RouterRoleTestTxn : public RouterRoleTest {
public:
void setUp() override {
RouterRoleTest::setUp();
const auto opCtx = operationContext();
opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
_routerOpCtxSession.emplace(opCtx);
// The service needs to be set to the RouterServer service so that the "insert" command can
// be found when attaching txn request fields for the tests in this file.
auto targetService =
operationContext()->getServiceContext()->getService(ClusterRole::RouterServer);
operationContext()->getClient()->setService(targetService);
}
void tearDown() override {
_routerOpCtxSession.reset();
RouterRoleTest::tearDown();
}
void actAsSubRouter(LogicalTime afterClusterTime = LogicalTime(Timestamp(3, 1))) {
// Attach txn request fields for a request to shard0 so that the shard acts as a sub-router.
TxnNumber txnNum{3};
operationContext()->setTxnNumber(txnNum);
operationContext()->setActiveTransactionParticipant();
operationContext()->setInMultiDocumentTransaction();
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(
BSON("insert"
<< "test" << repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kAfterClusterTimeFieldName
<< afterClusterTime.asTimestamp() << repl::ReadConcernArgs::kLevelFieldName
<< "majority"))));
repl::ReadConcernArgs::get(operationContext()) = readConcernArgs;
}
private:
boost::optional<RouterOperationContextSession> _routerOpCtxSession;
};
TEST_F(RouterRoleTestTxn, DBPrimaryRouterSetsPlacementConflictTimeIfSubRouter) {
auto clusterTime = Timestamp(1, 1);
actAsSubRouter(LogicalTime(clusterTime));
sharding::router::DBPrimaryRouter router(getServiceContext(), _nss.dbName());
router.route(
operationContext(), "test", [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) {
auto txnRouter = TransactionRouter::get(opCtx);
ASSERT(txnRouter);
auto placementConflictTime = txnRouter.getPlacementConflictTime();
ASSERT(placementConflictTime);
ASSERT_EQ(placementConflictTime->asTimestamp(), clusterTime);
});
}
TEST_F(RouterRoleTestTxn, CollectionRouterSetsPlacementConflictTimeIfSubRouter) {
auto clusterTime = Timestamp(1, 1);
actAsSubRouter(LogicalTime(clusterTime));
sharding::router::CollectionRouter router(getServiceContext(), _nss);
router.route(
operationContext(), "test", [&](OperationContext* opCtx, const CollectionRoutingInfo& cri) {
auto txnRouter = TransactionRouter::get(opCtx);
ASSERT(txnRouter);
auto placementConflictTime = txnRouter.getPlacementConflictTime();
ASSERT(placementConflictTime);
ASSERT_EQ(placementConflictTime->asTimestamp(), clusterTime);
});
}
TEST_F(RouterRoleTestTxn, MultiCollectionRouterSetsPlacementConflictTimeIfSubRouter) {
auto clusterTime = Timestamp(1, 1);
actAsSubRouter(LogicalTime(clusterTime));
sharding::router::MultiCollectionRouter router(getServiceContext(), {_nss});
router.route(operationContext(),
"test",
[&](OperationContext* opCtx,
stdx::unordered_map<NamespaceString, CollectionRoutingInfo> criMap) {
auto txnRouter = TransactionRouter::get(opCtx);
ASSERT(txnRouter);
auto placementConflictTime = txnRouter.getPlacementConflictTime();
ASSERT(placementConflictTime);
ASSERT_EQ(placementConflictTime->asTimestamp(), clusterTime);
});
}
} // namespace
} // namespace mongo

View File

@ -838,6 +838,7 @@ boost::optional<LogicalTime> TransactionRouter::Router::getSelectedAtClusterTime
}
boost::optional<LogicalTime> TransactionRouter::Router::getPlacementConflictTime() const {
invariant(isInitialized());
return o().placementConflictTimeForNonSnapshotReadConcern.map(
[](const auto& x) { return x.getTime(); });
}