SERVER-97609 Time-series write path code movement pre-work (#29586)

GitOrigin-RevId: e1a55ab4fcaaf69a22f6667a6e4eed3d9dba5e8d
This commit is contained in:
Matt Kneiser 2024-11-27 07:41:58 -08:00 committed by MongoDB Bot
parent 7614621116
commit 809d42b47a
14 changed files with 314 additions and 172 deletions

View File

@ -35,7 +35,6 @@
#include <string>
#include <utility>
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/group_from_first_document_transformation.h"
#include "mongo/db/query/collation/collator_interface.h"
@ -45,7 +44,6 @@ namespace mongo {
class BSONObj;
class ExtensionsCallback;
class NamespaceString;
class OperationContext;
/**

View File

@ -197,6 +197,7 @@ mongo_cc_library(
"//src/mongo/db/timeseries:timeseries_write_util",
"//src/mongo/db/timeseries/bucket_catalog",
"//src/mongo/db/timeseries/write_ops:timeseries_write_ops",
"//src/mongo/db/timeseries/write_ops:timeseries_write_ops_utils",
"//src/mongo/db/transaction",
"//src/mongo/db/transaction:transaction_api",
"//src/mongo/executor:inline_executor",

View File

@ -132,6 +132,7 @@
#include "mongo/db/timeseries/timeseries_update_delete_util.h"
#include "mongo/db/timeseries/timeseries_write_util.h"
#include "mongo/db/timeseries/write_ops/timeseries_write_ops.h"
#include "mongo/db/timeseries/write_ops/timeseries_write_ops_utils.h"
#include "mongo/db/transaction/retryable_writes_stats.h"
#include "mongo/db/transaction/transaction_api.h"
#include "mongo/db/transaction/transaction_participant.h"
@ -1533,7 +1534,7 @@ void runTimeseriesRetryableUpdates(OperationContext* opCtx,
size_t nextOpIndex = 0;
for (auto&& singleOp : wholeOp.getUpdates()) {
auto singleUpdateOp = timeseries::buildSingleUpdateOp(wholeOp, nextOpIndex);
auto singleUpdateOp = timeseries::write_ops::buildSingleUpdateOp(wholeOp, nextOpIndex);
const auto stmtId = write_ops::getStmtIdForWriteAt(wholeOp, nextOpIndex++);
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();

View File

@ -109,6 +109,7 @@ mongo_cc_library(
"//src/mongo/db/timeseries/bucket_catalog:rollover.h",
"//src/mongo/db/timeseries/bucket_catalog:tracking_contexts.h",
"//src/mongo/db/timeseries/bucket_catalog:write_batch.h",
"//src/mongo/db/timeseries/write_ops:measurement.h",
"//src/mongo/db/catalog:collection.h",
"//src/mongo/db/catalog:index_catalog.h",
"//src/mongo/db/catalog:index_catalog_entry.h",

View File

@ -52,6 +52,7 @@
#include "mongo/bson/util/builder.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_write_util.h"
#include "mongo/db/timeseries/write_ops/measurement.h"
#include "mongo/logv2/log.h"
#include "mongo/logv2/log_attr.h"
#include "mongo/logv2/log_truncation.h"
@ -82,7 +83,7 @@ CompressionResult _compressBucket(const BSONObj& bucketDoc,
std::unique_ptr<char[]> tamperedData;
BSONObjBuilder builder; // builder to build the compressed bucket
std::vector<details::Measurement>
std::vector<write_ops_utils::details::Measurement>
measurements; // Extracted measurements from uncompressed bucket
boost::optional<BSONObjIterator> time; // Iterator to read time fields from uncompressed bucket
std::vector<std::pair<StringData, BSONObjIterator>>
@ -135,7 +136,7 @@ CompressionResult _compressBucket(const BSONObj& bucketDoc,
auto timeElement = time->next();
// Get BSONElement's to all data elements. Missing data fields are represented as EOO.
details::Measurement measurement;
write_ops_utils::details::Measurement measurement;
measurement.timeField = timeElement;
measurement.dataFields.resize(columns.size());
@ -182,7 +183,8 @@ CompressionResult _compressBucket(const BSONObj& bucketDoc,
// Sort all the measurements on time order.
std::sort(measurements.begin(),
measurements.end(),
[](const details::Measurement& lhs, const details::Measurement& rhs) {
[](const write_ops_utils::details::Measurement& lhs,
const write_ops_utils::details::Measurement& rhs) {
return lhs.timeField.date() < rhs.timeField.date();
});

View File

@ -106,9 +106,10 @@ MONGO_FAIL_POINT_DEFINE(runPostCommitDebugChecks);
// Return a verifierFunction that is used to perform a data integrity check on inserts into
// a compressed column.
doc_diff::VerifierFunc makeVerifierFunction(std::vector<details::Measurement> sortedMeasurements,
std::shared_ptr<bucket_catalog::WriteBatch> batch,
OperationSource source) {
doc_diff::VerifierFunc makeVerifierFunction(
std::vector<write_ops_utils::details::Measurement> sortedMeasurements,
std::shared_ptr<bucket_catalog::WriteBatch> batch,
OperationSource source) {
return [sortedMeasurements = std::move(sortedMeasurements), batch, source](
const BSONObj& docToWrite, const BSONObj& pre) {
timeseriesDataIntegrityCheckFailureUpdate.executeIf(
@ -123,43 +124,44 @@ doc_diff::VerifierFunc makeVerifierFunction(std::vector<details::Measurement> so
[&source](const BSONObj&) { return source == OperationSource::kTimeseriesUpdate; });
using AddAttrsFn = std::function<void(logv2::DynamicAttributes&)>;
auto failed =
[&sortedMeasurements, &batch, &docToWrite, &pre](
StringData reason, AddAttrsFn addAttrsWithoutData, AddAttrsFn addAttrsWithData) {
logv2::DynamicAttributes attrs;
attrs.add("reason", reason);
attrs.add("bucketId", batch->bucketId.oid);
attrs.add("collectionUUID", batch->bucketId.collectionUUID);
addAttrsWithoutData(attrs);
auto failed = [&sortedMeasurements, &batch, &docToWrite, &pre](
StringData reason,
AddAttrsFn addAttrsWithoutData,
AddAttrsFn addAttrsWithData) {
logv2::DynamicAttributes attrs;
attrs.add("reason", reason);
attrs.add("bucketId", batch->bucketId.oid);
attrs.add("collectionUUID", batch->bucketId.collectionUUID);
addAttrsWithoutData(attrs);
LOGV2_WARNING(
8807500, "Failed data verification inserting into compressed column", attrs);
LOGV2_WARNING(
8807500, "Failed data verification inserting into compressed column", attrs);
attrs = {};
auto seqLogDataFields = [](const details::Measurement& measurement) {
return logv2::seqLog(measurement.dataFields);
};
auto measurementsAttr = logv2::seqLog(
boost::make_transform_iterator(sortedMeasurements.begin(), seqLogDataFields),
boost::make_transform_iterator(sortedMeasurements.end(), seqLogDataFields));
attrs.add("measurements", measurementsAttr);
auto preAttr = base64::encode(pre.objdata(), pre.objsize());
attrs.add("pre", preAttr);
auto bucketAttr = base64::encode(docToWrite.objdata(), docToWrite.objsize());
attrs.add("bucket", bucketAttr);
addAttrsWithData(attrs);
LOGV2_WARNING_OPTIONS(8807501,
logv2::LogTruncation::Disabled,
"Failed data verification inserting into compressed column",
attrs);
invariant(!TestingProctor::instance().isEnabled());
tasserted(timeseries::BucketCompressionFailure(batch->bucketId.collectionUUID,
batch->bucketId.oid,
batch->bucketId.keySignature),
"Failed data verification inserting into compressed column");
attrs = {};
auto seqLogDataFields = [](const write_ops_utils::details::Measurement& measurement) {
return logv2::seqLog(measurement.dataFields);
};
auto measurementsAttr = logv2::seqLog(
boost::make_transform_iterator(sortedMeasurements.begin(), seqLogDataFields),
boost::make_transform_iterator(sortedMeasurements.end(), seqLogDataFields));
attrs.add("measurements", measurementsAttr);
auto preAttr = base64::encode(pre.objdata(), pre.objsize());
attrs.add("pre", preAttr);
auto bucketAttr = base64::encode(docToWrite.objdata(), docToWrite.objsize());
attrs.add("bucket", bucketAttr);
addAttrsWithData(attrs);
LOGV2_WARNING_OPTIONS(8807501,
logv2::LogTruncation::Disabled,
"Failed data verification inserting into compressed column",
attrs);
invariant(!TestingProctor::instance().isEnabled());
tasserted(timeseries::BucketCompressionFailure(batch->bucketId.collectionUUID,
batch->bucketId.oid,
batch->bucketId.keySignature),
"Failed data verification inserting into compressed column");
};
auto actualMeta = docToWrite.getField(kBucketMetaFieldName);
auto expectedMeta = batch->bucketKey.metadata.element();
@ -393,7 +395,7 @@ write_ops::UpdateOpEntry makeTimeseriesCompressedDiffEntry(
OperationContext* opCtx,
std::shared_ptr<bucket_catalog::WriteBatch> batch,
bool changedToUnsorted,
const std::vector<details::Measurement>& sortedMeasurements) {
const std::vector<write_ops_utils::details::Measurement>& sortedMeasurements) {
// Verifier function that will be called when we apply the diff to our bucket and verify that
// the measurements we inserted appear correctly in the resulting bucket's BSONColumns.
@ -799,14 +801,14 @@ BSONObj makeTimeseriesInsertCompressedBucketDocument(
} // namespace
namespace details {
std::vector<Measurement> sortMeasurementsOnTimeField(
std::vector<write_ops_utils::details::Measurement> sortMeasurementsOnTimeField(
std::shared_ptr<bucket_catalog::WriteBatch> batch) {
std::vector<Measurement> measurements;
std::vector<write_ops_utils::details::Measurement> measurements;
// Convert measurements in batch from BSONObj to vector of data fields.
// Store timefield separate to allow simple sort.
for (auto& measurementObj : batch->measurements) {
Measurement measurement;
write_ops_utils::details::Measurement measurement;
for (auto& dataField : measurementObj) {
StringData key = dataField.fieldNameStringData();
if (key == batch->bucketKey.metadata.getMetaField()) {
@ -822,7 +824,8 @@ std::vector<Measurement> sortMeasurementsOnTimeField(
std::sort(measurements.begin(),
measurements.end(),
[](const Measurement& lhs, const Measurement& rhs) {
[](const write_ops_utils::details::Measurement& lhs,
const write_ops_utils::details::Measurement& rhs) {
return lhs.timeField.date() < rhs.timeField.date();
});
@ -830,18 +833,6 @@ std::vector<Measurement> sortMeasurementsOnTimeField(
}
} // namespace details
write_ops::UpdateCommandRequest buildSingleUpdateOp(const write_ops::UpdateCommandRequest& wholeOp,
size_t opIndex) {
write_ops::UpdateCommandRequest singleUpdateOp(wholeOp.getNamespace(),
{wholeOp.getUpdates()[opIndex]});
auto& commandBase = singleUpdateOp.getWriteCommandRequestBase();
commandBase.setOrdered(wholeOp.getOrdered());
commandBase.setBypassDocumentValidation(wholeOp.getBypassDocumentValidation());
commandBase.setBypassEmptyTsReplacement(wholeOp.getBypassEmptyTsReplacement());
return singleUpdateOp;
}
void assertTimeseriesBucketsCollection(const Collection* bucketsColl) {
uassert(
8555700,
@ -1013,7 +1004,7 @@ write_ops::InsertCommandRequest makeTimeseriesInsertOp(
BucketDocument bucketDoc;
if (feature_flags::gTimeseriesAlwaysUseCompressedBuckets.isEnabled(
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
std::vector<details::Measurement> sortedMeasurements =
std::vector<write_ops_utils::details::Measurement> sortedMeasurements =
details::sortMeasurementsOnTimeField(batch);
// Insert measurements, and appropriate skips, into all column builders.
@ -1072,7 +1063,8 @@ write_ops::UpdateCommandRequest makeTimeseriesCompressedDiffUpdateOp(
bool changedToUnsorted = false;
std::vector<Measurement> sortedMeasurements = sortMeasurementsOnTimeField(batch);
std::vector<write_ops_utils::details::Measurement> sortedMeasurements =
sortMeasurementsOnTimeField(batch);
if (batch->bucketIsSortedByTime &&
sortedMeasurements.begin()->timeField.timestamp() <
batch->measurementMap.timeOfLastMeasurement(batch->timeField)) {

View File

@ -52,17 +52,11 @@
#include "mongo/db/timeseries/bucket_catalog/write_batch.h"
#include "mongo/db/timeseries/timeseries_gen.h"
#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
#include "mongo/db/timeseries/write_ops/measurement.h"
#include "mongo/stdx/unordered_map.h"
namespace mongo::timeseries {
/**
* Constructs an update request using a single update statement at position `opIndex`.
*/
mongo::write_ops::UpdateCommandRequest buildSingleUpdateOp(
const mongo::write_ops::UpdateCommandRequest& wholeOp, size_t opIndex);
/**
* Asserts the buckets collection exists and has valid time-series options.
*
@ -345,21 +339,11 @@ std::function<void(const timeseries::bucket_catalog::WriteBatch&, StringData tim
getPostCommitDebugChecks(OperationContext*, const NamespaceString&);
namespace details {
/**
* Helper for measurement sorting.
* timeField: {"<timeField>": "2022-06-06T15:34:30.000Z"}
* dataFields: [{"<timefield>": 2022-06-06T15:34:30.000Z}, {"a": 1}, {"b": 2}]
*/
struct Measurement {
BSONElement timeField;
std::vector<BSONElement> dataFields;
};
/**
* Returns collection of measurements sorted on time field.
* Filters out meta field from input and does not include it in output.
*/
std::vector<Measurement> sortMeasurementsOnTimeField(
std::vector<write_ops_utils::details::Measurement> sortMeasurementsOnTimeField(
std::shared_ptr<bucket_catalog::WriteBatch> batch);
} // namespace details
} // namespace mongo::timeseries

View File

@ -57,6 +57,8 @@
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_options.h"
#include "mongo/db/timeseries/timeseries_write_util.h"
#include "mongo/db/timeseries/write_ops/measurement.h"
#include "mongo/db/timeseries/write_ops/timeseries_write_ops_utils.h"
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/logv2/log.h"
#include "mongo/unittest/assert.h"
@ -65,33 +67,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
namespace mongo::timeseries {
namespace details {
inline bool operator==(const Measurement& lhs, const Measurement& rhs) {
bool timeFieldEqual = (lhs.timeField.woCompare(rhs.timeField) == 0);
if (!timeFieldEqual || (lhs.dataFields.size() != rhs.dataFields.size())) {
return false;
}
StringMap<BSONElement> rhsFields;
for (auto& field : rhs.dataFields) {
rhsFields.insert({field.fieldNameStringData().toString(), field});
}
for (size_t i = 0; i < lhs.dataFields.size(); ++i) {
auto& lhsField = lhs.dataFields[i];
auto it = rhsFields.find(lhsField.fieldNameStringData().toString());
if (it == rhsFields.end()) {
return false;
}
if (it->second.woCompare(lhsField) != 0) {
return false;
}
}
return true;
}
} // namespace details
namespace {
const std::string testDbName = "db_timeseries_write_util_test";
@ -537,24 +512,26 @@ TEST_F(TimeseriesWriteUtilTest, PerformAtomicDelete) {
// Deletes the bucket document.
{
write_ops::DeleteOpEntry deleteEntry(BSON("_id" << bucketId), false);
write_ops::DeleteCommandRequest op(ns.makeTimeseriesBucketsNamespace(), {deleteEntry});
mongo::write_ops::DeleteOpEntry deleteEntry(BSON("_id" << bucketId), false);
mongo::write_ops::DeleteCommandRequest op(ns.makeTimeseriesBucketsNamespace(),
{deleteEntry});
write_ops::WriteCommandRequestBase base;
mongo::write_ops::WriteCommandRequestBase base;
base.setBypassDocumentValidation(true);
base.setStmtIds(std::vector<StmtId>{kUninitializedStmtId});
op.setWriteCommandRequestBase(std::move(base));
ASSERT_DOES_NOT_THROW(performAtomicWrites(
opCtx,
bucketsColl.getCollection(),
recordId,
std::variant<write_ops::UpdateCommandRequest, write_ops::DeleteCommandRequest>{op},
{},
{},
/*fromMigrate=*/false,
/*stmtId=*/kUninitializedStmtId));
ASSERT_DOES_NOT_THROW(
performAtomicWrites(opCtx,
bucketsColl.getCollection(),
recordId,
std::variant<mongo::write_ops::UpdateCommandRequest,
mongo::write_ops::DeleteCommandRequest>{op},
{},
{},
/*fromMigrate=*/false,
/*stmtId=*/kUninitializedStmtId));
}
// Checks the document is removed.
@ -606,25 +583,26 @@ TEST_F(TimeseriesWriteUtilTest, PerformAtomicUpdate) {
"b":{"0":3}}})");
{
write_ops::UpdateModification u(replaceDoc);
write_ops::UpdateOpEntry update(BSON("_id" << bucketId), std::move(u));
write_ops::UpdateCommandRequest op(ns.makeTimeseriesBucketsNamespace(), {update});
mongo::write_ops::UpdateModification u(replaceDoc);
mongo::write_ops::UpdateOpEntry update(BSON("_id" << bucketId), std::move(u));
mongo::write_ops::UpdateCommandRequest op(ns.makeTimeseriesBucketsNamespace(), {update});
write_ops::WriteCommandRequestBase base;
mongo::write_ops::WriteCommandRequestBase base;
base.setBypassDocumentValidation(true);
base.setStmtIds(std::vector<StmtId>{kUninitializedStmtId});
op.setWriteCommandRequestBase(std::move(base));
ASSERT_DOES_NOT_THROW(performAtomicWrites(
opCtx,
bucketsColl.getCollection(),
recordId,
std::variant<write_ops::UpdateCommandRequest, write_ops::DeleteCommandRequest>{op},
{},
{},
/*fromMigrate=*/false,
/*stmtId=*/kUninitializedStmtId));
ASSERT_DOES_NOT_THROW(
performAtomicWrites(opCtx,
bucketsColl.getCollection(),
recordId,
std::variant<mongo::write_ops::UpdateCommandRequest,
mongo::write_ops::DeleteCommandRequest>{op},
{},
{},
/*fromMigrate=*/false,
/*stmtId=*/kUninitializedStmtId));
}
// Checks the document is updated.
@ -686,27 +664,28 @@ TEST_F(TimeseriesWriteUtilTest, PerformAtomicDeleteAndInsert) {
OID bucketId2 = bucketDoc2["_id"].OID();
auto recordId2 = record_id_helpers::keyForOID(bucketId2);
{
write_ops::DeleteOpEntry deleteEntry(BSON("_id" << bucketId1), false);
write_ops::DeleteCommandRequest deleteOp(ns.makeTimeseriesBucketsNamespace(),
{deleteEntry});
write_ops::WriteCommandRequestBase base;
mongo::write_ops::DeleteOpEntry deleteEntry(BSON("_id" << bucketId1), false);
mongo::write_ops::DeleteCommandRequest deleteOp(ns.makeTimeseriesBucketsNamespace(),
{deleteEntry});
mongo::write_ops::WriteCommandRequestBase base;
base.setBypassDocumentValidation(true);
base.setStmtIds(std::vector<StmtId>{kUninitializedStmtId});
deleteOp.setWriteCommandRequestBase(base);
write_ops::InsertCommandRequest insertOp(ns.makeTimeseriesBucketsNamespace(), {bucketDoc2});
mongo::write_ops::InsertCommandRequest insertOp(ns.makeTimeseriesBucketsNamespace(),
{bucketDoc2});
insertOp.setWriteCommandRequestBase(base);
ASSERT_DOES_NOT_THROW(performAtomicWrites(
opCtx,
bucketsColl.getCollection(),
recordId1,
std::variant<write_ops::UpdateCommandRequest, write_ops::DeleteCommandRequest>{
deleteOp},
{insertOp},
{},
/*fromMigrate=*/false,
/*stmtId=*/kUninitializedStmtId));
ASSERT_DOES_NOT_THROW(
performAtomicWrites(opCtx,
bucketsColl.getCollection(),
recordId1,
std::variant<mongo::write_ops::UpdateCommandRequest,
mongo::write_ops::DeleteCommandRequest>{deleteOp},
{insertOp},
{},
/*fromMigrate=*/false,
/*stmtId=*/kUninitializedStmtId));
}
// Checks document1 is removed and document2 is added.
@ -792,31 +771,32 @@ TEST_F(TimeseriesWriteUtilTest, PerformAtomicUpdateAndInserts) {
OID bucketId3 = bucketDoc3["_id"].OID();
auto recordId3 = record_id_helpers::keyForOID(bucketId3);
{
write_ops::UpdateModification u(replaceDoc);
write_ops::UpdateOpEntry update(BSON("_id" << bucketId1), std::move(u));
write_ops::UpdateCommandRequest updateOp(ns.makeTimeseriesBucketsNamespace(), {update});
write_ops::WriteCommandRequestBase base;
mongo::write_ops::UpdateModification u(replaceDoc);
mongo::write_ops::UpdateOpEntry update(BSON("_id" << bucketId1), std::move(u));
mongo::write_ops::UpdateCommandRequest updateOp(ns.makeTimeseriesBucketsNamespace(),
{update});
mongo::write_ops::WriteCommandRequestBase base;
base.setBypassDocumentValidation(true);
base.setStmtIds(std::vector<StmtId>{kUninitializedStmtId});
updateOp.setWriteCommandRequestBase(base);
write_ops::InsertCommandRequest insertOp1(ns.makeTimeseriesBucketsNamespace(),
{bucketDoc2});
mongo::write_ops::InsertCommandRequest insertOp1(ns.makeTimeseriesBucketsNamespace(),
{bucketDoc2});
insertOp1.setWriteCommandRequestBase(base);
write_ops::InsertCommandRequest insertOp2(ns.makeTimeseriesBucketsNamespace(),
{bucketDoc3});
mongo::write_ops::InsertCommandRequest insertOp2(ns.makeTimeseriesBucketsNamespace(),
{bucketDoc3});
insertOp2.setWriteCommandRequestBase(base);
ASSERT_DOES_NOT_THROW(performAtomicWrites(
opCtx,
bucketsColl.getCollection(),
recordId1,
std::variant<write_ops::UpdateCommandRequest, write_ops::DeleteCommandRequest>{
updateOp},
{insertOp1, insertOp2},
{},
/*fromMigrate=*/false,
/*stmtId=*/kUninitializedStmtId));
ASSERT_DOES_NOT_THROW(
performAtomicWrites(opCtx,
bucketsColl.getCollection(),
recordId1,
std::variant<mongo::write_ops::UpdateCommandRequest,
mongo::write_ops::DeleteCommandRequest>{updateOp},
{insertOp1, insertOp2},
{},
/*fromMigrate=*/false,
/*stmtId=*/kUninitializedStmtId));
}
// Checks document1 is updated and document2 and document3 are added.
@ -1152,7 +1132,8 @@ TEST_F(TimeseriesWriteUtilTest, SortMeasurementsOnTimeField) {
batch->max = fromjson(R"({"time":{"$date":"2022-06-06T15:34:30.000Z"},"a":3,"b":3})");
batch->timeField = kTimeseriesOptions.getTimeField();
std::vector testMeasurements = details::sortMeasurementsOnTimeField(batch);
std::vector<timeseries::write_ops_utils::details::Measurement> testMeasurements =
details::sortMeasurementsOnTimeField(batch);
const std::vector<BSONObj> sortedMeasurements = {
fromjson(R"({"time":{"$date":"2022-06-06T15:34:30.000Z"},"a":3,"b":3})"),
@ -1166,7 +1147,7 @@ TEST_F(TimeseriesWriteUtilTest, SortMeasurementsOnTimeField) {
ASSERT_EQ(testMeasurements.size(), sortedMeasurements.size());
for (size_t i = 0; i < sortedMeasurements.size(); ++i) {
details::Measurement m;
timeseries::write_ops_utils::details::Measurement m;
m.timeField = sortedTimeFields[i].getField("time");
m.dataFields.push_back(sortedMeasurements[i].getField("time"));
m.dataFields.push_back(sortedMeasurements[i].getField("a"));
@ -1205,12 +1186,13 @@ TEST_F(TimeseriesWriteUtilTest, SortMeasurementsOnTimeFieldExtendedRange) {
batch->max = measurements[0];
batch->timeField = kTimeseriesOptions.getTimeField();
const std::vector testMeasurements = details::sortMeasurementsOnTimeField(batch);
std::vector<timeseries::write_ops_utils::details::Measurement> testMeasurements =
details::sortMeasurementsOnTimeField(batch);
ASSERT_EQ(testMeasurements.size(), measurements.size());
auto compare = [&](int inputIdx, int outputIdx) {
details::Measurement m;
timeseries::write_ops_utils::details::Measurement m;
m.timeField = measurements[inputIdx].getField("time");
m.dataFields.push_back(measurements[inputIdx].getField("time"));
m.dataFields.push_back(measurements[inputIdx].getField("a"));

View File

@ -4,8 +4,11 @@ package(default_visibility = ["//visibility:public"])
exports_files(
[
"measurement.h",
"timeseries_write_ops.cpp",
"timeseries_write_ops.h",
"timeseries_write_ops_utils.cpp",
"timeseries_write_ops_utils.h",
],
)
@ -37,3 +40,16 @@ mongo_cc_library(
"//src/mongo/db/update:update_common",
],
)
mongo_cc_library(
name = "timeseries_write_ops_utils",
srcs = [
"timeseries_write_ops_utils.cpp",
],
hdrs = [
"timeseries_write_ops_utils.h",
],
deps = [
"//src/mongo/db/query/write_ops:write_ops_parsers",
],
)

View File

@ -0,0 +1,76 @@
/**
* Copyright (C) 2024-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include <vector>
#include "mongo/bson/bsonelement.h"
#include "mongo/util/string_map.h"
namespace mongo::timeseries::write_ops_utils::details {
/**
* Helper for measurement sorting.
* timeField: {"<timeField>": "2022-06-06T15:34:30.000Z"}
* dataFields: [{"<timefield>": 2022-06-06T15:34:30.000Z}, {"a": 1}, {"b": 2}]
*/
struct Measurement {
BSONElement timeField;
std::vector<BSONElement> dataFields;
};
inline bool operator==(const timeseries::write_ops_utils::details::Measurement& lhs,
const timeseries::write_ops_utils::details::Measurement& rhs) {
bool timeFieldEqual = (lhs.timeField.woCompare(rhs.timeField) == 0);
if (!timeFieldEqual || (lhs.dataFields.size() != rhs.dataFields.size())) {
return false;
}
StringMap<BSONElement> rhsFields;
for (auto& field : rhs.dataFields) {
rhsFields.insert({field.fieldNameStringData().toString(), field});
}
for (size_t i = 0; i < lhs.dataFields.size(); ++i) {
auto& lhsField = lhs.dataFields[i];
auto it = rhsFields.find(lhsField.fieldNameStringData().toString());
if (it == rhsFields.end()) {
return false;
}
if (it->second.woCompare(lhsField) != 0) {
return false;
}
}
return true;
}
} // namespace mongo::timeseries::write_ops_utils::details

View File

@ -0,0 +1,46 @@
/**
* Copyright (C) 2024-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/timeseries/write_ops/timeseries_write_ops_utils.h"
namespace mongo::timeseries::write_ops {
mongo::write_ops::UpdateCommandRequest buildSingleUpdateOp(
const mongo::write_ops::UpdateCommandRequest& wholeOp, size_t opIndex) {
mongo::write_ops::UpdateCommandRequest singleUpdateOp(wholeOp.getNamespace(),
{wholeOp.getUpdates()[opIndex]});
auto& commandBase = singleUpdateOp.getWriteCommandRequestBase();
commandBase.setOrdered(wholeOp.getOrdered());
commandBase.setBypassDocumentValidation(wholeOp.getBypassDocumentValidation());
commandBase.setBypassEmptyTsReplacement(wholeOp.getBypassEmptyTsReplacement());
return singleUpdateOp;
}
} // namespace mongo::timeseries::write_ops

View File

@ -0,0 +1,43 @@
/**
* Copyright (C) 2024-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/query/write_ops/write_ops_gen.h"
namespace mongo::timeseries::write_ops {
/**
* Constructs an update request using a single update statement at position `opIndex`.
*/
mongo::write_ops::UpdateCommandRequest buildSingleUpdateOp(
const mongo::write_ops::UpdateCommandRequest& wholeOp, size_t opIndex);
} // namespace mongo::timeseries::write_ops

View File

@ -759,7 +759,7 @@ mongo_cc_library(
"//src/mongo/db/timeseries:timeseries_conversion_util",
"//src/mongo/db/timeseries:timeseries_metadata",
"//src/mongo/db/timeseries:timeseries_options",
"//src/mongo/db/timeseries:timeseries_write_util",
"//src/mongo/db/timeseries/write_ops:timeseries_write_ops_utils",
"//src/mongo/db/transaction:transaction_api",
"//src/mongo/executor:inline_executor",
"//src/mongo/s/query/planner:cluster_find",

View File

@ -57,7 +57,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/session/logical_session_id_helpers.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/timeseries/timeseries_write_util.h"
#include "mongo/db/timeseries/write_ops/timeseries_write_ops_utils.h"
#include "mongo/db/transaction/transaction_api.h"
#include "mongo/executor/inline_executor.h"
#include "mongo/executor/remote_command_response.h"
@ -599,7 +599,7 @@ void executeRetryableTimeseriesUpdate(OperationContext* opCtx,
}
auto wholeOp = clientRequest.getUpdateRequest();
auto singleUpdateOp = timeseries::buildSingleUpdateOp(wholeOp, index);
auto singleUpdateOp = timeseries::write_ops::buildSingleUpdateOp(wholeOp, index);
BatchedCommandRequest singleUpdateRequest(singleUpdateOp);
const auto stmtId = write_ops::getStmtIdForWriteAt(wholeOp, index);