mongo/jstests/sharding/resharding_generate_oplog_e...

281 lines
10 KiB
JavaScript

/**
* This test is used to generate oplog entries that are used by the ReshardingAggTest unit tests to
* validate the pipeline generated by the createOplogFetchingPipelineForResharding function.
*
* @tags: [multiversion_incompatible, uses_transactions, uses_prepare_transaction]
*/
import {
withAbortAndRetryOnTransientTxnError,
withTxnAndAutoRetryOnMongos,
} from "jstests/libs/auto_retry_transaction_in_sharding.js";
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {extractUUIDFromObject, getUUIDFromListCollections} from "jstests/libs/uuid_util.js";
const st = new ShardingTest({
mongos: 1,
shards: 2,
rsOptions: {setParameter: {maxNumberOfTransactionOperationsInSingleOplogEntry: 2}},
});
const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;
const mongos = st.s0;
let testDB = mongos.getDB(dbName);
let tempReshardingColl = "";
// This function sets the environment needed to appear that we are in the middle of a resharding
// operation. The resharding key is set as y and the split is set at 5. So, all test cases below
// will generate a destinedRecipient field based on which side of 5 the value for y is set to.
// Example:
// If the following documents are inserted in a transaction:
// {_id: 2, x: -2, y: 4} => destinedRecipient = shard0 (because y < 5)
// {_id: 3, x: -3, y: 11} => destinedRecipient = shard1 (because y >= 5)
function simulateResharding() {
let uuid = getUUIDFromListCollections(testDB, collName);
tempReshardingColl = "system.resharding." + extractUUIDFromObject(uuid);
const tempReshardingNss = dbName + "." + tempReshardingColl;
assert.commandWorked(testDB.createCollection(tempReshardingColl));
assert.commandWorked(mongos.adminCommand({shardCollection: tempReshardingNss, key: {y: 1}}));
assert.commandWorked(mongos.adminCommand({split: tempReshardingNss, middle: {y: 5}}));
assert.commandWorked(mongos.adminCommand({moveChunk: tempReshardingNss, find: {y: 5}, to: st.shard1.shardName}));
assert.commandWorked(mongos.adminCommand({moveChunk: ns, find: {x: 5}, to: st.shard1.shardName}));
jsTestLog("Updating resharding fields");
let donorReshardingFields = {
"uuid": uuid,
"state": "preparing-to-donate",
"startTime": new Date(),
"donorFields": {
"tempNs": tempReshardingNss,
"reshardingKey": {y: 1},
"recipientShardIds": [st.shard0.shardName, st.shard1.shardName],
},
};
assert.commandWorked(
st.configRS
.getPrimary()
.getDB("config")
.collections.update({_id: ns}, {"$set": {"reshardingFields": donorReshardingFields}}),
);
jsTestLog("Flushing routing table updates");
if (!FeatureFlagUtil.isPresentAndEnabled(st.shard0, "ShardAuthoritativeDbMetadataCRUD")) {
assert.commandWorked(st.shard0.adminCommand({_flushDatabaseCacheUpdates: dbName}));
}
if (!FeatureFlagUtil.isPresentAndEnabled(st.shard1, "ShardAuthoritativeDbMetadataCRUD")) {
assert.commandWorked(st.shard1.adminCommand({_flushDatabaseCacheUpdates: dbName}));
}
assert.commandWorked(st.shard0.adminCommand({_flushRoutingTableCacheUpdates: ns, syncFromConfig: true}));
assert.commandWorked(st.shard1.adminCommand({_flushRoutingTableCacheUpdates: ns, syncFromConfig: true}));
assert.commandWorked(
st.shard0.adminCommand({_flushRoutingTableCacheUpdates: tempReshardingNss, syncFromConfig: true}),
);
assert.commandWorked(
st.shard1.adminCommand({_flushRoutingTableCacheUpdates: tempReshardingNss, syncFromConfig: true}),
);
st.refreshCatalogCacheForNs(mongos, ns);
}
assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {x: 1}}));
assert.commandWorked(mongos.adminCommand({split: ns, middle: {x: 5}}));
simulateResharding();
let primary = st.shard0;
(() => {
jsTestLog("Inserting docs in applyOps");
assert.commandWorked(
primary.getDB(dbName).runCommand({
applyOps: [
{op: "i", ns: ns, o: {_id: 0, x: 2, y: 2}},
{op: "i", ns: ns, o: {_id: 1, x: 3, y: 5}},
],
}),
);
})();
(() => {
jsTestLog("Inserting docs in a small transaction");
let session = testDB.getMongo().startSession();
withTxnAndAutoRetryOnMongos(session, () => {
let sessionDB = session.getDatabase(dbName);
assert.commandWorked(sessionDB.foo.insert({_id: 2, x: -2, y: 4}));
assert.commandWorked(sessionDB.foo.insert({_id: 3, x: -3, y: 11}));
});
session.endSession();
})();
(() => {
jsTestLog("Inserting docs in a large transaction");
let session = testDB.getMongo().startSession();
// maxNumberOfTransactionOperationsInSingleOplogEntry has been set to 2 to force the following
// transaction to be broken up into two oplog entries.
withTxnAndAutoRetryOnMongos(session, () => {
let sessionDB = session.getDatabase(dbName);
assert.commandWorked(sessionDB.foo.insert({_id: 4, x: -20, y: 4}));
assert.commandWorked(sessionDB.foo.insert({_id: 5, x: -30, y: 11}));
assert.commandWorked(sessionDB.foo.insert({_id: 6, x: -40, y: 11}));
});
session.endSession();
})();
(() => {
jsTestLog("Abort insert of docs in a large transaction");
let session = testDB.getMongo().startSession();
let sessionDB = session.getDatabase(dbName);
withAbortAndRetryOnTransientTxnError(session, () => {
session.startTransaction();
assert.commandWorked(sessionDB.foo.insert({_id: 7, x: -2, y: 4}));
assert.commandWorked(sessionDB.foo.insert({_id: 8, x: -3, y: 11}));
assert.commandWorked(sessionDB.foo.insert({_id: 9, x: -40, y: 11}));
session.abortTransaction();
});
session.endSession();
})();
(() => {
jsTestLog("Inserting docs in a prepared transaction");
let session = testDB.getMongo().startSession();
// The TransactionCoordinator will internally use a prepared transaction if the writes target
// multiple shards.
withTxnAndAutoRetryOnMongos(session, () => {
let sessionDB = session.getDatabase(dbName);
assert.commandWorked(sessionDB.foo.insert({_id: 10, x: -4, y: 4}));
assert.commandWorked(sessionDB.foo.insert({_id: 11, x: 10, y: 11}));
});
session.endSession();
})();
(() => {
jsTestLog("Abort inserting docs in a prepared transaction");
let session = testDB.getMongo().startSession();
let sessionDB = session.getDatabase(dbName);
// The TransactionCoordinator will internally use a prepared transaction if the writes target
// multiple shards.
withAbortAndRetryOnTransientTxnError(session, () => {
session.startTransaction();
assert.commandWorked(sessionDB.foo.insert({_id: 12, x: -4, y: 10}));
assert.commandWorked(sessionDB.foo.insert({_id: 13, x: 10, y: 11}));
// Send prepareTransaction directly to all shards to force oplog entries to be written.
st.rs0.getPrimary().adminCommand({
prepareTransaction: 1,
lsid: session.getSessionId(),
txnNumber: NumberLong(0),
autocommit: false,
});
st.rs1.getPrimary().adminCommand({
prepareTransaction: 1,
lsid: session.getSessionId(),
txnNumber: NumberLong(0),
autocommit: false,
});
session.abortTransaction();
});
session.endSession();
})();
(() => {
jsTestLog("Inserting docs in a large prepared transaction");
let session = testDB.getMongo().startSession();
let sessionDB = session.getDatabase(dbName);
// The TransactionCoordinator will internally use a prepared transaction if the writes target
// multiple shards.
withTxnAndAutoRetryOnMongos(session, () => {
let sessionDB = session.getDatabase(dbName);
assert.commandWorked(sessionDB.foo.insert({_id: 14, x: -4, y: 11}));
assert.commandWorked(sessionDB.foo.insert({_id: 15, x: 10, y: 4}));
assert.commandWorked(sessionDB.foo.insert({_id: 16, x: -3, y: 12}));
assert.commandWorked(sessionDB.foo.insert({_id: 17, x: 11, y: 3}));
assert.commandWorked(sessionDB.foo.insert({_id: 18, x: 2, y: 3}));
});
session.endSession();
})();
(() => {
jsTestLog("Abort inserting docs in a large prepared transaction");
let session = testDB.getMongo().startSession();
let sessionDB = session.getDatabase(dbName);
// The TransactionCoordinator will internally use a prepared transaction if the writes target
// multiple shards.
withAbortAndRetryOnTransientTxnError(session, () => {
session.startTransaction();
assert.commandWorked(sessionDB.foo.insert({_id: 19, x: -4, y: 4}));
assert.commandWorked(sessionDB.foo.insert({_id: 20, x: 10, y: 11}));
assert.commandWorked(sessionDB.foo.insert({_id: 21, x: -3, y: 3}));
assert.commandWorked(sessionDB.foo.insert({_id: 22, x: -2, y: 12}));
assert.commandWorked(sessionDB.foo.insert({_id: 23, x: 12, y: 12}));
assert.commandWorked(sessionDB.foo.insert({_id: 24, x: 13, y: 12}));
// Send prepareTransaction directly to all shards to force oplog entries to be written.
st.rs0.getPrimary().adminCommand({
prepareTransaction: 1,
lsid: session.getSessionId(),
txnNumber: NumberLong(0),
autocommit: false,
});
st.rs1.getPrimary().adminCommand({
prepareTransaction: 1,
lsid: session.getSessionId(),
txnNumber: NumberLong(0),
autocommit: false,
});
session.abortTransaction();
});
session.endSession();
})();
let localDB = primary.getDB("local");
let oplog = localDB.oplog.rs.find();
let oplogEntries = oplog.toArray();
jsTestLog("oplog: " + tojson(oplogEntries));
jsTestLog("oplog strict: " + tostrictjson(oplogEntries));
// The temporary reshard collection must be dropped before checking metadata integrity.
assert(testDB[tempReshardingColl].drop());
// Drop the donor/recipient documents created when simulating the resharding.
assert(st.shard0.rs.getPrimary().getDB("config").localReshardingOperations.donor.drop());
assert(st.shard0.rs.getPrimary().getDB("config").localReshardingOperations.recipient.drop());
assert(st.shard1.rs.getPrimary().getDB("config").localReshardingOperations.donor.drop());
assert(st.shard1.rs.getPrimary().getDB("config").localReshardingOperations.recipient.drop());
st.stop();