diff --git a/buildscripts/resmokeconfig/suites/unified_write_executor_sharding.yml b/buildscripts/resmokeconfig/suites/unified_write_executor_sharding.yml index 92c0431478b..46c921a2971 100644 --- a/buildscripts/resmokeconfig/suites/unified_write_executor_sharding.yml +++ b/buildscripts/resmokeconfig/suites/unified_write_executor_sharding.yml @@ -6,60 +6,67 @@ test_kind: js_test selector: roots: - - jstests/sharding/internal_txns/**/*.js - - jstests/sharding/txn*.js - - jstests/noPassthrough/txns_retryable_writes_sessions/**/*.js - - jstests/noPassthrough/query/out_merge/**/*.js - - jstests/sharding/query/out_merge/**/*.js - # Sharding tests. - - jstests/sharding/query/bulk_write_basic.js - jstests/sharding/analyze_shard_key/list_sampled_queries.js + - jstests/sharding/analyze_shard_key/read_and_write_distribution.js - jstests/sharding/analyze_shard_key/sample_rates_bulk_write_multi_namespaces.js # - jstests/sharding/applyOps_multiple_namespaces.js + - jstests/sharding/batched_writes_with_id_without_shard_key_basic.js + - jstests/sharding/batched_writes_with_id_without_shard_key_stale_config.js - jstests/sharding/chunk_migration_with_bulk_write.js # - jstests/sharding/database_versioning_all_commands.js + - jstests/sharding/deleteOne_with_id_without_shard_key_basic.js + - jstests/sharding/deleteOne_with_id_without_shard_key_stale_config.js + - jstests/sharding/internal_txns/**/*.js - jstests/sharding/mongos_writes_wait_for_write_concern_sharded.js - jstests/sharding/mongos_writes_wait_for_write_concern_sharded_addl_crud_ops.js - - jstests/sharding/timeseries/**/*.js - jstests/sharding/mongos_writes_wait_for_write_concern_timeseries.js - jstests/sharding/mongos_writes_wait_for_write_concern_unsharded.js + - jstests/sharding/migration_blocking_operation/implicit_create_from_upsert_with_paused_migrations.js + - jstests/sharding/migration_blocking_operation/mongos_calls_shardsvr_coordinate_multi_update_command.js + - jstests/sharding/migration_blocking_operation/pause_during_multi_updates_cluster_parameter_requires_feature_flag.js - jstests/sharding/multi_collection_transaction_placement_conflict_workaround.js - # TODO: SERVER-103781 try enabling when write without shard key with id supported. - # - jstests/sharding/retryable_update_one_by_id_chunk_migration.js + - jstests/sharding/query/bulk_write_basic.js - jstests/sharding/query/bulk_write_size_limit.js + - jstests/sharding/query/collation/*.js + - jstests/sharding/query/find_and_modify/*.js - jstests/sharding/query/let_rand.js + - jstests/sharding/query/out_merge/**/*.js # - jstests/sharding/query/update/update_shard_key_bulk_write.js # - jstests/sharding/query/update_delete_many_metrics.js # - jstests/sharding/read_write_concern_defaults_application.js - - jstests/sharding/updateOne_without_shard_key/deleteOne_without_shard_key_basic.js - - jstests/sharding/updateOne_without_shard_key/dropping_collection_during_clusterQueryWithoutShardKey_errors.js - - jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_basic.js - - jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_sort.js - - jstests/sharding/updateOne_without_shard_key/errors.js - - jstests/sharding/updateOne_without_shard_key/find_and_modify_without_shard_key_sort.js - - jstests/sharding/analyze_shard_key/read_and_write_distribution.js - - jstests/noPassthrough/crud/bulk_write_max_replies_size_mongos.js - - jstests/noPassthrough/crud/bulk_write_metrics.js - - jstests/sharding/updateOne_without_shard_key/write_without_shard_key_single_shard_data_placement_change.js - - jstests/sharding/retryable_writes.js - - jstests/sharding/retryable_writes_nested_shard_key.js + - jstests/sharding/retryable_update_one_by_id_chunk_migration.js - jstests/sharding/retryable_upsert_single_write_shard.js - jstests/sharding/retryable_write_error_labels.js - - jstests/sharding/query/find_and_modify/*.js - - jstests/sharding/query/collation/*.js + - jstests/sharding/retryable_writes.js + - jstests/sharding/retryable_writes_nested_shard_key.js + - jstests/sharding/server_status_crud_metrics_write_without_shard_key_with_id.js + - jstests/sharding/swallow_unnecessary_uuid_mismatch_error.js + - jstests/sharding/timeseries/**/*.js + - jstests/sharding/txn*.js + - jstests/sharding/updateOne_with_id_without_shard_key_basic.js + - jstests/sharding/updateOne_with_id_without_shard_key_stale_config.js + - jstests/sharding/updateOne_without_shard_key/deleteOne_without_shard_key_basic.js + - jstests/sharding/updateOne_without_shard_key/dropping_collection_during_clusterQueryWithoutShardKey_errors.js + - jstests/sharding/updateOne_without_shard_key/errors.js + - jstests/sharding/updateOne_without_shard_key/find_and_modify_without_shard_key_sort.js + - jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_basic.js + - jstests/sharding/updateOne_without_shard_key/updateOne_without_shard_key_sort.js + - jstests/sharding/updateOne_without_shard_key/write_without_shard_key_single_shard_data_placement_change.js # No passthrough tests. - - jstests/noPassthrough/timeseries/**/*.js - # - jstests/noPassthrough/crud/bulk_write_currentop.js + - jstests/noPassthrough/crud/bulk_write_currentop.js + - jstests/noPassthrough/crud/bulk_write_max_replies_size_mongos.js - jstests/noPassthrough/crud/bulk_write_metrics.js - jstests/noPassthrough/crud/bulk_write_metrics_single_shard.js - - jstests/noPassthrough/crud/bulk_write_currentop.js - - jstests/noPassthrough/query/current_op/currentop_query.js - # - jstests/noPassthrough/query/command_diagnostics_sharded.js + - jstests/noPassthrough/query/current_op/currentop_query.js + - jstests/noPassthrough/query/out_merge/**/*.js # - jstests/noPassthrough/query/profile/profile_planning_time_stats.js # - jstests/noPassthrough/server_parameters/cluster_commands_require_cluster_node.js + - jstests/noPassthrough/timeseries/**/*.js + - jstests/noPassthrough/txns_retryable_writes_sessions/**/*.js + - src/mongo/db/modules/enterprise/jstests/no_passthrough/fle2_bulkwrite_metrics.js - src/mongo/db/modules/enterprise/jstests/no_passthrough/fle2_log_omit.js - src/mongo/db/modules/enterprise/jstests/no_passthrough/fle2_log_omit_bulk_write.js @@ -68,12 +75,6 @@ selector: - src/mongo/db/modules/enterprise/jstests/no_passthrough/fle2_write_concern_errors_unsharded.js - src/mongo/db/modules/enterprise/jstests/no_passthrough/fle_tassert_log_omit.js - - jstests/sharding/migration_blocking_operation/mongos_calls_shardsvr_coordinate_multi_update_command.js - - jstests/sharding/migration_blocking_operation/implicit_create_from_upsert_with_paused_migrations.js - - jstests/sharding/migration_blocking_operation/pause_during_multi_updates_cluster_parameter_requires_feature_flag.js - - - jstests/sharding/swallow_unnecessary_uuid_mismatch_error.js - exclude_files: # TODO SERVER-114138: Handle concurrent collection drops with $merge in UWE. - jstests/sharding/query/out_merge/merge_stale_on_fields.js diff --git a/jstests/sharding/retryable_update_one_by_id_chunk_migration.js b/jstests/sharding/retryable_update_one_by_id_chunk_migration.js index 25cde212ff1..279bf553427 100644 --- a/jstests/sharding/retryable_update_one_by_id_chunk_migration.js +++ b/jstests/sharding/retryable_update_one_by_id_chunk_migration.js @@ -1,17 +1,13 @@ /** - * This tests that mongos correctly reports 'n'/'nModified' for retried retryable updates/deletes - * with _id without shard key after combining responses from multiple shards post session migration - * in the following cases: - * 1) If they are sent with batch size of 1 with ordered:true or ordered:false. - * 2) If they are sent with batch size > 1 with ordered: true. - * - * The case of batch size > 1 with ordered: false will be taken care by PM-3673. + * This test checks the 'n'/'nModified' values reported by mongos when retrying updates/deletes + * by _id without a shard key after a chunk migration. * * @tags: [requires_fcv_80] */ import {ShardingTest} from "jstests/libs/shardingtest.js"; import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js"; +import {isUweEnabled} from "jstests/libs/query/uwe_utils.js"; (function () { "use strict"; @@ -20,6 +16,8 @@ import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_ const db = st.s.getDB("test"); const collection = db.getCollection("mycoll"); + const uweEnabled = isUweEnabled(st.s); + CreateShardedCollectionUtil.shardCollectionWithChunks(collection, {x: 1}, [ {min: {x: MinKey}, max: {x: 0}, shard: st.shard0.shardName}, {min: {x: 0}, max: {x: 10}, shard: st.shard0.shardName}, @@ -30,89 +28,120 @@ import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_ assert.commandWorked(collection.insert({_id: i, x: 5, counter: 0})); } - const sessionCollection = st.s + const sessionColl = st.s .startSession({causalConsistency: false, retryWrites: false}) .getDatabase(db.getName()) .getCollection(collection.getName()); - // Updates by _id are broadcasted to all shards which own chunks for the collection. After the - // session information is migrated to shard1 from the moveChunk command, both shard0 and shard1 - // will report {n: 1, nModified: 1} for the retried stmt ids. - const updateCmd = { + const updateCmdSingleOp = { updates: [{q: {_id: 0}, u: {$inc: {counter: 1}}}], ordered: true, txnNumber: NumberLong(0), }; - const deleteCmd = { + const deleteCmdSingleOp = { deletes: [{q: {_id: 0}, limit: 1}], ordered: true, txnNumber: NumberLong(1), }; - const updateCmdUnordered = { - updates: [{q: {_id: 1}, u: {$inc: {counter: 1}}}], - ordered: false, + const updateCmdOrdered = { + updates: [ + {q: {_id: 1}, u: {$inc: {counter: 1}}}, + {q: {_id: 2}, u: {$inc: {counter: 1}}}, + ], + ordered: true, txnNumber: NumberLong(2), }; - const deleteCmdUnordered = { - deletes: [{q: {_id: 1}, limit: 1}], - ordered: false, + const deleteCmdOrdered = { + deletes: [ + {q: {_id: 1}, limit: 1}, + {q: {_id: 2}, limit: 1}, + ], + ordered: true, txnNumber: NumberLong(3), }; - const updateCmdWithMultipleUpdatesOrdered = { + const updateCmdUnordered = { updates: [ - {q: {_id: 2}, u: {$inc: {counter: 1}}}, {q: {_id: 3}, u: {$inc: {counter: 1}}}, {q: {_id: 4}, u: {$inc: {counter: 1}}}, ], - ordered: true, + ordered: false, txnNumber: NumberLong(4), }; - const deleteCmdWithMultipleDeletesOrdered = { + const deleteCmdUnordered = { deletes: [ - {q: {_id: 2}, limit: 1}, {q: {_id: 3}, limit: 1}, {q: {_id: 4}, limit: 1}, ], - ordered: true, + ordered: false, txnNumber: NumberLong(5), }; - function runUpdateAndMoveChunk(cmdObj, coll, toShard, expected) { + function runUpdateTwice(cmdObj, coll, shard0, shard1, firstExp, secondExp) { const firstRes = assert.commandWorked(coll.runCommand("update", cmdObj)); - assert.eq({n: firstRes.n, nModified: firstRes.nModified}, expected); + assert.eq(firstRes.n, firstExp); + assert.eq(firstRes.nModified, firstExp); - assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: toShard})); + assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: shard1})); const secondRes = assert.commandWorked(coll.runCommand("update", cmdObj)); - assert.eq({n: secondRes.n, nModified: secondRes.nModified}, expected); + assert.eq(secondRes.n, secondExp); + assert.eq(secondRes.nModified, secondExp); + + assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: shard0})); } - function runDeleteAndMoveChunk(cmdObj, coll, toShard, expected) { + function runDeleteTwice(cmdObj, coll, shard0, shard1, firstExp, secondExp) { const firstRes = assert.commandWorked(coll.runCommand("delete", cmdObj)); - assert.eq(firstRes.n, expected); + assert.eq(firstRes.n, firstExp); - assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: toShard})); + assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: shard1})); const secondRes = assert.commandWorked(coll.runCommand("delete", cmdObj)); - assert.eq(secondRes.n, expected); + assert.eq(secondRes.n, secondExp); + + assert.commandWorked(db.adminCommand({moveChunk: coll.getFullName(), find: {x: 5}, to: shard0})); } - runUpdateAndMoveChunk(updateCmd, sessionCollection, st.shard1.shardName, {n: 1, nModified: 1}); - runDeleteAndMoveChunk(deleteCmd, sessionCollection, st.shard0.shardName, 1); + const shardName0 = st.shard0.shardName; + const shardName1 = st.shard1.shardName; - runUpdateAndMoveChunk(updateCmdUnordered, sessionCollection, st.shard1.shardName, {n: 1, nModified: 1}); - runDeleteAndMoveChunk(deleteCmdUnordered, sessionCollection, st.shard0.shardName, 1); + // Updates/deletes by _id without a shard key are broadcasted to all shards which own chunks for + // the collection. After the session information is migrated to shard1 from the moveChunk + // command, both shard0 and shard1 will report {n: 1, nModified: 1} for the retried stmt ids. + // + // What happens when this command is retried? + // + // If BatchWriteExec or bulk_write_exec is used -AND- if each update/delete by _id executes in a + // batch by itself (either because ordered=true or due to other circumstances), there is special + // case logic (see WriteOp::_noteWriteWithoutShardKeyWithIdBatchResponseWithSingleWrite()) that + // ensures that each op will only count as "1" when generating the response to send to the + // client. + // + // If UWE is used -OR- if ordered=false and multiple updates/deletes by _id executed in a batch + // together, then the responses from all the shards get summed together, which in this test + // example results in each op being counted twice. + // + // TODO SERVER-54019 Avoid over-counting 'n' and 'nModified' values when retrying updates by _id + // or deletes by _id after chunk migration. + let firstExp = 1; + let secondExp = uweEnabled ? 2 * firstExp : firstExp; + runUpdateTwice(updateCmdSingleOp, sessionColl, shardName0, shardName1, firstExp, secondExp); + runDeleteTwice(deleteCmdSingleOp, sessionColl, shardName0, shardName1, firstExp, secondExp); - runUpdateAndMoveChunk(updateCmdWithMultipleUpdatesOrdered, sessionCollection, st.shard1.shardName, { - n: 3, - nModified: 3, - }); - runDeleteAndMoveChunk(deleteCmdWithMultipleDeletesOrdered, sessionCollection, st.shard0.shardName, 3); + firstExp = 2; + secondExp = uweEnabled ? 2 * firstExp : firstExp; + runUpdateTwice(updateCmdOrdered, sessionColl, shardName0, shardName1, firstExp, secondExp); + runDeleteTwice(deleteCmdOrdered, sessionColl, shardName0, shardName1, firstExp, secondExp); + + firstExp = 2; + secondExp = 2 * firstExp; + runUpdateTwice(updateCmdUnordered, sessionColl, shardName0, shardName1, firstExp, secondExp); + runDeleteTwice(deleteCmdUnordered, sessionColl, shardName0, shardName1, firstExp, secondExp); st.stop(); })(); diff --git a/src/mongo/db/router_role/collection_routing_info_targeter.cpp b/src/mongo/db/router_role/collection_routing_info_targeter.cpp index 34527b94bcd..3a0e787a6b6 100644 --- a/src/mongo/db/router_role/collection_routing_info_targeter.cpp +++ b/src/mongo/db/router_role/collection_routing_info_targeter.cpp @@ -558,7 +558,7 @@ NSTargeter::TargetingResult CollectionRoutingInfoTargeter::targetUpdate( // TODO SPM-3673: Implement a similar approach for non-retryable or sessionless multi:false // non-upsert updates with an "_id" equality that involve multiple shards. result.isNonTargetedRetryableWriteWithId = - multipleEndpoints && isExactId && !isUpsert && isRetryableWrite(opCtx); + multipleEndpoints && isExactId && !isUpsert && !isFindAndModify && isRetryableWrite(opCtx); // Increment query counters as appropriate. if (!multipleEndpoints) { diff --git a/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.cpp b/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.cpp index 7fe0231a34b..21e27d85f06 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.cpp @@ -642,7 +642,7 @@ WriteBatchResponse WriteBatchExecutor::_execute(OperationContext* opCtx, routingCtx.onRequestSentForNss(nss); } - SimpleWriteBatchResponse resp; + auto resp = SimpleWriteBatchResponse::makeEmpty(batch.isRetryableWriteWithId); bool stopParsingResponses = false; while (!stopParsingResponses && !sender.done()) { diff --git a/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.h b/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.h index b74a855aad8..87a9aa944de 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.h +++ b/src/mongo/s/write_ops/unified_write_executor/write_batch_executor.h @@ -162,6 +162,11 @@ struct EmptyBatchResponse {}; struct SimpleWriteBatchResponse { std::vector> shardResponses; + bool isRetryableWriteWithId = false; + + static SimpleWriteBatchResponse makeEmpty(bool isRetryableWriteWithId) { + return SimpleWriteBatchResponse{{}, isRetryableWriteWithId}; + } }; class NoRetryWriteBatchResponse : public BasicResponse { diff --git a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp index f0a9466904f..73f2433f515 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp @@ -34,10 +34,12 @@ #include "mongo/db/query/client_cursor/cursor_server_params_gen.h" #include "mongo/db/router_role/collection_uuid_mismatch.h" #include "mongo/db/shard_role/shard_catalog/collection_uuid_mismatch_info.h" +#include "mongo/db/stats/counters.h" #include "mongo/s/commands/query_cmd/populate_cursor.h" #include "mongo/s/transaction_router.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/s/write_ops/unified_write_executor/write_batch_executor.h" +#include "mongo/s/write_ops/write_op.h" #include "mongo/s/write_ops/write_op_helper.h" #include "mongo/util/assert_util.h" @@ -52,10 +54,24 @@ using Unexecuted = WriteBatchResponseProcessor::Unexecuted; using SucceededWithoutItem = WriteBatchResponseProcessor::SucceededWithoutItem; using FindAndModifyReplyItem = WriteBatchResponseProcessor::FindAndModifyReplyItem; using GroupItemsResult = WriteBatchResponseProcessor::GroupItemsResult; +using ItemsByOpMap = WriteBatchResponseProcessor::ItemsByOpMap; using ShardResult = WriteBatchResponseProcessor::ShardResult; namespace { -ErrorCodes::Error getErrorCodeForItem(const ItemVariant& itemVar) { +// This function returns the Status for a given ItemVariant ('itemVar'). If 'itemVar' is Unexecuted, +// this function will return an OK status. +Status getItemStatus(const ItemVariant& itemVar) { + return visit( + OverloadedVisitor([&](const Unexecuted&) { return Status::OK(); }, + [&](const SucceededWithoutItem&) { return Status::OK(); }, + [&](const BulkWriteReplyItem& item) { return item.getStatus(); }, + [&](const FindAndModifyReplyItem& item) { return item.getStatus(); }), + itemVar); +} + +// This function returns the error code for a given ItemVariant ('itemVar'). If 'itemVar' is +// Unexecuted, this function will return ErrorCodes::OK. +ErrorCodes::Error getErrorCode(const ItemVariant& itemVar) { return visit(OverloadedVisitor( [&](const Unexecuted&) { return ErrorCodes::OK; }, [&](const SucceededWithoutItem&) { return ErrorCodes::OK; }, @@ -64,8 +80,13 @@ ErrorCodes::Error getErrorCodeForItem(const ItemVariant& itemVar) { itemVar); } +// Like getErrorCode(), but takes a 'std::pair' as its input. ErrorCodes::Error getErrorCodeForShardItemPair(const std::pair& p) { - return getErrorCodeForItem(p.second); + return getErrorCode(p.second); +} + +bool isRetryableError(const ItemVariant& itemVar) { + return write_op_helpers::isRetryErrCode(getErrorCode(itemVar)); } template @@ -87,6 +108,46 @@ void handleTransientTxnError(OperationContext* opCtx, << " during a transaction")); } } + +std::shared_ptr getCannotImplicitlyCreateCollectionInfo( + const Status& status) { + if (status == ErrorCodes::CannotImplicitlyCreateCollection) { + auto info = status.extraInfo(); + tassert(11182204, "Expected to find CannotImplicitlyCreateCollectionInfo", info != nullptr); + + return info; + } + + return {}; +} + +std::shared_ptr getCollectionUUIDMismatchInfo( + const Status& status) { + if (status == ErrorCodes::CollectionUUIDMismatch) { + auto info = status.extraInfo(); + tassert(11273500, "Expected to find CollectionUUIDMismatchInfo", info != nullptr); + + return info; + } + + return {}; +} + +// Helper function that prints the contents of 'opsToRetry' to the log if appropriate. +void logOpsToRetry(const std::vector& opsToRetry) { + if (opsToRetry.empty() && + shouldLog(MONGO_LOGV2_DEFAULT_COMPONENT, logv2::LogSeverity::Debug(4))) { + std::stringstream opsStream; + size_t numOpsInStream = 0; + + for (const auto& op : opsToRetry) { + opsStream << (numOpsInStream++ > 0 ? ", " : "") << op.getId(); + } + + LOGV2_DEBUG( + 10411404, 4, "re-enqueuing ops that didn't complete", "ops"_attr = opsStream.str()); + } +} } // namespace ProcessorResult WriteBatchResponseProcessor::onWriteBatchResponse( @@ -119,7 +180,47 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse( // Organize the items from the shard responses by op, and check if any of the items is an // unrecoverable error. - auto [itemsByOp, unrecoverable] = groupItemsByOp(opCtx, shardResults); + auto [itemsByOp, unrecoverable, hasRetryableError] = groupItemsByOp(opCtx, shardResults); + + // For "RetryableWriteWithId" simple write batches, a different retry strategy is used. If the + // batch has any retryable errors (and 'inTransaction' and 'unrecoverable' are false), then all + // of the ops in the batch are retried regardless of what their execution status was. + // + // This is the key difference between "RetryableWriteWithId" simple write batches and regular + // simple write batch. + if (response.isRetryableWriteWithId && hasRetryableError && !inTransaction && !unrecoverable) { + std::set toRetry; + CollectionsToCreate collsToCreate; + + // For each 'op', queue 'op' for retry and also increment counters as appropriate. + for (auto& [op, items] : itemsByOp) { + for (const auto& [shardId, itemVar] : items) { + if (isRetryableError(itemVar)) { + // If 'itemVar' has a Status that is a retryable error, we pass that in when + // calling queueOpForRetry(). + queueOpForRetry(op, getItemStatus(itemVar), toRetry, collsToCreate); + } else { + // Otherwise, call queueOpForRetry() without a Stauts. + queueOpForRetry(op, toRetry); + } + } + + if (op.getType() == kUpdate) { + getQueryCounters(opCtx).updateOneWithoutShardKeyWithIdRetryCount.increment(1); + } else if (op.getType() == kDelete) { + getQueryCounters(opCtx).deleteOneWithoutShardKeyWithIdRetryCount.increment(1); + } + } + + ProcessorResult result; + result.opsToRetry.insert(result.opsToRetry.end(), toRetry.begin(), toRetry.end()); + result.collsToCreate = std::move(collsToCreate); + + // Print the contents of 'opsToRetry' to the log if appropriate. + logOpsToRetry(result.opsToRetry); + + return result; + } // Update the counters (excluding _nErrors), update the list of retried stmtIds, and process // the write concern error (if any). @@ -154,7 +255,7 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse( // If 'shouldRetryOnUnexecutedOrRetryableError' is true, then queue 'op' to // be retried. if (shouldRetryOnUnexecutedOrRetryableError) { - queueOpForRetry(op, /*status*/ boost::none, toRetry, collsToCreate); + queueOpForRetry(op, toRetry); } return true; }, @@ -221,20 +322,18 @@ ProcessorResult WriteBatchResponseProcessor::_onWriteBatchResponse( result.collsToCreate = std::move(collsToCreate); result.successfulShardSet = std::move(successfulShardSet); - if (!result.opsToRetry.empty()) { - // Print the contents of 'opsToRetry' to the log. - if (shouldLog(MONGO_LOGV2_DEFAULT_COMPONENT, logv2::LogSeverity::Debug(4))) { - std::stringstream opsStream; - size_t numOpsInStream = 0; - for (const auto& op : result.opsToRetry) { - opsStream << (numOpsInStream++ > 0 ? ", " : "") << op.getId(); - } - - LOGV2_DEBUG( - 10411404, 4, "re-enqueuing ops that didn't complete", "ops"_attr = opsStream.str()); + // For "RetryableWriteWithId" batches, if the "hangAfterCompletingWriteWithoutShardKeyWithId" + // failpoint is set, call pauseWhileSet(). + if (response.isRetryableWriteWithId) { + auto& fp = getHangAfterCompletingWriteWithoutShardKeyWithIdFailPoint(); + if (MONGO_unlikely(fp.shouldFail())) { + fp.pauseWhileSet(); } } + // Print the contents of 'opsToRetry' to the log if appropriate. + logOpsToRetry(result.opsToRetry); + return result; } @@ -344,15 +443,17 @@ void WriteBatchResponseProcessor::noteRetryableError(OperationContext* opCtx, } void WriteBatchResponseProcessor::queueOpForRetry(const WriteOp& op, - boost::optional status, + std::set& toRetry) const { + toRetry.emplace(op); +} + +void WriteBatchResponseProcessor::queueOpForRetry(const WriteOp& op, + const Status& status, std::set& toRetry, CollectionsToCreate& collsToCreate) const { toRetry.emplace(op); - if (status && *status == ErrorCodes::CannotImplicitlyCreateCollection) { - auto info = status->extraInfo(); - tassert(11182204, "Expected to find CannotImplicitlyCreateCollectionInfo", info != nullptr); - + if (auto info = getCannotImplicitlyCreateCollectionInfo(status)) { auto nss = info->getNss(); collsToCreate.emplace(std::move(nss), std::move(info)); } @@ -533,19 +634,21 @@ GroupItemsResult WriteBatchResponseProcessor::groupItemsByOp( const bool inTransaction = static_cast(TransactionRouter::get(opCtx)); auto isUnrecoverable = [&](const ItemVariant& itemVar) { - const auto code = getErrorCodeForItem(itemVar); + const auto code = getErrorCode(itemVar); return (code != ErrorCodes::OK && (inTransaction || (ordered && !write_op_helpers::isRetryErrCode(code)))); }; // Organize the items from the shard responses by op, and check if any of the items is an // unrecoverable error. - std::map>> itemsByOp; + ItemsByOpMap itemsByOp; bool unrecoverable = false; + bool hasRetryableError = false; for (const auto& [shardId, shardResult] : shardResults) { for (auto& [op, itemVar] : shardResult.items) { unrecoverable |= isUnrecoverable(itemVar); + hasRetryableError |= isRetryableError(itemVar); auto it = itemsByOp.find(op); if (it != itemsByOp.end()) { @@ -558,7 +661,7 @@ GroupItemsResult WriteBatchResponseProcessor::groupItemsByOp( } } - return GroupItemsResult{std::move(itemsByOp), unrecoverable}; + return GroupItemsResult{std::move(itemsByOp), unrecoverable, hasRetryableError}; } void WriteBatchResponseProcessor::processCountersAndRetriedStmtIds( @@ -588,7 +691,8 @@ void WriteBatchResponseProcessor::retrieveBulkWriteReplyItems(OperationContext* tassert(11182209, "Expected BulkWriteCommandReply", result.bulkWriteReply.has_value()); const BulkWriteCommandReply& parsedReply = *result.bulkWriteReply; - const bool orderedOrInTxn = _cmdRef.getOrdered() || TransactionRouter::get(opCtx); + const bool ordered = _cmdRef.getOrdered(); + const bool inTransaction = static_cast(TransactionRouter::get(opCtx)); // Put all the reply items into a vector and then validate the reply items. Note that if the // BulkWriteCommandReply object contains a non-zero cursor ID, exhaustCursorForReplyItems() @@ -607,7 +711,7 @@ void WriteBatchResponseProcessor::retrieveBulkWriteReplyItems(OperationContext* std::vector opsAfterFinalRetryableError; const bool logOpsAfterFinalRetryableError = - shouldLog(MONGO_LOGV2_DEFAULT_COMPONENT, logv2::LogSeverity::Debug(4)); + !inTransaction && shouldLog(MONGO_LOGV2_DEFAULT_COMPONENT, logv2::LogSeverity::Debug(4)); for (size_t shardOpId = 0; shardOpId < ops.size(); ++shardOpId) { const auto& op = ops[shardOpId]; @@ -631,10 +735,10 @@ void WriteBatchResponseProcessor::retrieveBulkWriteReplyItems(OperationContext* noteRetryableError(opCtx, routingCtx, status); } - // If 'item' is an error and 'orderedOrInTxn' is true, -OR- if 'item' is a - // retryable error and it's the last reply item, then for remaining ops without + // If 'item' is an error and 'ordered || inTransaction' is true, -OR- if 'item' is + // a retryable error and it's the last reply item, then for remaining ops without // reply items we will assume the ops did not execute. - if (orderedOrInTxn || (isRetryableErr && itemIndex >= items.size())) { + if (ordered || inTransaction || (isRetryableErr && itemIndex >= items.size())) { finalErrorForBatch = status; finalErrorForBatchIsRetryable = isRetryableErr; } @@ -722,18 +826,6 @@ void WriteBatchResponseProcessor::validateBulkWriteReplyItems( } namespace { -std::shared_ptr getCollectionUUIDMismatchInfo( - const Status& status) { - if (status == ErrorCodes::CollectionUUIDMismatch) { - auto info = status.extraInfo(); - tassert(11273500, "Expected to find CollectionUUIDMismatchInfo", info != nullptr); - - return info; - } - - return {}; -} - BulkWriteReplyItem getFirstError(const std::vector& items) { tassert(11182216, "Expected vector to contain at least one item", !items.empty()); diff --git a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.h b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.h index 03d8be46dfb..df0397d83d8 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.h +++ b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.h @@ -106,8 +106,13 @@ public: using CollectionsToCreate = ProcessorResult::CollectionsToCreate; - using GroupItemsResult = - std::pair>>, bool>; + using ItemsByOpMap = std::map>>; + + struct GroupItemsResult { + ItemsByOpMap itemsByOp; + bool unrecoverable = false; + bool hasRetryableError = false; + }; struct ShardResult { boost::optional bulkWriteReply; @@ -264,11 +269,16 @@ private: const Status& status); /** - * Helper method that adds 'op' to 'toRetry' and also copies CannotImplicityCreateCollection - * errors into 'collsToCreate'. + * Helper method that adds 'op' to 'toRetry'. + */ + void queueOpForRetry(const WriteOp& op, std::set& toRetry) const; + + /** + * Helper method that adds 'op' to 'toRetry', and extracts CannotImplicityCreateCollectionInfo + * from 'status' (if it exists) and stores it into 'collsToCreate'. */ void queueOpForRetry(const WriteOp& op, - boost::optional status, + const Status& status, std::set& toRetry, CollectionsToCreate& collsToCreate) const; @@ -279,7 +289,8 @@ private: /** * This method scans the items from 'shardResults' and groups the items together by op. This - * method also checks if 'shardResults' contains an unrecoverable error. + * method also returns a flag indicating if 'shardResults' contains a retryable error, and + * another flag indicating if an error occurred that is "unrecoverable". */ GroupItemsResult groupItemsByOp( OperationContext* opCtx, std::vector>& shardResults) const; diff --git a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor_test.cpp b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor_test.cpp index 0018aeee882..dc04683fb57 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor_test.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor_test.cpp @@ -68,6 +68,7 @@ public: const NamespaceString nss1 = NamespaceString::createNamespaceString_forTest("test", "coll"); const NamespaceString nss2 = NamespaceString::createNamespaceString_forTest("test", "coll2"); const NamespaceString nss3 = NamespaceString::createNamespaceString_forTest("test", "coll3"); + const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS(boost::none); const HostAndPort host1 = HostAndPort("host1", 0); const HostAndPort host2 = HostAndPort("host2", 0); const ShardId shard1Name = ShardId("shard1"); @@ -685,6 +686,90 @@ TEST_F(WriteBatchResponseProcessorTest, MixedStalenessErrorsAndOk) { ASSERT_EQ(result.opsToRetry[0].getNss(), nss1); ASSERT_EQ(result.opsToRetry[1].getId(), op2.getId()); ASSERT_EQ(result.opsToRetry[1].getNss(), nss1); + + // Assert that 2 OK items were processed. + ASSERT_EQ(processor.getNumOkItemsProcessed(), 2); + + // Assert that no errors were recorded. + ASSERT_EQ(processor.getNumErrorsRecorded(), 0); +} + +TEST_F(WriteBatchResponseProcessorTest, RetryableWriteWithIdMixedStalenessErrorsAndOk) { + auto request = BulkWriteCommandRequest( + {BulkWriteUpdateOp(0, BSON("_id" << 1), BSON("$set" << BSON("y" << 1))), + BulkWriteUpdateOp(0, BSON("_id" << -1), BSON("$set" << BSON("y" << 1))), + BulkWriteUpdateOp(1, BSON("_id" << 1), BSON("$set" << BSON("y" << 1))), + BulkWriteUpdateOp(1, BSON("_id" << -1), BSON("$set" << BSON("y" << 1)))}, + {NamespaceInfoEntry(nss1), NamespaceInfoEntry(nss2)}); + + request.setOrdered(false); + + const bool isRetryableWriteWithId = true; + + WriteOp op1(request, 0); + WriteOp op2(request, 1); + WriteOp op3(request, 2); + WriteOp op4(request, 3); + + Status badValueStatus(ErrorCodes::BadValue, "Bad Value"); + Status staleCollStatus1( + StaleConfigInfo(nss1, *shard1Endpoint.shardVersion, newShardVersion, shard1Name), ""); + Status staleCollStatus2( + StaleConfigInfo(nss2, *shard2Endpoint.shardVersion, newShardVersion, shard2Name), ""); + + auto reply = makeReply(); + reply.setNErrors(2); + reply.setCursor(BulkWriteCommandResponseCursor(0, + {BulkWriteReplyItem{0, Status::OK()}, + BulkWriteReplyItem{1, staleCollStatus1}, + BulkWriteReplyItem{2, Status::OK()}, + BulkWriteReplyItem{3, badValueStatus}}, + cursorNss)); + RemoteCommandResponse rcr1(host1, setTopLevelOK(reply.toBSON()), Microseconds{0}, false); + + auto reply2 = makeReply(); + reply2.setNErrors(2); + reply2.setCursor(BulkWriteCommandResponseCursor(0, + {BulkWriteReplyItem{0, Status::OK()}, + BulkWriteReplyItem{1, Status::OK()}, + BulkWriteReplyItem{2, staleCollStatus2}, + BulkWriteReplyItem{3, badValueStatus}}, + cursorNss)); + RemoteCommandResponse rcr2(host2, setTopLevelOK(reply2.toBSON()), Microseconds{0}, false); + + WriteCommandRef cmdRef(request); + Stats stats; + WriteBatchResponseProcessor processor(cmdRef, stats); + auto result = processor.onWriteBatchResponse( + opCtx, + routingCtx, + SimpleWriteBatchResponse{{{shard1Name, ShardResponse::make(rcr1, {op1, op2, op3, op4})}, + {shard2Name, ShardResponse::make(rcr2, {op1, op2, op3, op4})}}, + isRetryableWriteWithId}); + + ASSERT_EQ(routingCtx.errors.size(), 2); + ASSERT_EQ(routingCtx.errors[0].code(), ErrorCodes::StaleConfig); + ASSERT_EQ(routingCtx.errors[0].extraInfo()->getNss(), nss1); + ASSERT_EQ(routingCtx.errors[1].code(), ErrorCodes::StaleConfig); + ASSERT_EQ(routingCtx.errors[1].extraInfo()->getNss(), nss2); + ASSERT_EQ(result.collsToCreate.size(), 0); + + // Assert that all ops were returned for retry (regardless of whether they succeeded or not). + ASSERT_EQ(result.opsToRetry.size(), 4); + ASSERT_EQ(result.opsToRetry[0].getId(), op1.getId()); + ASSERT_EQ(result.opsToRetry[0].getNss(), nss1); + ASSERT_EQ(result.opsToRetry[1].getId(), op2.getId()); + ASSERT_EQ(result.opsToRetry[1].getNss(), nss1); + ASSERT_EQ(result.opsToRetry[2].getId(), op3.getId()); + ASSERT_EQ(result.opsToRetry[2].getNss(), nss2); + ASSERT_EQ(result.opsToRetry[3].getId(), op4.getId()); + ASSERT_EQ(result.opsToRetry[3].getNss(), nss2); + + // Assert that no OK items were processed. + ASSERT_EQ(processor.getNumOkItemsProcessed(), 0); + + // Assert that no errors were recorded. + ASSERT_EQ(processor.getNumErrorsRecorded(), 0); } TEST_F(WriteBatchResponseProcessorTest, RetryShardsCannotRefreshDueToLocksHeldError) { diff --git a/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.cpp b/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.cpp index 38d4fe41043..d9040881934 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.cpp @@ -65,10 +65,7 @@ StatusWith WriteOpAnalyzerImpl::analyze(OperationContext* opCtx, // TODO SERVER-103782 Don't use CRITargeter. auto nss = op.getNss(); CollectionRoutingInfoTargeter targeter(nss, routingCtx); - - // TODO SERVER-103781 Add support for kPartialKeyWithId. // TODO SERVER-103146 Add kChangesOwnership. - // TODO SERVER-103781 Add support for "WriteWithoutShardKeyWithId" writes. NSTargeter::TargetingResult tr; switch (op.getType()) { case WriteType::kInsert: { @@ -118,7 +115,13 @@ StatusWith WriteOpAnalyzerImpl::analyze(OperationContext* opCtx, auto targetedSampleId = analyze_shard_key::tryGenerateTargetedSampleId( opCtx, targeter.getNS(), op.getItemRef().getOpType(), tr.endpoints); - if (tr.useTwoPhaseWriteProtocol || tr.isNonTargetedRetryableWriteWithId) { + if (tr.isNonTargetedRetryableWriteWithId) { + recordTargetingStats(opCtx, targeter, tr, op); + return Analysis{AnalysisType::kRetryableWriteWithId, + std::move(tr.endpoints), + isViewfulTimeseries, + std::move(targetedSampleId)}; + } else if (tr.useTwoPhaseWriteProtocol) { recordTargetingStats(opCtx, targeter, tr, op); return Analysis{AnalysisType::kTwoPhaseWrite, std::move(tr.endpoints), diff --git a/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.h b/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.h index 15b1c9cdcfb..a7fc4a024e6 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.h +++ b/src/mongo/s/write_ops/unified_write_executor/write_op_analyzer.h @@ -49,6 +49,7 @@ enum AnalysisType { kSingleShard, kMultiShard, kTwoPhaseWrite, + kRetryableWriteWithId, kInternalTransaction, kMultiWriteBlockingMigrations, }; diff --git a/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.cpp b/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.cpp index ed05aa555b0..0c6d1e0998d 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.cpp @@ -39,11 +39,9 @@ namespace mongo { namespace unified_write_executor { -namespace { -bool analysisTypeSupportsGrouping(AnalysisType writeType) { - return writeType == kSingleShard || writeType == kMultiShard; +bool analysisTypeSupportsGrouping(AnalysisType type) { + return type == kSingleShard || type == kMultiShard || type == kRetryableWriteWithId; } -} // namespace template class SimpleBatchBuilderBase { @@ -80,16 +78,25 @@ public: */ bool isCompatibleWithBatch(NamespaceString nss, Analysis& analysis) const { const auto& endpoints = analysis.shardsAffected; - // If the op's type is not compatible with SimpleBatch, return false. - if (!analysisTypeSupportsGrouping(analysis.type)) { + const AnalysisType& analysisType = analysis.type; + const bool opIsRetryableWriteWithId = analysisType == kRetryableWriteWithId; + + // If op's analysis type doesn't support grouping, return false. + if (!analysisTypeSupportsGrouping(analysisType)) { return false; } + + // If op isn't compatible with the current batch's type, return false. + if (_batch && (opIsRetryableWriteWithId != _batch->isRetryableWriteWithId)) { + return false; + } + // Verify that there is at least one endpoint. Also, if this op is kSingleShard, verify // that there's exactly one endpoint. tassert(10896511, "Expected at least one affected shard", !endpoints.empty()); tassert(10896512, - "Single shard write type should only target a single shard", - analysis.type != kSingleShard || endpoints.size() == 1); + "Single shard op should only target a single shard", + analysisType != kSingleShard || endpoints.size() == 1); // If the current batch is empty, return true. if (!_batch || _batch->requestByShardId.empty()) { return true; @@ -98,7 +105,7 @@ public: // current batch targets multiple shards, or the op and the current batch target different // shards, then return false. if (Ordered && - (analysis.type != kSingleShard || _batch->requestByShardId.size() > 1 || + (endpoints.size() > 1 || _batch->requestByShardId.size() > 1 || endpoints.front().shardName != _batch->requestByShardId.begin()->first)) { return false; } @@ -136,18 +143,24 @@ public: } /** - * Adds 'writeOp' to the current batch. This method will fail with a tassert if writeOp's type - * is not compatible with SimpleWriteBatch. + * Adds 'writeOp' to the current batch. This method will fail with a tassert if the op's + * analysis type is not compatible with the current batch. */ void addOp(WriteOp& writeOp, Analysis& analysis) { + const AnalysisType& analysisType = analysis.type; + const bool opIsRetryableWriteWithId = analysisType == kRetryableWriteWithId; tassert(10896513, - "Expected op to be compatible with SimpleWriteBatch", - analysisTypeSupportsGrouping(analysis.type)); + "Expected op's analysis type to support grouping", + analysisTypeSupportsGrouping(analysisType)); if (!_batch) { - _batch.emplace(); + _batch = SimpleWriteBatch::makeEmpty(opIsRetryableWriteWithId); } + tassert(10378100, + "Expected op's type to be compatible with batch", + opIsRetryableWriteWithId == _batch->isRetryableWriteWithId); + for (const auto& shard : analysis.shardsAffected) { auto nss = writeOp.getNss(); int estSizeBytesForWrite = @@ -212,6 +225,10 @@ protected: */ bool wasShardAlreadyTargetedWithDifferentShardVersion(NamespaceString nss, Analysis& analysis) const { + if (!_batch) { + return false; + } + for (const auto& shard : analysis.shardsAffected) { auto it = _batch->requestByShardId.find(shard.shardName); if (it != _batch->requestByShardId.end()) { @@ -241,12 +258,14 @@ protected: class OrderedSimpleBatchBuilder : public SimpleBatchBuilderBase { public: - using SimpleBatchBuilderBase::SimpleBatchBuilderBase; + using BaseT = SimpleBatchBuilderBase; + using BaseT::BaseT; }; class UnorderedSimpleBatchBuilder : public SimpleBatchBuilderBase { public: - using SimpleBatchBuilderBase::SimpleBatchBuilderBase; + using BaseT = SimpleBatchBuilderBase; + using BaseT::BaseT; }; void WriteOpBatcher::markBatchReprocess(WriteBatch batch) { @@ -365,14 +384,14 @@ BatcherResult OrderedWriteOpBatcher::getNextBatch(OperationContext* opCtx, *writeOp, std::move(sampleId), analysis.isViewfulTimeseries}}, std::move(opsWithErrors)}; } - // If the first WriteOp is kMultiShard, then add the op to a SimpleBatch and then break - // and return the batch. - if (analysis.type == kMultiShard) { + // If the first WriteOp is kMultiShard or kRetryableWriteWithId, then start a new + // SimpleWriteBatch, add the op to the batch, and break and return the batch. + if (analysis.type != kSingleShard) { builder.addOp(*writeOp, analysis); break; } - // If the op is kSingleShard, then add the op to a SimpleBatch and keep looping to see - // if more ops can be added to the batch. + // If the op is kSingleShard, then start a new SimpleWriteBatch and add the op to the + // batch, and keep looping to see if more ops can be added to the batch. builder.addOp(*writeOp, analysis); } } @@ -505,8 +524,9 @@ BatcherResult UnorderedWriteOpBatcher::getNextBatch(OperationContext* opCtx, writeOp, std::move(sampleId), analysis.isViewfulTimeseries}}, std::move(opsWithErrors)}; } - // If the op is kSingleShard or kMultiShard, then add the op to a SimpleBatch and keep - // looping to see if more ops can be added to the batch. + // If the op is kSingleShard or kMultiShard or kRetryableWriteWithId, then start a new + // SimpleWriteBatch and add the op to the batch, and keep looping to see if more ops can + // be added to the batch. builder.addOp(writeOp, analysis); } } diff --git a/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.h b/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.h index 93be93a46c2..09fb0b7d5cd 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.h +++ b/src/mongo/s/write_ops/unified_write_executor/write_op_batcher.h @@ -39,6 +39,8 @@ namespace mongo { namespace unified_write_executor { +bool analysisTypeSupportsGrouping(AnalysisType type); + struct EmptyBatch { std::vector getWriteOps() const { return std::vector{}; @@ -54,9 +56,8 @@ struct EmptyBatch { }; struct SimpleWriteBatch { - // Given that a write command can target multiple collections, - // we store one shard version per namespace to support batching ops which target the same shard, - // but target different namespaces. + // Given that a write command can target multiple collections, we store one shard version per + // namespace to support batching ops which target different namespaces on the same shard. struct ShardRequest { std::map versionByNss; std::set nssIsViewfulTimeseries; @@ -67,6 +68,12 @@ struct SimpleWriteBatch { std::map requestByShardId; + bool isRetryableWriteWithId = false; + + static SimpleWriteBatch makeEmpty(bool isRetryableWriteWithId) { + return SimpleWriteBatch{{}, isRetryableWriteWithId}; + } + std::vector getWriteOps() const { std::vector result; absl::flat_hash_set dedup; diff --git a/src/mongo/s/write_ops/unified_write_executor/write_op_batcher_test.cpp b/src/mongo/s/write_ops/unified_write_executor/write_op_batcher_test.cpp index eb000d4760b..7a6670a136b 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_op_batcher_test.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_op_batcher_test.cpp @@ -434,9 +434,11 @@ TEST_F(OrderedUnifiedWriteExecutorBatcherTest, OrderedBatcherBatchesQuaruntineOp {0, Analysis{kSingleShard, {nss0Shard0}, nss0IsViewfulTimeseries}}, {1, Analysis{kTwoPhaseWrite, {nss0Shard0}, nss0IsViewfulTimeseries}}, {2, Analysis{kSingleShard, {nss0Shard0}, nss0IsViewfulTimeseries}}, - {3, Analysis{kInternalTransaction, {nss0Shard0}}}, + {3, Analysis{kInternalTransaction, {nss0Shard0}, nss0IsViewfulTimeseries}}, {4, Analysis{kSingleShard, {nss0Shard0}, nss0IsViewfulTimeseries}}, - {5, Analysis{kMultiWriteBlockingMigrations, {nss0Shard0, nss0Shard1}}}, + {5, + Analysis{ + kMultiWriteBlockingMigrations, {nss0Shard0, nss0Shard1}, nss0IsViewfulTimeseries}}, }); auto routingCtx = RoutingContext::createSynthetic({}); @@ -925,9 +927,11 @@ TEST_F(UnorderedUnifiedWriteExecutorBatcherTest, UnorderedBatcherBatchesQuarunti {3, Analysis{kTwoPhaseWrite, {nss0Shard0, nss0Shard1}, nss0IsViewfulTimeseries}}, {4, Analysis{kTwoPhaseWrite, {nss0Shard0, nss0Shard1}, nss0IsViewfulTimeseries}}, {5, Analysis{kSingleShard, {nss0Shard0}, nss0IsViewfulTimeseries}}, - {6, Analysis{kInternalTransaction, {nss0Shard0}}}, + {6, Analysis{kInternalTransaction, {nss0Shard0}, nss0IsViewfulTimeseries}}, {7, Analysis{kSingleShard, {nss0Shard0}, nss0IsViewfulTimeseries}}, - {8, Analysis{kMultiWriteBlockingMigrations, {nss0Shard0, nss0Shard1}}}, + {8, + Analysis{ + kMultiWriteBlockingMigrations, {nss0Shard0, nss0Shard1}, nss0IsViewfulTimeseries}}, }); auto routingCtx = RoutingContext::createSynthetic({}); diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index 2b2adda25c0..cb88f0078e6 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -58,10 +58,14 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding namespace mongo { -namespace { MONGO_FAIL_POINT_DEFINE(hangAfterCompletingWriteWithoutShardKeyWithId); +FailPoint& getHangAfterCompletingWriteWithoutShardKeyWithIdFailPoint() { + return hangAfterCompletingWriteWithoutShardKeyWithId; +} + +namespace { // Aggregate a bunch of errors for a single op together write_ops::WriteError combineOpErrors(const std::vector& errOps) { auto getStatusCode = [](ChildWriteOp const* item) { diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h index 4c7f90d0bab..6fe6a81c676 100644 --- a/src/mongo/s/write_ops/write_op.h +++ b/src/mongo/s/write_ops/write_op.h @@ -100,6 +100,8 @@ inline bool writeTypeSupportsGrouping(WriteType t) { return t == WriteType::Ordinary || t == WriteType::WithoutShardKeyWithId; } +FailPoint& getHangAfterCompletingWriteWithoutShardKeyWithIdFailPoint(); + /** * State of a write in-progress (to a single shard) which is one part of a larger write * operation.