SERVER-104535 Support cursor creation in UWE (#41291)

GitOrigin-RevId: 791809c21e6be5395772fc34c3a9dd24dd1162c2
This commit is contained in:
Mihai Andrei 2025-09-22 13:45:48 -04:00 committed by MongoDB Bot
parent 8a9bfcc463
commit b8cd2e439f
23 changed files with 303 additions and 208 deletions

View File

@ -66,8 +66,6 @@ selector:
- src/mongo/db/modules/enterprise/jstests/no_passthrough/fle_tassert_log_omit.js
exclude_files:
# TODO: SERVER-104535 support cursor with bulk write.
- jstests/sharding/internal_txns/transaction_api_distributed_from_shard.js
# TODO SERVER-110635: enable
- jstests/sharding/timeseries/timeseries_retry_delete_and_update_unsharded.js

View File

@ -235,12 +235,8 @@ selector:
- jstests/core/**/query/function_prototype_bson_type.js
# Exclusions because of unimplemented functionalities in the unified write executor.
# TODO: SERVER-104535 support cursor with bulk write.
- jstests/core/query/bulk/bulk_write_update_cursor.js
- jstests/core/query/bulk/bulk_write_delete_cursor.js
- jstests/core/query/bulk/bulk_write_non_retryable_cursor.js
# TODO SERVER-105762: add support for errorsOnly to UWE.
- jstests/core/write/bulk/bulk_write_insert_cursor.js
- jstests/core/query/bulk/bulk_write.js
exclude_with_any_tags:
# "Cowardly refusing to override read concern of command: ..."

View File

@ -231,12 +231,8 @@ selector:
- jstests/core/**/upsert_shell.js
# Exclusions because of unimplemented functionalities in the unified write executor.
#TODO: SERVER-104535 support cursor with bulk write.
- jstests/core/query/bulk/bulk_write_update_cursor.js
- jstests/core/query/bulk/bulk_write_delete_cursor.js
- jstests/core/query/bulk/bulk_write_non_retryable_cursor.js
# TODO SERVER-105762: add support for errorsOnly to UWE.
- jstests/core/write/bulk/bulk_write_insert_cursor.js
- jstests/core/query/bulk/bulk_write.js
exclude_with_any_tags:
##

View File

@ -37,9 +37,7 @@ selector:
- jstests/core/**/benchrun_pipeline_updates.js
# Exclusions because of unimplemented functionalities in the unified write executor.
# TODO: SERVER-104535 support cursor with bulk write.
- jstests/core/txns/bulk_write_getMore.js
# TODO SERVER-105762: add support for errorsOnly to UWE.
- jstests/core/write/bulk/bulk_write_insert_cursor.js
# TODO: SERVER-104139 Support write commands against timeseries collections in UWE.

View File

@ -23,10 +23,6 @@ selector:
# attached to statements in a transaction beyond the first one.
- jstests/core/txns/non_transactional_operations_on_session_with_transaction.js
# Exclusions because of unimplemented functionalities in the unified write executor.
# TODO: SERVER-104535 support cursor with bulk write.
- jstests/core/txns/bulk_write_getMore.js
exclude_with_any_tags:
- assumes_against_mongod_not_mongos
- does_not_support_causal_consistency

View File

@ -38,8 +38,7 @@ selector:
# Exclusions because of unimplemented functionalities in the unified write executor.
# TODO: SERVER-104535 support cursor with bulk write.
- jstests/core/txns/bulk_write_getMore.js
# TODO SERVER-105762: add support for errorsOnly to UWE.
- jstests/core/write/bulk/bulk_write_insert_cursor.js
# TODO: SERVER-104139 Support write commands against timeseries collections in UWE.

View File

@ -52,7 +52,7 @@ selector:
# Disabled because some features covered in these tests are not yet supported in the UWE.
# TODO SERVER-104139: Add timeseries support.
- jstests/core/timeseries/**/*.js
# TODO: SERVER-104535 support cursor with bulk write.
# TODO SERVER-105762: add support for errorsOnly to UWE.
- jstests/core/write/bulk/bulk_write_insert_cursor.js
exclude_with_any_tags:
- assumes_standalone_mongod

View File

@ -19,10 +19,6 @@ selector:
# Uses hangAfterCollectionInserts failpoint not available on mongos.
- jstests/core/txns/speculative_snapshot_includes_all_writes.js
# Exclusions because of unimplemented functionalities in the unified write executor.
# TODO: SERVER-104535 support cursor with bulk write.
- jstests/core/txns/bulk_write_getMore.js
exclude_with_any_tags:
- assumes_against_mongod_not_mongos
# Transactions are not allowed to operate on capped collections.

View File

@ -25,8 +25,6 @@ selector:
- jstests/core/txns/commands_in_txns_read_concern.js
# Exclusions because of unimplemented functionalities in the unified write executor.
# TODO: SERVER-104535 support cursor with bulk write.
- jstests/core/txns/bulk_write_getMore.js
# TODO: SERVER-104139 timeseries support.
- jstests/core/txns/timeseries/timeseries_reads_in_txns.js
- jstests/core/txns/txn_ops_allowed_on_buckets_coll.js

View File

@ -37,7 +37,7 @@ selector:
- jstests/core_sharding/zones/zone_timeseries_basic.js
- jstests/core_sharding/chunk_migration/move_range_timeseries.js
- jstests/core_sharding/global_catalog/sharded_data_distribution.js
# TODO: SERVER-104535 support cursor with bulk write.
# TODO SERVER-105762: add support for errorsOnly to UWE.
- jstests/core/write/bulk/bulk_write_insert_cursor.js
exclude_with_any_tags:
- assumes_standalone_mongod

View File

@ -57,7 +57,7 @@ selector:
- jstests/core/timeseries/**/*.js
- jstests/core_sharding/resharding/reshard_collection_timeseries.js
- jstests/core_sharding/global_catalog/sharded_data_distribution.js
# TODO: SERVER-104535 support cursor with bulk write.
# TODO SERVER-105762: add support for errorsOnly to UWE.
- jstests/core/write/bulk/bulk_write_insert_cursor.js
exclude_with_any_tags:
- assumes_standalone_mongod

View File

@ -1217,6 +1217,7 @@ mongo_cc_library(
"//src/mongo/s/commands/query_cmd:cluster_find_cmd.h",
"//src/mongo/s/commands/query_cmd:cluster_getmore_cmd.h",
"//src/mongo/s/commands/query_cmd:cluster_pipeline_cmd.h",
"//src/mongo/s/commands/query_cmd:populate_cursor.h",
],
deps = [
":analyze_shard_key_util",
@ -1271,6 +1272,7 @@ mongo_cc_library(
"//src/mongo/s:startup_initialization",
"//src/mongo/s/commands:cluster_commands_common",
"//src/mongo/s/commands:sharded_cluster_sharding_commands",
"//src/mongo/s/commands/query_cmd:populate_cursor",
],
)

View File

@ -263,6 +263,7 @@ mongo_cc_library(
"//src/mongo/s:mongos_topology_coordinator",
"//src/mongo/s:sharding_api",
"//src/mongo/s/commands/query_cmd:cluster_replicate_search_index_command_idl",
"//src/mongo/s/commands/query_cmd:populate_cursor",
"//src/mongo/s/query/exec:cluster_cursor",
"//src/mongo/s/query/planner:cluster_aggregate",
"//src/mongo/transport:message_compressor",

View File

@ -8,7 +8,10 @@ exports_files(
"*.h",
"*.cpp",
],
exclude = ["cluster_replicate_search_index_command.cpp"],
exclude = [
"cluster_replicate_search_index_command.cpp",
"populate_cursor.cpp",
],
),
)
@ -34,3 +37,34 @@ mongo_cc_library(
"//src/mongo/db/query/search:search_index_command_testing_helper",
],
)
mongo_cc_library(
name = "populate_cursor",
srcs = [
"populate_cursor.cpp",
],
hdrs = [
"populate_cursor.h",
"//src/mongo/db/commands/query_cmd:bulk_write_common.h",
"//src/mongo/db/commands/query_cmd:bulk_write_gen",
"//src/mongo/db/query:find.h",
"//src/mongo/db/query:find_common.h",
"//src/mongo/db/sharding_environment:grid.h",
"//src/mongo/s/query/exec:cluster_client_cursor.h",
"//src/mongo/s/query/exec:cluster_client_cursor_guard.h",
"//src/mongo/s/query/exec:cluster_client_cursor_impl.h",
"//src/mongo/s/query/exec:cluster_client_cursor_params.h",
"//src/mongo/s/query/exec:cluster_cursor_manager.h",
"//src/mongo/s/query/exec:cluster_query_result.h",
"//src/mongo/s/query/exec:router_exec_stage.h",
"//src/mongo/s/query/exec:router_stage_queued_data.h",
"//src/mongo/s/write_ops:bulk_write_reply_info.h",
],
deps = [
"//src/mongo:base",
"//src/mongo/db/commands/query_cmd:bulk_write_common",
"//src/mongo/db/query:query_common",
"//src/mongo/s/query/exec:cluster_cursor",
"//src/mongo/s/query/exec:router_exec_stage",
],
)

View File

@ -63,6 +63,7 @@
#include "mongo/s/commands/document_shard_key_update_util.h"
#include "mongo/s/commands/query_cmd/cluster_explain.h"
#include "mongo/s/commands/query_cmd/cluster_write_cmd.h"
#include "mongo/s/commands/query_cmd/populate_cursor.h"
#include "mongo/s/query/exec/cluster_client_cursor.h"
#include "mongo/s/query/exec/cluster_client_cursor_guard.h"
#include "mongo/s/query/exec/cluster_client_cursor_impl.h"
@ -229,120 +230,6 @@ public:
return static_cast<const ClusterBulkWriteCmd*>(definition());
}
BulkWriteCommandReply _populateCursorReply(
OperationContext* opCtx,
BulkWriteCommandRequest& bulkRequest,
const OpMsgRequest& unparsedRequest,
bulk_write_exec::BulkWriteReplyInfo replyInfo) const {
const auto& req = bulkRequest;
auto reqObj = unparsedRequest.body;
auto& [replyItems, summaryFields, wcErrors, retriedStmtIds] = replyInfo;
const NamespaceString cursorNss =
NamespaceString::makeBulkWriteNSS(req.getDbName().tenantId());
if (bulk_write_common::isUnacknowledgedBulkWrite(opCtx)) {
// Skip cursor creation and return the simplest reply.
return BulkWriteCommandReply(BulkWriteCommandResponseCursor(
0 /* cursorId */, {} /* firstBatch */, cursorNss),
summaryFields.nErrors,
summaryFields.nInserted,
summaryFields.nMatched,
summaryFields.nModified,
summaryFields.nUpserted,
summaryFields.nDeleted);
}
ClusterClientCursorParams params(
cursorNss,
APIParameters::get(opCtx),
ReadPreferenceSetting::get(opCtx),
repl::ReadConcernArgs::get(opCtx),
[&] {
if (!opCtx->getLogicalSessionId())
return OperationSessionInfoFromClient();
// TODO (SERVER-80525): This code path does not
// clear the setAutocommit field on the presence of
// TransactionRouter::get
return OperationSessionInfoFromClient(
*opCtx->getLogicalSessionId(),
// Retryable writes will have a txnNumber we do not want to associate with
// the cursor. We only want to set this field for transactions.
opCtx->inMultiDocumentTransaction() ? opCtx->getTxnNumber() : boost::none);
}());
long long batchSize = std::numeric_limits<long long>::max();
if (req.getCursor() && req.getCursor()->getBatchSize()) {
params.batchSize = req.getCursor()->getBatchSize();
batchSize = *req.getCursor()->getBatchSize();
}
params.originatingCommandObj = reqObj.getOwned();
params.originatingPrivileges = bulk_write_common::getPrivileges(req);
auto queuedDataStage = std::make_unique<RouterStageQueuedData>(opCtx);
BulkWriteCommandReply reply;
reply.setNErrors(summaryFields.nErrors);
reply.setNInserted(summaryFields.nInserted);
reply.setNDeleted(summaryFields.nDeleted);
reply.setNMatched(summaryFields.nMatched);
reply.setNModified(summaryFields.nModified);
reply.setNUpserted(summaryFields.nUpserted);
reply.setWriteConcernError(wcErrors);
reply.setRetriedStmtIds(retriedStmtIds);
for (auto& replyItem : replyItems) {
queuedDataStage->queueResult(replyItem.toBSON());
}
auto ccc =
ClusterClientCursorImpl::make(opCtx, std::move(queuedDataStage), std::move(params));
size_t numRepliesInFirstBatch = 0;
FindCommon::BSONArrayResponseSizeTracker responseSizeTracker;
for (long long objCount = 0; objCount < batchSize; objCount++) {
auto next = uassertStatusOK(ccc->next());
if (next.isEOF()) {
break;
}
auto nextObj = *next.getResult();
if (!responseSizeTracker.haveSpaceForNext(nextObj)) {
ccc->queueResult(nextObj);
break;
}
numRepliesInFirstBatch++;
responseSizeTracker.add(nextObj);
}
if (numRepliesInFirstBatch == replyItems.size()) {
replyItems.resize(numRepliesInFirstBatch);
reply.setCursor(BulkWriteCommandResponseCursor(
0, std::vector<BulkWriteReplyItem>(std::move(replyItems)), cursorNss));
return reply;
}
ccc->detachFromOperationContext();
ccc->incNBatches();
auto authUser =
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName();
auto cursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
opCtx,
ccc.releaseCursor(),
cursorNss,
ClusterCursorManager::CursorType::QueuedData,
ClusterCursorManager::CursorLifetime::Mortal,
authUser));
// Record the cursorID in CurOp.
CurOp::get(opCtx)->debug().cursorid = cursorId;
replyItems.resize(numRepliesInFirstBatch);
reply.setCursor(BulkWriteCommandResponseCursor(
cursorId, std::vector<BulkWriteReplyItem>(std::move(replyItems)), cursorNss));
return reply;
}
bool runImpl(OperationContext* opCtx,
const OpMsgRequest& request,
BulkWriteCommandRequest& bulkRequest,
@ -396,7 +283,7 @@ public:
// - To ensure that possible writeErrors are properly managed, a "fire and forget"
// request needs to be temporarily upgraded to 'w:1'(unless the request belongs to
// a transaction, where per-operation WC settings are not supported);
// - Once done, The original WC is re-established to allow _populateCursorReply
// - Once done, The original WC is re-established to allow populateCursorReply
// evaluating whether a reply needs to be returned to the external client.
bulk_write_exec::BulkWriteExecStats execStats;
auto bulkWriteReply = [&] {
@ -417,8 +304,8 @@ public:
// updates that modify a documents owning shard.
execStats.updateMetrics(opCtx, targeters, updatedShardKey);
response =
_populateCursorReply(opCtx, bulkRequest, request, std::move(bulkWriteReply));
response = populateCursorReply(
opCtx, bulkRequest, request.body, std::move(bulkWriteReply));
}
result.appendElements(response.toBSON());
return true;

View File

@ -0,0 +1,156 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/s/commands/query_cmd/populate_cursor.h"
#include "mongo/db/commands/query_cmd/bulk_write_common.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/sharding_environment/grid.h"
#include "mongo/s/query/exec/cluster_client_cursor.h"
#include "mongo/s/query/exec/cluster_client_cursor_guard.h"
#include "mongo/s/query/exec/cluster_client_cursor_impl.h"
#include "mongo/s/query/exec/cluster_client_cursor_params.h"
#include "mongo/s/query/exec/cluster_cursor_manager.h"
#include "mongo/s/query/exec/cluster_query_result.h"
#include "mongo/s/query/exec/router_exec_stage.h"
#include "mongo/s/query/exec/router_stage_queued_data.h"
namespace mongo {
BulkWriteCommandReply populateCursorReply(OperationContext* opCtx,
const BulkWriteCommandRequest& req,
const BSONObj& reqObj,
bulk_write_exec::BulkWriteReplyInfo replyInfo) {
auto& [replyItems, summaryFields, wcErrors, retriedStmtIds] = replyInfo;
const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS(req.getDbName().tenantId());
if (bulk_write_common::isUnacknowledgedBulkWrite(opCtx)) {
// Skip cursor creation and return the simplest reply.
return BulkWriteCommandReply(
BulkWriteCommandResponseCursor(0 /* cursorId */, {} /* firstBatch */, cursorNss),
summaryFields.nErrors,
summaryFields.nInserted,
summaryFields.nMatched,
summaryFields.nModified,
summaryFields.nUpserted,
summaryFields.nDeleted);
}
ClusterClientCursorParams params(
cursorNss,
APIParameters::get(opCtx),
ReadPreferenceSetting::get(opCtx),
repl::ReadConcernArgs::get(opCtx),
[&] {
if (!opCtx->getLogicalSessionId())
return OperationSessionInfoFromClient();
// TODO (SERVER-80525): This code path does not
// clear the setAutocommit field on the presence of
// TransactionRouter::get
return OperationSessionInfoFromClient(
*opCtx->getLogicalSessionId(),
// Retryable writes will have a txnNumber we do not want to associate with
// the cursor. We only want to set this field for transactions.
opCtx->inMultiDocumentTransaction() ? opCtx->getTxnNumber() : boost::none);
}());
long long batchSize = std::numeric_limits<long long>::max();
if (req.getCursor() && req.getCursor()->getBatchSize()) {
params.batchSize = req.getCursor()->getBatchSize();
batchSize = *req.getCursor()->getBatchSize();
}
if (!reqObj.isEmpty()) {
params.originatingCommandObj = reqObj.getOwned();
params.originatingPrivileges = bulk_write_common::getPrivileges(req);
}
auto queuedDataStage = std::make_unique<RouterStageQueuedData>(opCtx);
BulkWriteCommandReply reply;
reply.setNErrors(summaryFields.nErrors);
reply.setNInserted(summaryFields.nInserted);
reply.setNDeleted(summaryFields.nDeleted);
reply.setNMatched(summaryFields.nMatched);
reply.setNModified(summaryFields.nModified);
reply.setNUpserted(summaryFields.nUpserted);
reply.setWriteConcernError(wcErrors);
reply.setRetriedStmtIds(retriedStmtIds);
for (auto& replyItem : replyItems) {
queuedDataStage->queueResult(replyItem.toBSON());
}
auto ccc = ClusterClientCursorImpl::make(opCtx, std::move(queuedDataStage), std::move(params));
size_t numRepliesInFirstBatch = 0;
FindCommon::BSONArrayResponseSizeTracker responseSizeTracker;
for (long long objCount = 0; objCount < batchSize; objCount++) {
auto next = uassertStatusOK(ccc->next());
if (next.isEOF()) {
break;
}
auto nextObj = *next.getResult();
if (!responseSizeTracker.haveSpaceForNext(nextObj)) {
ccc->queueResult(nextObj);
break;
}
numRepliesInFirstBatch++;
responseSizeTracker.add(nextObj);
}
if (numRepliesInFirstBatch == replyItems.size()) {
replyItems.resize(numRepliesInFirstBatch);
reply.setCursor(BulkWriteCommandResponseCursor(
0, std::vector<BulkWriteReplyItem>(std::move(replyItems)), cursorNss));
return reply;
}
ccc->detachFromOperationContext();
ccc->incNBatches();
auto authUser = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserName();
auto cursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
opCtx,
ccc.releaseCursor(),
cursorNss,
ClusterCursorManager::CursorType::QueuedData,
ClusterCursorManager::CursorLifetime::Mortal,
authUser));
// Record the cursorID in CurOp.
CurOp::get(opCtx)->debug().cursorid = cursorId;
replyItems.resize(numRepliesInFirstBatch);
reply.setCursor(BulkWriteCommandResponseCursor(
cursorId, std::vector<BulkWriteReplyItem>(std::move(replyItems)), cursorNss));
return reply;
}
} // namespace mongo

View File

@ -0,0 +1,47 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/commands/query_cmd/bulk_write_gen.h"
#include "mongo/s/write_ops/bulk_write_reply_info.h"
#include "mongo/util/modules.h"
namespace mongo {
/**
* Constructs a BulkWriteCommandReply for the given 'replyInfo'. This function will also create a
* cursor if needed.
*/
BulkWriteCommandReply populateCursorReply(OperationContext* opCtx,
const BulkWriteCommandRequest& req,
const BSONObj& reqObj,
bulk_write_exec::BulkWriteReplyInfo replyInfo);
} // namespace mongo

View File

@ -37,6 +37,7 @@ mongo_cc_library(
"//src/mongo/db/query/write_ops:parsed_update",
"//src/mongo/s:sharding_router_api",
"//src/mongo/s:sharding_write_without_shard_key_api",
"//src/mongo/s/commands/query_cmd:populate_cursor",
"//src/mongo/s/write_ops:batch_write_types",
"//src/mongo/s/write_ops:fle",
"//src/mongo/s/write_ops:wc_error",

View File

@ -31,6 +31,7 @@
#include "mongo/db/fle_crud.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/s/commands/query_cmd/populate_cursor.h"
#include "mongo/s/write_ops/fle.h"
#include "mongo/s/write_ops/unified_write_executor/stats.h"
#include "mongo/s/write_ops/unified_write_executor/write_batch_executor.h"
@ -63,7 +64,9 @@ bool isNonVerboseWriteCommand(OperationContext* opCtx, WriteCommandRef cmdRef) {
}
} // namespace
WriteCommandResponse executeWriteCommand(OperationContext* opCtx, WriteCommandRef cmdRef) {
WriteCommandResponse executeWriteCommand(OperationContext* opCtx,
WriteCommandRef cmdRef,
BSONObj originalCommand) {
const bool isNonVerbose = isNonVerboseWriteCommand(opCtx, cmdRef);
Stats stats;
@ -80,7 +83,7 @@ WriteCommandResponse executeWriteCommand(OperationContext* opCtx, WriteCommandRe
}
WriteBatchExecutor executor(cmdRef);
WriteBatchResponseProcessor processor(cmdRef, stats, isNonVerbose);
WriteBatchResponseProcessor processor(cmdRef, stats, isNonVerbose, originalCommand);
WriteBatchScheduler scheduler(cmdRef, *batcher, executor, processor);
scheduler.run(opCtx);
@ -102,18 +105,20 @@ BatchedCommandResponse write(OperationContext* opCtx, const BatchedCommandReques
return std::get<BatchedCommandResponse>(executeWriteCommand(opCtx, WriteCommandRef{request}));
}
BulkWriteCommandReply bulkWrite(OperationContext* opCtx, const BulkWriteCommandRequest& request) {
BulkWriteCommandReply bulkWrite(OperationContext* opCtx,
const BulkWriteCommandRequest& request,
BSONObj originalCommand) {
if (request.getNsInfo()[0].getEncryptionInformation().has_value()) {
auto [result, replyInfo] = attemptExecuteFLE(opCtx, request);
if (result == FLEBatchResult::kProcessed) {
return WriteBatchResponseProcessor::generateClientResponseForBulkWriteCommand(
std::move(replyInfo));
return populateCursorReply(opCtx, request, originalCommand, std::move(replyInfo));
}
// When FLE logic determines there is no need of processing, we fall through to the normal
// case.
}
return std::get<BulkWriteCommandReply>(executeWriteCommand(opCtx, WriteCommandRef{request}));
return std::get<BulkWriteCommandReply>(
executeWriteCommand(opCtx, WriteCommandRef{request}, originalCommand));
}
bool isEnabled(OperationContext* opCtx) {

View File

@ -45,7 +45,9 @@ using WriteCommandResponse = std::variant<BatchedCommandResponse, BulkWriteComma
/**
* This function will execute the specified write command and return a response.
*/
WriteCommandResponse executeWriteCommand(OperationContext* opCtx, WriteCommandRef cmdRef);
WriteCommandResponse executeWriteCommand(OperationContext* opCtx,
WriteCommandRef cmdRef,
BSONObj originalCommand = BSONObj());
/**
* Helper function for executing insert/update/delete commands.
@ -55,7 +57,9 @@ BatchedCommandResponse write(OperationContext* opCtx, const BatchedCommandReques
/**
* Helper function for executing bulk commands.
*/
BulkWriteCommandReply bulkWrite(OperationContext* opCtx, const BulkWriteCommandRequest& request);
BulkWriteCommandReply bulkWrite(OperationContext* opCtx,
const BulkWriteCommandRequest& request,
BSONObj originalCommand = BSONObj());
/**
* Unified write executor feature flag check. Also ensures we only have viewless timeseries

View File

@ -33,6 +33,7 @@
#include "mongo/db/error_labels.h"
#include "mongo/db/global_catalog/router_role_api/collection_uuid_mismatch.h"
#include "mongo/db/operation_context.h"
#include "mongo/s/commands/query_cmd/populate_cursor.h"
#include "mongo/s/transaction_router.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/s/write_ops/unified_write_executor/write_batch_executor.h"
@ -94,7 +95,6 @@ Result WriteBatchResponseProcessor::_onWriteBatchResponse(
OperationContext* opCtx,
RoutingContext& routingCtx,
const NoRetryWriteBatchResponse& response) {
// TODO SERVER-104535 cursor support for UnifiedWriteExec.
// TODO SERVER-104122 Support for 'WouldChangeOwningShard' writes.
// TODO SERVER-105762 Add support for errorsOnly: true.
const auto& swRes = response.swResponse;
@ -301,7 +301,6 @@ Result WriteBatchResponseProcessor::onShardResponse(OperationContext* opCtx,
}
}
// TODO SERVER-104535 cursor support for UnifiedWriteExec.
const auto& replyItems = parsedReply.getCursor().getFirstBatch();
auto result = processOpsInReplyItems(opCtx, routingCtx, ops, replyItems);
@ -552,46 +551,19 @@ BulkWriteCommandReply WriteBatchResponseProcessor::generateClientResponseForBulk
_stats.incrementOpCounters(opCtx, _cmdRef.getOp(id));
}
// Construct a BulkWriteCommandReply object. We always store the values of the top-level
// counters for the command (nInserted, nMatched, etc) into the BulkWriteCommandReply object,
// regardless of whether '_isNonVerbose' is true or false.
auto reply = BulkWriteCommandReply(
// TODO SERVER-104535 cursor support for UnifiedWriteExec.
BulkWriteCommandResponseCursor(
0, std::move(results), NamespaceString::makeBulkWriteNSS(boost::none)),
_nErrors,
_nInserted,
_nMatched,
_nModified,
_nUpserted,
_nDeleted);
reply.setRetriedStmtIds(getRetriedStmtIds());
bulk_write_exec::SummaryFields fields(
_nErrors, _nInserted, _nMatched, _nModified, _nUpserted, _nDeleted);
bulk_write_exec::BulkWriteReplyInfo info(
std::move(results), std::move(fields), boost::none, getRetriedStmtIds());
// Aggregate all the write concern errors from the shards.
if (auto totalWcError = mergeWriteConcernErrors(_wcErrors); totalWcError) {
reply.setWriteConcernError(BulkWriteWriteConcernError{totalWcError->toStatus().code(),
totalWcError->toStatus().reason()});
info.wcErrors = BulkWriteWriteConcernError{totalWcError->toStatus().code(),
totalWcError->toStatus().reason()};
}
return reply;
}
BulkWriteCommandReply WriteBatchResponseProcessor::generateClientResponseForBulkWriteCommand(
bulk_write_exec::BulkWriteReplyInfo replyInfo) {
// TODO SERVER-104535 cursor support for UnifiedWriteExec. Is it needed given this function is
// only used for FLE case?
auto reply = BulkWriteCommandReply(
BulkWriteCommandResponseCursor(
0, std::move(replyInfo.replyItems), NamespaceString::makeBulkWriteNSS(boost::none)),
replyInfo.summaryFields.nErrors,
replyInfo.summaryFields.nInserted,
replyInfo.summaryFields.nMatched,
replyInfo.summaryFields.nModified,
replyInfo.summaryFields.nUpserted,
replyInfo.summaryFields.nDeleted);
reply.setWriteConcernError(std::move(replyInfo.wcErrors));
reply.setRetriedStmtIds(std::move(replyInfo.retriedStmtIds));
return reply;
return populateCursorReply(
opCtx, _cmdRef.getBulkWriteCommandRequest(), _originalCommand, std::move(info));
}
BatchedCommandResponse WriteBatchResponseProcessor::generateClientResponseForBatchedCommand() {
@ -636,7 +608,12 @@ BatchedCommandResponse WriteBatchResponseProcessor::generateClientResponseForBat
const int nValue = _nInserted + _nUpserted + _nMatched + _nDeleted;
resp.setN(nValue);
resp.setNModified(_nModified);
if (_cmdRef.isBatchWriteCommand() &&
_cmdRef.getBatchedCommandRequest().getBatchType() ==
BatchedCommandRequest::BatchType_Update &&
_nModified >= 0) {
resp.setNModified(_nModified);
}
resp.setRetriedStmtIds(getRetriedStmtIds());
// Aggregate all the write concern errors from the shards.

View File

@ -72,18 +72,26 @@ public:
explicit WriteBatchResponseProcessor(WriteCommandRef cmdRef,
Stats& stats,
bool isNonVerbose = false)
: _cmdRef(cmdRef), _stats(stats), _isNonVerbose(isNonVerbose) {}
bool isNonVerbose = false,
BSONObj originalCommand = BSONObj())
: _cmdRef(cmdRef),
_stats(stats),
_isNonVerbose(isNonVerbose),
_originalCommand(originalCommand) {}
explicit WriteBatchResponseProcessor(const BatchedCommandRequest& request,
Stats& stats,
bool isNonVerbose = false)
: WriteBatchResponseProcessor(WriteCommandRef{request}, stats, isNonVerbose) {}
bool isNonVerbose = false,
BSONObj originalCommand = BSONObj())
: WriteBatchResponseProcessor(
WriteCommandRef{request}, stats, isNonVerbose, originalCommand) {}
explicit WriteBatchResponseProcessor(const BulkWriteCommandRequest& request,
Stats& stats,
bool isNonVerbose = false)
: WriteBatchResponseProcessor(WriteCommandRef{request}, stats, isNonVerbose) {}
bool isNonVerbose = false,
BSONObj originalCommand = BSONObj())
: WriteBatchResponseProcessor(
WriteCommandRef{request}, stats, isNonVerbose, originalCommand) {}
/**
* Process a response from each shard, handle errors, and collect statistics. Returns an
@ -106,12 +114,6 @@ public:
BulkWriteCommandReply generateClientResponseForBulkWriteCommand(OperationContext* opCtx);
/**
* Generate bulkWrite client response from BulkWriteReplyInfo object.
*/
static BulkWriteCommandReply generateClientResponseForBulkWriteCommand(
bulk_write_exec::BulkWriteReplyInfo replyInfo);
/**
* This method is called by the scheduler to record a target error that occurred during batch
* creation.
@ -222,6 +224,7 @@ private:
WriteCommandRef _cmdRef;
Stats& _stats;
const bool _isNonVerbose;
BSONObj _originalCommand;
size_t _nErrors{0};
size_t _nInserted{0};
size_t _nMatched{0};

View File

@ -118,10 +118,11 @@ TEST_F(WriteBatchResponseProcessorTest, OKReplies) {
ASSERT_EQ(clientReply.getNModified(), 6);
// Generating a 'BatchedCommandResponse' should output the same statistics, save for 'n', which
// is the combination of 'nInserted' and 'nMatched'.
// is the combination of 'nInserted' and 'nMatched', and 'nModified', which is only set on
// updates.
auto batchedCommandReply = processor.generateClientResponseForBatchedCommand();
ASSERT_EQ(batchedCommandReply.getN(), 8);
ASSERT_EQ(batchedCommandReply.getNModified(), 6);
ASSERT_EQ(batchedCommandReply.getNModified(), 0);
}
TEST_F(WriteBatchResponseProcessorTest, AllStatisticsCopied) {