SERVER-108565 Check bucket size before writing for ordered time-series inserts (#42779)

GitOrigin-RevId: 8be5bbf7b90d998a08e272903477f45dc8019aba
This commit is contained in:
Matt Kneiser 2025-10-22 08:00:06 -07:00 committed by MongoDB Bot
parent 822a3ce873
commit 549c2f9ca8
22 changed files with 817 additions and 177 deletions

0
buildscripts/evergreen_resmoke_job_count.py Normal file → Executable file
View File

View File

@ -1,5 +1,8 @@
// The {$set: {x: maxStr}}} update takes multiple seconds to execute.
// @tags: [operations_longer_than_stepdown_interval]
// @tags: [
// operations_longer_than_stepdown_interval,
// multiversion_incompatible,
// ]
/**
* Confirms that:
@ -51,7 +54,7 @@ coll.drop();
assert.commandFailedWithCode(
db.runCommand(
{insert: coll.getName(), documents: [{_id: new ObjectId(), x: largerThanMaxString}]}),
2);
ErrorCodes.BSONObjectTooLarge);
coll.drop();
assert.commandFailedWithCode(db.runCommand({
@ -68,5 +71,5 @@ assert.commandFailedWithCode(db.runCommand({
ordered: true,
updates: [{q: {_id: objectId}, u: {$set: {x: largerThanMaxString}}}]
}),
17419);
[17419, ErrorCodes.BSONObjectTooLarge]);
})();

View File

@ -9,6 +9,7 @@
// does_not_support_stepdowns,
// uses_map_reduce_with_temp_collections,
// requires_scripting,
// multiversion_incompatible,
// ]
/**
@ -63,7 +64,7 @@ function runTest(testOptions) {
// In some cases we may see the javascript execution interrupted because it takes longer than
// our default time limit, so we allow that possibility.
assert.commandFailedWithCode(res,
[ErrorCodes.BadValue, ErrorCodes.Interrupted],
[ErrorCodes.BSONObjectTooLarge, ErrorCodes.Interrupted],
"creating a document larger than 16MB didn't fail");
if (res.code != ErrorCodes.Interrupted) {
assert.lte(

View File

@ -0,0 +1,55 @@
/**
* Tests that time-series inserts do not cause any underlying bucket documents to exceed the max
* user BSON size.
*
* Bucket Insert: Measurements that are uninsertable due to exceeding the BSON size limit when a
* bucket insert is generated to accommodate one measurement.
*
* @tags: [
* requires_timeseries,
* multiversion_incompatible,
* ]
*/
(function() {
"use strict";
load("jstests/core/timeseries/libs/timeseries.js");
let counter = 0;
TimeseriesTest.run(insert => {
const testDB = db.getSiblingDB(jsTestName());
const coll = testDB["coll_" + counter++];
const timestamp = ISODate("2025-01-01T12:00:00Z");
const timeField = "t";
const metaField = "m";
coll.drop();
assert.commandWorked(testDB.createCollection(
coll.getName(), {timeseries: {"timeField": timeField, "metaField": metaField}}));
const largeMeta = "a".repeat(16 * 1024 * 1024 + 1);
const measurement1 = {};
measurement1[timeField] = timestamp;
measurement1[metaField] = largeMeta;
measurement1["a"] = 1;
const smallMeta = "5";
const bigStr = "a".repeat(8000);
const measurement2 = {};
for (let i = 0; i < 1000; ++i) {
measurement2[i.toString()] = bigStr;
}
measurement2[timeField] = timestamp;
measurement2[metaField] = smallMeta;
// Insert Measurements
// This measurement is always too big due to meta.
assert.commandFailedWithCode(insert(coll, measurement1), ErrorCodes.BSONObjectTooLarge);
// This measurement is always too big due to total metric size being copied into control block.
assert.commandFailedWithCode(insert(coll, measurement2), ErrorCodes.BSONObjectTooLarge);
});
})();

View File

@ -0,0 +1,95 @@
/**
* Bucket Insert: Measurements that are uninsertable due to exceeding the BSON size limit when a
* bucket insert is generated to accommodate one measurement.
*
* Importantly, this controlled test checks collStats.
*
* @tags: [
* requires_replication,
* ]
*/
const rst = new ReplSetTest({nodes: 1});
const nodes = rst.startSet();
rst.initiate();
const primary = rst.getPrimary();
const db = primary.getDB("test");
const testDB = db.getSiblingDB(jsTestName());
const coll = testDB.coll;
const bucketsColl = testDB.system.buckets[coll.getName()];
const timeField = "t";
const metaField = "m";
function runTest(isOrderedWrite) {
jsTestLog("runTest(ordered=" + isOrderedWrite.toString() + ")");
// Setup
assert.commandWorked(testDB.createCollection(
coll.getName(), {timeseries: {timeField: timeField, metaField: metaField}}));
const largeMeta = "a".repeat(16 * 1024 * 1024 + 1);
const timestamp = ISODate("2025-01-01T12:00:00Z");
const measurement1 = {};
measurement1[timeField] = timestamp;
measurement1[metaField] = largeMeta;
measurement1["a"] = 1;
const smallMeta = "5";
const bigStr = "a".repeat(60000);
const measurement2 = {};
for (let i = 0; i < 100; ++i) {
measurement2[i.toString()] = bigStr;
}
measurement2[timeField] = timestamp;
measurement2[metaField] = smallMeta;
// Insert Measurement
jsTestLog("insert1");
assert.commandFailedWithCode(coll.insert(measurement1, {ordered: isOrderedWrite}),
ErrorCodes.BSONObjectTooLarge);
let stats = coll.stats().timeseries;
assert.eq(0, stats.numBucketInserts, tojson(stats));
assert.eq(0, stats.numBucketUpdates, tojson(stats));
assert.eq(isOrderedWrite ? 2 : 5, stats.numBucketsOpenedDueToMetadata, tojson(stats));
assert.eq(0, stats.numBucketsClosedDueToSize, tojson(stats));
// The failed ordered write retries as unordered and thus makes 2
// unsuccessful attempts.
assert.eq(isOrderedWrite ? 2 : 5, stats.numBucketDocumentsTooLargeInsert, tojson(stats));
assert.eq(0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats));
jsTestLog("insert2");
assert.commandFailedWithCode(coll.insert(measurement2, {ordered: isOrderedWrite}),
ErrorCodes.BSONObjectTooLarge);
stats = coll.stats().timeseries;
assert.eq(0, stats.numBucketInserts, tojson(stats));
assert.eq(0, stats.numBucketUpdates, tojson(stats));
assert.eq(isOrderedWrite ? 4 : 6, stats.numBucketsOpenedDueToMetadata, tojson(stats));
assert.eq(0, stats.numBucketsClosedDueToSize, tojson(stats));
// The failed ordered write retries as unordered and thus makes 2
// unsuccessful attempts.
assert.eq(isOrderedWrite ? 4 : 6, stats.numBucketDocumentsTooLargeInsert, tojson(stats));
assert.eq(0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats));
// Check Results
// TODO(SERVER-108699): Remove this check.
let buckets = bucketsColl.find().toArray();
for (let i = 0; i < buckets.length; i++) {
let bucketDocSize = Object.bsonsize(buckets[i]);
assert.lte(bucketDocSize, 16 * 1024 * 1024);
}
coll.drop();
// Stats do not reset on v7.0 when a collection drops. Thus, many checks are path-dependent
// without a reboot.
}
runTest(/*isOrderedWrite=*/ true);
runTest(/*isOrderedWrite=*/ false);
rst.stopSet();

View File

@ -0,0 +1,98 @@
/**
* Bucket Update: exploits field size <32B estimated at 0B.
* An ordered write that results in a failed bucket update due to exceeding the BSON size limit,
* should be successfully retryable as a bucket insert.
*
* This test does not exercise new code paths added in SERVER-108565, only on v7.0. It exists to
* verify that this branch is not vulnerable to these issues.
*
* @tags: [
* requires_replication,
* ]
*/
const rst = new ReplSetTest({nodes: 1});
const nodes = rst.startSet();
rst.initiate();
const primary = rst.getPrimary();
const db = primary.getDB("test");
const testDB = db.getSiblingDB(jsTestName());
const coll = testDB.coll;
const bucketsColl = testDB.system.buckets[coll.getName()];
const timeField = "t";
const metaField = "m";
function runTest(isOrderedWrite) {
jsTestLog("runTest(ordered=" + isOrderedWrite.toString() + ")");
// Setup
assert.commandWorked(testDB.createCollection(
coll.getName(), {timeseries: {timeField: timeField, metaField: metaField}}));
const timestamp = ISODate("2025-01-01T12:00:00Z");
// This value size is chosen to maximize the size of an object that is estimated by the
// time-series write path of being treated as 0. There is a fixed-size 8 bytes of bson and
// estimation overhead.
const str = "a".repeat(24);
const measurement1 = {a: 1, [timeField]: timestamp};
const measurement2 = {};
// The number of fields chosen below combined with the unestimated size yields a measurement
// size of around 5MB. When tripled to account for the 3 literals per time-series bucket, this
// is just underneath the BSON limit for a bucket document.
for (let i = 0; i < 140000; ++i) {
measurement2[i.toString()] = str;
}
measurement2[timeField] = timestamp;
// Insert Measurements
jsTestLog("insert1");
assert.commandWorked(coll.insert(measurement1, {ordered: isOrderedWrite}));
let stats = coll.stats().timeseries;
assert.eq(isOrderedWrite ? 1 : 3, stats.numBucketInserts, tojson(stats));
assert.eq(0, stats.numBucketUpdates, tojson(stats));
assert.eq(isOrderedWrite ? 1 : 2, stats.numBucketsOpenedDueToMetadata, tojson(stats));
assert.eq(isOrderedWrite ? 0 : 1, stats.numBucketsClosedDueToSize, tojson(stats));
assert.eq(
0, stats.numBucketDocumentsTooLargeInsert, tojson(stats)); // See comment at top of file.
assert.eq(
0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats)); // See comment at top of file.
jsTestLog("insert2");
assert.commandWorked(coll.insert(measurement2, {ordered: isOrderedWrite}));
stats = coll.stats().timeseries;
assert.eq(isOrderedWrite ? 2 : 4, stats.numBucketInserts, tojson(stats));
assert.eq(0, stats.numBucketUpdates, tojson(stats));
// The first bucket gets cleared during the retry logic of an ordered write, thus when the
// second bucket gets allocated, the write path doesn't see an associated open bucket for the
// same meta value.
assert.eq(isOrderedWrite ? 1 : 2, stats.numBucketsOpenedDueToMetadata, tojson(stats));
assert.eq(isOrderedWrite ? 1 : 2, stats.numBucketsClosedDueToSize, tojson(stats));
assert.eq(
0, stats.numBucketDocumentsTooLargeInsert, tojson(stats)); // See comment at top of file.
assert.eq(
0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats)); // See comment at top of file.
// Check Results
// TODO(SERVER-108699): Remove this check.
let buckets = bucketsColl.find().toArray();
for (let i = 0; i < buckets.length; i++) {
let bucketDocSize = Object.bsonsize(buckets[i]);
assert.lte(bucketDocSize, 16 * 1024 * 1024);
}
coll.drop();
// Stats do not reset on v7.0 when a collection drops. Thus, many checks are path-dependent
// without a reboot.
}
runTest(/*isOrderedWrite=*/ true);
runTest(/*isOrderedWrite=*/ false);
rst.stopSet();

View File

@ -0,0 +1,100 @@
/**
* Bucket Update: Large meta near the BSON limit, allows only one measurement due to lower
* timeseries bucket size limits - Bucket::kLargeMeasurementsMaxBucketSize.
*
* @tags: [
* requires_replication,
* ]
*/
const rst = new ReplSetTest({nodes: 1});
const nodes = rst.startSet();
rst.initiate();
const primary = rst.getPrimary();
const db = primary.getDB("test");
const testDB = db.getSiblingDB(jsTestName());
const coll = testDB.coll;
const bucketsColl = testDB.system.buckets[coll.getName()];
const timeField = "t";
const metaField = "m";
function runTest(isOrderedWrite) {
jsTestLog("runTest(ordered=" + isOrderedWrite.toString() + ")");
// Setup
assert.commandWorked(testDB.createCollection(
coll.getName(), {timeseries: {timeField: timeField, metaField: metaField}}));
const timestamp = ISODate("2025-01-01T12:00:00Z");
const meta = "a".repeat(1024 * 1024 * 15);
const measurement1 = {a: 1, [timeField]: timestamp, [metaField]: meta};
const measurement2 = {
a: 2,
[timeField]: ISODate("2025-01-01T12:00:00Z"),
[metaField]: meta,
b: "b".repeat(1024 * 1024 * 0.335),
};
// Insert Measurements
jsTestLog("insert1");
assert.commandWorked(coll.insert(measurement1, {ordered: isOrderedWrite}));
let stats = coll.stats().timeseries;
assert.eq(isOrderedWrite ? 1 : 3, stats.numBucketInserts, tojson(stats));
assert.eq(0, stats.numBucketUpdates, tojson(stats));
assert.eq(isOrderedWrite ? 1 : 3, stats.numBucketsOpenedDueToMetadata, tojson(stats));
assert.eq(isOrderedWrite ? 0 : 2, stats.numBucketsClosedDueToSize, tojson(stats));
assert.eq(isOrderedWrite ? 0 : 2, stats.numBucketDocumentsTooLargeInsert, tojson(stats));
assert.eq(0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats));
// This insert will land in a new bucket due to Bucket::kLargeMeasurementsMaxBucketSize being
// exceeded by the first measurement.
jsTestLog("insert2");
assert.commandWorked(coll.insert(measurement1, {ordered: isOrderedWrite}));
stats = coll.stats().timeseries;
assert.eq(isOrderedWrite ? 2 : 4, stats.numBucketInserts, tojson(stats));
assert.eq(0, stats.numBucketUpdates, tojson(stats));
assert.eq(isOrderedWrite ? 1 : 3, stats.numBucketsOpenedDueToMetadata, tojson(stats));
assert.eq(isOrderedWrite ? 1 : 3, stats.numBucketsClosedDueToSize, tojson(stats));
assert.eq(isOrderedWrite ? 0 : 2, stats.numBucketDocumentsTooLargeInsert, tojson(stats));
assert.eq(0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats));
// This insert is not insertable, due to the 3x inflation of metric size + large meta,
// this measurement is too large for bucket update and bucket insert.
jsTestLog("insert3");
assert.commandFailedWithCode(coll.insert(measurement2, {ordered: isOrderedWrite}),
ErrorCodes.BSONObjectTooLarge);
stats = coll.stats().timeseries;
assert.eq(
isOrderedWrite ? 2 : 4, stats.numBucketInserts, tojson(stats)); // TODO: address this break
assert.eq(0, stats.numBucketUpdates, tojson(stats));
assert.eq(isOrderedWrite ? 2 : 3, stats.numBucketsOpenedDueToMetadata, tojson(stats));
assert.eq(isOrderedWrite ? 2 : 4, stats.numBucketsClosedDueToSize, tojson(stats));
// The failed ordered write retries as unordered and thus makes 2
// unsuccessful attempts.
assert.eq(isOrderedWrite ? 2 : 3, stats.numBucketDocumentsTooLargeInsert, tojson(stats));
assert.eq(0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats));
// Check Results
// TODO(SERVER-108699): Remove this check.
let buckets = bucketsColl.find().toArray();
for (let i = 0; i < buckets.length; i++) {
let bucketDocSize = Object.bsonsize(buckets[i]);
assert.lte(bucketDocSize, 16 * 1024 * 1024);
}
coll.drop();
// Stats do not reset on v7.0 when a collection drops. Thus, many checks are path-dependent
// without a reboot.
}
runTest(/*isOrderedWrite=*/ true);
runTest(/*isOrderedWrite=*/ false);
rst.stopSet();

View File

@ -67,7 +67,7 @@ function tests(conn, isStandalone) {
assert.commandWorked(testDb.c.insert({_id: "X".repeat(r)}));
}
if (idLen >= minIdErrorLen) {
assert.commandFailedWithCode(res, ErrorCodes.BadValue);
assert.commandFailedWithCode(res, ErrorCodes.BSONObjectTooLarge);
} else if (isStandalone) {
assert.commandWorked(res);
} else {

View File

@ -20,7 +20,8 @@ const bsonMaxInternalSize = bsonMaxUserSize + (16 * 1024);
// Trying to insert an object that is the maximum size will fail.
let obj = {x: 'x'.repeat(bsonMaxUserSize)};
assert.commandFailedWithCode(coll.insert(obj), ErrorCodes.BadValue, "object to insert too large");
assert.commandFailedWithCode(
coll.insert(obj), ErrorCodes.BSONObjectTooLarge, "object to insert too large");
// The string value in the field is a number of bytes smaller than the max, to account for other
// data in the BSON object. This value below will create an object very close to the maximum user
@ -66,7 +67,7 @@ conn = rst.start(0, {
});
oplog = conn.getDB("local").getCollection('oplog.rs');
assert.commandFailedWithCode(
oplog.insert(lastOplogEntry), ErrorCodes.BadValue, "object to insert too large");
oplog.insert(lastOplogEntry), ErrorCodes.BSONObjectTooLarge, "object to insert too large");
rst.stop(0, undefined /* signal */, undefined /* opts */, {forRestart: true});
// Restart as standalone with the 'allowDocumentsGreaterThanMaxUserSize' server parameter enabled to

View File

@ -85,7 +85,7 @@ const testWriteOplogDocumentKey = ({sharded, inTransaction}) => {
performWrites(function largeInsert(coll) {
const largeDoc = {_id: 'x'.repeat(16 * 1024 * 1024), a: 0};
assert.commandFailedWithCode(coll.insert(largeDoc), ErrorCodes.BadValue);
assert.commandFailedWithCode(coll.insert(largeDoc), ErrorCodes.BSONObjectTooLarge);
});
};

View File

@ -96,7 +96,7 @@ StatusWith<BSONObj> fixDocumentForInsert(OperationContext* opCtx,
// already been validated for size on the source cluster, and were successfully inserted
// into the source oplog.
if (doc.objsize() > BSONObjMaxUserSize && !gAllowDocumentsGreaterThanMaxUserSize)
return StatusWith<BSONObj>(ErrorCodes::BadValue,
return StatusWith<BSONObj>(ErrorCodes::BSONObjectTooLarge,
str::stream() << "object to insert too large"
<< ". size in bytes: " << doc.objsize()
<< ", max size: " << BSONObjMaxUserSize);

View File

@ -1780,9 +1780,25 @@ Status performAtomicTimeseriesWrites(
for (auto& op : insertOps) {
invariant(op.getDocuments().size() == 1);
auto doc = op.getDocuments().front();
// Since this bypasses the usual write path, size validation is needed.
if (MONGO_unlikely(doc.objsize() > BSONObjMaxUserSize)) {
LOGV2_WARNING(10856504,
"Ordered time-series bucket insert is too large.",
"bucketSize"_attr = doc.objsize(),
"ns"_attr = ns);
timeseries::bucket_catalog::markBucketInsertTooLarge(
timeseries::bucket_catalog::BucketCatalog::get(opCtx),
ns.getTimeseriesViewNamespace());
return {ErrorCodes::BSONObjectTooLarge,
"Ordered time-series bucket insert is too large"};
}
inserts.emplace_back(op.getStmtIds() ? *op.getStmtIds()
: std::vector<StmtId>{kUninitializedStmtId},
op.getDocuments().front(),
doc,
slot ? *(*slot)++ : OplogSlot{});
}
@ -1838,6 +1854,25 @@ Status performAtomicTimeseriesWrites(
invariant(false, "Unexpected update type");
}
// Since this bypasses the usual write path, size validation is needed.
if (MONGO_unlikely(updated.objsize() > BSONObjMaxUserSize)) {
invariant(false);
// This block isn't expected to be hit on v7.0 because the object
// would have failed BSON construction via exception earlier in the write
// path. Keeping this here for completeness.
LOGV2_WARNING(
10856505,
"Ordered time-series bucket update is too large. Will internally retry write on "
"a new bucket.",
"bucketSize"_attr = updated.objsize());
timeseries::bucket_catalog::markBucketUpdateTooLarge(
timeseries::bucket_catalog::BucketCatalog::get(opCtx),
ns.getTimeseriesViewNamespace());
return {ErrorCodes::BSONObjectTooLarge,
"Ordered time-series bucket update is too large"};
}
if (slot) {
args.oplogSlots = {**slot};
fassert(5481600,
@ -2422,26 +2457,55 @@ bool commitTimeseriesBucket(OperationContext* opCtx,
const auto docId = batch->bucketId.oid;
const bool performInsert = batch->numPreviouslyCommittedMeasurements == 0;
if (performInsert) {
const auto output =
performTimeseriesInsert(opCtx, batch, metadata, std::move(stmtIds), request);
if (auto error = write_ops_exec::generateError(
opCtx, output.result.getStatus(), start + index, errors->size())) {
bool canContinue = output.canContinue;
// Automatically attempts to retry on DuplicateKey error.
if (error->getStatus().code() == ErrorCodes::DuplicateKey &&
retryAttemptsForDup[index]++ < gTimeseriesInsertMaxRetriesOnDuplicates.load()) {
docsToRetry->push_back(index);
canContinue = true;
} else {
errors->emplace_back(std::move(*error));
try {
const auto output =
performTimeseriesInsert(opCtx, batch, metadata, std::move(stmtIds), request);
if (auto error = write_ops_exec::generateError(
opCtx, output.result.getStatus(), start + index, errors->size())) {
bool canContinue = output.canContinue;
// Automatically attempts to retry on DuplicateKey error.
if (error->getStatus().code() == ErrorCodes::DuplicateKey &&
retryAttemptsForDup[index]++ < gTimeseriesInsertMaxRetriesOnDuplicates.load()) {
docsToRetry->push_back(index);
canContinue = true;
} else {
if (output.result.getStatus() == ErrorCodes::BSONObjectTooLarge) {
LOGV2_WARNING(10856506,
"Unordered time-series bucket insert is too large.",
"statusMsg"_attr = output.result.getStatus().reason(),
"canContinue"_attr = canContinue,
"ns"_attr = request.getNamespace(),
"batchNs"_attr = batch->bucketId.ns);
timeseries::bucket_catalog::markBucketInsertTooLarge(bucketCatalog,
batch->bucketId.ns);
}
errors->emplace_back(std::move(*error));
}
abort(bucketCatalog, batch, output.result.getStatus());
return canContinue;
}
abort(bucketCatalog, batch, output.result.getStatus());
return canContinue;
}
invariant(output.result.getValue().getN() == 1,
str::stream() << "Expected 1 insertion of document with _id '" << docId
<< "', but found " << output.result.getValue().getN() << ".");
invariant(output.result.getValue().getN() == 1,
str::stream() << "Expected 1 insertion of document with _id '" << docId
<< "', but found " << output.result.getValue().getN() << ".");
} catch (const DBException& ex) {
if (ex.toStatus() == ErrorCodes::BSONObjectTooLarge) {
LOGV2_WARNING(10856502,
"Unordered time-series bucket insert is too large.",
"statusMsg"_attr = ex.toStatus().reason(),
"ns"_attr = batch->bucketId.ns);
auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx);
timeseries::bucket_catalog::markBucketInsertTooLarge(bucketCatalog,
batch->bucketId.ns);
auto error = write_ops_exec::generateError(
opCtx, ex.toStatus(), start + index, errors->size());
errors->emplace_back(std::move(*error));
abort(bucketCatalog, batch, ex.toStatus());
return false;
}
throw;
}
} else {
auto op = batch->decompressed.has_value()
? makeTimeseriesDecompressAndUpdateOp(
@ -2461,6 +2525,18 @@ bool commitTimeseriesBucket(OperationContext* opCtx,
return true;
} else if (auto error = write_ops_exec::generateError(
opCtx, output.result.getStatus(), start + index, errors->size())) {
if (output.result.getStatus() == ErrorCodes::BSONObjectTooLarge) {
invariant(false);
LOGV2_WARNING(10856507,
"Unordered time-series bucket update is too large.",
"statusMsg"_attr = output.result.getStatus().reason());
timeseries::bucket_catalog::markBucketUpdateTooLarge(bucketCatalog,
request.getNamespace());
errors->emplace_back(std::move(*error));
abort(bucketCatalog, batch, output.result.getStatus());
return false;
}
errors->emplace_back(std::move(*error));
abort(bucketCatalog, batch, output.result.getStatus());
return output.canContinue;
@ -2484,13 +2560,13 @@ bool commitTimeseriesBucket(OperationContext* opCtx,
}
// Returns true if commit was successful, false otherwise. May also throw.
bool commitTimeseriesBucketsAtomically(OperationContext* opCtx,
TimeseriesBatches* batches,
TimeseriesStmtIds&& stmtIds,
std::vector<write_ops::WriteError>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
const write_ops::InsertCommandRequest& request) {
Status commitTimeseriesBucketsAtomically(OperationContext* opCtx,
TimeseriesBatches* batches,
TimeseriesStmtIds&& stmtIds,
std::vector<write_ops::WriteError>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
const write_ops::InsertCommandRequest& request) {
auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx);
std::vector<std::reference_wrapper<std::shared_ptr<timeseries::bucket_catalog::WriteBatch>>>
@ -2503,7 +2579,7 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx,
}
if (batchesToCommit.empty()) {
return true;
return Status::OK();
}
// Sort by bucket so that preparing the commit for each batch cannot deadlock.
@ -2529,19 +2605,58 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx,
auto prepareCommitStatus = prepareCommit(bucketCatalog, batch);
if (!prepareCommitStatus.isOK()) {
abortStatus = prepareCommitStatus;
return false;
return prepareCommitStatus;
}
if (batch.get()->numPreviouslyCommittedMeasurements == 0) {
insertOps.push_back(makeTimeseriesInsertOp(
batch, metadata, std::move(stmtIds[batch.get().get()]), request));
try {
insertOps.push_back(makeTimeseriesInsertOp(
batch, metadata, std::move(stmtIds[batch.get().get()]), request));
} catch (const DBException& ex) {
if (ex.toStatus() == ErrorCodes::BSONObjectTooLarge) {
LOGV2_WARNING(10856500,
"Ordered time-series bucket insert is too large.",
"statusMsg"_attr = ex.toStatus().reason(),
"ns"_attr = batch.get()->bucketId.ns);
auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx);
timeseries::bucket_catalog::markBucketInsertTooLarge(
bucketCatalog, batch.get()->bucketId.ns);
abortStatus = ex.toStatus();
return ex.toStatus();
}
throw;
}
} else {
if (batch.get()->decompressed.has_value()) {
updateOps.push_back(makeTimeseriesDecompressAndUpdateOp(
opCtx, batch, metadata, std::move(stmtIds[batch.get().get()]), request));
} else {
updateOps.push_back(makeTimeseriesUpdateOp(
opCtx, batch, metadata, std::move(stmtIds[batch.get().get()]), request));
try {
if (batch.get()->decompressed.has_value()) {
updateOps.push_back(makeTimeseriesDecompressAndUpdateOp(
opCtx,
batch,
metadata,
std::move(stmtIds[batch.get().get()]),
request));
} else {
updateOps.push_back(
makeTimeseriesUpdateOp(opCtx,
batch,
metadata,
std::move(stmtIds[batch.get().get()]),
request));
}
} catch (const DBException& ex) {
if (ex.toStatus() == ErrorCodes::BSONObjectTooLarge) {
LOGV2_WARNING(10856501,
"Ordered time-series bucket update is too large.",
"statusMsg"_attr = ex.toStatus().reason(),
"compressed"_attr = batch.get()->decompressed.has_value(),
"ns"_attr = batch.get()->bucketId.ns);
auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx);
timeseries::bucket_catalog::markBucketUpdateTooLarge(
bucketCatalog, batch.get()->bucketId.ns);
abortStatus = ex.toStatus();
return ex.toStatus();
}
throw;
}
}
}
@ -2554,7 +2669,7 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx,
timeseries::bucket_catalog::resetBucketOIDCounter();
}
abortStatus = result;
return false;
return result;
}
getOpTimeAndElectionId(opCtx, opTime, electionId);
@ -2579,7 +2694,7 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx,
}
batchGuard.dismiss();
return true;
return Status::OK();
}
// For sharded time-series collections, we need to use the granularity from the config
@ -2759,6 +2874,7 @@ std::tuple<TimeseriesBatches, TimeseriesStmtIds, size_t /* numInserted */> inser
size_t start,
size_t numDocs,
const std::vector<size_t>& indices,
timeseries::BucketReopeningPermittance allowQueryBasedReopening,
std::vector<write_ops::WriteError>* errors,
bool* containsRetry,
const write_ops::InsertCommandRequest& request) {
@ -2837,6 +2953,10 @@ std::tuple<TimeseriesBatches, TimeseriesStmtIds, size_t /* numInserted */> inser
StatusWith<timeseries::bucket_catalog::InsertResult> swResult =
Status{ErrorCodes::BadValue, "Uninitialized InsertResult"};
do {
timeseries::bucket_catalog::AllowQueryBasedReopening reopening =
allowQueryBasedReopening == timeseries::BucketReopeningPermittance::kAllowed
? timeseries::bucket_catalog::AllowQueryBasedReopening::kAllow
: timeseries::bucket_catalog::AllowQueryBasedReopening::kDisallow;
if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
swResult = timeseries::bucket_catalog::tryInsert(
@ -2846,7 +2966,8 @@ std::tuple<TimeseriesBatches, TimeseriesStmtIds, size_t /* numInserted */> inser
bucketsColl->getDefaultCollator(),
timeSeriesOptions,
measurementDoc,
canCombineTimeseriesInsertWithOtherClients(opCtx, request));
canCombineTimeseriesInsertWithOtherClients(opCtx, request),
reopening);
if (swResult.isOK()) {
auto& insertResult = swResult.getValue();
@ -2914,6 +3035,7 @@ std::tuple<TimeseriesBatches, TimeseriesStmtIds, size_t /* numInserted */> inser
timeSeriesOptions,
measurementDoc,
canCombineTimeseriesInsertWithOtherClients(opCtx, request),
reopening,
reopeningContext);
} else if (auto* waiter =
stdx::get_if<timeseries::bucket_catalog::InsertWaiter>(
@ -2935,7 +3057,8 @@ std::tuple<TimeseriesBatches, TimeseriesStmtIds, size_t /* numInserted */> inser
bucketsColl->getDefaultCollator(),
timeSeriesOptions,
measurementDoc,
canCombineTimeseriesInsertWithOtherClients(opCtx, request));
canCombineTimeseriesInsertWithOtherClients(opCtx, request),
reopening);
}
// If there is an era offset (between the bucket we want to reopen and the
@ -3002,25 +3125,33 @@ std::tuple<TimeseriesBatches, TimeseriesStmtIds, size_t /* numInserted */> inser
return {std::move(batches), std::move(stmtIds), request.getDocuments().size()};
}
bool performOrderedTimeseriesWritesAtomically(OperationContext* opCtx,
std::vector<write_ops::WriteError>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry,
const write_ops::InsertCommandRequest& request) {
auto [batches, stmtIds, numInserted] = insertIntoBucketCatalog(
opCtx, 0, request.getDocuments().size(), {}, errors, containsRetry, request);
Status performOrderedTimeseriesWritesAtomically(OperationContext* opCtx,
std::vector<write_ops::WriteError>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry,
const write_ops::InsertCommandRequest& request) {
auto [batches, stmtIds, numInserted] =
insertIntoBucketCatalog(opCtx,
0,
request.getDocuments().size(),
{},
timeseries::BucketReopeningPermittance::kAllowed,
errors,
containsRetry,
request);
hangTimeseriesInsertBeforeCommit.pauseWhileSet();
if (!commitTimeseriesBucketsAtomically(
opCtx, &batches, std::move(stmtIds), errors, opTime, electionId, request)) {
return false;
auto commitResult = commitTimeseriesBucketsAtomically(
opCtx, &batches, std::move(stmtIds), errors, opTime, electionId, request);
if (!commitResult.isOK()) {
return commitResult;
}
getTimeseriesBatchResults(opCtx, batches, 0, batches.size(), true, errors, opTime, electionId);
return true;
return Status::OK();
}
/**
@ -3036,14 +3167,15 @@ std::vector<size_t> performUnorderedTimeseriesWrites(
size_t start,
size_t numDocs,
const std::vector<size_t>& indices,
const timeseries::BucketReopeningPermittance bucketReopening,
std::vector<write_ops::WriteError>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry,
const write_ops::InsertCommandRequest& request,
absl::flat_hash_map<int, int>& retryAttemptsForDup) {
auto [batches, bucketStmtIds, _] =
insertIntoBucketCatalog(opCtx, start, numDocs, indices, errors, containsRetry, request);
auto [batches, bucketStmtIds, _] = insertIntoBucketCatalog(
opCtx, start, numDocs, indices, bucketReopening, errors, containsRetry, request);
hangTimeseriesInsertBeforeCommit.pauseWhileSet();
@ -3112,14 +3244,16 @@ std::vector<size_t> performUnorderedTimeseriesWrites(
return docsToRetry;
}
void performUnorderedTimeseriesWritesWithRetries(OperationContext* opCtx,
size_t start,
size_t numDocs,
std::vector<write_ops::WriteError>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry,
const write_ops::InsertCommandRequest& request) {
void performUnorderedTimeseriesWritesWithRetries(
OperationContext* opCtx,
size_t start,
size_t numDocs,
timeseries::BucketReopeningPermittance bucketReopening,
std::vector<write_ops::WriteError>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry,
const write_ops::InsertCommandRequest& request) {
std::vector<size_t> docsToRetry;
absl::flat_hash_map<int, int> retryAttemptsForDup;
do {
@ -3127,6 +3261,7 @@ void performUnorderedTimeseriesWritesWithRetries(OperationContext* opCtx,
start,
numDocs,
docsToRetry,
bucketReopening,
errors,
opTime,
electionId,
@ -3148,17 +3283,22 @@ size_t performOrderedTimeseriesWrites(OperationContext* opCtx,
boost::optional<OID>* electionId,
bool* containsRetry,
const write_ops::InsertCommandRequest& request) {
if (performOrderedTimeseriesWritesAtomically(
opCtx, errors, opTime, electionId, containsRetry, request)) {
auto result = performOrderedTimeseriesWritesAtomically(
opCtx, errors, opTime, electionId, containsRetry, request);
if (result.isOK()) {
return request.getDocuments().size();
}
// The atomic commit failed and might have populated 'errors'. To retry inserting each
// measurement one by one, first clear 'errors' so the retry starts with a clean state.
errors->clear();
timeseries::BucketReopeningPermittance bucketReopening =
result.code() == ErrorCodes::BSONObjectTooLarge
? timeseries::BucketReopeningPermittance::kDisallowed
: timeseries::BucketReopeningPermittance::kAllowed;
for (size_t i = 0; i < request.getDocuments().size(); ++i) {
performUnorderedTimeseriesWritesWithRetries(
opCtx, i, 1, errors, opTime, electionId, containsRetry, request);
opCtx, i, 1, bucketReopening, errors, opTime, electionId, containsRetry, request);
if (!errors->empty()) {
return i;
}
@ -3217,14 +3357,16 @@ write_ops::InsertCommandReply performTimeseriesWrites(
baseReply.setN(performOrderedTimeseriesWrites(
opCtx, &errors, &opTime, &electionId, &containsRetry, request));
} else {
performUnorderedTimeseriesWritesWithRetries(opCtx,
0,
request.getDocuments().size(),
&errors,
&opTime,
&electionId,
&containsRetry,
request);
performUnorderedTimeseriesWritesWithRetries(
opCtx,
0,
request.getDocuments().size(),
timeseries::BucketReopeningPermittance::kAllowed,
&errors,
&opTime,
&electionId,
&containsRetry,
request);
baseReply.setN(request.getDocuments().size() - errors.size());
}

View File

@ -328,7 +328,7 @@ TEST_F(WriteOpsRetryability, PerformOrderedInsertsStopsAtBadDoc) {
ASSERT_EQ(2, result.results.size());
ASSERT_TRUE(result.results[0].isOK());
ASSERT_FALSE(result.results[1].isOK());
ASSERT_EQ(ErrorCodes::BadValue, result.results[1].getStatus());
ASSERT_EQ(ErrorCodes::BSONObjectTooLarge, result.results[1].getStatus());
}
TEST_F(WriteOpsRetryability, PerformUnorderedInsertsContinuesAtBadDoc) {
@ -362,7 +362,7 @@ TEST_F(WriteOpsRetryability, PerformUnorderedInsertsContinuesAtBadDoc) {
ASSERT_TRUE(result.results[0].isOK());
ASSERT_FALSE(result.results[1].isOK());
ASSERT_TRUE(result.results[2].isOK());
ASSERT_EQ(ErrorCodes::BadValue, result.results[1].getStatus());
ASSERT_EQ(ErrorCodes::BSONObjectTooLarge, result.results[1].getStatus());
}
using FindAndModifyRetryability = MockReplCoordServerFixture;

View File

@ -28,6 +28,7 @@
*/
#include "mongo/db/timeseries/bucket_catalog/bucket.h"
#include "mongo/db/timeseries/timeseries_constants.h"
namespace mongo::timeseries::bucket_catalog {
@ -84,7 +85,11 @@ void calculateBucketFieldsAndSizeChange(const Bucket& bucket,
for (const auto& elem : doc) {
auto fieldName = elem.fieldNameStringData();
if (fieldName == metaField) {
// Ignore the metadata field since it will not be inserted.
// Only account for the meta field size once, on bucket insert, since it is stored
// uncompressed at the top-level of the bucket.
if (bucket.size == 0) {
sizeToBeAdded += kBucketMetaFieldName.size() + elem.size() - elem.fieldNameSize();
}
continue;
}

View File

@ -150,9 +150,17 @@ StatusWith<InsertResult> tryInsert(OperationContext* opCtx,
const StringData::ComparatorInterface* comparator,
const TimeseriesOptions& options,
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine) {
return internal::insert(
opCtx, catalog, ns, comparator, options, doc, combine, internal::AllowBucketCreation::kNo);
CombineWithInsertsFromOtherClients combine,
const AllowQueryBasedReopening reopening) {
return internal::insert(opCtx,
catalog,
ns,
comparator,
options,
doc,
combine,
internal::AllowBucketCreation::kNo,
reopening);
}
StatusWith<InsertResult> insert(OperationContext* opCtx,
@ -162,6 +170,7 @@ StatusWith<InsertResult> insert(OperationContext* opCtx,
const TimeseriesOptions& options,
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine,
const AllowQueryBasedReopening reopening,
ReopeningContext* reopeningContext) {
return internal::insert(opCtx,
catalog,
@ -171,6 +180,7 @@ StatusWith<InsertResult> insert(OperationContext* opCtx,
doc,
combine,
internal::AllowBucketCreation::kYes,
reopening,
reopeningContext);
}
@ -381,6 +391,14 @@ void clear(BucketCatalog& catalog, StringData dbName) {
});
}
void markBucketInsertTooLarge(BucketCatalog& catalog, const NamespaceString& ns) {
internal::getOrInitializeExecutionStats(catalog, ns).incNumBucketDocumentsTooLargeInsert();
}
void markBucketUpdateTooLarge(BucketCatalog& catalog, const NamespaceString& ns) {
internal::getOrInitializeExecutionStats(catalog, ns).incNumBucketDocumentsTooLargeUpdate();
}
BucketId extractBucketId(BucketCatalog& bucketCatalog,
const TimeseriesOptions& options,
const StringData::ComparatorInterface* comparator,

View File

@ -59,6 +59,12 @@ namespace mongo::timeseries::bucket_catalog {
using StripeNumber = std::uint8_t;
using ShouldClearFn = std::function<bool(const NamespaceString&)>;
/**
* Mode enum to control whether getReopeningCandidate() will allow query-based
* reopening of buckets when attempting to accommodate a new measurement.
*/
enum class AllowQueryBasedReopening { kAllow, kDisallow };
/**
* Whether to allow inserts to be batched together with those from other clients.
*/
@ -213,7 +219,8 @@ StatusWith<InsertResult> tryInsert(OperationContext* opCtx,
const StringData::ComparatorInterface* comparator,
const TimeseriesOptions& options,
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine);
CombineWithInsertsFromOtherClients combine,
AllowQueryBasedReopening allowQueryBasedReopening);
/**
* Returns the WriteBatch into which the document was inserted and a list of any buckets that were
@ -231,6 +238,7 @@ StatusWith<InsertResult> insert(OperationContext* opCtx,
const TimeseriesOptions& options,
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine,
AllowQueryBasedReopening allowQueryBasedReopening,
ReopeningContext* reopeningContext = nullptr);
/**
@ -300,6 +308,20 @@ void clear(BucketCatalog& catalog, const NamespaceString& ns);
*/
void clear(BucketCatalog& catalog, StringData dbName);
/**
* Increments an FTDC counter.
* Denotes an event where a generated time-series bucket document for insert exceeded the BSON
* size limit.
*/
void markBucketInsertTooLarge(BucketCatalog& catalog, const NamespaceString& ns);
/**
* Increments an FTDC counter.
* Denotes an event where a generated time-series bucket document for update exceeded the BSON
* size limit.
*/
void markBucketUpdateTooLarge(BucketCatalog& catalog, const NamespaceString& ns);
/**
* Extracts the BucketId from a bucket document.
*/

View File

@ -651,6 +651,7 @@ StatusWith<InsertResult> insert(OperationContext* opCtx,
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine,
AllowBucketCreation mode,
AllowQueryBasedReopening allowQueryBasedReopening,
ReopeningContext* reopeningContext) {
invariant(!ns.isTimeseriesBucketsCollection());
@ -662,7 +663,7 @@ StatusWith<InsertResult> insert(OperationContext* opCtx,
auto time = res.getValue().second;
ExecutionStatsController stats = getOrInitializeExecutionStats(catalog, ns);
if (reopeningContext) {
if (reopeningContext && allowQueryBasedReopening == AllowQueryBasedReopening::kAllow) {
updateBucketFetchAndQueryStats(*reopeningContext, stats);
}
@ -752,7 +753,7 @@ StatusWith<InsertResult> insert(OperationContext* opCtx,
if (!bucket) {
invariant(mode == AllowBucketCreation::kNo);
return getReopeningContext(
opCtx, catalog, stripe, stripeLock, info, catalogEra, AllowQueryBasedReopening::kAllow);
opCtx, catalog, stripe, stripeLock, info, catalogEra, allowQueryBasedReopening);
}
auto insertionResult = insertIntoBucket(
@ -797,7 +798,8 @@ StatusWith<InsertResult> insert(OperationContext* opCtx,
stripeLock,
info,
catalogEra,
(*reason == RolloverReason::kTimeBackward)
((allowQueryBasedReopening == AllowQueryBasedReopening::kAllow) &&
(*reason == RolloverReason::kTimeBackward))
? AllowQueryBasedReopening::kAllow
: AllowQueryBasedReopening::kDisallow);
}

View File

@ -74,12 +74,6 @@ enum class IgnoreBucketState { kYes, kNo };
*/
enum class BucketPrepareAction { kPrepare, kUnprepare };
/**
* Mode enum to control whether getReopeningCandidate() will allow query-based
* reopening of buckets when attempting to accommodate a new measurement.
*/
enum class AllowQueryBasedReopening { kAllow, kDisallow };
/**
* Maps bucket identifier to the stripe that is responsible for it.
*/
@ -208,6 +202,7 @@ StatusWith<InsertResult> insert(OperationContext* opCtx,
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine,
AllowBucketCreation mode,
AllowQueryBasedReopening allowQueryBasedReopening,
ReopeningContext* reopeningContext = nullptr);
/**

View File

@ -185,7 +185,8 @@ void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns,
_getCollator(ns),
_getTimeseriesOptions(ns),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto& batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
_commit(batch, numPreviouslyCommittedMeasurements);
}
@ -217,7 +218,8 @@ void BucketCatalogTest::_testMeasurementSchema(
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
timestampedDoc.obj(),
CombineWithInsertsFromOtherClients::kAllow)
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow)
.isOK());
auto post = _getExecutionStat(_ns1, kNumSchemaChanges);
@ -301,7 +303,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch1 = stdx::get<SuccessfulInsertion>(result1.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch1));
@ -313,7 +316,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
ASSERT_EQ(batch1, batch2);
ASSERT(!claimWriteBatchCommitRights(*batch2));
@ -345,7 +349,8 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch));
auto bucketId = batch->bucketId;
@ -360,21 +365,24 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << _metaField << "123"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto result2 = insert(_opCtx,
*_bucketCatalog,
_ns1,
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << _metaField << BSONObj()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto result3 = insert(_opCtx,
*_bucketCatalog,
_ns2,
_getCollator(_ns2),
_getTimeseriesOptions(_ns2),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
// Inserts should all be into three distinct buckets (and therefore batches).
ASSERT_NE(stdx::get<SuccessfulInsertion>(result1.getValue()).batch,
@ -414,7 +422,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << _metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto result2 = insert(
_opCtx,
*_bucketCatalog,
@ -422,7 +431,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << _metaField << BSON_ARRAY(BSON("b" << 1 << "a" << 0))),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_EQ(stdx::get<SuccessfulInsertion>(result1.getValue()).batch,
stdx::get<SuccessfulInsertion>(result2.getValue()).batch);
@ -448,7 +458,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) {
BSON(_timeField << Date_t::now() << _metaField
<< BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1)
<< BSON("f" << 1 << "g" << 0))))),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto result2 =
insert(_opCtx,
*_bucketCatalog,
@ -458,7 +469,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) {
BSON(_timeField << Date_t::now() << _metaField
<< BSONObj(BSON("c" << BSON_ARRAY(BSON("b" << 1 << "a" << 0)
<< BSON("g" << 0 << "f" << 1))))),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_EQ(stdx::get<SuccessfulInsertion>(result1.getValue()).batch,
stdx::get<SuccessfulInsertion>(result2.getValue()).batch);
@ -488,7 +500,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) {
<< BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1)
<< BSON_ARRAY("123"
<< "456"))))),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto result2 =
insert(_opCtx,
*_bucketCatalog,
@ -499,7 +512,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) {
<< BSONObj(BSON("c" << BSON_ARRAY(BSON("b" << 1 << "a" << 0)
<< BSON_ARRAY("123"
<< "456"))))),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_EQ(stdx::get<SuccessfulInsertion>(result1.getValue()).batch,
stdx::get<SuccessfulInsertion>(result2.getValue()).batch);
@ -526,14 +540,16 @@ TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << _metaField << BSONNULL),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto result2 = insert(_opCtx,
*_bucketCatalog,
_ns1,
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
// Inserts should all be into three distinct buckets (and therefore batches).
ASSERT_NE(stdx::get<SuccessfulInsertion>(result1.getValue()).batch,
@ -592,7 +608,8 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch1 = stdx::get<SuccessfulInsertion>(result1.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch1));
ASSERT_OK(prepareCommit(*_bucketCatalog, batch1));
@ -606,7 +623,8 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
ASSERT_NE(batch1, batch2);
@ -624,7 +642,8 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto& batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT_OK(prepareCommit(*_bucketCatalog, batch));
@ -640,7 +659,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto& batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT_BSONOBJ_EQ(BSONObj(), getMetadata(*_bucketCatalog, batch->bucketId));
@ -655,7 +675,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << 0),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT(result.isOK());
auto batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
auto oldId = batch->bucketId;
@ -672,7 +693,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << 1),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT(result.isOK());
batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
_commit(batch, 1);
@ -685,7 +707,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT(result.isOK());
batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
_commit(batch, 2);
@ -700,7 +723,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << i),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT(result.isOK());
batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
_commit(batch, i);
@ -715,7 +739,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto& batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
ASSERT_NE(oldId, batch2->bucketId);
_commit(batch2, 0);
@ -731,7 +756,8 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch1 = stdx::get<SuccessfulInsertion>(result1.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch1));
ASSERT_OK(prepareCommit(*_bucketCatalog, batch1));
@ -745,7 +771,8 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
ASSERT_NE(batch1, batch2);
@ -766,7 +793,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch));
@ -782,7 +810,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(prepareCommit(*_bucketCatalog, batch));
@ -809,7 +838,8 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(prepareCommit(*_bucketCatalog, batch));
@ -831,7 +861,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch1 = stdx::get<SuccessfulInsertion>(result1.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch1));
ASSERT_OK(prepareCommit(*_bucketCatalog, batch1));
@ -845,7 +876,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
ASSERT_NE(batch1, batch2);
ASSERT_EQ(batch1->bucketId, batch2->bucketId);
@ -870,7 +902,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch3 = stdx::get<SuccessfulInsertion>(result3.getValue()).batch;
ASSERT_NE(batch1, batch3);
ASSERT_NE(batch2, batch3);
@ -892,7 +925,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch));
@ -912,7 +946,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch1 = stdx::get<SuccessfulInsertion>(result1.getValue()).batch;
auto result2 = insert(_makeOperationContext().second.get(),
@ -921,7 +956,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
auto result3 = insert(_makeOperationContext().second.get(),
@ -930,7 +966,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch3 = stdx::get<SuccessfulInsertion>(result3.getValue()).batch;
auto result4 = insert(_makeOperationContext().second.get(),
@ -939,7 +976,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
auto batch4 = stdx::get<SuccessfulInsertion>(result4.getValue()).batch;
ASSERT_NE(batch1, batch2);
@ -959,7 +997,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch1 = stdx::get<SuccessfulInsertion>(result1.getValue()).batch;
auto result2 = insert(_makeOperationContext().second.get(),
@ -968,7 +1007,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch1));
@ -999,7 +1039,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch1 = stdx::get<SuccessfulInsertion>(result1.getValue()).batch;
auto result2 = insert(_makeOperationContext().second.get(),
@ -1008,7 +1049,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
auto result3 = insert(_makeOperationContext().second.get(),
@ -1017,7 +1059,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch3 = stdx::get<SuccessfulInsertion>(result3.getValue()).batch;
ASSERT_EQ(batch1->bucketId, batch2->bucketId);
@ -1057,7 +1100,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch4 = stdx::get<SuccessfulInsertion>(result4.getValue()).batch;
ASSERT_NE(batch2->bucketId, batch4->bucketId);
}
@ -1069,7 +1113,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch1 = stdx::get<SuccessfulInsertion>(result1.getValue()).batch;
auto result2 = insert(_makeOperationContext().second.get(),
@ -1078,7 +1123,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
// Batch 1 and 2 use the same bucket.
@ -1102,7 +1148,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch3 = stdx::get<SuccessfulInsertion>(result3.getValue()).batch;
ASSERT_NE(batch1->bucketId, batch3->bucketId);
}
@ -1114,7 +1161,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch1 = stdx::get<SuccessfulInsertion>(result1.getValue()).batch;
auto result2 = insert(_makeOperationContext().second.get(),
@ -1123,7 +1171,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
CombineWithInsertsFromOtherClients::kDisallow);
CombineWithInsertsFromOtherClients::kDisallow,
AllowQueryBasedReopening::kAllow);
auto batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
// Batch 2 is the first batch to commit the time field.
@ -1348,7 +1397,8 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"},
"a":-100,"b":100})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
// No buckets are closed.
ASSERT(stdx::get<SuccessfulInsertion>(result.getValue()).closedBuckets.empty());
@ -1398,7 +1448,8 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"},"tag":42,
"a":-100,"b":100})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
// No buckets are closed.
ASSERT(stdx::get<SuccessfulInsertion>(result.getValue()).closedBuckets.empty());
@ -1454,7 +1505,8 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"},
"a":{},"b":{}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
// The reopened bucket gets closed as the schema is incompatible.
ASSERT_EQ(1, stdx::get<SuccessfulInsertion>(result.getValue()).closedBuckets.size());
@ -1508,7 +1560,8 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement)
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"},
"a":-100,"b":100})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
// No buckets are closed.
ASSERT(stdx::get<SuccessfulInsertion>(result.getValue()).closedBuckets.empty());
@ -1568,7 +1621,8 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"},
"a":{},"b":{}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
// The reopened bucket gets closed as the schema is incompatible.
ASSERT_EQ(1, stdx::get<SuccessfulInsertion>(result.getValue()).closedBuckets.size());
@ -1601,7 +1655,8 @@ TEST_F(BucketCatalogTest, ArchivingUnderMemoryPressure) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << _metaField << meta++),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
auto batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(claimWriteBatchCommitRights(*batch));
@ -1677,7 +1732,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
ASSERT(stdx::holds_alternative<ReopeningContext>(result.getValue()));
ASSERT_TRUE(stdx::holds_alternative<std::vector<BSONObj>>(
@ -1694,7 +1750,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
auto batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(batch);
@ -1714,7 +1771,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
ASSERT(stdx::holds_alternative<ReopeningContext>(result.getValue()));
ASSERT_TRUE(stdx::holds_alternative<std::vector<BSONObj>>(
@ -1730,7 +1788,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
ASSERT(stdx::holds_alternative<ReopeningContext>(result.getValue()));
ASSERT_TRUE(stdx::holds_alternative<std::monostate>(
@ -1747,7 +1806,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}, "tag": "foo"})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold));
ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold));
@ -1770,7 +1830,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
ASSERT(stdx::holds_alternative<ReopeningContext>(result.getValue()));
ASSERT_TRUE(
@ -1793,7 +1854,8 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket)
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a": true})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
auto batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(batch);
@ -1812,7 +1874,8 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket)
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}, "a": {}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(batch);
@ -1836,7 +1899,8 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
auto batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(batch);
@ -1872,6 +1936,7 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"),
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow,
&reopeningContext);
ASSERT_OK(result.getStatus());
ASSERT_TRUE(stdx::holds_alternative<SuccessfulInsertion>(result.getValue()));
@ -1887,15 +1952,16 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened));
ASSERT_FALSE(stdx::get<SuccessfulInsertion>(result.getValue()).closedBuckets.empty());
// Verify that if we try another insert for the soft-closed bucket, we get a query-based
// reopening candidate.
// Verify that if we try another insert for the soft-closed bucket, we get a
// query-based reopening candidate.
result = tryInsert(_opCtx,
*_bucketCatalog,
_ns1,
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:35:40.000Z"}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
ASSERT_TRUE(stdx::holds_alternative<ReopeningContext>(result.getValue()));
ASSERT_TRUE(stdx::holds_alternative<std::vector<BSONObj>>(
@ -1915,7 +1981,8 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result.getStatus());
auto batch = stdx::get<SuccessfulInsertion>(result.getValue()).batch;
ASSERT(batch);
@ -1958,6 +2025,7 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) {
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"),
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow,
&reopeningContext);
ASSERT_NOT_OK(result.getStatus());
ASSERT_EQ(result.getStatus().code(), ErrorCodes::WriteConflict);
@ -1975,7 +2043,8 @@ TEST_F(BucketCatalogTest, QueryBasedReopeningConflictsWithQueryBasedReopening) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"},"tag":"a"})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result1.getStatus());
auto* context = stdx::get_if<ReopeningContext>(&result1.getValue());
ASSERT(context);
@ -1990,7 +2059,8 @@ TEST_F(BucketCatalogTest, QueryBasedReopeningConflictsWithQueryBasedReopening) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:50.000Z"},"tag":"a"})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result2.getStatus());
ASSERT(stdx::holds_alternative<InsertWaiter>(result2.getValue()));
}
@ -2006,7 +2076,8 @@ TEST_F(BucketCatalogTest, ReopeningConflictsWithPreparedBatch) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"},"tag":"b"})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result1.getStatus());
auto batch1 = stdx::get<SuccessfulInsertion>(result1.getValue()).batch;
ASSERT(batch1);
@ -2023,7 +2094,8 @@ TEST_F(BucketCatalogTest, ReopeningConflictsWithPreparedBatch) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:45.000Z"},"tag":"b"})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result2.getStatus());
auto batch2 = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
ASSERT(batch2);
@ -2038,7 +2110,8 @@ TEST_F(BucketCatalogTest, ReopeningConflictsWithPreparedBatch) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:50.000Z"},"tag":"b"})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result3.getStatus());
ASSERT(stdx::holds_alternative<InsertWaiter>(result3.getValue()));
}
@ -2055,7 +2128,8 @@ TEST_F(BucketCatalogTest, PreparingBatchConflictsWithQueryBasedReopening) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"},"tag":"c"})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result1->getStatus());
auto* context = stdx::get_if<ReopeningContext>(&result1->getValue());
ASSERT(context);
@ -2069,7 +2143,8 @@ TEST_F(BucketCatalogTest, PreparingBatchConflictsWithQueryBasedReopening) {
_getCollator(_ns1),
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-07-05T15:34:40.000Z"},"tag":"c"})"),
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result2.getStatus());
auto batch = stdx::get<SuccessfulInsertion>(result2.getValue()).batch;
ASSERT(batch);
@ -2114,7 +2189,8 @@ TEST_F(BucketCatalogTest, ArchiveBasedReopeningConflictsWithArchiveBasedReopenin
_getCollator(_ns1),
options,
doc,
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result1->getStatus());
auto* context = stdx::get_if<ReopeningContext>(&result1->getValue());
ASSERT(context);
@ -2130,7 +2206,8 @@ TEST_F(BucketCatalogTest, ArchiveBasedReopeningConflictsWithArchiveBasedReopenin
_getCollator(_ns1),
options,
doc,
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result2->getStatus());
ASSERT(stdx::holds_alternative<InsertWaiter>(result2->getValue()));
}
@ -2167,7 +2244,8 @@ TEST_F(BucketCatalogTest,
_getCollator(_ns1),
options,
doc1,
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result1->getStatus());
auto* context1 = stdx::get_if<ReopeningContext>(&result1->getValue());
ASSERT(context1);
@ -2191,7 +2269,8 @@ TEST_F(BucketCatalogTest,
_getCollator(_ns1),
options,
doc2,
CombineWithInsertsFromOtherClients::kAllow);
CombineWithInsertsFromOtherClients::kAllow,
AllowQueryBasedReopening::kAllow);
ASSERT_OK(result2->getStatus());
auto* context2 = stdx::get_if<ReopeningContext>(&result2->getValue());
ASSERT(context2);

View File

@ -158,6 +158,16 @@ void ExecutionStatsController::incNumDuplicateBucketsReopened(long long incremen
_globalStats.numDuplicateBucketsReopened.fetchAndAddRelaxed(increment);
}
void ExecutionStatsController::incNumBucketDocumentsTooLargeInsert(long long increment) {
_collectionStats->numBucketDocumentsTooLargeInsert.fetchAndAddRelaxed(increment);
_globalStats.numBucketDocumentsTooLargeInsert.fetchAndAddRelaxed(increment);
}
void ExecutionStatsController::incNumBucketDocumentsTooLargeUpdate(long long increment) {
_collectionStats->numBucketDocumentsTooLargeUpdate.fetchAndAddRelaxed(increment);
_globalStats.numBucketDocumentsTooLargeUpdate.fetchAndAddRelaxed(increment);
}
void appendExecutionStatsToBuilder(const ExecutionStats& stats, BSONObjBuilder& builder) {
builder.appendNumber("numBucketInserts", stats.numBucketInserts.load());
builder.appendNumber("numBucketUpdates", stats.numBucketUpdates.load());
@ -206,6 +216,11 @@ void appendExecutionStatsToBuilder(const ExecutionStats& stats, BSONObjBuilder&
builder.appendNumber("numDuplicateBucketsReopened",
stats.numDuplicateBucketsReopened.load());
}
builder.appendNumber("numBucketDocumentsTooLargeInsert",
stats.numBucketDocumentsTooLargeInsert.load());
builder.appendNumber("numBucketDocumentsTooLargeUpdate",
stats.numBucketDocumentsTooLargeUpdate.load());
}
} // namespace mongo::timeseries::bucket_catalog

View File

@ -62,6 +62,8 @@ struct ExecutionStats {
AtomicWord<long long> numBucketQueriesFailed;
AtomicWord<long long> numBucketReopeningsFailed;
AtomicWord<long long> numDuplicateBucketsReopened;
AtomicWord<long long> numBucketDocumentsTooLargeInsert;
AtomicWord<long long> numBucketDocumentsTooLargeUpdate;
};
class ExecutionStatsController {
@ -97,6 +99,8 @@ public:
void incNumBucketQueriesFailed(long long increment = 1);
void incNumBucketReopeningsFailed(long long increment = 1);
void incNumDuplicateBucketsReopened(long long increment = 1);
void incNumBucketDocumentsTooLargeInsert(long long increment = 1);
void incNumBucketDocumentsTooLargeUpdate(long long increment = 1);
private:
std::shared_ptr<ExecutionStats> _collectionStats;

View File

@ -55,6 +55,11 @@ BSONObj makeNewDocumentForWrite(
const boost::optional<const StringData::ComparatorInterface*>& comparator,
boost::optional<Date_t> currentMinTime);
enum class BucketReopeningPermittance {
kAllowed,
kDisallowed,
};
/**
* Performs modifications atomically for a user command on a time-series collection.
* Replaces the bucket document for a partial bucket modification and removes the bucket for a full