mirror of https://github.com/mongodb/mongo
281 lines
10 KiB
JavaScript
281 lines
10 KiB
JavaScript
/**
|
|
* This test is used to generate oplog entries that are used by the ReshardingAggTest unit tests to
|
|
* validate the pipeline generated by the createOplogFetchingPipelineForResharding function.
|
|
*
|
|
* @tags: [multiversion_incompatible, uses_transactions, uses_prepare_transaction]
|
|
*/
|
|
import {
|
|
withAbortAndRetryOnTransientTxnError,
|
|
withTxnAndAutoRetryOnMongos,
|
|
} from "jstests/libs/auto_retry_transaction_in_sharding.js";
|
|
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
import {extractUUIDFromObject, getUUIDFromListCollections} from "jstests/libs/uuid_util.js";
|
|
|
|
const st = new ShardingTest({
|
|
mongos: 1,
|
|
shards: 2,
|
|
rsOptions: {setParameter: {maxNumberOfTransactionOperationsInSingleOplogEntry: 2}},
|
|
});
|
|
const dbName = "test";
|
|
const collName = "foo";
|
|
const ns = dbName + "." + collName;
|
|
const mongos = st.s0;
|
|
|
|
let testDB = mongos.getDB(dbName);
|
|
let tempReshardingColl = "";
|
|
|
|
// This function sets the environment needed to appear that we are in the middle of a resharding
|
|
// operation. The resharding key is set as y and the split is set at 5. So, all test cases below
|
|
// will generate a destinedRecipient field based on which side of 5 the value for y is set to.
|
|
// Example:
|
|
// If the following documents are inserted in a transaction:
|
|
// {_id: 2, x: -2, y: 4} => destinedRecipient = shard0 (because y < 5)
|
|
// {_id: 3, x: -3, y: 11} => destinedRecipient = shard1 (because y >= 5)
|
|
function simulateResharding() {
|
|
let uuid = getUUIDFromListCollections(testDB, collName);
|
|
|
|
tempReshardingColl = "system.resharding." + extractUUIDFromObject(uuid);
|
|
const tempReshardingNss = dbName + "." + tempReshardingColl;
|
|
assert.commandWorked(testDB.createCollection(tempReshardingColl));
|
|
assert.commandWorked(mongos.adminCommand({shardCollection: tempReshardingNss, key: {y: 1}}));
|
|
assert.commandWorked(mongos.adminCommand({split: tempReshardingNss, middle: {y: 5}}));
|
|
assert.commandWorked(mongos.adminCommand({moveChunk: tempReshardingNss, find: {y: 5}, to: st.shard1.shardName}));
|
|
|
|
assert.commandWorked(mongos.adminCommand({moveChunk: ns, find: {x: 5}, to: st.shard1.shardName}));
|
|
|
|
jsTestLog("Updating resharding fields");
|
|
let donorReshardingFields = {
|
|
"uuid": uuid,
|
|
"state": "preparing-to-donate",
|
|
"startTime": new Date(),
|
|
"donorFields": {
|
|
"tempNs": tempReshardingNss,
|
|
"reshardingKey": {y: 1},
|
|
"recipientShardIds": [st.shard0.shardName, st.shard1.shardName],
|
|
},
|
|
};
|
|
assert.commandWorked(
|
|
st.configRS
|
|
.getPrimary()
|
|
.getDB("config")
|
|
.collections.update({_id: ns}, {"$set": {"reshardingFields": donorReshardingFields}}),
|
|
);
|
|
|
|
jsTestLog("Flushing routing table updates");
|
|
|
|
if (!FeatureFlagUtil.isPresentAndEnabled(st.shard0, "ShardAuthoritativeDbMetadataCRUD")) {
|
|
assert.commandWorked(st.shard0.adminCommand({_flushDatabaseCacheUpdates: dbName}));
|
|
}
|
|
if (!FeatureFlagUtil.isPresentAndEnabled(st.shard1, "ShardAuthoritativeDbMetadataCRUD")) {
|
|
assert.commandWorked(st.shard1.adminCommand({_flushDatabaseCacheUpdates: dbName}));
|
|
}
|
|
|
|
assert.commandWorked(st.shard0.adminCommand({_flushRoutingTableCacheUpdates: ns, syncFromConfig: true}));
|
|
assert.commandWorked(st.shard1.adminCommand({_flushRoutingTableCacheUpdates: ns, syncFromConfig: true}));
|
|
|
|
assert.commandWorked(
|
|
st.shard0.adminCommand({_flushRoutingTableCacheUpdates: tempReshardingNss, syncFromConfig: true}),
|
|
);
|
|
assert.commandWorked(
|
|
st.shard1.adminCommand({_flushRoutingTableCacheUpdates: tempReshardingNss, syncFromConfig: true}),
|
|
);
|
|
st.refreshCatalogCacheForNs(mongos, ns);
|
|
}
|
|
|
|
assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
|
assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {x: 1}}));
|
|
assert.commandWorked(mongos.adminCommand({split: ns, middle: {x: 5}}));
|
|
|
|
simulateResharding();
|
|
|
|
let primary = st.shard0;
|
|
|
|
(() => {
|
|
jsTestLog("Inserting docs in applyOps");
|
|
|
|
assert.commandWorked(
|
|
primary.getDB(dbName).runCommand({
|
|
applyOps: [
|
|
{op: "i", ns: ns, o: {_id: 0, x: 2, y: 2}},
|
|
{op: "i", ns: ns, o: {_id: 1, x: 3, y: 5}},
|
|
],
|
|
}),
|
|
);
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Inserting docs in a small transaction");
|
|
|
|
let session = testDB.getMongo().startSession();
|
|
|
|
withTxnAndAutoRetryOnMongos(session, () => {
|
|
let sessionDB = session.getDatabase(dbName);
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 2, x: -2, y: 4}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 3, x: -3, y: 11}));
|
|
});
|
|
|
|
session.endSession();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Inserting docs in a large transaction");
|
|
|
|
let session = testDB.getMongo().startSession();
|
|
|
|
// maxNumberOfTransactionOperationsInSingleOplogEntry has been set to 2 to force the following
|
|
// transaction to be broken up into two oplog entries.
|
|
withTxnAndAutoRetryOnMongos(session, () => {
|
|
let sessionDB = session.getDatabase(dbName);
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 4, x: -20, y: 4}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 5, x: -30, y: 11}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 6, x: -40, y: 11}));
|
|
});
|
|
|
|
session.endSession();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Abort insert of docs in a large transaction");
|
|
|
|
let session = testDB.getMongo().startSession();
|
|
let sessionDB = session.getDatabase(dbName);
|
|
|
|
withAbortAndRetryOnTransientTxnError(session, () => {
|
|
session.startTransaction();
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 7, x: -2, y: 4}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 8, x: -3, y: 11}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 9, x: -40, y: 11}));
|
|
session.abortTransaction();
|
|
});
|
|
|
|
session.endSession();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Inserting docs in a prepared transaction");
|
|
|
|
let session = testDB.getMongo().startSession();
|
|
|
|
// The TransactionCoordinator will internally use a prepared transaction if the writes target
|
|
// multiple shards.
|
|
withTxnAndAutoRetryOnMongos(session, () => {
|
|
let sessionDB = session.getDatabase(dbName);
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 10, x: -4, y: 4}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 11, x: 10, y: 11}));
|
|
});
|
|
|
|
session.endSession();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Abort inserting docs in a prepared transaction");
|
|
|
|
let session = testDB.getMongo().startSession();
|
|
let sessionDB = session.getDatabase(dbName);
|
|
|
|
// The TransactionCoordinator will internally use a prepared transaction if the writes target
|
|
// multiple shards.
|
|
withAbortAndRetryOnTransientTxnError(session, () => {
|
|
session.startTransaction();
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 12, x: -4, y: 10}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 13, x: 10, y: 11}));
|
|
|
|
// Send prepareTransaction directly to all shards to force oplog entries to be written.
|
|
st.rs0.getPrimary().adminCommand({
|
|
prepareTransaction: 1,
|
|
lsid: session.getSessionId(),
|
|
txnNumber: NumberLong(0),
|
|
autocommit: false,
|
|
});
|
|
|
|
st.rs1.getPrimary().adminCommand({
|
|
prepareTransaction: 1,
|
|
lsid: session.getSessionId(),
|
|
txnNumber: NumberLong(0),
|
|
autocommit: false,
|
|
});
|
|
|
|
session.abortTransaction();
|
|
});
|
|
|
|
session.endSession();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Inserting docs in a large prepared transaction");
|
|
|
|
let session = testDB.getMongo().startSession();
|
|
let sessionDB = session.getDatabase(dbName);
|
|
|
|
// The TransactionCoordinator will internally use a prepared transaction if the writes target
|
|
// multiple shards.
|
|
withTxnAndAutoRetryOnMongos(session, () => {
|
|
let sessionDB = session.getDatabase(dbName);
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 14, x: -4, y: 11}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 15, x: 10, y: 4}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 16, x: -3, y: 12}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 17, x: 11, y: 3}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 18, x: 2, y: 3}));
|
|
});
|
|
|
|
session.endSession();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Abort inserting docs in a large prepared transaction");
|
|
|
|
let session = testDB.getMongo().startSession();
|
|
let sessionDB = session.getDatabase(dbName);
|
|
|
|
// The TransactionCoordinator will internally use a prepared transaction if the writes target
|
|
// multiple shards.
|
|
withAbortAndRetryOnTransientTxnError(session, () => {
|
|
session.startTransaction();
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 19, x: -4, y: 4}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 20, x: 10, y: 11}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 21, x: -3, y: 3}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 22, x: -2, y: 12}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 23, x: 12, y: 12}));
|
|
assert.commandWorked(sessionDB.foo.insert({_id: 24, x: 13, y: 12}));
|
|
|
|
// Send prepareTransaction directly to all shards to force oplog entries to be written.
|
|
st.rs0.getPrimary().adminCommand({
|
|
prepareTransaction: 1,
|
|
lsid: session.getSessionId(),
|
|
txnNumber: NumberLong(0),
|
|
autocommit: false,
|
|
});
|
|
|
|
st.rs1.getPrimary().adminCommand({
|
|
prepareTransaction: 1,
|
|
lsid: session.getSessionId(),
|
|
txnNumber: NumberLong(0),
|
|
autocommit: false,
|
|
});
|
|
|
|
session.abortTransaction();
|
|
});
|
|
|
|
session.endSession();
|
|
})();
|
|
|
|
let localDB = primary.getDB("local");
|
|
|
|
let oplog = localDB.oplog.rs.find();
|
|
let oplogEntries = oplog.toArray();
|
|
|
|
jsTestLog("oplog: " + tojson(oplogEntries));
|
|
jsTestLog("oplog strict: " + tostrictjson(oplogEntries));
|
|
|
|
// The temporary reshard collection must be dropped before checking metadata integrity.
|
|
assert(testDB[tempReshardingColl].drop());
|
|
|
|
// Drop the donor/recipient documents created when simulating the resharding.
|
|
assert(st.shard0.rs.getPrimary().getDB("config").localReshardingOperations.donor.drop());
|
|
assert(st.shard0.rs.getPrimary().getDB("config").localReshardingOperations.recipient.drop());
|
|
assert(st.shard1.rs.getPrimary().getDB("config").localReshardingOperations.donor.drop());
|
|
assert(st.shard1.rs.getPrimary().getDB("config").localReshardingOperations.recipient.drop());
|
|
|
|
st.stop();
|