mirror of https://github.com/mongodb/mongo
618 lines
20 KiB
JavaScript
618 lines
20 KiB
JavaScript
/**
|
|
* Tests that bulk writes during a shard split correctly report write errors and
|
|
* retries writes that returned TenantMigrationCommitted.
|
|
*
|
|
* Shard split are not expected to be run on servers with ephemeralForTest.
|
|
*
|
|
* @tags: [
|
|
* incompatible_with_eft,
|
|
* incompatible_with_macos,
|
|
* incompatible_with_windows_tls,
|
|
* requires_majority_read_concern,
|
|
* requires_persistence,
|
|
* serverless,
|
|
* requires_fcv_63
|
|
* ]
|
|
*/
|
|
|
|
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
|
import {Thread} from "jstests/libs/parallelTester.js";
|
|
import {assertMigrationState, ShardSplitTest} from "jstests/serverless/libs/shard_split_test.js";
|
|
|
|
const kMaxBatchSize = 2;
|
|
const kCollName = "testColl";
|
|
const kTenantDefinedDbName = "0";
|
|
const kNumWriteOps = 6; // num of writes to run in bulk.
|
|
const kNumWriteBatchesWithoutMigrationConflict =
|
|
2; // num of write batches we allow to complete before split blocks writes.
|
|
const kNumUpdatesWithoutMigrationConflict = 2;
|
|
const kMaxSleepTimeMS = 1000;
|
|
|
|
function setup() {
|
|
const test = new ShardSplitTest({
|
|
recipientTagName: "recipientTag",
|
|
recipientSetName: "recipientSet",
|
|
quickGarbageCollection: true,
|
|
nodeOptions: {
|
|
setParameter: {
|
|
internalInsertMaxBatchSize:
|
|
kMaxBatchSize, /* Decrease internal max batch size so we can still show writes
|
|
are batched without inserting hundreds of documents. */
|
|
// Allow non-timestamped reads on donor after split completes for testing.
|
|
'failpoint.tenantMigrationDonorAllowsNonTimestampedReads':
|
|
tojson({mode: 'alwaysOn'}),
|
|
}
|
|
}
|
|
});
|
|
test.addRecipientNodes();
|
|
|
|
return test;
|
|
}
|
|
|
|
function bulkInsertDocsOrdered(primaryHost, dbName, collName, numDocs) {
|
|
const primary = new Mongo(primaryHost);
|
|
let primaryDB = primary.getDB(dbName);
|
|
let bulk = primaryDB[collName].initializeOrderedBulkOp();
|
|
for (let i = 0; i < numDocs; ++i) {
|
|
bulk.insert({x: i});
|
|
}
|
|
|
|
let res;
|
|
try {
|
|
res = bulk.execute();
|
|
} catch (e) {
|
|
res = e;
|
|
}
|
|
return {res: res.getRawResponse(), ops: bulk.getOperations()};
|
|
}
|
|
|
|
function bulkInsertDocsUnordered(primaryHost, dbName, collName, numDocs) {
|
|
const primary = new Mongo(primaryHost);
|
|
let primaryDB = primary.getDB(dbName);
|
|
let bulk = primaryDB[collName].initializeUnorderedBulkOp();
|
|
for (let i = 0; i < numDocs; ++i) {
|
|
bulk.insert({x: i});
|
|
}
|
|
|
|
let res;
|
|
try {
|
|
res = bulk.execute();
|
|
} catch (e) {
|
|
res = e;
|
|
}
|
|
return {res: res.getRawResponse(), ops: bulk.getOperations()};
|
|
}
|
|
|
|
function bulkMultiUpdateDocsOrdered(primaryHost, dbName, collName, numDocs) {
|
|
const primary = new Mongo(primaryHost);
|
|
let primaryDB = primary.getDB(dbName);
|
|
|
|
// Insert initial docs to be updated.
|
|
let insertBulk = primaryDB[collName].initializeOrderedBulkOp();
|
|
for (let i = 0; i < numDocs; ++i) {
|
|
insertBulk.insert({x: i});
|
|
}
|
|
insertBulk.execute();
|
|
|
|
let updateBulk = primaryDB[collName].initializeOrderedBulkOp();
|
|
for (let i = 0; i < numDocs; ++i) {
|
|
updateBulk.find({x: i}).update({$set: {ordered_update: true}});
|
|
}
|
|
|
|
let res;
|
|
try {
|
|
res = updateBulk.execute();
|
|
} catch (e) {
|
|
res = e;
|
|
}
|
|
return {res: res.getRawResponse ? res.getRawResponse() : res, ops: updateBulk.getOperations()};
|
|
}
|
|
|
|
function bulkMultiUpdateDocsUnordered(primaryHost, dbName, collName, numDocs) {
|
|
const primary = new Mongo(primaryHost);
|
|
let primaryDB = primary.getDB(dbName);
|
|
|
|
// Insert initial docs to be updated.
|
|
let insertBulk = primaryDB[collName].initializeOrderedBulkOp();
|
|
for (let i = 0; i < numDocs; ++i) {
|
|
insertBulk.insert({x: i});
|
|
}
|
|
insertBulk.execute();
|
|
|
|
let updateBulk = primaryDB[collName].initializeUnorderedBulkOp();
|
|
for (let i = 0; i < numDocs; ++i) {
|
|
updateBulk.find({x: i}).update({$set: {unordered_update: true}});
|
|
}
|
|
|
|
let res;
|
|
try {
|
|
res = updateBulk.execute();
|
|
} catch (e) {
|
|
res = e;
|
|
}
|
|
return {res: res.getRawResponse ? res.getRawResponse() : res, ops: updateBulk.getOperations()};
|
|
}
|
|
|
|
(() => {
|
|
jsTestLog("Testing unordered bulk insert against a shard split that commits.");
|
|
|
|
const test = setup();
|
|
|
|
const tenantId = ObjectId();
|
|
|
|
const dbName = test.tenantDB(tenantId.str, kTenantDefinedDbName);
|
|
const primary = test.getDonorPrimary();
|
|
const primaryDB = primary.getDB(dbName);
|
|
|
|
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
|
|
|
|
const writeFp = configureFailPoint(
|
|
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
|
|
const bulkWriteThread =
|
|
new Thread(bulkInsertDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
|
|
|
|
bulkWriteThread.start();
|
|
writeFp.wait();
|
|
|
|
assert.commandWorked(test.createSplitOperation([tenantId]).commit());
|
|
|
|
writeFp.off();
|
|
bulkWriteThread.join();
|
|
|
|
const bulkWriteRes = bulkWriteThread.returnData();
|
|
const writeErrors = bulkWriteRes.res.writeErrors;
|
|
|
|
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
|
|
assert.eq(writeErrors.length,
|
|
(kNumWriteOps - (kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict)));
|
|
|
|
let expectedErrorIndex = kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict;
|
|
writeErrors.forEach((err, arrIndex) => {
|
|
assert.eq(err.code, ErrorCodes.TenantMigrationCommitted);
|
|
assert.eq(err.index, expectedErrorIndex++);
|
|
if (arrIndex == 0) {
|
|
assert(err.errmsg);
|
|
} else {
|
|
assert(!err.errmsg);
|
|
}
|
|
});
|
|
test.stop();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog(
|
|
"Testing unordered bulk insert against a shard split that blocks a few inserts and commits.");
|
|
|
|
const test = setup();
|
|
|
|
const tenantId = ObjectId();
|
|
|
|
const dbName = test.tenantDB(tenantId.str, kTenantDefinedDbName);
|
|
const primary = test.getDonorPrimary();
|
|
const primaryDB = primary.getDB(dbName);
|
|
|
|
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
|
|
|
|
const writeFp = configureFailPoint(
|
|
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
|
|
const bulkWriteThread =
|
|
new Thread(bulkInsertDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
|
|
|
|
const blockFp = configureFailPoint(primaryDB, "pauseShardSplitAfterBlocking");
|
|
|
|
const operation = test.createSplitOperation([tenantId]);
|
|
|
|
bulkWriteThread.start();
|
|
writeFp.wait();
|
|
|
|
const splitThread = operation.commitAsync();
|
|
blockFp.wait();
|
|
|
|
writeFp.off();
|
|
sleep(Math.random() * kMaxSleepTimeMS);
|
|
blockFp.off();
|
|
|
|
bulkWriteThread.join();
|
|
splitThread.join();
|
|
|
|
assert.commandWorked(splitThread.returnData());
|
|
|
|
let bulkWriteRes = bulkWriteThread.returnData();
|
|
let writeErrors = bulkWriteRes.res.writeErrors;
|
|
|
|
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
|
|
assert.eq(writeErrors.length,
|
|
(kNumWriteOps - (kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict)));
|
|
|
|
let expectedErrorIndex = kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict;
|
|
writeErrors.forEach((err, index) => {
|
|
assert.eq(err.code, ErrorCodes.TenantMigrationCommitted);
|
|
assert.eq(err.index, expectedErrorIndex++);
|
|
if (index == 0) {
|
|
assert.eq(err.errmsg,
|
|
"Write or read must be re-routed to the new owner of this tenant");
|
|
} else {
|
|
assert.eq(err.errmsg, "");
|
|
}
|
|
});
|
|
test.stop();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Testing unordered bulk insert against a shard split that aborts.");
|
|
|
|
const test = setup();
|
|
|
|
const tenantId = ObjectId();
|
|
|
|
const dbName = test.tenantDB(tenantId.str, kTenantDefinedDbName);
|
|
const primary = test.getDonorPrimary();
|
|
const primaryDB = primary.getDB(dbName);
|
|
|
|
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
|
|
|
|
const writeFp = configureFailPoint(
|
|
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
|
|
const bulkWriteThread =
|
|
new Thread(bulkInsertDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
|
|
|
|
// The failpoint below is used to ensure that a write to throw
|
|
// TenantMigrationConflict in the op observer. Without this failpoint, the split
|
|
// could have already aborted by the time the write gets to the op observer.
|
|
const blockFp = configureFailPoint(primaryDB, "pauseShardSplitAfterBlocking");
|
|
const operation = test.createSplitOperation([tenantId]);
|
|
|
|
bulkWriteThread.start();
|
|
writeFp.wait();
|
|
|
|
const splitThread = operation.commitAsync();
|
|
blockFp.wait();
|
|
|
|
writeFp.off();
|
|
sleep(Math.random() * kMaxSleepTimeMS);
|
|
|
|
operation.abort();
|
|
blockFp.off();
|
|
|
|
bulkWriteThread.join();
|
|
splitThread.join();
|
|
|
|
assert.commandFailed(splitThread.returnData());
|
|
assertMigrationState(primary, operation.migrationId, "aborted");
|
|
|
|
const bulkWriteRes = bulkWriteThread.returnData();
|
|
const writeErrors = bulkWriteRes.res.writeErrors;
|
|
|
|
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
|
|
assert.eq(writeErrors.length,
|
|
(kNumWriteOps - (kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict)));
|
|
|
|
let expectedErrorIndex = kMaxBatchSize * kNumWriteBatchesWithoutMigrationConflict;
|
|
writeErrors.forEach((err, arrIndex) => {
|
|
assert.eq(err.code, ErrorCodes.TenantMigrationAborted);
|
|
assert.eq(err.index, expectedErrorIndex++);
|
|
if (arrIndex == 0) {
|
|
assert(err.errmsg);
|
|
} else {
|
|
assert(!err.errmsg);
|
|
}
|
|
});
|
|
test.stop();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Testing ordered bulk inserts against a shard split that commits.");
|
|
|
|
const test = setup();
|
|
|
|
const tenantId = ObjectId();
|
|
|
|
const dbName = test.tenantDB(tenantId.str, kTenantDefinedDbName);
|
|
const primary = test.getDonorPrimary();
|
|
const primaryDB = primary.getDB(dbName);
|
|
|
|
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
|
|
|
|
const writeFp = configureFailPoint(
|
|
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
|
|
const bulkWriteThread =
|
|
new Thread(bulkInsertDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
|
|
|
|
bulkWriteThread.start();
|
|
writeFp.wait();
|
|
|
|
assert.commandWorked(test.createSplitOperation([tenantId]).commit());
|
|
|
|
writeFp.off();
|
|
bulkWriteThread.join();
|
|
|
|
const bulkWriteRes = bulkWriteThread.returnData();
|
|
const writeErrors = bulkWriteRes.res.writeErrors;
|
|
|
|
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
|
|
assert.eq(writeErrors.length, 1);
|
|
assert(writeErrors[0].errmsg);
|
|
|
|
// The single write error should correspond to the first write after the split
|
|
// started blocking writes.
|
|
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
|
|
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted);
|
|
test.stop();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog(
|
|
"Testing ordered bulk insert against a shard split that blocks a few inserts and commits.");
|
|
|
|
const test = setup();
|
|
|
|
const tenantId = ObjectId();
|
|
|
|
const dbName = test.tenantDB(tenantId.str, kTenantDefinedDbName);
|
|
const primary = test.getDonorPrimary();
|
|
const primaryDB = primary.getDB(dbName);
|
|
|
|
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
|
|
|
|
const writeFp = configureFailPoint(
|
|
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
|
|
const bulkWriteThread =
|
|
new Thread(bulkInsertDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
|
|
|
|
const blockFp = configureFailPoint(primaryDB, "pauseShardSplitAfterBlocking");
|
|
|
|
const operation = test.createSplitOperation([tenantId]);
|
|
|
|
bulkWriteThread.start();
|
|
writeFp.wait();
|
|
|
|
const splitThread = operation.commitAsync();
|
|
|
|
blockFp.wait();
|
|
|
|
writeFp.off();
|
|
sleep(Math.random() * kMaxSleepTimeMS);
|
|
blockFp.off();
|
|
|
|
bulkWriteThread.join();
|
|
splitThread.join();
|
|
|
|
assert.commandWorked(splitThread.returnData());
|
|
|
|
const bulkWriteRes = bulkWriteThread.returnData();
|
|
const writeErrors = bulkWriteRes.res.writeErrors;
|
|
|
|
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
|
|
assert.eq(writeErrors.length, 1);
|
|
assert(writeErrors[0].errmsg);
|
|
|
|
// The single write error should correspond to the first write after the split
|
|
// started blocking writes.
|
|
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
|
|
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted);
|
|
test.stop();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Testing ordered bulk write against a shard split that aborts.");
|
|
|
|
const test = setup();
|
|
|
|
const tenantId = ObjectId();
|
|
|
|
const dbName = test.tenantDB(tenantId.str, kTenantDefinedDbName);
|
|
const primary = test.getDonorPrimary();
|
|
const primaryDB = primary.getDB(dbName);
|
|
|
|
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
|
|
|
|
const writeFp = configureFailPoint(
|
|
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
|
|
const bulkWriteThread =
|
|
new Thread(bulkInsertDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
|
|
|
|
// The failpoint below is used to ensure that a write to throw
|
|
// TenantMigrationConflict in the op observer. Without this failpoint, the split
|
|
// could have already aborted by the time the write gets to the op observer.
|
|
const blockFp = configureFailPoint(primaryDB, "pauseShardSplitAfterBlocking");
|
|
|
|
const operation = test.createSplitOperation([tenantId]);
|
|
|
|
bulkWriteThread.start();
|
|
writeFp.wait();
|
|
|
|
const splitThread = operation.commitAsync();
|
|
|
|
blockFp.wait();
|
|
|
|
writeFp.off();
|
|
sleep(Math.random() * kMaxSleepTimeMS);
|
|
|
|
operation.abort();
|
|
|
|
blockFp.off();
|
|
|
|
bulkWriteThread.join();
|
|
splitThread.join();
|
|
|
|
assert.commandFailed(splitThread.returnData());
|
|
assertMigrationState(primary, operation.migrationId, "aborted");
|
|
|
|
const bulkWriteRes = bulkWriteThread.returnData();
|
|
const writeErrors = bulkWriteRes.res.writeErrors;
|
|
|
|
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
|
|
assert.eq(writeErrors.length, 1);
|
|
assert(writeErrors[0].errmsg);
|
|
|
|
// The single write error should correspond to the first write after the split
|
|
// started blocking writes.
|
|
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
|
|
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationAborted);
|
|
test.stop();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Testing unordered bulk multi update that blocks.");
|
|
|
|
const test = setup();
|
|
|
|
const tenantId = ObjectId();
|
|
|
|
const dbName = test.tenantDB(tenantId.str, kTenantDefinedDbName);
|
|
const primary = test.getDonorPrimary();
|
|
const primaryDB = primary.getDB(dbName);
|
|
|
|
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
|
|
|
|
const writeFp = configureFailPoint(
|
|
primaryDB, "hangDuringBatchUpdate", {}, {skip: kNumUpdatesWithoutMigrationConflict});
|
|
const bulkWriteThread =
|
|
new Thread(bulkMultiUpdateDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
|
|
|
|
const blockFp = configureFailPoint(primaryDB, "pauseShardSplitAfterBlocking");
|
|
|
|
const operation = test.createSplitOperation([tenantId]);
|
|
|
|
bulkWriteThread.start();
|
|
writeFp.wait();
|
|
|
|
const splitThread = operation.commitAsync();
|
|
|
|
blockFp.wait();
|
|
|
|
writeFp.off();
|
|
sleep(Math.random() * kMaxSleepTimeMS);
|
|
blockFp.off();
|
|
|
|
bulkWriteThread.join();
|
|
splitThread.join();
|
|
|
|
assert.commandWorked(splitThread.returnData());
|
|
|
|
let bulkWriteRes = bulkWriteThread.returnData();
|
|
assert.eq(bulkWriteRes.res.code, ErrorCodes.Interrupted, tojson(bulkWriteRes));
|
|
assert.eq(
|
|
bulkWriteRes.res.errmsg,
|
|
"Operation interrupted by an internal data migration and could not be automatically retried",
|
|
tojson(bulkWriteRes));
|
|
test.stop();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Testing ordered bulk multi update that blocks.");
|
|
|
|
const test = setup();
|
|
|
|
const tenantId = ObjectId();
|
|
|
|
const dbName = test.tenantDB(tenantId.str, kTenantDefinedDbName);
|
|
const primary = test.getDonorPrimary();
|
|
const primaryDB = primary.getDB(dbName);
|
|
|
|
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
|
|
|
|
const writeFp = configureFailPoint(
|
|
primaryDB, "hangDuringBatchUpdate", {}, {skip: kNumUpdatesWithoutMigrationConflict});
|
|
const bulkWriteThread =
|
|
new Thread(bulkMultiUpdateDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
|
|
|
|
const blockFp = configureFailPoint(primaryDB, "pauseShardSplitAfterBlocking");
|
|
|
|
const operation = test.createSplitOperation([tenantId]);
|
|
|
|
bulkWriteThread.start();
|
|
writeFp.wait();
|
|
|
|
const splitThread = operation.commitAsync();
|
|
|
|
blockFp.wait();
|
|
|
|
writeFp.off();
|
|
sleep(Math.random() * kMaxSleepTimeMS);
|
|
blockFp.off();
|
|
|
|
bulkWriteThread.join();
|
|
splitThread.join();
|
|
|
|
assert.commandWorked(splitThread.returnData());
|
|
|
|
const bulkWriteRes = bulkWriteThread.returnData();
|
|
assert.eq(bulkWriteRes.res.code, ErrorCodes.Interrupted, tojson(bulkWriteRes));
|
|
assert.eq(
|
|
bulkWriteRes.res.errmsg,
|
|
"Operation interrupted by an internal data migration and could not be automatically retried",
|
|
tojson(bulkWriteRes));
|
|
test.stop();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Testing unordered multi updates against a shard split that has completed.");
|
|
|
|
const test = setup();
|
|
const tenantId = ObjectId();
|
|
|
|
const dbName = test.tenantDB(tenantId.str, kTenantDefinedDbName);
|
|
const primary = test.getDonorPrimary();
|
|
const primaryDB = primary.getDB(dbName);
|
|
|
|
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
|
|
|
|
const writeFp = configureFailPoint(
|
|
primaryDB, "hangDuringBatchUpdate", {}, {skip: kNumUpdatesWithoutMigrationConflict});
|
|
const bulkWriteThread =
|
|
new Thread(bulkMultiUpdateDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
|
|
|
|
bulkWriteThread.start();
|
|
writeFp.wait();
|
|
|
|
assert.commandWorked(test.createSplitOperation([tenantId]).commit());
|
|
|
|
writeFp.off();
|
|
bulkWriteThread.join();
|
|
|
|
const bulkWriteRes = bulkWriteThread.returnData();
|
|
assert.eq(bulkWriteRes.res.code, ErrorCodes.Interrupted, tojson(bulkWriteRes));
|
|
assert.eq(
|
|
bulkWriteRes.res.errmsg,
|
|
"Operation interrupted by an internal data migration and could not be automatically retried",
|
|
tojson(bulkWriteRes));
|
|
test.stop();
|
|
})();
|
|
|
|
(() => {
|
|
jsTestLog("Testing ordered multi updates against a shard split that has completed.");
|
|
|
|
const test = setup();
|
|
|
|
const tenantId = ObjectId();
|
|
|
|
const dbName = test.tenantDB(tenantId.str, kTenantDefinedDbName);
|
|
const primary = test.getDonorPrimary();
|
|
const primaryDB = primary.getDB(dbName);
|
|
|
|
assert.commandWorked(primaryDB.runCommand({create: kCollName}));
|
|
|
|
const writeFp = configureFailPoint(
|
|
primaryDB, "hangDuringBatchUpdate", {}, {skip: kNumUpdatesWithoutMigrationConflict});
|
|
const bulkWriteThread =
|
|
new Thread(bulkMultiUpdateDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
|
|
|
|
bulkWriteThread.start();
|
|
writeFp.wait();
|
|
|
|
assert.commandWorked(test.createSplitOperation([tenantId]).commit());
|
|
|
|
writeFp.off();
|
|
bulkWriteThread.join();
|
|
|
|
const bulkWriteRes = bulkWriteThread.returnData();
|
|
jsTestLog("Returned data " + bulkWriteRes + " " + tojson(bulkWriteRes));
|
|
assert.eq(bulkWriteRes.res.code, ErrorCodes.Interrupted, tojson(bulkWriteRes));
|
|
assert.eq(
|
|
bulkWriteRes.res.errmsg,
|
|
"Operation interrupted by an internal data migration and could not be automatically retried",
|
|
tojson(bulkWriteRes));
|
|
test.stop();
|
|
})();
|