From 0b18dd5a7c999670fd80000da6024543552adb0d Mon Sep 17 00:00:00 2001 From: Ivan Fefer Date: Wed, 10 Dec 2025 11:17:52 +0100 Subject: [PATCH] SERVER-115094: In MongoProcessInterface::insert return all error statuses instead of only the first one (#44979) GitOrigin-RevId: 226536b1d72069e5bfe71b33ca43b6a9ed31fd64 --- src/mongo/db/exec/agg/out_stage.cpp | 12 +++-- src/mongo/db/pipeline/merge_processor.cpp | 6 ++- src/mongo/db/pipeline/merge_processor.h | 3 -- .../mongo_process_interface.h | 31 ++++++----- .../mongos_process_interface.h | 20 ++++---- .../non_shardsvr_process_interface.cpp | 45 ++++++++-------- .../non_shardsvr_process_interface.h | 20 ++++---- .../replica_set_node_process_interface.cpp | 51 ++++++++++++++++--- .../replica_set_node_process_interface.h | 28 ++++++---- .../shardsvr_process_interface.cpp | 19 +++++-- .../shardsvr_process_interface.h | 20 ++++---- .../stub_mongo_process_interface.h | 20 ++++---- 12 files changed, 171 insertions(+), 104 deletions(-) diff --git a/src/mongo/db/exec/agg/out_stage.cpp b/src/mongo/db/exec/agg/out_stage.cpp index c5a52f66d84..b98a443345a 100644 --- a/src/mongo/db/exec/agg/out_stage.cpp +++ b/src/mongo/db/exec/agg/out_stage.cpp @@ -289,8 +289,10 @@ void OutStage::flush(BatchedCommandRequest bcr, BatchedObjects batch) { auto targetEpoch = boost::none; if (_timeseries) { - uassertStatusOK(pExpCtx->getMongoProcessInterface()->insertTimeseries( - pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)); + for (const auto& insertStatus : pExpCtx->getMongoProcessInterface()->insertTimeseries( + pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)) { + uassertStatusOK(insertStatus); + } } else { // Use the UUID to catch a mismatch if the temp collection was dropped and recreated. // Timeseries will detect this as inserts don't implicitly @@ -301,8 +303,10 @@ void OutStage::flush(BatchedCommandRequest bcr, BatchedObjects batch) { insertCommand->getWriteCommandRequestBase().setCollectionUUID(_tempNsUUID); } try { - uassertStatusOK(pExpCtx->getMongoProcessInterface()->insert( - pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)); + for (const auto& insertStatus : pExpCtx->getMongoProcessInterface()->insert( + pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)) { + uassertStatusOK(insertStatus); + } } catch (ExceptionFor& ex) { ex.addContext( diff --git a/src/mongo/db/pipeline/merge_processor.cpp b/src/mongo/db/pipeline/merge_processor.cpp index 064e49f6586..3523d4364a5 100644 --- a/src/mongo/db/pipeline/merge_processor.cpp +++ b/src/mongo/db/pipeline/merge_processor.cpp @@ -163,8 +163,10 @@ MergeStrategy makeInsertStrategy() { }); auto insertCommand = bcr.extractInsertRequest(); insertCommand->setDocuments(std::move(objectsToInsert)); - uassertStatusOK(expCtx->getMongoProcessInterface()->insert( - expCtx, ns, std::move(insertCommand), wc, epoch)); + for (const auto& insertStatus : expCtx->getMongoProcessInterface()->insert( + expCtx, ns, std::move(insertCommand), wc, epoch)) { + uassertStatusOK(insertStatus); + } }; } diff --git a/src/mongo/db/pipeline/merge_processor.h b/src/mongo/db/pipeline/merge_processor.h index e1fa2b1417b..e35ed9b0e36 100644 --- a/src/mongo/db/pipeline/merge_processor.h +++ b/src/mongo/db/pipeline/merge_processor.h @@ -39,11 +39,8 @@ #include "mongo/db/versioning_protocol/chunk_version.h" #include "mongo/db/write_concern_options.h" #include "mongo/s/write_ops/batched_command_request.h" -#include "mongo/stdx/unordered_map.h" #include "mongo/util/modules.h" -#include -#include #include namespace MONGO_MOD_PUBLIC mongo { diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 838ae44b1dd..8865e915d85 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -239,21 +239,26 @@ public: virtual void updateClientOperationTime(OperationContext* opCtx) const = 0; /** - * Executes 'insertCommand' against 'ns' and returns an error Status if the insert fails. If - * 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted collection does not have - * the same epoch or the epoch changes during the course of the insert. + * Executes 'insertCommand' against 'ns'. Returns a vector of statuses. Will contain at least + * one error status if insert failed to not swallow any errors. If 'targetEpoch' is set, throws + * ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or the epoch + * changes during the course of the insert. */ - virtual Status insert(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional targetEpoch) = 0; + using InsertResult = absl::InlinedVector; + + virtual InsertResult insert(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional targetEpoch) = 0; + + virtual InsertResult insertTimeseries( + const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional targetEpoch) = 0; - virtual Status insertTimeseries(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional targetEpoch) = 0; /** * Executes the updates described by 'updateCommand'. Returns an error Status if any of the * updates fail, otherwise returns an 'UpdateResult' objects with the details of the update diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 86efce63e84..f7baa9f773c 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -129,19 +129,19 @@ public: return CommonProcessInterface::findOwningShard(opCtx, ns); } - Status insert(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional) final { + InsertResult insert(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional) final { MONGO_UNREACHABLE; } - Status insertTimeseries(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional targetEpoch) final { + InsertResult insertTimeseries(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional targetEpoch) final { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp index a3f6be1f3a0..5d82ab90bf8 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -197,7 +197,7 @@ boost::optional NonShardServerProcessInterface::lookupSingleDocument( return lookedUpDocument; } -Status NonShardServerProcessInterface::insert( +MongoProcessInterface::InsertResult NonShardServerProcessInterface::insert( const boost::intrusive_ptr& expCtx, const NamespaceString& ns, std::unique_ptr insertCommand, @@ -206,37 +206,40 @@ Status NonShardServerProcessInterface::insert( auto writeResults = write_ops_exec::performInserts(expCtx->getOperationContext(), *insertCommand); - // Need to check each result in the batch since the writes are unordered. - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); + InsertResult results; + for (const auto& writeResult : writeResults.results) { + if (writeResult.getStatus() != Status::OK()) { + results.push_back(writeResult.getStatus()); } } - return Status::OK(); + if (results.empty()) { + results.push_back(Status::OK()); + } + return results; } -Status NonShardServerProcessInterface::insertTimeseries( +MongoProcessInterface::InsertResult NonShardServerProcessInterface::insertTimeseries( const boost::intrusive_ptr& expCtx, const NamespaceString& ns, std::unique_ptr insertCommand, const WriteConcernOptions& wc, boost::optional targetEpoch) { - try { - auto [preConditions, _] = - timeseries::getCollectionPreConditionsAndIsTimeseriesLogicalRequest( - expCtx->getOperationContext(), - ns, - *insertCommand, - insertCommand->getCollectionUUID()); - auto insertReply = timeseries::write_ops::performTimeseriesWrites( - expCtx->getOperationContext(), *insertCommand, preConditions); + InsertResult result; + auto [preConditions, _] = timeseries::getCollectionPreConditionsAndIsTimeseriesLogicalRequest( + expCtx->getOperationContext(), ns, *insertCommand, insertCommand->getCollectionUUID()); + auto insertReply = timeseries::write_ops::performTimeseriesWrites( + expCtx->getOperationContext(), *insertCommand, preConditions); - checkWriteErrors(insertReply.getWriteCommandReplyBase()); - } catch (DBException& ex) { - ex.addContext(str::stream() << "time-series insert failed: " << ns.toStringForErrorMsg()); - throw; + if (insertReply.getWriteErrors().has_value()) { + for (const auto& writeError : *insertReply.getWriteErrors()) { + result.push_back(writeError.getStatus()); + } + uassert(10903400, "Write errors must not be empty", !result.empty()); + } else { + result.push_back(Status::OK()); } - return Status::OK(); + + return result; } StatusWith NonShardServerProcessInterface::update( diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index b2d33786c1a..cdc404a4232 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -135,17 +135,17 @@ public: const Document& documentKey, boost::optional readConcern) final; - Status insert(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional targetEpoch) override; + InsertResult insert(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional targetEpoch) override; - Status insertTimeseries(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional targetEpoch) override; + InsertResult insertTimeseries(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional targetEpoch) override; StatusWith update(const boost::intrusive_ptr& expCtx, const NamespaceString& ns, diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index ea081263301..59ac5805c11 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -87,7 +87,7 @@ void ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor( replicaSetNodeExecutor(service) = std::move(executor); } -Status ReplicaSetNodeProcessInterface::insert( +MongoProcessInterface::InsertResult ReplicaSetNodeProcessInterface::insert( const boost::intrusive_ptr& expCtx, const NamespaceString& ns, std::unique_ptr insertCommand, @@ -101,7 +101,29 @@ Status ReplicaSetNodeProcessInterface::insert( BatchedCommandRequest batchInsertCommand(std::move(insertCommand)); - return _executeCommandOnPrimary(opCtx, ns, batchInsertCommand.toBSON()).getStatus(); + auto statusWithReply = _executeCommandOnPrimaryRaw(opCtx, ns, batchInsertCommand.toBSON()); + if (!statusWithReply.isOK()) { + return {statusWithReply.getStatus()}; + } + + BatchedCommandResponse response; + std::string errMsg; + InsertResult result; + if (!response.parseBSON(statusWithReply.getValue(), &errMsg)) { + result.emplace_back(ErrorCodes::FailedToParse, errMsg); + } else if (!response.getOk()) { + result.push_back(response.getTopLevelStatus()); + } else if (response.isErrDetailsSet()) { + result.reserve(response.getErrDetails().size()); + for (const auto& error : response.getErrDetails()) { + result.push_back(error.getStatus()); + } + } else if (response.isWriteConcernErrorSet()) { + result.push_back(response.getWriteConcernError()->toStatus()); + } else { + result.push_back(Status::OK()); + } + return result; } StatusWith ReplicaSetNodeProcessInterface::update( @@ -158,7 +180,7 @@ void ReplicaSetNodeProcessInterface::createTimeseriesView(OperationContext* opCt } } -Status ReplicaSetNodeProcessInterface::insertTimeseries( +MongoProcessInterface::InsertResult ReplicaSetNodeProcessInterface::insertTimeseries( const boost::intrusive_ptr& expCtx, const NamespaceString& ns, std::unique_ptr insertCommand, @@ -235,7 +257,7 @@ UUID ReplicaSetNodeProcessInterface::fetchCollectionUUIDFromPrimary(OperationCon return uassertStatusOK(UUID::parse(uuid)); } -StatusWith ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( +StatusWith ReplicaSetNodeProcessInterface::_executeCommandOnPrimaryRaw( OperationContext* opCtx, const NamespaceString& ns, const BSONObj& cmdObj, @@ -284,23 +306,36 @@ StatusWith ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( if (!rcr.response.status.isOK()) { return rcr.response.status; } + return std::move(rcr.response.data); +} - auto commandStatus = getStatusFromCommandResult(rcr.response.data); +StatusWith ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( + OperationContext* opCtx, + const NamespaceString& ns, + const BSONObj& cmdObj, + bool attachWriteConcern) const { + auto statusWithData = _executeCommandOnPrimaryRaw(opCtx, ns, cmdObj, attachWriteConcern); + if (!statusWithData.isOK()) { + return statusWithData.getStatus(); + } + auto data = statusWithData.getValue(); + + auto commandStatus = getStatusFromCommandResult(data); if (!commandStatus.isOK()) { return commandStatus; } - auto writeConcernStatus = getWriteConcernStatusFromCommandResult(rcr.response.data); + auto writeConcernStatus = getWriteConcernStatusFromCommandResult(data); if (!writeConcernStatus.isOK()) { return writeConcernStatus; } - auto writeStatus = getFirstWriteErrorStatusFromCommandResult(rcr.response.data); + auto writeStatus = getFirstWriteErrorStatusFromCommandResult(data); if (!writeStatus.isOK()) { return writeStatus; } - return rcr.response.data; + return data; } void ReplicaSetNodeProcessInterface::_attachGenericCommandArgs(OperationContext* opCtx, diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index 7038d93d0ec..d2853f6adfc 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -96,11 +96,11 @@ public: ~ReplicaSetNodeProcessInterface() override = default; - Status insert(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional targetEpoch) final; + InsertResult insert(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional targetEpoch) final; StatusWith update(const boost::intrusive_ptr& expCtx, const NamespaceString& ns, @@ -130,16 +130,24 @@ public: const BSONObj& cmdObj, const TimeseriesOptions& userOpts) override; - Status insertTimeseries(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional targetEpoch) override; + InsertResult insertTimeseries(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional targetEpoch) override; UUID fetchCollectionUUIDFromPrimary(OperationContext* opCtx, const NamespaceString& nss) override; private: + /** + * Attemps to execute the specified command on the primary. Returns the command response without + * parsing the result. May return non-OK status in case of network issues. + */ + StatusWith _executeCommandOnPrimaryRaw(OperationContext* opCtx, + const NamespaceString& ns, + const BSONObj& cmdObj, + bool attachWriteConcern = true) const; /** * Attemps to execute the specified command on the primary. Returns the command response upon * success or a non-OK status upon a failed command response, a writeConcernError, or any diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 8a2ae52f91c..f7e70b95d1b 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -204,7 +204,7 @@ boost::optional ShardServerProcessInterface::lookupSingleDocument( expCtx, nss, std::move(collectionUUID), documentKey, std::move(opts)); } -Status ShardServerProcessInterface::insert( +MongoProcessInterface::InsertResult ShardServerProcessInterface::insert( const boost::intrusive_ptr& expCtx, const NamespaceString& ns, std::unique_ptr insertCommand, @@ -226,7 +226,20 @@ Status ShardServerProcessInterface::insert( &response, targetEpoch); - return response.toStatus(); + InsertResult result; + if (!response.getOk()) { + result.push_back(response.getTopLevelStatus()); + } else if (response.isErrDetailsSet()) { + result.reserve(response.getErrDetails().size()); + for (const auto& error : response.getErrDetails()) { + result.push_back(error.getStatus()); + } + } else if (response.isWriteConcernErrorSet()) { + result.push_back(response.getWriteConcernError()->toStatus()); + } else { + result.push_back(Status::OK()); + } + return result; } StatusWith ShardServerProcessInterface::update( @@ -631,7 +644,7 @@ boost::optional ShardServerProcessInterface::_getTimeseriesOp IDLParserContext("TimeseriesOptions")); } -Status ShardServerProcessInterface::insertTimeseries( +MongoProcessInterface::InsertResult ShardServerProcessInterface::insertTimeseries( const boost::intrusive_ptr& expCtx, const NamespaceString& ns, std::unique_ptr insertCommand, diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index 0f038e7dc38..7ffd9225362 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -115,11 +115,11 @@ public: const Document& documentKey, boost::optional readConcern) final; - Status insert(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional targetEpoch) final; + InsertResult insert(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional targetEpoch) final; StatusWith update(const boost::intrusive_ptr& expCtx, const NamespaceString& ns, @@ -204,11 +204,11 @@ public: const BSONObj& cmdObj, const TimeseriesOptions& userOpts) final; - Status insertTimeseries(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional targetEpoch) final; + InsertResult insertTimeseries(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional targetEpoch) final; std::vector getAllDatabases(OperationContext* opCtx, boost::optional tenantId) final; diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 904ac26c649..b46ffce78b8 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -96,19 +96,19 @@ public: void updateClientOperationTime(OperationContext* opCtx) const override {} - Status insert(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional) override { + InsertResult insert(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional) override { MONGO_UNREACHABLE; } - Status insertTimeseries(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::unique_ptr insertCommand, - const WriteConcernOptions& wc, - boost::optional targetEpoch) override { + InsertResult insertTimeseries(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::unique_ptr insertCommand, + const WriteConcernOptions& wc, + boost::optional targetEpoch) override { MONGO_UNREACHABLE; }