mirror of https://github.com/mongodb/mongo
SERVER-113247 Fix commit sequence protocol for change stream readers in reshardCollection (#44494)
GitOrigin-RevId: 0faabf49a74baa755cd49cd023aa382b7703089b
This commit is contained in:
parent
f2bdea4145
commit
a80e10c5c1
|
|
@ -13,8 +13,7 @@ selector:
|
||||||
# This test exercises the internal behavior of $changeStream v1 and is not compatible with v2.
|
# This test exercises the internal behavior of $changeStream v1 and is not compatible with v2.
|
||||||
- jstests/change_streams/create_event_from_chunk_migration.js
|
- jstests/change_streams/create_event_from_chunk_migration.js
|
||||||
|
|
||||||
# TODO: SERVER-113247 Ensure placementHistory entries are recorded with the right timestamp.
|
# TODO: SERVER-114511 re-enable this test.
|
||||||
- jstests/change_streams/reshard_collection_event.js
|
|
||||||
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
|
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
|
||||||
|
|
||||||
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
|
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
|
||||||
|
|
|
||||||
|
|
@ -10,8 +10,7 @@ selector:
|
||||||
# This test exercises the internal behavior of $changeStream v1 and is not compatible with v2.
|
# This test exercises the internal behavior of $changeStream v1 and is not compatible with v2.
|
||||||
- jstests/change_streams/create_event_from_chunk_migration.js
|
- jstests/change_streams/create_event_from_chunk_migration.js
|
||||||
|
|
||||||
# TODO: SERVER-113247 Ensure placementHistory entries are recorded with the right timestamp.
|
# TODO: SERVER-114511 re-enable this test.
|
||||||
- jstests/change_streams/reshard_collection_event.js
|
|
||||||
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
|
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
|
||||||
|
|
||||||
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
|
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
|
||||||
|
|
|
||||||
|
|
@ -12,8 +12,7 @@ selector:
|
||||||
# This test exercises the internal behavior of $changeStream v1 and is not compatible with v2.
|
# This test exercises the internal behavior of $changeStream v1 and is not compatible with v2.
|
||||||
- jstests/change_streams/create_event_from_chunk_migration.js
|
- jstests/change_streams/create_event_from_chunk_migration.js
|
||||||
|
|
||||||
# TODO: SERVER-113247 Ensure placementHistory entries are recorded with the right timestamp.
|
# TODO: SERVER-114511 re-enable this test.
|
||||||
- jstests/change_streams/reshard_collection_event.js
|
|
||||||
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
|
- jstests/change_streams/migrate_last_chunk_from_shard_event.js
|
||||||
|
|
||||||
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
|
# TODO:SERVER-113140 Report resumeToken for change streams when batchSize is 0.
|
||||||
|
|
|
||||||
|
|
@ -1139,7 +1139,7 @@ function testReshardCollection() {
|
||||||
|
|
||||||
// Timestamps in the config.placementHistory doc and the op entries produceed by the shard
|
// Timestamps in the config.placementHistory doc and the op entries produceed by the shard
|
||||||
// match the expected ordering.
|
// match the expected ordering.
|
||||||
assert(timestampCmp(reshardCommitOpEntry.ts, finalCollPlacementInfo.timestamp) <= 0);
|
assert(timestampCmp(reshardCommitOpEntry.ts, finalCollPlacementInfo.timestamp) < 0);
|
||||||
assert(timestampCmp(finalCollPlacementInfo.timestamp, collPlacementChangeEntry.ts) <= 0);
|
assert(timestampCmp(finalCollPlacementInfo.timestamp, collPlacementChangeEntry.ts) <= 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,61 +0,0 @@
|
||||||
/**
|
|
||||||
* Verifies that successful commits of Sharding DDL operations generate the expected op entry types
|
|
||||||
* (following the format and rules defined in the design doc of PM-1939).
|
|
||||||
* TODO SERVER-81138 remove multiversion_incompatible and fix comparison with 7.0 binaries
|
|
||||||
* @tags: [
|
|
||||||
* does_not_support_stepdowns,
|
|
||||||
* ]
|
|
||||||
*/
|
|
||||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
||||||
|
|
||||||
const st = new ShardingTest({shards: 3, chunkSize: 1});
|
|
||||||
|
|
||||||
function verifyOpEntriesOnNodes(expectedOpEntryTemplates, nodes) {
|
|
||||||
const namespaces = [...new Set(expectedOpEntryTemplates.map((t) => t.ns))];
|
|
||||||
for (const node of nodes) {
|
|
||||||
const foundOpEntries = node
|
|
||||||
.getCollection("local.oplog.rs")
|
|
||||||
.find({ns: {$in: namespaces}, op: {$in: ["c", "n"]}})
|
|
||||||
.sort({ts: -1})
|
|
||||||
.limit(expectedOpEntryTemplates.length)
|
|
||||||
.toArray()
|
|
||||||
.reverse();
|
|
||||||
|
|
||||||
assert.eq(expectedOpEntryTemplates.length, foundOpEntries.length);
|
|
||||||
for (let i = 0; i < foundOpEntries.length; ++i) {
|
|
||||||
assert.eq(expectedOpEntryTemplates[i].op, foundOpEntries[i].op);
|
|
||||||
assert.eq(expectedOpEntryTemplates[i].ns, foundOpEntries[i].ns);
|
|
||||||
assert.docEq(expectedOpEntryTemplates[i].o, foundOpEntries[i].o);
|
|
||||||
assert.docEq(expectedOpEntryTemplates[i].o2, foundOpEntries[i].o2);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
function testMovePrimary() {
|
|
||||||
jsTest.log("Testing placement entries added by movePrimary() over a new sharding-enabled DB with no data");
|
|
||||||
|
|
||||||
// Set the initial state
|
|
||||||
const dbName = "movePrimaryTestDB";
|
|
||||||
const fromPrimaryShard = st.shard0;
|
|
||||||
const fromReplicaSet = st.rs0;
|
|
||||||
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: fromPrimaryShard.shardName}));
|
|
||||||
|
|
||||||
// Move the primary shard.
|
|
||||||
const toPrimaryShard = st.shard1;
|
|
||||||
assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: toPrimaryShard.shardName}));
|
|
||||||
|
|
||||||
// Verify that the old shard generated the expected event.
|
|
||||||
const expectedEntriesForPrimaryMoved = [
|
|
||||||
{
|
|
||||||
op: "n",
|
|
||||||
ns: dbName,
|
|
||||||
o: {msg: {movePrimary: dbName}},
|
|
||||||
o2: {movePrimary: dbName, from: fromPrimaryShard.shardName, to: toPrimaryShard.shardName},
|
|
||||||
},
|
|
||||||
];
|
|
||||||
|
|
||||||
verifyOpEntriesOnNodes(expectedEntriesForPrimaryMoved, [fromReplicaSet.getPrimary()]);
|
|
||||||
}
|
|
||||||
|
|
||||||
testMovePrimary();
|
|
||||||
|
|
||||||
st.stop();
|
|
||||||
|
|
@ -48,6 +48,7 @@
|
||||||
#include "mongo/db/sharding_environment/sharding_feature_flags_gen.h"
|
#include "mongo/db/sharding_environment/sharding_feature_flags_gen.h"
|
||||||
#include "mongo/db/sharding_environment/sharding_logging.h"
|
#include "mongo/db/sharding_environment/sharding_logging.h"
|
||||||
#include "mongo/db/topology/vector_clock/vector_clock.h"
|
#include "mongo/db/topology/vector_clock/vector_clock.h"
|
||||||
|
#include "mongo/db/topology/vector_clock/vector_clock_mutable.h"
|
||||||
#include "mongo/otel/traces/telemetry_context_serialization.h"
|
#include "mongo/otel/traces/telemetry_context_serialization.h"
|
||||||
#include "mongo/s/request_types/abort_reshard_collection_gen.h"
|
#include "mongo/s/request_types/abort_reshard_collection_gen.h"
|
||||||
#include "mongo/s/request_types/commit_reshard_collection_gen.h"
|
#include "mongo/s/request_types/commit_reshard_collection_gen.h"
|
||||||
|
|
@ -510,12 +511,18 @@ ExecutorFuture<void> ReshardingCoordinator::_commitAndFinishReshardOperation(
|
||||||
VersionContext::getDecoration(opCtx.get()),
|
VersionContext::getDecoration(opCtx.get()),
|
||||||
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
|
||||||
// V2 change stream readers expect to see an op entry concerning the
|
// V2 change stream readers expect to see an op entry concerning the
|
||||||
// commit before this materializes into the global catalog. (Multiple
|
// commit before this materializes into the global catalog (multiple
|
||||||
// copies of this event notification are acceptable)
|
// copies of this event notification are acceptable).
|
||||||
_generateCommitNotificationForChangeStreams(
|
_generateCommitNotificationForChangeStreams(
|
||||||
opCtx.get(),
|
opCtx.get(),
|
||||||
executor,
|
executor,
|
||||||
ChangeStreamCommitNotificationMode::BeforeWriteOnCatalog);
|
ChangeStreamCommitNotificationMode::BeforeWriteOnCatalog);
|
||||||
|
// Change stream readers also require that the metadata about the
|
||||||
|
// resharded collection gets later persisted on the global catalog
|
||||||
|
// with a timestamp that is strictly bigger than the cluster time
|
||||||
|
// of this notification.
|
||||||
|
// Bump the related vector clock element to enforce the constraint.
|
||||||
|
VectorClockMutable::get(opCtx.get())->tickClusterTime(1);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.then(
|
.then(
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue