SERVER-115094: In MongoProcessInterface::insert return all error statuses instead of only the first one (#44979)

GitOrigin-RevId: 226536b1d72069e5bfe71b33ca43b6a9ed31fd64
This commit is contained in:
Ivan Fefer 2025-12-10 11:17:52 +01:00 committed by MongoDB Bot
parent 4118948164
commit 0b18dd5a7c
12 changed files with 171 additions and 104 deletions

View File

@ -289,8 +289,10 @@ void OutStage::flush(BatchedCommandRequest bcr, BatchedObjects batch) {
auto targetEpoch = boost::none;
if (_timeseries) {
uassertStatusOK(pExpCtx->getMongoProcessInterface()->insertTimeseries(
pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch));
for (const auto& insertStatus : pExpCtx->getMongoProcessInterface()->insertTimeseries(
pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)) {
uassertStatusOK(insertStatus);
}
} else {
// Use the UUID to catch a mismatch if the temp collection was dropped and recreated.
// Timeseries will detect this as inserts don't implicitly
@ -301,8 +303,10 @@ void OutStage::flush(BatchedCommandRequest bcr, BatchedObjects batch) {
insertCommand->getWriteCommandRequestBase().setCollectionUUID(_tempNsUUID);
}
try {
uassertStatusOK(pExpCtx->getMongoProcessInterface()->insert(
pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch));
for (const auto& insertStatus : pExpCtx->getMongoProcessInterface()->insert(
pExpCtx, _tempNs, std::move(insertCommand), _writeConcern, targetEpoch)) {
uassertStatusOK(insertStatus);
}
} catch (ExceptionFor<ErrorCodes::CollectionUUIDMismatch>& ex) {
ex.addContext(

View File

@ -163,8 +163,10 @@ MergeStrategy makeInsertStrategy() {
});
auto insertCommand = bcr.extractInsertRequest();
insertCommand->setDocuments(std::move(objectsToInsert));
uassertStatusOK(expCtx->getMongoProcessInterface()->insert(
expCtx, ns, std::move(insertCommand), wc, epoch));
for (const auto& insertStatus : expCtx->getMongoProcessInterface()->insert(
expCtx, ns, std::move(insertCommand), wc, epoch)) {
uassertStatusOK(insertStatus);
}
};
}

View File

@ -39,11 +39,8 @@
#include "mongo/db/versioning_protocol/chunk_version.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/util/modules.h"
#include <memory>
#include <string>
#include <vector>
namespace MONGO_MOD_PUBLIC mongo {

View File

@ -239,21 +239,26 @@ public:
virtual void updateClientOperationTime(OperationContext* opCtx) const = 0;
/**
* Executes 'insertCommand' against 'ns' and returns an error Status if the insert fails. If
* 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the targeted collection does not have
* the same epoch or the epoch changes during the course of the insert.
* Executes 'insertCommand' against 'ns'. Returns a vector of statuses. Will contain at least
* one error status if insert failed to not swallow any errors. If 'targetEpoch' is set, throws
* ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or the epoch
* changes during the course of the insert.
*/
virtual Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) = 0;
using InsertResult = absl::InlinedVector<Status, 4>;
virtual InsertResult insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) = 0;
virtual InsertResult insertTimeseries(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) = 0;
virtual Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) = 0;
/**
* Executes the updates described by 'updateCommand'. Returns an error Status if any of the
* updates fail, otherwise returns an 'UpdateResult' objects with the details of the update

View File

@ -129,19 +129,19 @@ public:
return CommonProcessInterface::findOwningShard(opCtx, ns);
}
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID>) final {
InsertResult insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID>) final {
MONGO_UNREACHABLE;
}
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final {
InsertResult insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final {
MONGO_UNREACHABLE;
}

View File

@ -197,7 +197,7 @@ boost::optional<Document> NonShardServerProcessInterface::lookupSingleDocument(
return lookedUpDocument;
}
Status NonShardServerProcessInterface::insert(
MongoProcessInterface::InsertResult NonShardServerProcessInterface::insert(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
@ -206,37 +206,40 @@ Status NonShardServerProcessInterface::insert(
auto writeResults =
write_ops_exec::performInserts(expCtx->getOperationContext(), *insertCommand);
// Need to check each result in the batch since the writes are unordered.
for (const auto& result : writeResults.results) {
if (result.getStatus() != Status::OK()) {
return result.getStatus();
InsertResult results;
for (const auto& writeResult : writeResults.results) {
if (writeResult.getStatus() != Status::OK()) {
results.push_back(writeResult.getStatus());
}
}
return Status::OK();
if (results.empty()) {
results.push_back(Status::OK());
}
return results;
}
Status NonShardServerProcessInterface::insertTimeseries(
MongoProcessInterface::InsertResult NonShardServerProcessInterface::insertTimeseries(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) {
try {
auto [preConditions, _] =
timeseries::getCollectionPreConditionsAndIsTimeseriesLogicalRequest(
expCtx->getOperationContext(),
ns,
*insertCommand,
insertCommand->getCollectionUUID());
auto insertReply = timeseries::write_ops::performTimeseriesWrites(
expCtx->getOperationContext(), *insertCommand, preConditions);
InsertResult result;
auto [preConditions, _] = timeseries::getCollectionPreConditionsAndIsTimeseriesLogicalRequest(
expCtx->getOperationContext(), ns, *insertCommand, insertCommand->getCollectionUUID());
auto insertReply = timeseries::write_ops::performTimeseriesWrites(
expCtx->getOperationContext(), *insertCommand, preConditions);
checkWriteErrors(insertReply.getWriteCommandReplyBase());
} catch (DBException& ex) {
ex.addContext(str::stream() << "time-series insert failed: " << ns.toStringForErrorMsg());
throw;
if (insertReply.getWriteErrors().has_value()) {
for (const auto& writeError : *insertReply.getWriteErrors()) {
result.push_back(writeError.getStatus());
}
uassert(10903400, "Write errors must not be empty", !result.empty());
} else {
result.push_back(Status::OK());
}
return Status::OK();
return result;
}
StatusWith<MongoProcessInterface::UpdateResult> NonShardServerProcessInterface::update(

View File

@ -135,17 +135,17 @@ public:
const Document& documentKey,
boost::optional<BSONObj> readConcern) final;
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override;
InsertResult insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override;
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override;
InsertResult insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override;
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,

View File

@ -87,7 +87,7 @@ void ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor(
replicaSetNodeExecutor(service) = std::move(executor);
}
Status ReplicaSetNodeProcessInterface::insert(
MongoProcessInterface::InsertResult ReplicaSetNodeProcessInterface::insert(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
@ -101,7 +101,29 @@ Status ReplicaSetNodeProcessInterface::insert(
BatchedCommandRequest batchInsertCommand(std::move(insertCommand));
return _executeCommandOnPrimary(opCtx, ns, batchInsertCommand.toBSON()).getStatus();
auto statusWithReply = _executeCommandOnPrimaryRaw(opCtx, ns, batchInsertCommand.toBSON());
if (!statusWithReply.isOK()) {
return {statusWithReply.getStatus()};
}
BatchedCommandResponse response;
std::string errMsg;
InsertResult result;
if (!response.parseBSON(statusWithReply.getValue(), &errMsg)) {
result.emplace_back(ErrorCodes::FailedToParse, errMsg);
} else if (!response.getOk()) {
result.push_back(response.getTopLevelStatus());
} else if (response.isErrDetailsSet()) {
result.reserve(response.getErrDetails().size());
for (const auto& error : response.getErrDetails()) {
result.push_back(error.getStatus());
}
} else if (response.isWriteConcernErrorSet()) {
result.push_back(response.getWriteConcernError()->toStatus());
} else {
result.push_back(Status::OK());
}
return result;
}
StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update(
@ -158,7 +180,7 @@ void ReplicaSetNodeProcessInterface::createTimeseriesView(OperationContext* opCt
}
}
Status ReplicaSetNodeProcessInterface::insertTimeseries(
MongoProcessInterface::InsertResult ReplicaSetNodeProcessInterface::insertTimeseries(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
@ -235,7 +257,7 @@ UUID ReplicaSetNodeProcessInterface::fetchCollectionUUIDFromPrimary(OperationCon
return uassertStatusOK(UUID::parse(uuid));
}
StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimaryRaw(
OperationContext* opCtx,
const NamespaceString& ns,
const BSONObj& cmdObj,
@ -284,23 +306,36 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
if (!rcr.response.status.isOK()) {
return rcr.response.status;
}
return std::move(rcr.response.data);
}
auto commandStatus = getStatusFromCommandResult(rcr.response.data);
StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
OperationContext* opCtx,
const NamespaceString& ns,
const BSONObj& cmdObj,
bool attachWriteConcern) const {
auto statusWithData = _executeCommandOnPrimaryRaw(opCtx, ns, cmdObj, attachWriteConcern);
if (!statusWithData.isOK()) {
return statusWithData.getStatus();
}
auto data = statusWithData.getValue();
auto commandStatus = getStatusFromCommandResult(data);
if (!commandStatus.isOK()) {
return commandStatus;
}
auto writeConcernStatus = getWriteConcernStatusFromCommandResult(rcr.response.data);
auto writeConcernStatus = getWriteConcernStatusFromCommandResult(data);
if (!writeConcernStatus.isOK()) {
return writeConcernStatus;
}
auto writeStatus = getFirstWriteErrorStatusFromCommandResult(rcr.response.data);
auto writeStatus = getFirstWriteErrorStatusFromCommandResult(data);
if (!writeStatus.isOK()) {
return writeStatus;
}
return rcr.response.data;
return data;
}
void ReplicaSetNodeProcessInterface::_attachGenericCommandArgs(OperationContext* opCtx,

View File

@ -96,11 +96,11 @@ public:
~ReplicaSetNodeProcessInterface() override = default;
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
InsertResult insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
@ -130,16 +130,24 @@ public:
const BSONObj& cmdObj,
const TimeseriesOptions& userOpts) override;
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override;
InsertResult insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override;
UUID fetchCollectionUUIDFromPrimary(OperationContext* opCtx,
const NamespaceString& nss) override;
private:
/**
* Attemps to execute the specified command on the primary. Returns the command response without
* parsing the result. May return non-OK status in case of network issues.
*/
StatusWith<BSONObj> _executeCommandOnPrimaryRaw(OperationContext* opCtx,
const NamespaceString& ns,
const BSONObj& cmdObj,
bool attachWriteConcern = true) const;
/**
* Attemps to execute the specified command on the primary. Returns the command response upon
* success or a non-OK status upon a failed command response, a writeConcernError, or any

View File

@ -204,7 +204,7 @@ boost::optional<Document> ShardServerProcessInterface::lookupSingleDocument(
expCtx, nss, std::move(collectionUUID), documentKey, std::move(opts));
}
Status ShardServerProcessInterface::insert(
MongoProcessInterface::InsertResult ShardServerProcessInterface::insert(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
@ -226,7 +226,20 @@ Status ShardServerProcessInterface::insert(
&response,
targetEpoch);
return response.toStatus();
InsertResult result;
if (!response.getOk()) {
result.push_back(response.getTopLevelStatus());
} else if (response.isErrDetailsSet()) {
result.reserve(response.getErrDetails().size());
for (const auto& error : response.getErrDetails()) {
result.push_back(error.getStatus());
}
} else if (response.isWriteConcernErrorSet()) {
result.push_back(response.getWriteConcernError()->toStatus());
} else {
result.push_back(Status::OK());
}
return result;
}
StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::update(
@ -631,7 +644,7 @@ boost::optional<TimeseriesOptions> ShardServerProcessInterface::_getTimeseriesOp
IDLParserContext("TimeseriesOptions"));
}
Status ShardServerProcessInterface::insertTimeseries(
MongoProcessInterface::InsertResult ShardServerProcessInterface::insertTimeseries(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,

View File

@ -115,11 +115,11 @@ public:
const Document& documentKey,
boost::optional<BSONObj> readConcern) final;
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
InsertResult insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
@ -204,11 +204,11 @@ public:
const BSONObj& cmdObj,
const TimeseriesOptions& userOpts) final;
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
InsertResult insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) final;
std::vector<DatabaseName> getAllDatabases(OperationContext* opCtx,
boost::optional<TenantId> tenantId) final;

View File

@ -96,19 +96,19 @@ public:
void updateClientOperationTime(OperationContext* opCtx) const override {}
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID>) override {
InsertResult insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID>) override {
MONGO_UNREACHABLE;
}
Status insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override {
InsertResult insertTimeseries(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::unique_ptr<write_ops::InsertCommandRequest> insertCommand,
const WriteConcernOptions& wc,
boost::optional<OID> targetEpoch) override {
MONGO_UNREACHABLE;
}