diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 10399c44be9..fdd0da206e0 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -871,6 +871,8 @@ last-continuous: ticket: SERVER-103960 - test_file: jstests/sharding/resharding_commit_monitor_repl_lag.js ticket: SERVER-103932 + - test_file: jstests/sharding/txn_lookup_hits_migration_conflict_all_colls_local.js + ticket: SERVER-107535 suites: null last-lts: all: @@ -1804,4 +1806,6 @@ last-lts: ticket: SERVER-103960 - test_file: jstests/sharding/resharding_commit_monitor_repl_lag.js ticket: SERVER-103932 + - test_file: jstests/sharding/txn_lookup_hits_migration_conflict_all_colls_local.js + ticket: SERVER-107535 suites: null diff --git a/jstests/sharding/txn_lookup_hits_migration_conflict_all_colls_local.js b/jstests/sharding/txn_lookup_hits_migration_conflict_all_colls_local.js new file mode 100644 index 00000000000..0489d4843d4 --- /dev/null +++ b/jstests/sharding/txn_lookup_hits_migration_conflict_all_colls_local.js @@ -0,0 +1,75 @@ +/** + * Tests that a concurrent migration will cause an aggregation running in a txn to fail even when + * the routing shard can take advantage of the query optimization to skip routing over the network, + * and instead push down to sbe (which occurs when the secondary sharded collection is fully local). + * + * @tags: [ + * assumes_balancer_off, + * requires_sharding + * ] + */ + +const st = new ShardingTest({mongos: 1, shards: 2}); + +const dbName = 'test_txn_with_chunk_migration'; +const collName1 = 'coll1'; +const collName2 = 'coll2'; +const ns1 = dbName + '.' + collName1; +const ns2 = dbName + '.' + collName2; + +st.s.getDB(dbName).dropDatabase(); + +let coll1 = st.s.getDB(dbName)[collName1]; +let coll2 = st.s.getDB(dbName)[collName2]; + +st.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}); +st.adminCommand({shardCollection: ns1, key: {a: 1}}); +st.adminCommand({shardCollection: ns2, key: {x: 1}}); +assert.commandWorked(st.splitAt(ns2, {x: 0})); +assert.commandWorked(st.moveChunk(ns2, {x: -1}, st.shard0.shardName)); +assert.commandWorked(st.moveChunk(ns2, {x: 1}, st.shard1.shardName)); + +assert.commandWorked(coll1.insert({a: 1})); +assert.commandWorked(coll2.insert({x: -1})); +assert.commandWorked(coll2.insert({x: 1})); + +{ + const session = st.s.startSession(); + const sessionDB = session.getDatabase(dbName); + const sessionColl1 = sessionDB.getCollection(collName1); + + session.startTransaction(); + + // Opens snapshot at time T on both shards + sessionColl1.find({a: 1}).itcount(); + + // Migration at time T+1 - move chunk with x:1 from shard1 to shard0 + /* + * Distribution at T: + * shard0: ns2: {x: -1}, ns1: {a: 1} + * shard1: ns2 {x: 1} + * Distribution at T+1 after migration: + * shard0: ns2: {x: -1, x: 1}, ns1: {a: 1} + * shard1: {} + */ + assert.commandWorked(st.moveChunk(ns2, {x: 1}, st.shard0.shardName)); + + // A non-transactional agg should find: a:1 matches x:1 + const lookupPipeline = + [{$lookup: {from: collName2, localField: "a", foreignField: "x", as: "result"}}]; + let firstResult = + st.s.getDB(dbName).getCollection(collName1).aggregate(lookupPipeline).toArray(); + jsTest.log("First aggregation result: " + tojson(firstResult)); + assert.eq(1, firstResult[0].result.length, "First lookup should find match for a:1 with x:1"); + + // Run the same agg in the open transaction, and assert it fails with MigrationConflict. + assert.commandFailedWithCode(assert.throws(() => sessionColl1.aggregate(lookupPipeline)), + ErrorCodes.MigrationConflict); + + // Cleanup + for (let db of [st.shard0.getDB('config'), st.shard1.getDB('config')]) { + assert.commandWorked(db.runCommand({killSessions: [session.id]})); + } +} + +st.stop(); \ No newline at end of file diff --git a/src/mongo/db/pipeline/initialize_auto_get_helper.h b/src/mongo/db/pipeline/initialize_auto_get_helper.h index 71d045ca8d5..d074ba853ef 100644 --- a/src/mongo/db/pipeline/initialize_auto_get_helper.h +++ b/src/mongo/db/pipeline/initialize_auto_get_helper.h @@ -58,7 +58,9 @@ std::vector createScopedShardRoles( auto shardVersion = [&] { auto sv = isTracked ? nssCri->second.getShardVersion(myShardId) : ShardVersion::UNSHARDED(); - if (auto txnRouter = TransactionRouter::get(opCtx)) { + + if (auto txnRouter = TransactionRouter::get(opCtx); + txnRouter && opCtx->inMultiDocumentTransaction()) { if (auto optOriginalPlacementConflictTime = txnRouter.getPlacementConflictTime()) { sv.setPlacementConflictTime(*optOriginalPlacementConflictTime); } diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 555cbfbd435..b709c101332 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -902,7 +902,8 @@ std::unique_ptr tryAttachCursorSourceForLocalRead( auto shardVersion = [&] { auto sv = cm.hasRoutingTable() ? targetingCri.getShardVersion(localShardId) : ShardVersion::UNSHARDED(); - if (auto txnRouter = TransactionRouter::get(opCtx)) { + if (auto txnRouter = TransactionRouter::get(opCtx); + txnRouter && opCtx->inMultiDocumentTransaction()) { if (auto optOriginalPlacementConflictTime = txnRouter.getPlacementConflictTime()) { sv.setPlacementConflictTime(*optOriginalPlacementConflictTime); } diff --git a/src/mongo/s/BUILD.bazel b/src/mongo/s/BUILD.bazel index 4f22683c727..e16dc183110 100644 --- a/src/mongo/s/BUILD.bazel +++ b/src/mongo/s/BUILD.bazel @@ -501,6 +501,7 @@ mongo_cc_unit_test( "//src/mongo/s:mongos_topology_coordinator_test.cpp", "//src/mongo/s:query_analysis_sample_tracker_test.cpp", "//src/mongo/s:query_analysis_sampler_test.cpp", + "//src/mongo/s:router_role_test.cpp", "//src/mongo/s:routing_table_cache_gossip_metadata_hook_test.cpp", "//src/mongo/s:routing_table_history_test.cpp", "//src/mongo/s:sessions_collection_sharded_test.cpp", diff --git a/src/mongo/s/router_role.cpp b/src/mongo/s/router_role.cpp index 69f3b67250d..84d7fe98e30 100644 --- a/src/mongo/s/router_role.cpp +++ b/src/mongo/s/router_role.cpp @@ -45,6 +45,7 @@ #include "mongo/s/shard_version.h" #include "mongo/s/sharding_state.h" #include "mongo/s/stale_exception.h" +#include "mongo/s/transaction_router.h" #include "mongo/util/str.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding @@ -55,6 +56,19 @@ namespace router { RouterBase::RouterBase(ServiceContext* service) : _service(service) {} +void RouterBase::_initTxnRouterIfNeeded(OperationContext* opCtx) { + bool activeTxnParticipantAddParticipants = + opCtx->isActiveTransactionParticipant() && opCtx->inMultiDocumentTransaction(); + + auto txnRouter = TransactionRouter::get(opCtx); + if (txnRouter && activeTxnParticipantAddParticipants) { + auto opCtxTxnNum = opCtx->getTxnNumber(); + invariant(opCtxTxnNum); + txnRouter.beginOrContinueTxn( + opCtx, *opCtxTxnNum, TransactionRouter::TransactionActions::kStartOrContinue); + } +} + DBPrimaryRouter::DBPrimaryRouter(ServiceContext* service, const DatabaseName& db) : RouterBase(service), _dbName(db) {} diff --git a/src/mongo/s/router_role.h b/src/mongo/s/router_role.h index 254bec3e0f8..115f1b604e0 100644 --- a/src/mongo/s/router_role.h +++ b/src/mongo/s/router_role.h @@ -56,6 +56,8 @@ protected: int numAttempts{0}; }; + void _initTxnRouterIfNeeded(OperationContext* opCtx); + ServiceContext* const _service; }; @@ -74,6 +76,8 @@ public: template auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) { RouteContext context{comment.toString()}; + _initTxnRouterIfNeeded(opCtx); + while (true) { auto cdb = _getRoutingInfo(opCtx); try { @@ -138,6 +142,7 @@ public: template auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) { RouteContext context{comment.toString()}; + _initTxnRouterIfNeeded(opCtx); while (true) { auto cri = _getRoutingInfo(opCtx, _targetedNamespaces.front()); try { @@ -167,6 +172,7 @@ public: template auto route(OperationContext* opCtx, StringData comment, F&& callbackFn) { RouteContext context{comment.toString()}; + _initTxnRouterIfNeeded(opCtx); while (true) { stdx::unordered_map criMap; for (const auto& nss : _targetedNamespaces) { diff --git a/src/mongo/s/router_role_test.cpp b/src/mongo/s/router_role_test.cpp new file mode 100644 index 00000000000..167329e5859 --- /dev/null +++ b/src/mongo/s/router_role_test.cpp @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2024-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/router_role.h" + +#include "mongo/base/status.h" +#include "mongo/db/commands.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/query/cursor_response.h" +#include "mongo/db/service_context_test_fixture.h" +#include "mongo/db/vector_clock.h" +#include "mongo/idl/server_parameter_test_util.h" +#include "mongo/s/catalog_cache_test_fixture.h" +#include "mongo/s/grid.h" +#include "mongo/s/session_catalog_router.h" +#include "mongo/s/shard_version_factory.h" +#include "mongo/s/sharding_mongos_test_fixture.h" +#include "mongo/s/transaction_participant_failed_unyield_exception.h" +#include "mongo/s/transaction_router.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +namespace { + +/** + * This sub-class does not initialize a session, so the OperationContext will not contain + * a TransactionRouter instance. This allows testing cases when not operating within a transaction. + */ +class RouterRoleTest : public RouterCatalogCacheTestFixture { +public: + void setUp() override { + RouterCatalogCacheTestFixture::setUp(); + + _nss = NamespaceString::createNamespaceString_forTest("test.foo"); + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(_nss); + } + + void tearDown() override { + RouterCatalogCacheTestFixture::tearDown(); + } + +protected: + NamespaceString _nss; +}; + +/** + * This sub-class initializes a session within the OperationContext. + */ +class RouterRoleTestTxn : public RouterRoleTest { +public: + void setUp() override { + RouterRoleTest::setUp(); + + const auto opCtx = operationContext(); + opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + _routerOpCtxSession.emplace(opCtx); + + // The service needs to be set to the RouterServer service so that the "insert" command can + // be found when attaching txn request fields for the tests in this file. + auto targetService = + operationContext()->getServiceContext()->getService(ClusterRole::RouterServer); + operationContext()->getClient()->setService(targetService); + } + + void tearDown() override { + _routerOpCtxSession.reset(); + RouterRoleTest::tearDown(); + } + + void actAsSubRouter(LogicalTime afterClusterTime = LogicalTime(Timestamp(3, 1))) { + // Attach txn request fields for a request to shard0 so that the shard acts as a sub-router. + TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); + operationContext()->setActiveTransactionParticipant(); + operationContext()->setInMultiDocumentTransaction(); + repl::ReadConcernArgs readConcernArgs; + ASSERT_OK(readConcernArgs.initialize( + BSON("insert" + << "test" << repl::ReadConcernArgs::kReadConcernFieldName + << BSON(repl::ReadConcernArgs::kAfterClusterTimeFieldName + << afterClusterTime.asTimestamp() << repl::ReadConcernArgs::kLevelFieldName + << "majority")))); + repl::ReadConcernArgs::get(operationContext()) = readConcernArgs; + } + +private: + boost::optional _routerOpCtxSession; +}; + +TEST_F(RouterRoleTestTxn, DBPrimaryRouterSetsPlacementConflictTimeIfSubRouter) { + auto clusterTime = Timestamp(1, 1); + actAsSubRouter(LogicalTime(clusterTime)); + + sharding::router::DBPrimaryRouter router(getServiceContext(), _nss.dbName()); + router.route( + operationContext(), "test", [&](OperationContext* opCtx, const CachedDatabaseInfo& cdb) { + auto txnRouter = TransactionRouter::get(opCtx); + ASSERT(txnRouter); + + auto placementConflictTime = txnRouter.getPlacementConflictTime(); + ASSERT(placementConflictTime); + ASSERT_EQ(placementConflictTime->asTimestamp(), clusterTime); + }); +} + +TEST_F(RouterRoleTestTxn, CollectionRouterSetsPlacementConflictTimeIfSubRouter) { + auto clusterTime = Timestamp(1, 1); + actAsSubRouter(LogicalTime(clusterTime)); + + sharding::router::CollectionRouter router(getServiceContext(), _nss); + router.route( + operationContext(), "test", [&](OperationContext* opCtx, const CollectionRoutingInfo& cri) { + auto txnRouter = TransactionRouter::get(opCtx); + ASSERT(txnRouter); + + auto placementConflictTime = txnRouter.getPlacementConflictTime(); + ASSERT(placementConflictTime); + ASSERT_EQ(placementConflictTime->asTimestamp(), clusterTime); + }); +} + +TEST_F(RouterRoleTestTxn, MultiCollectionRouterSetsPlacementConflictTimeIfSubRouter) { + auto clusterTime = Timestamp(1, 1); + actAsSubRouter(LogicalTime(clusterTime)); + + sharding::router::MultiCollectionRouter router(getServiceContext(), {_nss}); + router.route(operationContext(), + "test", + [&](OperationContext* opCtx, + stdx::unordered_map criMap) { + auto txnRouter = TransactionRouter::get(opCtx); + ASSERT(txnRouter); + + auto placementConflictTime = txnRouter.getPlacementConflictTime(); + ASSERT(placementConflictTime); + ASSERT_EQ(placementConflictTime->asTimestamp(), clusterTime); + }); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 6838811c804..44e83518837 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -838,6 +838,7 @@ boost::optional TransactionRouter::Router::getSelectedAtClusterTime } boost::optional TransactionRouter::Router::getPlacementConflictTime() const { + invariant(isInitialized()); return o().placementConflictTimeForNonSnapshotReadConcern.map( [](const auto& x) { return x.getTime(); }); }