mirror of https://github.com/mongodb/mongo
SERVER-103781 Add support for WriteWithoutShardKeyWithId to unified write executor (#44092)
GitOrigin-RevId: 0710dd96d363f88fd764fdb354553d6b41dfa69f
This commit is contained in:
parent
0f33522dcb
commit
fa03bbb796
|
|
@ -6,60 +6,67 @@ test_kind: js_test
|
|||
|
||||
selector:
|
||||
roots:
|
||||
- jstests/sharding/internal_txns/**/*.js
|
||||
- jstests/sharding/txn*.js
|
||||
- jstests/noPassthrough/txns_retryable_writes_sessions/**/*.js
|
||||
- jstests/noPassthrough/query/out_merge/**/*.js
|
||||
- jstests/sharding/query/out_merge/**/*.js
|
||||
|
||||
# Sharding tests.
|
||||
- jstests/sharding/query/bulk_write_basic.js
|
||||
- jstests/sharding/analyze_shard_key/list_sampled_queries.js
|
||||
- jstests/sharding/analyze_shard_key/read_and_write_distribution.js
|
||||
- jstests/sharding/analyze_shard_key/sample_rates_bulk_write_multi_namespaces.js
|
||||
# - jstests/sharding/applyOps_multiple_namespaces.js
|
||||
- jstests/sharding/batched_writes_with_id_without_shard_key_basic.js
|
||||
- jstests/sharding/batched_writes_with_id_without_shard_key_stale_config.js
|
||||
- jstests/sharding/chunk_migration_with_bulk_write.js
|
||||
# - jstests/sharding/database_versioning_all_commands.js
|
||||
- jstests/sharding/deleteOne_with_id_without_shard_key_basic.js
|
||||
- jstests/sharding/deleteOne_with_id_without_shard_key_stale_config.js
|
||||
- jstests/sharding/internal_txns/**/*.js
|
||||
- jstests/sharding/mongos_writes_wait_for_write_concern_sharded.js
|
||||
- jstests/sharding/mongos_writes_wait_for_write_concern_sharded_addl_crud_ops.js
|
||||
- jstests/sharding/timeseries/**/*.js
|
||||
- jstests/sharding/mongos_writes_wait_for_write_concern_timeseries.js
|
||||
- jstests/sharding/mongos_writes_wait_for_write_concern_unsharded.js
|
||||
- jstests/sharding/migration_blocking_operation/implicit_create_from_upsert_with_paused_migrations.js
|
||||
- jstests/sharding/migration_blocking_operation/mongos_calls_shardsvr_coordinate_multi_update_command.js
|
||||
- jstests/sharding/migration_blocking_operation/pause_during_multi_updates_cluster_parameter_requires_feature_flag.js
|
||||
- jstests/sharding/multi_collection_transaction_placement_conflict_workaround.js
|
||||
# TODO: SERVER-103781 try enabling when write without shard key with id supported.
|
||||
# - jstests/sharding/retryable_update_one_by_id_chunk_migration.js
|
||||
- jstests/sharding/query/bulk_write_basic.js
|
||||
- jstests/sharding/query/bulk_write_size_limit.js
|
||||
- jstests/sharding/query/collation/*.js
|
||||
- jstests/sharding/query/find_and_modify/*.js
|
||||
- jstests/sharding/query/let_rand.js
|
||||
- jstests/sharding/query/out_merge/**/*.js
|
||||
# - jstests/sharding/query/update/update_shard_key_bulk_write.js
|
||||
# - jstests/sharding/query/update_delete_many_metrics.js
|
||||
# - jstests/sharding/read_write_concern_defaults_application.js
|
||||
- jstests/sharding/updateOne_without_shard_key/deleteOne_without_shard_key_basic.js
|
||||
- jstests/sharding/updateOne_without_shard_key/dropping_collection_during_clusterQueryWithoutShardKey_errors.js
|
||||
- jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_basic.js
|
||||
- jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_sort.js
|
||||
- jstests/sharding/updateOne_without_shard_key/errors.js
|
||||
- jstests/sharding/updateOne_without_shard_key/find_and_modify_without_shard_key_sort.js
|
||||
- jstests/sharding/analyze_shard_key/read_and_write_distribution.js
|
||||
- jstests/noPassthrough/crud/bulk_write_max_replies_size_mongos.js
|
||||
- jstests/noPassthrough/crud/bulk_write_metrics.js
|
||||
- jstests/sharding/updateOne_without_shard_key/write_without_shard_key_single_shard_data_placement_change.js
|
||||
- jstests/sharding/retryable_writes.js
|
||||
- jstests/sharding/retryable_writes_nested_shard_key.js
|
||||
- jstests/sharding/retryable_update_one_by_id_chunk_migration.js
|
||||
- jstests/sharding/retryable_upsert_single_write_shard.js
|
||||
- jstests/sharding/retryable_write_error_labels.js
|
||||
- jstests/sharding/query/find_and_modify/*.js
|
||||
- jstests/sharding/query/collation/*.js
|
||||
- jstests/sharding/retryable_writes.js
|
||||
- jstests/sharding/retryable_writes_nested_shard_key.js
|
||||
- jstests/sharding/server_status_crud_metrics_write_without_shard_key_with_id.js
|
||||
- jstests/sharding/swallow_unnecessary_uuid_mismatch_error.js
|
||||
- jstests/sharding/timeseries/**/*.js
|
||||
- jstests/sharding/txn*.js
|
||||
- jstests/sharding/updateOne_with_id_without_shard_key_basic.js
|
||||
- jstests/sharding/updateOne_with_id_without_shard_key_stale_config.js
|
||||
- jstests/sharding/updateOne_without_shard_key/deleteOne_without_shard_key_basic.js
|
||||
- jstests/sharding/updateOne_without_shard_key/dropping_collection_during_clusterQueryWithoutShardKey_errors.js
|
||||
- jstests/sharding/updateOne_without_shard_key/errors.js
|
||||
- jstests/sharding/updateOne_without_shard_key/find_and_modify_without_shard_key_sort.js
|
||||
- jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_basic.js
|
||||
- jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_sort.js
|
||||
- jstests/sharding/updateOne_without_shard_key/write_without_shard_key_single_shard_data_placement_change.js
|
||||
|
||||
# No passthrough tests.
|
||||
- jstests/noPassthrough/timeseries/**/*.js
|
||||
# - jstests/noPassthrough/crud/bulk_write_currentop.js
|
||||
- jstests/noPassthrough/crud/bulk_write_currentop.js
|
||||
- jstests/noPassthrough/crud/bulk_write_max_replies_size_mongos.js
|
||||
- jstests/noPassthrough/crud/bulk_write_metrics.js
|
||||
- jstests/noPassthrough/crud/bulk_write_metrics_single_shard.js
|
||||
- jstests/noPassthrough/crud/bulk_write_currentop.js
|
||||
- jstests/noPassthrough/query/current_op/currentop_query.js
|
||||
|
||||
# - jstests/noPassthrough/query/command_diagnostics_sharded.js
|
||||
- jstests/noPassthrough/query/current_op/currentop_query.js
|
||||
- jstests/noPassthrough/query/out_merge/**/*.js
|
||||
# - jstests/noPassthrough/query/profile/profile_planning_time_stats.js
|
||||
# - jstests/noPassthrough/server_parameters/cluster_commands_require_cluster_node.js
|
||||
- jstests/noPassthrough/timeseries/**/*.js
|
||||
- jstests/noPassthrough/txns_retryable_writes_sessions/**/*.js
|
||||
|
||||
- src/mongo/db/modules/enterprise/jstests/no_passthrough/fle2_bulkwrite_metrics.js
|
||||
- src/mongo/db/modules/enterprise/jstests/no_passthrough/fle2_log_omit.js
|
||||
- src/mongo/db/modules/enterprise/jstests/no_passthrough/fle2_log_omit_bulk_write.js
|
||||
|
|
@ -68,12 +75,6 @@ selector:
|
|||
- src/mongo/db/modules/enterprise/jstests/no_passthrough/fle2_write_concern_errors_unsharded.js
|
||||
- src/mongo/db/modules/enterprise/jstests/no_passthrough/fle_tassert_log_omit.js
|
||||
|
||||
- jstests/sharding/migration_blocking_operation/mongos_calls_shardsvr_coordinate_multi_update_command.js
|
||||
- jstests/sharding/migration_blocking_operation/implicit_create_from_upsert_with_paused_migrations.js
|
||||
- jstests/sharding/migration_blocking_operation/pause_during_multi_updates_cluster_parameter_requires_feature_flag.js
|
||||
|
||||
- jstests/sharding/swallow_unnecessary_uuid_mismatch_error.js
|
||||
|
||||
exclude_files:
|
||||
# TODO SERVER-114138: Handle concurrent collection drops with $merge in UWE.
|
||||
- jstests/sharding/query/out_merge/merge_stale_on_fields.js
|
||||
|
|
|
|||
|
|
@ -1,17 +1,13 @@
|
|||
/**
|
||||
* This tests that mongos correctly reports 'n'/'nModified' for retried retryable updates/deletes
|
||||
* with _id without shard key after combining responses from multiple shards post session migration
|
||||
* in the following cases:
|
||||
* 1) If they are sent with batch size of 1 with ordered:true or ordered:false.
|
||||
* 2) If they are sent with batch size > 1 with ordered: true.
|
||||
*
|
||||
* The case of batch size > 1 with ordered: false will be taken care by PM-3673.
|
||||
* This test checks the 'n'/'nModified' values reported by mongos when retrying updates/deletes
|
||||
* by _id without a shard key after a chunk migration.
|
||||
*
|
||||
* @tags: [requires_fcv_80]
|
||||
*/
|
||||
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js";
|
||||
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
|
||||
|
||||
(function () {
|
||||
"use strict";
|
||||
|
|
@ -20,6 +16,8 @@ import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_
|
|||
|
||||
const db = st.s.getDB("test");
|
||||
const collection = db.getCollection("mycoll");
|
||||
const uweEnabled = isUweEnabled(st.s);
|
||||
|
||||
CreateShardedCollectionUtil.shardCollectionWithChunks(collection, {x: 1}, [
|
||||
{min: {x: MinKey}, max: {x: 0}, shard: st.shard0.shardName},
|
||||
{min: {x: 0}, max: {x: 10}, shard: st.shard0.shardName},
|
||||
|
|
@ -30,89 +28,120 @@ import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_
|
|||
assert.commandWorked(collection.insert({_id: i, x: 5, counter: 0}));
|
||||
}
|
||||
|
||||
const sessionCollection = st.s
|
||||
const sessionColl = st.s
|
||||
.startSession({causalConsistency: false, retryWrites: false})
|
||||
.getDatabase(db.getName())
|
||||
.getCollection(collection.getName());
|
||||
|
||||
// Updates by _id are broadcasted to all shards which own chunks for the collection. After the
|
||||
// session information is migrated to shard1 from the moveChunk command, both shard0 and shard1
|
||||
// will report {n: 1, nModified: 1} for the retried stmt ids.
|
||||
const updateCmd = {
|
||||
const updateCmdSingleOp = {
|
||||
updates: [{q: {_id: 0}, u: {$inc: {counter: 1}}}],
|
||||
ordered: true,
|
||||
txnNumber: NumberLong(0),
|
||||
};
|
||||
|
||||
const deleteCmd = {
|
||||
const deleteCmdSingleOp = {
|
||||
deletes: [{q: {_id: 0}, limit: 1}],
|
||||
ordered: true,
|
||||
txnNumber: NumberLong(1),
|
||||
};
|
||||
|
||||
const updateCmdUnordered = {
|
||||
updates: [{q: {_id: 1}, u: {$inc: {counter: 1}}}],
|
||||
ordered: false,
|
||||
const updateCmdOrdered = {
|
||||
updates: [
|
||||
{q: {_id: 1}, u: {$inc: {counter: 1}}},
|
||||
{q: {_id: 2}, u: {$inc: {counter: 1}}},
|
||||
],
|
||||
ordered: true,
|
||||
txnNumber: NumberLong(2),
|
||||
};
|
||||
|
||||
const deleteCmdUnordered = {
|
||||
deletes: [{q: {_id: 1}, limit: 1}],
|
||||
ordered: false,
|
||||
const deleteCmdOrdered = {
|
||||
deletes: [
|
||||
{q: {_id: 1}, limit: 1},
|
||||
{q: {_id: 2}, limit: 1},
|
||||
],
|
||||
ordered: true,
|
||||
txnNumber: NumberLong(3),
|
||||
};
|
||||
|
||||
const updateCmdWithMultipleUpdatesOrdered = {
|
||||
const updateCmdUnordered = {
|
||||
updates: [
|
||||
{q: {_id: 2}, u: {$inc: {counter: 1}}},
|
||||
{q: {_id: 3}, u: {$inc: {counter: 1}}},
|
||||
{q: {_id: 4}, u: {$inc: {counter: 1}}},
|
||||
],
|
||||
ordered: true,
|
||||
ordered: false,
|
||||
txnNumber: NumberLong(4),
|
||||
};
|
||||
|
||||
const deleteCmdWithMultipleDeletesOrdered = {
|
||||
const deleteCmdUnordered = {
|
||||
deletes: [
|
||||
{q: {_id: 2}, limit: 1},
|
||||
{q: {_id: 3}, limit: 1},
|
||||
{q: {_id: 4}, limit: 1},
|
||||
],
|
||||
ordered: true,
|
||||
ordered: false,
|
||||
txnNumber: NumberLong(5),
|
||||
};
|
||||
|
||||
function runUpdateAndMoveChunk(cmdObj, coll, toShard, expected) {
|
||||
function runUpdateTwice(cmdObj, coll, shard0, shard1, firstExp, secondExp) {
|
||||
const firstRes = assert.commandWorked(coll.runCommand("update", cmdObj));
|
||||
assert.eq({n: firstRes.n, nModified: firstRes.nModified}, expected);
|
||||
assert.eq(firstRes.n, firstExp);
|
||||
assert.eq(firstRes.nModified, firstExp);
|
||||
|
||||
assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: toShard}));
|
||||
assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: shard1}));
|
||||
|
||||
const secondRes = assert.commandWorked(coll.runCommand("update", cmdObj));
|
||||
assert.eq({n: secondRes.n, nModified: secondRes.nModified}, expected);
|
||||
assert.eq(secondRes.n, secondExp);
|
||||
assert.eq(secondRes.nModified, secondExp);
|
||||
|
||||
assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: shard0}));
|
||||
}
|
||||
|
||||
function runDeleteAndMoveChunk(cmdObj, coll, toShard, expected) {
|
||||
function runDeleteTwice(cmdObj, coll, shard0, shard1, firstExp, secondExp) {
|
||||
const firstRes = assert.commandWorked(coll.runCommand("delete", cmdObj));
|
||||
assert.eq(firstRes.n, expected);
|
||||
assert.eq(firstRes.n, firstExp);
|
||||
|
||||
assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: toShard}));
|
||||
assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: shard1}));
|
||||
|
||||
const secondRes = assert.commandWorked(coll.runCommand("delete", cmdObj));
|
||||
assert.eq(secondRes.n, expected);
|
||||
assert.eq(secondRes.n, secondExp);
|
||||
|
||||
assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: shard0}));
|
||||
}
|
||||
|
||||
runUpdateAndMoveChunk(updateCmd, sessionCollection, st.shard1.shardName, {n: 1, nModified: 1});
|
||||
runDeleteAndMoveChunk(deleteCmd, sessionCollection, st.shard0.shardName, 1);
|
||||
const shardName0 = st.shard0.shardName;
|
||||
const shardName1 = st.shard1.shardName;
|
||||
|
||||
runUpdateAndMoveChunk(updateCmdUnordered, sessionCollection, st.shard1.shardName, {n: 1, nModified: 1});
|
||||
runDeleteAndMoveChunk(deleteCmdUnordered, sessionCollection, st.shard0.shardName, 1);
|
||||
// Updates/deletes by _id without a shard key are broadcasted to all shards which own chunks for
|
||||
// the collection. After the session information is migrated to shard1 from the moveChunk
|
||||
// command, both shard0 and shard1 will report {n: 1, nModified: 1} for the retried stmt ids.
|
||||
//
|
||||
// What happens when this command is retried?
|
||||
//
|
||||
// If BatchWriteExec or bulk_write_exec is used -AND- if each update/delete by _id executes in a
|
||||
// batch by itself (either because ordered=true or due to other circumstances), there is special
|
||||
// case logic (see WriteOp::_noteWriteWithoutShardKeyWithIdBatchResponseWithSingleWrite()) that
|
||||
// ensures that each op will only count as "1" when generating the response to send to the
|
||||
// client.
|
||||
//
|
||||
// If UWE is used -OR- if ordered=false and multiple updates/deletes by _id executed in a batch
|
||||
// together, then the responses from all the shards get summed together, which in this test
|
||||
// example results in each op being counted twice.
|
||||
//
|
||||
// TODO SERVER-54019 Avoid over-counting 'n' and 'nModified' values when retrying updates by _id
|
||||
// or deletes by _id after chunk migration.
|
||||
let firstExp = 1;
|
||||
let secondExp = uweEnabled ? 2 * firstExp : firstExp;
|
||||
runUpdateTwice(updateCmdSingleOp, sessionColl, shardName0, shardName1, firstExp, secondExp);
|
||||
runDeleteTwice(deleteCmdSingleOp, sessionColl, shardName0, shardName1, firstExp, secondExp);
|
||||
|
||||
runUpdateAndMoveChunk(updateCmdWithMultipleUpdatesOrdered, sessionCollection, st.shard1.shardName, {
|
||||
n: 3,
|
||||
nModified: 3,
|
||||
});
|
||||
runDeleteAndMoveChunk(deleteCmdWithMultipleDeletesOrdered, sessionCollection, st.shard0.shardName, 3);
|
||||
firstExp = 2;
|
||||
secondExp = uweEnabled ? 2 * firstExp : firstExp;
|
||||
runUpdateTwice(updateCmdOrdered, sessionColl, shardName0, shardName1, firstExp, secondExp);
|
||||
runDeleteTwice(deleteCmdOrdered, sessionColl, shardName0, shardName1, firstExp, secondExp);
|
||||
|
||||
firstExp = 2;
|
||||
secondExp = 2 * firstExp;
|
||||
runUpdateTwice(updateCmdUnordered, sessionColl, shardName0, shardName1, firstExp, secondExp);
|
||||
runDeleteTwice(deleteCmdUnordered, sessionColl, shardName0, shardName1, firstExp, secondExp);
|
||||
|
||||
st.stop();
|
||||
})();
|
||||
|
|
|
|||
|
|
@ -558,7 +558,7 @@ NSTargeter::TargetingResult CollectionRoutingInfoTargeter::targetUpdate(
|
|||
// TODO SPM-3673: Implement a similar approach for non-retryable or sessionless multi:false
|
||||
// non-upsert updates with an "_id" equality that involve multiple shards.
|
||||
result.isNonTargetedRetryableWriteWithId =
|
||||
multipleEndpoints && isExactId && !isUpsert && isRetryableWrite(opCtx);
|
||||
multipleEndpoints && isExactId && !isUpsert && !isFindAndModify && isRetryableWrite(opCtx);
|
||||
|
||||
// Increment query counters as appropriate.
|
||||
if (!multipleEndpoints) {
|
||||
|
|
|
|||
|
|
@ -642,7 +642,7 @@ WriteBatchResponse WriteBatchExecutor::_execute(OperationContext* opCtx,
|
|||
routingCtx.onRequestSentForNss(nss);
|
||||
}
|
||||
|
||||
SimpleWriteBatchResponse resp;
|
||||
auto resp = SimpleWriteBatchResponse::makeEmpty(batch.isRetryableWriteWithId);
|
||||
bool stopParsingResponses = false;
|
||||
|
||||
while (!stopParsingResponses && !sender.done()) {
|
||||
|
|
|
|||
|
|
@ -162,6 +162,11 @@ struct EmptyBatchResponse {};
|
|||
|
||||
struct SimpleWriteBatchResponse {
|
||||
std::vector<std::pair<ShardId, ShardResponse>> shardResponses;
|
||||
bool isRetryableWriteWithId = false;
|
||||
|
||||
static SimpleWriteBatchResponse makeEmpty(bool isRetryableWriteWithId) {
|
||||
return SimpleWriteBatchResponse{{}, isRetryableWriteWithId};
|
||||
}
|
||||
};
|
||||
|
||||
class NoRetryWriteBatchResponse : public BasicResponse {
|
||||
|
|
|
|||
|
|
@ -34,10 +34,12 @@
|
|||
#include "mongo/db/query/client_cursor/cursor_server_params_gen.h"
|
||||
#include "mongo/db/router_role/collection_uuid_mismatch.h"
|
||||
#include "mongo/db/shard_role/shard_catalog/collection_uuid_mismatch_info.h"
|
||||
#include "mongo/db/stats/counters.h"
|
||||
#include "mongo/s/commands/query_cmd/populate_cursor.h"
|
||||
#include "mongo/s/transaction_router.h"
|
||||
#include "mongo/s/write_ops/batched_command_response.h"
|
||||
#include "mongo/s/write_ops/unified_write_executor/write_batch_executor.h"
|
||||
#include "mongo/s/write_ops/write_op.h"
|
||||
#include "mongo/s/write_ops/write_op_helper.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
||||
|
|
@ -52,10 +54,24 @@ using Unexecuted = WriteBatchResponseProcessor::Unexecuted;
|
|||
using SucceededWithoutItem = WriteBatchResponseProcessor::SucceededWithoutItem;
|
||||
using FindAndModifyReplyItem = WriteBatchResponseProcessor::FindAndModifyReplyItem;
|
||||
using GroupItemsResult = WriteBatchResponseProcessor::GroupItemsResult;
|
||||
using ItemsByOpMap = WriteBatchResponseProcessor::ItemsByOpMap;
|
||||
using ShardResult = WriteBatchResponseProcessor::ShardResult;
|
||||
|
||||
namespace {
|
||||
ErrorCodes::Error getErrorCodeForItem(const ItemVariant& itemVar) {
|
||||
// This function returns the Status for a given ItemVariant ('itemVar'). If 'itemVar' is Unexecuted,
|
||||
// this function will return an OK status.
|
||||
Status getItemStatus(const ItemVariant& itemVar) {
|
||||
return visit(
|
||||
OverloadedVisitor([&](const Unexecuted&) { return Status::OK(); },
|
||||
[&](const SucceededWithoutItem&) { return Status::OK(); },
|
||||
[&](const BulkWriteReplyItem& item) { return item.getStatus(); },
|
||||
[&](const FindAndModifyReplyItem& item) { return item.getStatus(); }),
|
||||
itemVar);
|
||||
}
|
||||
|
||||
// This function returns the error code for a given ItemVariant ('itemVar'). If 'itemVar' is
|
||||
// Unexecuted, this function will return ErrorCodes::OK.
|
||||
ErrorCodes::Error getErrorCode(const ItemVariant& itemVar) {
|
||||
return visit(OverloadedVisitor(
|
||||
[&](const Unexecuted&) { return ErrorCodes::OK; },
|
||||
[&](const SucceededWithoutItem&) { return ErrorCodes::OK; },
|
||||
|
|
@ -64,8 +80,13 @@ ErrorCodes::Error getErrorCodeForItem(const ItemVariant& itemVar) {
|
|||
itemVar);
|
||||
}
|
||||
|
||||
// Like getErrorCode(), but takes a 'std::pair<ShardId,ItemVariant>' as its input.
|
||||
ErrorCodes::Error getErrorCodeForShardItemPair(const std::pair<ShardId, ItemVariant>& p) {
|
||||
return getErrorCodeForItem(p.second);
|
||||
return getErrorCode(p.second);
|
||||
}
|
||||
|
||||
bool isRetryableError(const ItemVariant& itemVar) {
|
||||
return write_op_helpers::isRetryErrCode(getErrorCode(itemVar));
|
||||
}
|
||||
|
||||
template <typename ResultT>
|
||||
|
|
@ -87,6 +108,46 @@ void handleTransientTxnError(OperationContext* opCtx,
|
|||
<< " during a transaction"));
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<const CannotImplicitlyCreateCollectionInfo> getCannotImplicitlyCreateCollectionInfo(
|
||||
const Status& status) {
|
||||
if (status == ErrorCodes::CannotImplicitlyCreateCollection) {
|
||||
auto info = status.extraInfo<CannotImplicitlyCreateCollectionInfo>();
|
||||
tassert(11182204, "Expected to find CannotImplicitlyCreateCollectionInfo", info != nullptr);
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
std::shared_ptr<const CollectionUUIDMismatchInfo> getCollectionUUIDMismatchInfo(
|
||||
const Status& status) {
|
||||
if (status == ErrorCodes::CollectionUUIDMismatch) {
|
||||
auto info = status.extraInfo<CollectionUUIDMismatchInfo>();
|
||||
tassert(11273500, "Expected to find CollectionUUIDMismatchInfo", info != nullptr);
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
// Helper function that prints the contents of 'opsToRetry' to the log if appropriate.
|
||||
void logOpsToRetry(const std::vector<WriteOp>& opsToRetry) {
|
||||
if (opsToRetry.empty() &&
|
||||
shouldLog(MONGO_LOGV2_DEFAULT_COMPONENT, logv2::LogSeverity::Debug(4))) {
|
||||
std::stringstream opsStream;
|
||||
size_t numOpsInStream = 0;
|
||||
|
||||
for (const auto& op : opsToRetry) {
|
||||
opsStream << (numOpsInStream++ > 0 ? ", " : "") << op.getId();
|
||||
}
|
||||
|
||||
LOGV2_DEBUG(
|
||||
10411404, 4, "re-enqueuing ops that didn't complete", "ops"_attr = opsStream.str());
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
ProcessorResult WriteBatchResponseProcessor::onWriteBatchResponse(
|
||||
|
|
@ -119,7 +180,47 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
|
|||
|
||||
// Organize the items from the shard responses by op, and check if any of the items is an
|
||||
// unrecoverable error.
|
||||
auto [itemsByOp, unrecoverable] = groupItemsByOp(opCtx, shardResults);
|
||||
auto [itemsByOp, unrecoverable, hasRetryableError] = groupItemsByOp(opCtx, shardResults);
|
||||
|
||||
// For "RetryableWriteWithId" simple write batches, a different retry strategy is used. If the
|
||||
// batch has any retryable errors (and 'inTransaction' and 'unrecoverable' are false), then all
|
||||
// of the ops in the batch are retried regardless of what their execution status was.
|
||||
//
|
||||
// This is the key difference between "RetryableWriteWithId" simple write batches and regular
|
||||
// simple write batch.
|
||||
if (response.isRetryableWriteWithId && hasRetryableError && !inTransaction && !unrecoverable) {
|
||||
std::set<WriteOp> toRetry;
|
||||
CollectionsToCreate collsToCreate;
|
||||
|
||||
// For each 'op', queue 'op' for retry and also increment counters as appropriate.
|
||||
for (auto& [op, items] : itemsByOp) {
|
||||
for (const auto& [shardId, itemVar] : items) {
|
||||
if (isRetryableError(itemVar)) {
|
||||
// If 'itemVar' has a Status that is a retryable error, we pass that in when
|
||||
// calling queueOpForRetry().
|
||||
queueOpForRetry(op, getItemStatus(itemVar), toRetry, collsToCreate);
|
||||
} else {
|
||||
// Otherwise, call queueOpForRetry() without a Stauts.
|
||||
queueOpForRetry(op, toRetry);
|
||||
}
|
||||
}
|
||||
|
||||
if (op.getType() == kUpdate) {
|
||||
getQueryCounters(opCtx).updateOneWithoutShardKeyWithIdRetryCount.increment(1);
|
||||
} else if (op.getType() == kDelete) {
|
||||
getQueryCounters(opCtx).deleteOneWithoutShardKeyWithIdRetryCount.increment(1);
|
||||
}
|
||||
}
|
||||
|
||||
ProcessorResult result;
|
||||
result.opsToRetry.insert(result.opsToRetry.end(), toRetry.begin(), toRetry.end());
|
||||
result.collsToCreate = std::move(collsToCreate);
|
||||
|
||||
// Print the contents of 'opsToRetry' to the log if appropriate.
|
||||
logOpsToRetry(result.opsToRetry);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Update the counters (excluding _nErrors), update the list of retried stmtIds, and process
|
||||
// the write concern error (if any).
|
||||
|
|
@ -154,7 +255,7 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
|
|||
// If 'shouldRetryOnUnexecutedOrRetryableError' is true, then queue 'op' to
|
||||
// be retried.
|
||||
if (shouldRetryOnUnexecutedOrRetryableError) {
|
||||
queueOpForRetry(op, /*status*/ boost::none, toRetry, collsToCreate);
|
||||
queueOpForRetry(op, toRetry);
|
||||
}
|
||||
return true;
|
||||
},
|
||||
|
|
@ -221,19 +322,17 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse(
|
|||
result.collsToCreate = std::move(collsToCreate);
|
||||
result.successfulShardSet = std::move(successfulShardSet);
|
||||
|
||||
if (!result.opsToRetry.empty()) {
|
||||
// Print the contents of 'opsToRetry' to the log.
|
||||
if (shouldLog(MONGO_LOGV2_DEFAULT_COMPONENT, logv2::LogSeverity::Debug(4))) {
|
||||
std::stringstream opsStream;
|
||||
size_t numOpsInStream = 0;
|
||||
for (const auto& op : result.opsToRetry) {
|
||||
opsStream << (numOpsInStream++ > 0 ? ", " : "") << op.getId();
|
||||
// For "RetryableWriteWithId" batches, if the "hangAfterCompletingWriteWithoutShardKeyWithId"
|
||||
// failpoint is set, call pauseWhileSet().
|
||||
if (response.isRetryableWriteWithId) {
|
||||
auto& fp = getHangAfterCompletingWriteWithoutShardKeyWithIdFailPoint();
|
||||
if (MONGO_unlikely(fp.shouldFail())) {
|
||||
fp.pauseWhileSet();
|
||||
}
|
||||
}
|
||||
|
||||
LOGV2_DEBUG(
|
||||
10411404, 4, "re-enqueuing ops that didn't complete", "ops"_attr = opsStream.str());
|
||||
}
|
||||
}
|
||||
// Print the contents of 'opsToRetry' to the log if appropriate.
|
||||
logOpsToRetry(result.opsToRetry);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
@ -344,15 +443,17 @@ void WriteBatchResponseProcessor::noteRetryableError(OperationContext* opCtx,
|
|||
}
|
||||
|
||||
void WriteBatchResponseProcessor::queueOpForRetry(const WriteOp& op,
|
||||
boost::optional<const Status&> status,
|
||||
std::set<WriteOp>& toRetry) const {
|
||||
toRetry.emplace(op);
|
||||
}
|
||||
|
||||
void WriteBatchResponseProcessor::queueOpForRetry(const WriteOp& op,
|
||||
const Status& status,
|
||||
std::set<WriteOp>& toRetry,
|
||||
CollectionsToCreate& collsToCreate) const {
|
||||
toRetry.emplace(op);
|
||||
|
||||
if (status && *status == ErrorCodes::CannotImplicitlyCreateCollection) {
|
||||
auto info = status->extraInfo<CannotImplicitlyCreateCollectionInfo>();
|
||||
tassert(11182204, "Expected to find CannotImplicitlyCreateCollectionInfo", info != nullptr);
|
||||
|
||||
if (auto info = getCannotImplicitlyCreateCollectionInfo(status)) {
|
||||
auto nss = info->getNss();
|
||||
collsToCreate.emplace(std::move(nss), std::move(info));
|
||||
}
|
||||
|
|
@ -533,19 +634,21 @@ GroupItemsResult WriteBatchResponseProcessor::groupItemsByOp(
|
|||
const bool inTransaction = static_cast<bool>(TransactionRouter::get(opCtx));
|
||||
|
||||
auto isUnrecoverable = [&](const ItemVariant& itemVar) {
|
||||
const auto code = getErrorCodeForItem(itemVar);
|
||||
const auto code = getErrorCode(itemVar);
|
||||
return (code != ErrorCodes::OK &&
|
||||
(inTransaction || (ordered && !write_op_helpers::isRetryErrCode(code))));
|
||||
};
|
||||
|
||||
// Organize the items from the shard responses by op, and check if any of the items is an
|
||||
// unrecoverable error.
|
||||
std::map<WriteOp, std::vector<std::pair<ShardId, ItemVariant>>> itemsByOp;
|
||||
ItemsByOpMap itemsByOp;
|
||||
bool unrecoverable = false;
|
||||
bool hasRetryableError = false;
|
||||
|
||||
for (const auto& [shardId, shardResult] : shardResults) {
|
||||
for (auto& [op, itemVar] : shardResult.items) {
|
||||
unrecoverable |= isUnrecoverable(itemVar);
|
||||
hasRetryableError |= isRetryableError(itemVar);
|
||||
|
||||
auto it = itemsByOp.find(op);
|
||||
if (it != itemsByOp.end()) {
|
||||
|
|
@ -558,7 +661,7 @@ GroupItemsResult WriteBatchResponseProcessor::groupItemsByOp(
|
|||
}
|
||||
}
|
||||
|
||||
return GroupItemsResult{std::move(itemsByOp), unrecoverable};
|
||||
return GroupItemsResult{std::move(itemsByOp), unrecoverable, hasRetryableError};
|
||||
}
|
||||
|
||||
void WriteBatchResponseProcessor::processCountersAndRetriedStmtIds(
|
||||
|
|
@ -588,7 +691,8 @@ void WriteBatchResponseProcessor::retrieveBulkWriteReplyItems(OperationContext*
|
|||
tassert(11182209, "Expected BulkWriteCommandReply", result.bulkWriteReply.has_value());
|
||||
const BulkWriteCommandReply& parsedReply = *result.bulkWriteReply;
|
||||
|
||||
const bool orderedOrInTxn = _cmdRef.getOrdered() || TransactionRouter::get(opCtx);
|
||||
const bool ordered = _cmdRef.getOrdered();
|
||||
const bool inTransaction = static_cast<bool>(TransactionRouter::get(opCtx));
|
||||
|
||||
// Put all the reply items into a vector and then validate the reply items. Note that if the
|
||||
// BulkWriteCommandReply object contains a non-zero cursor ID, exhaustCursorForReplyItems()
|
||||
|
|
@ -607,7 +711,7 @@ void WriteBatchResponseProcessor::retrieveBulkWriteReplyItems(OperationContext*
|
|||
|
||||
std::vector<WriteOp> opsAfterFinalRetryableError;
|
||||
const bool logOpsAfterFinalRetryableError =
|
||||
shouldLog(MONGO_LOGV2_DEFAULT_COMPONENT, logv2::LogSeverity::Debug(4));
|
||||
!inTransaction && shouldLog(MONGO_LOGV2_DEFAULT_COMPONENT, logv2::LogSeverity::Debug(4));
|
||||
|
||||
for (size_t shardOpId = 0; shardOpId < ops.size(); ++shardOpId) {
|
||||
const auto& op = ops[shardOpId];
|
||||
|
|
@ -631,10 +735,10 @@ void WriteBatchResponseProcessor::retrieveBulkWriteReplyItems(OperationContext*
|
|||
noteRetryableError(opCtx, routingCtx, status);
|
||||
}
|
||||
|
||||
// If 'item' is an error and 'orderedOrInTxn' is true, -OR- if 'item' is a
|
||||
// retryable error and it's the last reply item, then for remaining ops without
|
||||
// If 'item' is an error and 'ordered || inTransaction' is true, -OR- if 'item' is
|
||||
// a retryable error and it's the last reply item, then for remaining ops without
|
||||
// reply items we will assume the ops did not execute.
|
||||
if (orderedOrInTxn || (isRetryableErr && itemIndex >= items.size())) {
|
||||
if (ordered || inTransaction || (isRetryableErr && itemIndex >= items.size())) {
|
||||
finalErrorForBatch = status;
|
||||
finalErrorForBatchIsRetryable = isRetryableErr;
|
||||
}
|
||||
|
|
@ -722,18 +826,6 @@ void WriteBatchResponseProcessor::validateBulkWriteReplyItems(
|
|||
}
|
||||
|
||||
namespace {
|
||||
std::shared_ptr<const CollectionUUIDMismatchInfo> getCollectionUUIDMismatchInfo(
|
||||
const Status& status) {
|
||||
if (status == ErrorCodes::CollectionUUIDMismatch) {
|
||||
auto info = status.extraInfo<CollectionUUIDMismatchInfo>();
|
||||
tassert(11273500, "Expected to find CollectionUUIDMismatchInfo", info != nullptr);
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
BulkWriteReplyItem getFirstError(const std::vector<BulkWriteReplyItem>& items) {
|
||||
tassert(11182216, "Expected vector to contain at least one item", !items.empty());
|
||||
|
||||
|
|
|
|||
|
|
@ -106,8 +106,13 @@ public:
|
|||
|
||||
using CollectionsToCreate = ProcessorResult::CollectionsToCreate;
|
||||
|
||||
using GroupItemsResult =
|
||||
std::pair<std::map<WriteOp, std::vector<std::pair<ShardId, ItemVariant>>>, bool>;
|
||||
using ItemsByOpMap = std::map<WriteOp, std::vector<std::pair<ShardId, ItemVariant>>>;
|
||||
|
||||
struct GroupItemsResult {
|
||||
ItemsByOpMap itemsByOp;
|
||||
bool unrecoverable = false;
|
||||
bool hasRetryableError = false;
|
||||
};
|
||||
|
||||
struct ShardResult {
|
||||
boost::optional<BulkWriteCommandReply> bulkWriteReply;
|
||||
|
|
@ -264,11 +269,16 @@ private:
|
|||
const Status& status);
|
||||
|
||||
/**
|
||||
* Helper method that adds 'op' to 'toRetry' and also copies CannotImplicityCreateCollection
|
||||
* errors into 'collsToCreate'.
|
||||
* Helper method that adds 'op' to 'toRetry'.
|
||||
*/
|
||||
void queueOpForRetry(const WriteOp& op, std::set<WriteOp>& toRetry) const;
|
||||
|
||||
/**
|
||||
* Helper method that adds 'op' to 'toRetry', and extracts CannotImplicityCreateCollectionInfo
|
||||
* from 'status' (if it exists) and stores it into 'collsToCreate'.
|
||||
*/
|
||||
void queueOpForRetry(const WriteOp& op,
|
||||
boost::optional<const Status&> status,
|
||||
const Status& status,
|
||||
std::set<WriteOp>& toRetry,
|
||||
CollectionsToCreate& collsToCreate) const;
|
||||
|
||||
|
|
@ -279,7 +289,8 @@ private:
|
|||
|
||||
/**
|
||||
* This method scans the items from 'shardResults' and groups the items together by op. This
|
||||
* method also checks if 'shardResults' contains an unrecoverable error.
|
||||
* method also returns a flag indicating if 'shardResults' contains a retryable error, and
|
||||
* another flag indicating if an error occurred that is "unrecoverable".
|
||||
*/
|
||||
GroupItemsResult groupItemsByOp(
|
||||
OperationContext* opCtx, std::vector<std::pair<ShardId, ShardResult>>& shardResults) const;
|
||||
|
|
|
|||
|
|
@ -68,6 +68,7 @@ public:
|
|||
const NamespaceString nss1 = NamespaceString::createNamespaceString_forTest("test", "coll");
|
||||
const NamespaceString nss2 = NamespaceString::createNamespaceString_forTest("test", "coll2");
|
||||
const NamespaceString nss3 = NamespaceString::createNamespaceString_forTest("test", "coll3");
|
||||
const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS(boost::none);
|
||||
const HostAndPort host1 = HostAndPort("host1", 0);
|
||||
const HostAndPort host2 = HostAndPort("host2", 0);
|
||||
const ShardId shard1Name = ShardId("shard1");
|
||||
|
|
@ -685,6 +686,90 @@ TEST_F(WriteBatchResponseProcessorTest, MixedStalenessErrorsAndOk) {
|
|||
ASSERT_EQ(result.opsToRetry[0].getNss(), nss1);
|
||||
ASSERT_EQ(result.opsToRetry[1].getId(), op2.getId());
|
||||
ASSERT_EQ(result.opsToRetry[1].getNss(), nss1);
|
||||
|
||||
// Assert that 2 OK items were processed.
|
||||
ASSERT_EQ(processor.getNumOkItemsProcessed(), 2);
|
||||
|
||||
// Assert that no errors were recorded.
|
||||
ASSERT_EQ(processor.getNumErrorsRecorded(), 0);
|
||||
}
|
||||
|
||||
TEST_F(WriteBatchResponseProcessorTest, RetryableWriteWithIdMixedStalenessErrorsAndOk) {
|
||||
auto request = BulkWriteCommandRequest(
|
||||
{BulkWriteUpdateOp(0, BSON("_id" << 1), BSON("$set" << BSON("y" << 1))),
|
||||
BulkWriteUpdateOp(0, BSON("_id" << -1), BSON("$set" << BSON("y" << 1))),
|
||||
BulkWriteUpdateOp(1, BSON("_id" << 1), BSON("$set" << BSON("y" << 1))),
|
||||
BulkWriteUpdateOp(1, BSON("_id" << -1), BSON("$set" << BSON("y" << 1)))},
|
||||
{NamespaceInfoEntry(nss1), NamespaceInfoEntry(nss2)});
|
||||
|
||||
request.setOrdered(false);
|
||||
|
||||
const bool isRetryableWriteWithId = true;
|
||||
|
||||
WriteOp op1(request, 0);
|
||||
WriteOp op2(request, 1);
|
||||
WriteOp op3(request, 2);
|
||||
WriteOp op4(request, 3);
|
||||
|
||||
Status badValueStatus(ErrorCodes::BadValue, "Bad Value");
|
||||
Status staleCollStatus1(
|
||||
StaleConfigInfo(nss1, *shard1Endpoint.shardVersion, newShardVersion, shard1Name), "");
|
||||
Status staleCollStatus2(
|
||||
StaleConfigInfo(nss2, *shard2Endpoint.shardVersion, newShardVersion, shard2Name), "");
|
||||
|
||||
auto reply = makeReply();
|
||||
reply.setNErrors(2);
|
||||
reply.setCursor(BulkWriteCommandResponseCursor(0,
|
||||
{BulkWriteReplyItem{0, Status::OK()},
|
||||
BulkWriteReplyItem{1, staleCollStatus1},
|
||||
BulkWriteReplyItem{2, Status::OK()},
|
||||
BulkWriteReplyItem{3, badValueStatus}},
|
||||
cursorNss));
|
||||
RemoteCommandResponse rcr1(host1, setTopLevelOK(reply.toBSON()), Microseconds{0}, false);
|
||||
|
||||
auto reply2 = makeReply();
|
||||
reply2.setNErrors(2);
|
||||
reply2.setCursor(BulkWriteCommandResponseCursor(0,
|
||||
{BulkWriteReplyItem{0, Status::OK()},
|
||||
BulkWriteReplyItem{1, Status::OK()},
|
||||
BulkWriteReplyItem{2, staleCollStatus2},
|
||||
BulkWriteReplyItem{3, badValueStatus}},
|
||||
cursorNss));
|
||||
RemoteCommandResponse rcr2(host2, setTopLevelOK(reply2.toBSON()), Microseconds{0}, false);
|
||||
|
||||
WriteCommandRef cmdRef(request);
|
||||
Stats stats;
|
||||
WriteBatchResponseProcessor processor(cmdRef, stats);
|
||||
auto result = processor.onWriteBatchResponse(
|
||||
opCtx,
|
||||
routingCtx,
|
||||
SimpleWriteBatchResponse{{{shard1Name, ShardResponse::make(rcr1, {op1, op2, op3, op4})},
|
||||
{shard2Name, ShardResponse::make(rcr2, {op1, op2, op3, op4})}},
|
||||
isRetryableWriteWithId});
|
||||
|
||||
ASSERT_EQ(routingCtx.errors.size(), 2);
|
||||
ASSERT_EQ(routingCtx.errors[0].code(), ErrorCodes::StaleConfig);
|
||||
ASSERT_EQ(routingCtx.errors[0].extraInfo<StaleConfigInfo>()->getNss(), nss1);
|
||||
ASSERT_EQ(routingCtx.errors[1].code(), ErrorCodes::StaleConfig);
|
||||
ASSERT_EQ(routingCtx.errors[1].extraInfo<StaleConfigInfo>()->getNss(), nss2);
|
||||
ASSERT_EQ(result.collsToCreate.size(), 0);
|
||||
|
||||
// Assert that all ops were returned for retry (regardless of whether they succeeded or not).
|
||||
ASSERT_EQ(result.opsToRetry.size(), 4);
|
||||
ASSERT_EQ(result.opsToRetry[0].getId(), op1.getId());
|
||||
ASSERT_EQ(result.opsToRetry[0].getNss(), nss1);
|
||||
ASSERT_EQ(result.opsToRetry[1].getId(), op2.getId());
|
||||
ASSERT_EQ(result.opsToRetry[1].getNss(), nss1);
|
||||
ASSERT_EQ(result.opsToRetry[2].getId(), op3.getId());
|
||||
ASSERT_EQ(result.opsToRetry[2].getNss(), nss2);
|
||||
ASSERT_EQ(result.opsToRetry[3].getId(), op4.getId());
|
||||
ASSERT_EQ(result.opsToRetry[3].getNss(), nss2);
|
||||
|
||||
// Assert that no OK items were processed.
|
||||
ASSERT_EQ(processor.getNumOkItemsProcessed(), 0);
|
||||
|
||||
// Assert that no errors were recorded.
|
||||
ASSERT_EQ(processor.getNumErrorsRecorded(), 0);
|
||||
}
|
||||
|
||||
TEST_F(WriteBatchResponseProcessorTest, RetryShardsCannotRefreshDueToLocksHeldError) {
|
||||
|
|
|
|||
|
|
@ -65,10 +65,7 @@ StatusWith<Analysis> WriteOpAnalyzerImpl::analyze(OperationContext* opCtx,
|
|||
// TODO SERVER-103782 Don't use CRITargeter.
|
||||
auto nss = op.getNss();
|
||||
CollectionRoutingInfoTargeter targeter(nss, routingCtx);
|
||||
|
||||
// TODO SERVER-103781 Add support for kPartialKeyWithId.
|
||||
// TODO SERVER-103146 Add kChangesOwnership.
|
||||
// TODO SERVER-103781 Add support for "WriteWithoutShardKeyWithId" writes.
|
||||
NSTargeter::TargetingResult tr;
|
||||
switch (op.getType()) {
|
||||
case WriteType::kInsert: {
|
||||
|
|
@ -118,7 +115,13 @@ StatusWith<Analysis> WriteOpAnalyzerImpl::analyze(OperationContext* opCtx,
|
|||
auto targetedSampleId = analyze_shard_key::tryGenerateTargetedSampleId(
|
||||
opCtx, targeter.getNS(), op.getItemRef().getOpType(), tr.endpoints);
|
||||
|
||||
if (tr.useTwoPhaseWriteProtocol || tr.isNonTargetedRetryableWriteWithId) {
|
||||
if (tr.isNonTargetedRetryableWriteWithId) {
|
||||
recordTargetingStats(opCtx, targeter, tr, op);
|
||||
return Analysis{AnalysisType::kRetryableWriteWithId,
|
||||
std::move(tr.endpoints),
|
||||
isViewfulTimeseries,
|
||||
std::move(targetedSampleId)};
|
||||
} else if (tr.useTwoPhaseWriteProtocol) {
|
||||
recordTargetingStats(opCtx, targeter, tr, op);
|
||||
return Analysis{AnalysisType::kTwoPhaseWrite,
|
||||
std::move(tr.endpoints),
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ enum AnalysisType {
|
|||
kSingleShard,
|
||||
kMultiShard,
|
||||
kTwoPhaseWrite,
|
||||
kRetryableWriteWithId,
|
||||
kInternalTransaction,
|
||||
kMultiWriteBlockingMigrations,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -39,11 +39,9 @@
|
|||
namespace mongo {
|
||||
namespace unified_write_executor {
|
||||
|
||||
namespace {
|
||||
bool analysisTypeSupportsGrouping(AnalysisType writeType) {
|
||||
return writeType == kSingleShard || writeType == kMultiShard;
|
||||
bool analysisTypeSupportsGrouping(AnalysisType type) {
|
||||
return type == kSingleShard || type == kMultiShard || type == kRetryableWriteWithId;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
template <bool Ordered>
|
||||
class SimpleBatchBuilderBase {
|
||||
|
|
@ -80,16 +78,25 @@ public:
|
|||
*/
|
||||
bool isCompatibleWithBatch(NamespaceString nss, Analysis& analysis) const {
|
||||
const auto& endpoints = analysis.shardsAffected;
|
||||
// If the op's type is not compatible with SimpleBatch, return false.
|
||||
if (!analysisTypeSupportsGrouping(analysis.type)) {
|
||||
const AnalysisType& analysisType = analysis.type;
|
||||
const bool opIsRetryableWriteWithId = analysisType == kRetryableWriteWithId;
|
||||
|
||||
// If op's analysis type doesn't support grouping, return false.
|
||||
if (!analysisTypeSupportsGrouping(analysisType)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If op isn't compatible with the current batch's type, return false.
|
||||
if (_batch && (opIsRetryableWriteWithId != _batch->isRetryableWriteWithId)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Verify that there is at least one endpoint. Also, if this op is kSingleShard, verify
|
||||
// that there's exactly one endpoint.
|
||||
tassert(10896511, "Expected at least one affected shard", !endpoints.empty());
|
||||
tassert(10896512,
|
||||
"Single shard write type should only target a single shard",
|
||||
analysis.type != kSingleShard || endpoints.size() == 1);
|
||||
"Single shard op should only target a single shard",
|
||||
analysisType != kSingleShard || endpoints.size() == 1);
|
||||
// If the current batch is empty, return true.
|
||||
if (!_batch || _batch->requestByShardId.empty()) {
|
||||
return true;
|
||||
|
|
@ -98,7 +105,7 @@ public:
|
|||
// current batch targets multiple shards, or the op and the current batch target different
|
||||
// shards, then return false.
|
||||
if (Ordered &&
|
||||
(analysis.type != kSingleShard || _batch->requestByShardId.size() > 1 ||
|
||||
(endpoints.size() > 1 || _batch->requestByShardId.size() > 1 ||
|
||||
endpoints.front().shardName != _batch->requestByShardId.begin()->first)) {
|
||||
return false;
|
||||
}
|
||||
|
|
@ -136,18 +143,24 @@ public:
|
|||
}
|
||||
|
||||
/**
|
||||
* Adds 'writeOp' to the current batch. This method will fail with a tassert if writeOp's type
|
||||
* is not compatible with SimpleWriteBatch.
|
||||
* Adds 'writeOp' to the current batch. This method will fail with a tassert if the op's
|
||||
* analysis type is not compatible with the current batch.
|
||||
*/
|
||||
void addOp(WriteOp& writeOp, Analysis& analysis) {
|
||||
const AnalysisType& analysisType = analysis.type;
|
||||
const bool opIsRetryableWriteWithId = analysisType == kRetryableWriteWithId;
|
||||
tassert(10896513,
|
||||
"Expected op to be compatible with SimpleWriteBatch",
|
||||
analysisTypeSupportsGrouping(analysis.type));
|
||||
"Expected op's analysis type to support grouping",
|
||||
analysisTypeSupportsGrouping(analysisType));
|
||||
|
||||
if (!_batch) {
|
||||
_batch.emplace();
|
||||
_batch = SimpleWriteBatch::makeEmpty(opIsRetryableWriteWithId);
|
||||
}
|
||||
|
||||
tassert(10378100,
|
||||
"Expected op's type to be compatible with batch",
|
||||
opIsRetryableWriteWithId == _batch->isRetryableWriteWithId);
|
||||
|
||||
for (const auto& shard : analysis.shardsAffected) {
|
||||
auto nss = writeOp.getNss();
|
||||
int estSizeBytesForWrite =
|
||||
|
|
@ -212,6 +225,10 @@ protected:
|
|||
*/
|
||||
bool wasShardAlreadyTargetedWithDifferentShardVersion(NamespaceString nss,
|
||||
Analysis& analysis) const {
|
||||
if (!_batch) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (const auto& shard : analysis.shardsAffected) {
|
||||
auto it = _batch->requestByShardId.find(shard.shardName);
|
||||
if (it != _batch->requestByShardId.end()) {
|
||||
|
|
@ -241,12 +258,14 @@ protected:
|
|||
|
||||
class OrderedSimpleBatchBuilder : public SimpleBatchBuilderBase<true> {
|
||||
public:
|
||||
using SimpleBatchBuilderBase::SimpleBatchBuilderBase;
|
||||
using BaseT = SimpleBatchBuilderBase<true>;
|
||||
using BaseT::BaseT;
|
||||
};
|
||||
|
||||
class UnorderedSimpleBatchBuilder : public SimpleBatchBuilderBase<false> {
|
||||
public:
|
||||
using SimpleBatchBuilderBase::SimpleBatchBuilderBase;
|
||||
using BaseT = SimpleBatchBuilderBase<false>;
|
||||
using BaseT::BaseT;
|
||||
};
|
||||
|
||||
void WriteOpBatcher::markBatchReprocess(WriteBatch batch) {
|
||||
|
|
@ -365,14 +384,14 @@ BatcherResult OrderedWriteOpBatcher::getNextBatch(OperationContext* opCtx,
|
|||
*writeOp, std::move(sampleId), analysis.isViewfulTimeseries}},
|
||||
std::move(opsWithErrors)};
|
||||
}
|
||||
// If the first WriteOp is kMultiShard, then add the op to a SimpleBatch and then break
|
||||
// and return the batch.
|
||||
if (analysis.type == kMultiShard) {
|
||||
// If the first WriteOp is kMultiShard or kRetryableWriteWithId, then start a new
|
||||
// SimpleWriteBatch, add the op to the batch, and break and return the batch.
|
||||
if (analysis.type != kSingleShard) {
|
||||
builder.addOp(*writeOp, analysis);
|
||||
break;
|
||||
}
|
||||
// If the op is kSingleShard, then add the op to a SimpleBatch and keep looping to see
|
||||
// if more ops can be added to the batch.
|
||||
// If the op is kSingleShard, then start a new SimpleWriteBatch and add the op to the
|
||||
// batch, and keep looping to see if more ops can be added to the batch.
|
||||
builder.addOp(*writeOp, analysis);
|
||||
}
|
||||
}
|
||||
|
|
@ -505,8 +524,9 @@ BatcherResult UnorderedWriteOpBatcher::getNextBatch(OperationContext* opCtx,
|
|||
writeOp, std::move(sampleId), analysis.isViewfulTimeseries}},
|
||||
std::move(opsWithErrors)};
|
||||
}
|
||||
// If the op is kSingleShard or kMultiShard, then add the op to a SimpleBatch and keep
|
||||
// looping to see if more ops can be added to the batch.
|
||||
// If the op is kSingleShard or kMultiShard or kRetryableWriteWithId, then start a new
|
||||
// SimpleWriteBatch and add the op to the batch, and keep looping to see if more ops can
|
||||
// be added to the batch.
|
||||
builder.addOp(writeOp, analysis);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,6 +39,8 @@
|
|||
namespace mongo {
|
||||
namespace unified_write_executor {
|
||||
|
||||
bool analysisTypeSupportsGrouping(AnalysisType type);
|
||||
|
||||
struct EmptyBatch {
|
||||
std::vector<WriteOp> getWriteOps() const {
|
||||
return std::vector<WriteOp>{};
|
||||
|
|
@ -54,9 +56,8 @@ struct EmptyBatch {
|
|||
};
|
||||
|
||||
struct SimpleWriteBatch {
|
||||
// Given that a write command can target multiple collections,
|
||||
// we store one shard version per namespace to support batching ops which target the same shard,
|
||||
// but target different namespaces.
|
||||
// Given that a write command can target multiple collections, we store one shard version per
|
||||
// namespace to support batching ops which target different namespaces on the same shard.
|
||||
struct ShardRequest {
|
||||
std::map<NamespaceString, ShardEndpoint> versionByNss;
|
||||
std::set<NamespaceString> nssIsViewfulTimeseries;
|
||||
|
|
@ -67,6 +68,12 @@ struct SimpleWriteBatch {
|
|||
|
||||
std::map<ShardId, ShardRequest> requestByShardId;
|
||||
|
||||
bool isRetryableWriteWithId = false;
|
||||
|
||||
static SimpleWriteBatch makeEmpty(bool isRetryableWriteWithId) {
|
||||
return SimpleWriteBatch{{}, isRetryableWriteWithId};
|
||||
}
|
||||
|
||||
std::vector<WriteOp> getWriteOps() const {
|
||||
std::vector<WriteOp> result;
|
||||
absl::flat_hash_set<WriteOpId> dedup;
|
||||
|
|
|
|||
|
|
@ -434,9 +434,11 @@ TEST_F(OrderedUnifiedWriteExecutorBatcherTest, OrderedBatcherBatchesQuaruntineOp
|
|||
{0, Analysis{kSingleShard, {nss0Shard0}, nss0IsViewfulTimeseries}},
|
||||
{1, Analysis{kTwoPhaseWrite, {nss0Shard0}, nss0IsViewfulTimeseries}},
|
||||
{2, Analysis{kSingleShard, {nss0Shard0}, nss0IsViewfulTimeseries}},
|
||||
{3, Analysis{kInternalTransaction, {nss0Shard0}}},
|
||||
{3, Analysis{kInternalTransaction, {nss0Shard0}, nss0IsViewfulTimeseries}},
|
||||
{4, Analysis{kSingleShard, {nss0Shard0}, nss0IsViewfulTimeseries}},
|
||||
{5, Analysis{kMultiWriteBlockingMigrations, {nss0Shard0, nss0Shard1}}},
|
||||
{5,
|
||||
Analysis{
|
||||
kMultiWriteBlockingMigrations, {nss0Shard0, nss0Shard1}, nss0IsViewfulTimeseries}},
|
||||
});
|
||||
|
||||
auto routingCtx = RoutingContext::createSynthetic({});
|
||||
|
|
@ -925,9 +927,11 @@ TEST_F(UnorderedUnifiedWriteExecutorBatcherTest, UnorderedBatcherBatchesQuarunti
|
|||
{3, Analysis{kTwoPhaseWrite, {nss0Shard0, nss0Shard1}, nss0IsViewfulTimeseries}},
|
||||
{4, Analysis{kTwoPhaseWrite, {nss0Shard0, nss0Shard1}, nss0IsViewfulTimeseries}},
|
||||
{5, Analysis{kSingleShard, {nss0Shard0}, nss0IsViewfulTimeseries}},
|
||||
{6, Analysis{kInternalTransaction, {nss0Shard0}}},
|
||||
{6, Analysis{kInternalTransaction, {nss0Shard0}, nss0IsViewfulTimeseries}},
|
||||
{7, Analysis{kSingleShard, {nss0Shard0}, nss0IsViewfulTimeseries}},
|
||||
{8, Analysis{kMultiWriteBlockingMigrations, {nss0Shard0, nss0Shard1}}},
|
||||
{8,
|
||||
Analysis{
|
||||
kMultiWriteBlockingMigrations, {nss0Shard0, nss0Shard1}, nss0IsViewfulTimeseries}},
|
||||
});
|
||||
|
||||
auto routingCtx = RoutingContext::createSynthetic({});
|
||||
|
|
|
|||
|
|
@ -58,10 +58,14 @@
|
|||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
MONGO_FAIL_POINT_DEFINE(hangAfterCompletingWriteWithoutShardKeyWithId);
|
||||
|
||||
FailPoint& getHangAfterCompletingWriteWithoutShardKeyWithIdFailPoint() {
|
||||
return hangAfterCompletingWriteWithoutShardKeyWithId;
|
||||
}
|
||||
|
||||
namespace {
|
||||
// Aggregate a bunch of errors for a single op together
|
||||
write_ops::WriteError combineOpErrors(const std::vector<ChildWriteOp const*>& errOps) {
|
||||
auto getStatusCode = [](ChildWriteOp const* item) {
|
||||
|
|
|
|||
|
|
@ -100,6 +100,8 @@ inline bool writeTypeSupportsGrouping(WriteType t) {
|
|||
return t == WriteType::Ordinary || t == WriteType::WithoutShardKeyWithId;
|
||||
}
|
||||
|
||||
FailPoint& getHangAfterCompletingWriteWithoutShardKeyWithIdFailPoint();
|
||||
|
||||
/**
|
||||
* State of a write in-progress (to a single shard) which is one part of a larger write
|
||||
* operation.
|
||||
|
|
|
|||
Loading…
Reference in New Issue