mirror of https://github.com/mongodb/mongo
534 lines
15 KiB
JavaScript
534 lines
15 KiB
JavaScript
/**
|
|
* Tests mongos behavior when reading against views in a transaction.
|
|
*
|
|
* @tags: [
|
|
* requires_sharding,
|
|
* uses_multi_shard_transaction,
|
|
* uses_transactions,
|
|
* ]
|
|
*/
|
|
import {arrayEq} from "jstests/aggregation/extras/utils.js";
|
|
import {withTxnAndAutoRetryOnMongos} from "jstests/libs/auto_retry_transaction_in_sharding.js";
|
|
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
import {flushRoutersAndRefreshShardMetadata} from "jstests/sharding/libs/sharded_transactions_helpers.js";
|
|
|
|
const shardedDbName = "shardedDB";
|
|
const shardedCollName = "sharded";
|
|
const shardedViewName = "sharded_view";
|
|
|
|
const unshardedDbName = "unshardedDB";
|
|
const unshardedCollName = "unsharded";
|
|
const unshardedViewName = "unsharded_view";
|
|
|
|
const viewOnShardedViewName = "sharded_view_view";
|
|
|
|
function setUpUnshardedCollectionAndView(st, session, primaryShard) {
|
|
assert.commandWorked(st.s.adminCommand({enableSharding: unshardedDbName, primaryShard: primaryShard}));
|
|
|
|
assert.commandWorked(
|
|
st.s
|
|
.getDB(unshardedDbName)
|
|
[unshardedCollName].insert({_id: 1, x: "unsharded"}, {writeConcern: {w: "majority"}}),
|
|
);
|
|
|
|
const unshardedView = session.getDatabase(unshardedDbName)[unshardedViewName];
|
|
assert.commandWorked(
|
|
unshardedView.runCommand("create", {viewOn: unshardedCollName, pipeline: [], writeConcern: {w: "majority"}}),
|
|
);
|
|
|
|
return unshardedView;
|
|
}
|
|
|
|
function setUpShardedCollectionAndView(st, session, primaryShard) {
|
|
const ns = shardedDbName + "." + shardedCollName;
|
|
|
|
assert.commandWorked(st.s.adminCommand({enableSharding: shardedDbName, primaryShard: primaryShard}));
|
|
assert.commandWorked(
|
|
st.s.getDB(shardedDbName)[shardedCollName].insert({_id: -1, x: "sharded -1"}, {writeConcern: {w: "majority"}}),
|
|
);
|
|
assert.commandWorked(
|
|
st.s.getDB(shardedDbName)[shardedCollName].insert({_id: 1, x: "sharded +1"}, {writeConcern: {w: "majority"}}),
|
|
);
|
|
|
|
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
|
|
assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
|
|
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 1}, to: st.shard1.shardName}));
|
|
|
|
const shardedView = session.getDatabase(shardedDbName)[shardedViewName];
|
|
assert.commandWorked(
|
|
shardedView.runCommand("create", {viewOn: shardedCollName, pipeline: [], writeConcern: {w: "majority"}}),
|
|
);
|
|
|
|
flushRoutersAndRefreshShardMetadata(st, {ns, dbNames: [shardedDbName, unshardedDbName]});
|
|
|
|
return shardedView;
|
|
}
|
|
|
|
const st = new ShardingTest({shards: 2, mongos: 1});
|
|
const session = st.s.startSession();
|
|
|
|
// Set up an unsharded collection on shard0.
|
|
const unshardedView = setUpUnshardedCollectionAndView(st, session, st.shard0.shardName);
|
|
|
|
// Set up a sharded collection with one chunk on each shard in a database with shard0 as its
|
|
// primary shard.
|
|
const shardedView = setUpShardedCollectionAndView(st, session, st.shard0.shardName);
|
|
|
|
// Set up a view on the sharded view, in the same database.
|
|
const viewOnShardedView = session.getDatabase(shardedDbName)[viewOnShardedViewName];
|
|
assert.commandWorked(
|
|
viewOnShardedView.runCommand("create", {viewOn: shardedViewName, pipeline: [], writeConcern: {w: "majority"}}),
|
|
);
|
|
|
|
//
|
|
// The first statement a participant shard receives reading from a view should succeed.
|
|
//
|
|
|
|
function readFromViewOnFirstParticipantStatement(session, view, viewFunc, numDocsExpected) {
|
|
withTxnAndAutoRetryOnMongos(session, () => {
|
|
assert.eq(viewFunc(view), numDocsExpected);
|
|
});
|
|
}
|
|
|
|
// Unsharded view.
|
|
readFromViewOnFirstParticipantStatement(
|
|
session,
|
|
unshardedView,
|
|
(view) => {
|
|
return view.aggregate({$match: {}}).itcount();
|
|
},
|
|
1,
|
|
);
|
|
readFromViewOnFirstParticipantStatement(
|
|
session,
|
|
unshardedView,
|
|
(view) => {
|
|
return view.distinct("_id").length;
|
|
},
|
|
1,
|
|
);
|
|
readFromViewOnFirstParticipantStatement(
|
|
session,
|
|
unshardedView,
|
|
(view) => {
|
|
return view.find().itcount();
|
|
},
|
|
1,
|
|
);
|
|
|
|
// Sharded view.
|
|
readFromViewOnFirstParticipantStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.aggregate({$match: {}}).itcount();
|
|
},
|
|
2,
|
|
);
|
|
readFromViewOnFirstParticipantStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.distinct("_id").length;
|
|
},
|
|
2,
|
|
);
|
|
readFromViewOnFirstParticipantStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.find().itcount();
|
|
},
|
|
2,
|
|
);
|
|
|
|
// View on sharded view.
|
|
readFromViewOnFirstParticipantStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.aggregate({$match: {}}).itcount();
|
|
},
|
|
2,
|
|
);
|
|
readFromViewOnFirstParticipantStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.distinct("_id").length;
|
|
},
|
|
2,
|
|
);
|
|
readFromViewOnFirstParticipantStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.find().itcount();
|
|
},
|
|
2,
|
|
);
|
|
|
|
//
|
|
// A later statement a participant shard receives reading from a view should succeed.
|
|
//
|
|
|
|
function readFromViewOnLaterParticipantStatement(session, view, viewFunc, numDocsExpected) {
|
|
withTxnAndAutoRetryOnMongos(session, () => {
|
|
assert.eq(view.aggregate({$match: {}}).itcount(), numDocsExpected);
|
|
assert.eq(viewFunc(view), numDocsExpected);
|
|
});
|
|
}
|
|
|
|
// Unsharded view.
|
|
readFromViewOnLaterParticipantStatement(
|
|
session,
|
|
unshardedView,
|
|
(view) => {
|
|
return view.aggregate({$match: {}}).itcount();
|
|
},
|
|
1,
|
|
);
|
|
readFromViewOnLaterParticipantStatement(
|
|
session,
|
|
unshardedView,
|
|
(view) => {
|
|
return view.distinct("_id").length;
|
|
},
|
|
1,
|
|
);
|
|
readFromViewOnLaterParticipantStatement(
|
|
session,
|
|
unshardedView,
|
|
(view) => {
|
|
return view.find().itcount();
|
|
},
|
|
1,
|
|
);
|
|
|
|
// Sharded view.
|
|
readFromViewOnLaterParticipantStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.aggregate({$match: {}}).itcount();
|
|
},
|
|
2,
|
|
);
|
|
readFromViewOnLaterParticipantStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.distinct("_id").length;
|
|
},
|
|
2,
|
|
);
|
|
readFromViewOnLaterParticipantStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.find().itcount();
|
|
},
|
|
2,
|
|
);
|
|
|
|
// View on sharded view.
|
|
readFromViewOnLaterParticipantStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.aggregate({$match: {}}).itcount();
|
|
},
|
|
2,
|
|
);
|
|
readFromViewOnLaterParticipantStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.distinct("_id").length;
|
|
},
|
|
2,
|
|
);
|
|
readFromViewOnLaterParticipantStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.find().itcount();
|
|
},
|
|
2,
|
|
);
|
|
|
|
//
|
|
// Transactions on shards that return a view resolution error on the first statement remain
|
|
// aborted if the shard is not targeted by the retry on the resolved namespace.
|
|
//
|
|
// This may happen when reading from a sharded view, because mongos will target the primary
|
|
// shard first to resolve the view, but the retry on the underlying sharded collection is not
|
|
// guaranteed to target the primary again.
|
|
//
|
|
|
|
// Assumes the request in viewFunc does not target the primary shard, Shard0.
|
|
function primaryShardNotReTargeted_FirstStatement(session, view, viewFunc, numDocsExpected) {
|
|
session.startTransaction();
|
|
assert.eq(viewFunc(view), numDocsExpected);
|
|
|
|
// There should not be an in-progress transaction on the primary shard.
|
|
assert.commandFailedWithCode(
|
|
st.rs0
|
|
.getPrimary()
|
|
.getDB("foo")
|
|
.runCommand({
|
|
find: "bar",
|
|
lsid: session.getSessionId(),
|
|
txnNumber: NumberLong(session.getTxnNumber_forTesting()),
|
|
autocommit: false,
|
|
}),
|
|
ErrorCodes.NoSuchTransaction,
|
|
);
|
|
|
|
assert.commandWorked(session.commitTransaction_forTesting());
|
|
|
|
// The transaction should not have been committed on the primary shard.
|
|
assert.commandFailedWithCode(
|
|
st.rs0
|
|
.getPrimary()
|
|
.getDB("foo")
|
|
.runCommand({
|
|
find: "bar",
|
|
lsid: session.getSessionId(),
|
|
txnNumber: NumberLong(session.getTxnNumber_forTesting()),
|
|
autocommit: false,
|
|
}),
|
|
ErrorCodes.NoSuchTransaction,
|
|
);
|
|
}
|
|
|
|
// This is only possible against sharded views.
|
|
primaryShardNotReTargeted_FirstStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.aggregate({$match: {_id: 1}}).itcount();
|
|
},
|
|
1,
|
|
);
|
|
primaryShardNotReTargeted_FirstStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.distinct("_id", {_id: {$gte: 1}}).length;
|
|
},
|
|
1,
|
|
);
|
|
primaryShardNotReTargeted_FirstStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.find({_id: 1}).itcount();
|
|
},
|
|
1,
|
|
);
|
|
|
|
// View on sharded view.
|
|
primaryShardNotReTargeted_FirstStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.aggregate({$match: {_id: 1}}).itcount();
|
|
},
|
|
1,
|
|
);
|
|
primaryShardNotReTargeted_FirstStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.distinct("_id", {_id: {$gte: 1}}).length;
|
|
},
|
|
1,
|
|
);
|
|
primaryShardNotReTargeted_FirstStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.find({_id: 1}).itcount();
|
|
},
|
|
1,
|
|
);
|
|
|
|
//
|
|
// Shards do not abort on a view resolution error if they have already completed a statement for
|
|
// a transaction.
|
|
//
|
|
|
|
// Assumes the primary shard for view is Shard0.
|
|
function primaryShardNotReTargeted_LaterStatement(session, view, viewFunc, numDocsExpected) {
|
|
withTxnAndAutoRetryOnMongos(session, () => {
|
|
// Complete a statement on the primary shard for the view.
|
|
assert.eq(view.aggregate({$match: {_id: -1}}).itcount(), 1);
|
|
// Targets the primary first, but the resolved retry only targets Shard1.
|
|
assert.eq(viewFunc(view), numDocsExpected);
|
|
});
|
|
}
|
|
|
|
// This is only possible against sharded views.
|
|
primaryShardNotReTargeted_LaterStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.aggregate({$match: {_id: 1}}).itcount();
|
|
},
|
|
1,
|
|
);
|
|
primaryShardNotReTargeted_LaterStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.distinct("_id", {_id: {$gte: 1}}).length;
|
|
},
|
|
1,
|
|
);
|
|
primaryShardNotReTargeted_LaterStatement(
|
|
session,
|
|
shardedView,
|
|
(view) => {
|
|
return view.find({_id: 1}).itcount();
|
|
},
|
|
1,
|
|
);
|
|
|
|
// View on sharded view.
|
|
primaryShardNotReTargeted_LaterStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.aggregate({$match: {_id: 1}}).itcount();
|
|
},
|
|
1,
|
|
);
|
|
primaryShardNotReTargeted_LaterStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.distinct("_id", {_id: {$gte: 1}}).length;
|
|
},
|
|
1,
|
|
);
|
|
primaryShardNotReTargeted_LaterStatement(
|
|
session,
|
|
viewOnShardedView,
|
|
(view) => {
|
|
return view.find({_id: 1}).itcount();
|
|
},
|
|
1,
|
|
);
|
|
|
|
//
|
|
// Reading from a view using $lookup and $graphLookup should succeed.
|
|
//
|
|
|
|
function assertAggResultEqInTransaction(coll, pipeline, expected) {
|
|
withTxnAndAutoRetryOnMongos(session, () => {
|
|
const resArray = coll.aggregate(pipeline).toArray();
|
|
assert(arrayEq(resArray, expected), tojson({got: resArray, expected: expected}));
|
|
});
|
|
}
|
|
|
|
// TODO SERVER-88936 Remove this check once last-lts has the feature flag enabled.
|
|
const areAdditionalParticipantsAllowed = FeatureFlagUtil.isPresentAndEnabled(
|
|
st.s.getDB("admin"),
|
|
"AllowAdditionalParticipants",
|
|
);
|
|
if (areAdditionalParticipantsAllowed) {
|
|
// Test unsharded collection lookup.
|
|
let lookupDbName = unshardedDbName;
|
|
const lookupCollName = "collForLookup";
|
|
assert.commandWorked(st.s.getDB(lookupDbName)[lookupCollName].insert({_id: 1}, {writeConcern: {w: "majority"}}));
|
|
let lookupColl = session.getDatabase(lookupDbName)[lookupCollName];
|
|
|
|
// Lookup the document in the unsharded collection with _id: 1 through the unsharded view.
|
|
assertAggResultEqInTransaction(
|
|
lookupColl,
|
|
[
|
|
{$match: {_id: 1}},
|
|
{
|
|
$lookup: {from: unshardedViewName, localField: "_id", foreignField: "_id", as: "matched"},
|
|
},
|
|
{$unwind: "$matched"},
|
|
{$project: {_id: 1, matchedX: "$matched.x"}},
|
|
],
|
|
[{_id: 1, matchedX: "unsharded"}],
|
|
);
|
|
|
|
// Find the same document through the view using $graphLookup.
|
|
assertAggResultEqInTransaction(
|
|
lookupColl,
|
|
[
|
|
{$match: {_id: 1}},
|
|
{
|
|
$graphLookup: {
|
|
from: unshardedViewName,
|
|
startWith: "$_id",
|
|
connectFromField: "_id",
|
|
connectToField: "_id",
|
|
as: "matched",
|
|
},
|
|
},
|
|
{$unwind: "$matched"},
|
|
{$project: {_id: 1, matchedX: "$matched.x"}},
|
|
],
|
|
[{_id: 1, matchedX: "unsharded"}],
|
|
);
|
|
|
|
// We now proceed to test sharded collection lookup. We do this by reusing the same collection
|
|
// here but sharding it first.
|
|
lookupDbName = shardedDbName;
|
|
lookupColl = session.getDatabase(lookupDbName)[lookupCollName];
|
|
assert.commandWorked(st.s.getDB(lookupDbName)[lookupCollName].insert({_id: 1}, {writeConcern: {w: "majority"}}));
|
|
assert.commandWorked(st.s.getDB(lookupDbName)[lookupCollName].insert({_id: -1}, {writeConcern: {w: "majority"}}));
|
|
|
|
assert.commandWorked(st.s.adminCommand({shardCollection: lookupColl.getFullName(), key: {_id: 1}}));
|
|
assert.commandWorked(st.s.adminCommand({split: lookupColl.getFullName(), middle: {_id: 0}}));
|
|
assert.commandWorked(
|
|
st.s.adminCommand({moveChunk: lookupColl.getFullName(), find: {_id: 1}, to: st.shard1.shardName}),
|
|
);
|
|
|
|
// Lookup the documents in the now sharded collection through the sharded view.
|
|
assertAggResultEqInTransaction(
|
|
lookupColl,
|
|
[
|
|
{
|
|
$lookup: {from: shardedViewName, localField: "_id", foreignField: "_id", as: "matched"},
|
|
},
|
|
{$unwind: "$matched"},
|
|
{$project: {_id: 1, matchedX: "$matched.x"}},
|
|
],
|
|
[
|
|
{_id: 1, matchedX: "sharded +1"},
|
|
{_id: -1, matchedX: "sharded -1"},
|
|
],
|
|
);
|
|
|
|
// Find the same documents through the view using $graphLookup.
|
|
assertAggResultEqInTransaction(
|
|
lookupColl,
|
|
[
|
|
{
|
|
$graphLookup: {
|
|
from: shardedViewName,
|
|
startWith: "$_id",
|
|
connectFromField: "_id",
|
|
connectToField: "_id",
|
|
as: "matched",
|
|
},
|
|
},
|
|
{$unwind: "$matched"},
|
|
{$project: {_id: 1, matchedX: "$matched.x"}},
|
|
],
|
|
[
|
|
{_id: 1, matchedX: "sharded +1"},
|
|
{_id: -1, matchedX: "sharded -1"},
|
|
],
|
|
);
|
|
}
|
|
st.stop();
|