diff --git a/buildscripts/evergreen_resmoke_job_count.py b/buildscripts/evergreen_resmoke_job_count.py old mode 100644 new mode 100755 diff --git a/jstests/core/max_doc_size.js b/jstests/core/max_doc_size.js index d47c8eebc5f..e7d2e6b6de5 100644 --- a/jstests/core/max_doc_size.js +++ b/jstests/core/max_doc_size.js @@ -1,5 +1,8 @@ // The {$set: {x: maxStr}}} update takes multiple seconds to execute. -// @tags: [operations_longer_than_stepdown_interval] +// @tags: [ +// operations_longer_than_stepdown_interval, +// multiversion_incompatible, +// ] /** * Confirms that: @@ -51,7 +54,7 @@ coll.drop(); assert.commandFailedWithCode( db.runCommand( {insert: coll.getName(), documents: [{_id: new ObjectId(), x: largerThanMaxString}]}), - 2); + ErrorCodes.BSONObjectTooLarge); coll.drop(); assert.commandFailedWithCode(db.runCommand({ @@ -68,5 +71,5 @@ assert.commandFailedWithCode(db.runCommand({ ordered: true, updates: [{q: {_id: objectId}, u: {$set: {x: largerThanMaxString}}}] }), - 17419); + [17419, ErrorCodes.BSONObjectTooLarge]); })(); diff --git a/jstests/core/query/mr/mr_bigobject_replace.js b/jstests/core/query/mr/mr_bigobject_replace.js index db7d0bc3712..e0fedb4a264 100644 --- a/jstests/core/query/mr/mr_bigobject_replace.js +++ b/jstests/core/query/mr/mr_bigobject_replace.js @@ -9,6 +9,7 @@ // does_not_support_stepdowns, // uses_map_reduce_with_temp_collections, // requires_scripting, +// multiversion_incompatible, // ] /** @@ -63,7 +64,7 @@ function runTest(testOptions) { // In some cases we may see the javascript execution interrupted because it takes longer than // our default time limit, so we allow that possibility. assert.commandFailedWithCode(res, - [ErrorCodes.BadValue, ErrorCodes.Interrupted], + [ErrorCodes.BSONObjectTooLarge, ErrorCodes.Interrupted], "creating a document larger than 16MB didn't fail"); if (res.code != ErrorCodes.Interrupted) { assert.lte( diff --git a/jstests/core/timeseries/timeseries_insert_respect_max_bson_size_too_big.js b/jstests/core/timeseries/timeseries_insert_respect_max_bson_size_too_big.js new file mode 100644 index 00000000000..2248be6ede8 --- /dev/null +++ b/jstests/core/timeseries/timeseries_insert_respect_max_bson_size_too_big.js @@ -0,0 +1,55 @@ +/** + * Tests that time-series inserts do not cause any underlying bucket documents to exceed the max + * user BSON size. + * + * Bucket Insert: Measurements that are uninsertable due to exceeding the BSON size limit when a + * bucket insert is generated to accommodate one measurement. + * + * @tags: [ + * requires_timeseries, + * multiversion_incompatible, + * ] + */ + +(function() { +"use strict"; + +load("jstests/core/timeseries/libs/timeseries.js"); + +let counter = 0; +TimeseriesTest.run(insert => { + const testDB = db.getSiblingDB(jsTestName()); + const coll = testDB["coll_" + counter++]; + const timestamp = ISODate("2025-01-01T12:00:00Z"); + const timeField = "t"; + const metaField = "m"; + + coll.drop(); + + assert.commandWorked(testDB.createCollection( + coll.getName(), {timeseries: {"timeField": timeField, "metaField": metaField}})); + + const largeMeta = "a".repeat(16 * 1024 * 1024 + 1); + const measurement1 = {}; + measurement1[timeField] = timestamp; + measurement1[metaField] = largeMeta; + measurement1["a"] = 1; + + const smallMeta = "5"; + const bigStr = "a".repeat(8000); + const measurement2 = {}; + for (let i = 0; i < 1000; ++i) { + measurement2[i.toString()] = bigStr; + } + measurement2[timeField] = timestamp; + measurement2[metaField] = smallMeta; + + // Insert Measurements + + // This measurement is always too big due to meta. + assert.commandFailedWithCode(insert(coll, measurement1), ErrorCodes.BSONObjectTooLarge); + + // This measurement is always too big due to total metric size being copied into control block. + assert.commandFailedWithCode(insert(coll, measurement2), ErrorCodes.BSONObjectTooLarge); +}); +})(); diff --git a/jstests/noPassthrough/bucket_insert_uninsertable_measurements.js b/jstests/noPassthrough/bucket_insert_uninsertable_measurements.js new file mode 100644 index 00000000000..e7b49b770fc --- /dev/null +++ b/jstests/noPassthrough/bucket_insert_uninsertable_measurements.js @@ -0,0 +1,95 @@ +/** + * Bucket Insert: Measurements that are uninsertable due to exceeding the BSON size limit when a + * bucket insert is generated to accommodate one measurement. + * + * Importantly, this controlled test checks collStats. + * + * @tags: [ + * requires_replication, + * ] + */ + +const rst = new ReplSetTest({nodes: 1}); +const nodes = rst.startSet(); +rst.initiate(); +const primary = rst.getPrimary(); +const db = primary.getDB("test"); + +const testDB = db.getSiblingDB(jsTestName()); +const coll = testDB.coll; +const bucketsColl = testDB.system.buckets[coll.getName()]; +const timeField = "t"; +const metaField = "m"; + +function runTest(isOrderedWrite) { + jsTestLog("runTest(ordered=" + isOrderedWrite.toString() + ")"); + + // Setup + + assert.commandWorked(testDB.createCollection( + coll.getName(), {timeseries: {timeField: timeField, metaField: metaField}})); + + const largeMeta = "a".repeat(16 * 1024 * 1024 + 1); + const timestamp = ISODate("2025-01-01T12:00:00Z"); + const measurement1 = {}; + measurement1[timeField] = timestamp; + measurement1[metaField] = largeMeta; + measurement1["a"] = 1; + + const smallMeta = "5"; + const bigStr = "a".repeat(60000); + const measurement2 = {}; + for (let i = 0; i < 100; ++i) { + measurement2[i.toString()] = bigStr; + } + measurement2[timeField] = timestamp; + measurement2[metaField] = smallMeta; + + // Insert Measurement + + jsTestLog("insert1"); + assert.commandFailedWithCode(coll.insert(measurement1, {ordered: isOrderedWrite}), + ErrorCodes.BSONObjectTooLarge); + + let stats = coll.stats().timeseries; + assert.eq(0, stats.numBucketInserts, tojson(stats)); + assert.eq(0, stats.numBucketUpdates, tojson(stats)); + assert.eq(isOrderedWrite ? 2 : 5, stats.numBucketsOpenedDueToMetadata, tojson(stats)); + assert.eq(0, stats.numBucketsClosedDueToSize, tojson(stats)); + // The failed ordered write retries as unordered and thus makes 2 + // unsuccessful attempts. + assert.eq(isOrderedWrite ? 2 : 5, stats.numBucketDocumentsTooLargeInsert, tojson(stats)); + assert.eq(0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats)); + + jsTestLog("insert2"); + assert.commandFailedWithCode(coll.insert(measurement2, {ordered: isOrderedWrite}), + ErrorCodes.BSONObjectTooLarge); + + stats = coll.stats().timeseries; + assert.eq(0, stats.numBucketInserts, tojson(stats)); + assert.eq(0, stats.numBucketUpdates, tojson(stats)); + assert.eq(isOrderedWrite ? 4 : 6, stats.numBucketsOpenedDueToMetadata, tojson(stats)); + assert.eq(0, stats.numBucketsClosedDueToSize, tojson(stats)); + // The failed ordered write retries as unordered and thus makes 2 + // unsuccessful attempts. + assert.eq(isOrderedWrite ? 4 : 6, stats.numBucketDocumentsTooLargeInsert, tojson(stats)); + assert.eq(0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats)); + + // Check Results + // TODO(SERVER-108699): Remove this check. + + let buckets = bucketsColl.find().toArray(); + for (let i = 0; i < buckets.length; i++) { + let bucketDocSize = Object.bsonsize(buckets[i]); + assert.lte(bucketDocSize, 16 * 1024 * 1024); + } + + coll.drop(); + // Stats do not reset on v7.0 when a collection drops. Thus, many checks are path-dependent + // without a reboot. +} + +runTest(/*isOrderedWrite=*/ true); +runTest(/*isOrderedWrite=*/ false); + +rst.stopSet(); diff --git a/jstests/noPassthrough/bucket_update_retryable.js b/jstests/noPassthrough/bucket_update_retryable.js new file mode 100644 index 00000000000..d0fb1ca23a6 --- /dev/null +++ b/jstests/noPassthrough/bucket_update_retryable.js @@ -0,0 +1,98 @@ +/** + * Bucket Update: exploits field size <32B estimated at 0B. + * An ordered write that results in a failed bucket update due to exceeding the BSON size limit, + * should be successfully retryable as a bucket insert. + * + * This test does not exercise new code paths added in SERVER-108565, only on v7.0. It exists to + * verify that this branch is not vulnerable to these issues. + * + * @tags: [ + * requires_replication, + * ] + */ + +const rst = new ReplSetTest({nodes: 1}); +const nodes = rst.startSet(); +rst.initiate(); +const primary = rst.getPrimary(); +const db = primary.getDB("test"); + +const testDB = db.getSiblingDB(jsTestName()); +const coll = testDB.coll; +const bucketsColl = testDB.system.buckets[coll.getName()]; +const timeField = "t"; +const metaField = "m"; + +function runTest(isOrderedWrite) { + jsTestLog("runTest(ordered=" + isOrderedWrite.toString() + ")"); + + // Setup + + assert.commandWorked(testDB.createCollection( + coll.getName(), {timeseries: {timeField: timeField, metaField: metaField}})); + + const timestamp = ISODate("2025-01-01T12:00:00Z"); + // This value size is chosen to maximize the size of an object that is estimated by the + // time-series write path of being treated as 0. There is a fixed-size 8 bytes of bson and + // estimation overhead. + const str = "a".repeat(24); + const measurement1 = {a: 1, [timeField]: timestamp}; + const measurement2 = {}; + + // The number of fields chosen below combined with the unestimated size yields a measurement + // size of around 5MB. When tripled to account for the 3 literals per time-series bucket, this + // is just underneath the BSON limit for a bucket document. + for (let i = 0; i < 140000; ++i) { + measurement2[i.toString()] = str; + } + measurement2[timeField] = timestamp; + + // Insert Measurements + + jsTestLog("insert1"); + assert.commandWorked(coll.insert(measurement1, {ordered: isOrderedWrite})); + + let stats = coll.stats().timeseries; + assert.eq(isOrderedWrite ? 1 : 3, stats.numBucketInserts, tojson(stats)); + assert.eq(0, stats.numBucketUpdates, tojson(stats)); + assert.eq(isOrderedWrite ? 1 : 2, stats.numBucketsOpenedDueToMetadata, tojson(stats)); + assert.eq(isOrderedWrite ? 0 : 1, stats.numBucketsClosedDueToSize, tojson(stats)); + assert.eq( + 0, stats.numBucketDocumentsTooLargeInsert, tojson(stats)); // See comment at top of file. + assert.eq( + 0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats)); // See comment at top of file. + + jsTestLog("insert2"); + assert.commandWorked(coll.insert(measurement2, {ordered: isOrderedWrite})); + + stats = coll.stats().timeseries; + assert.eq(isOrderedWrite ? 2 : 4, stats.numBucketInserts, tojson(stats)); + assert.eq(0, stats.numBucketUpdates, tojson(stats)); + // The first bucket gets cleared during the retry logic of an ordered write, thus when the + // second bucket gets allocated, the write path doesn't see an associated open bucket for the + // same meta value. + assert.eq(isOrderedWrite ? 1 : 2, stats.numBucketsOpenedDueToMetadata, tojson(stats)); + assert.eq(isOrderedWrite ? 1 : 2, stats.numBucketsClosedDueToSize, tojson(stats)); + assert.eq( + 0, stats.numBucketDocumentsTooLargeInsert, tojson(stats)); // See comment at top of file. + assert.eq( + 0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats)); // See comment at top of file. + + // Check Results + // TODO(SERVER-108699): Remove this check. + + let buckets = bucketsColl.find().toArray(); + for (let i = 0; i < buckets.length; i++) { + let bucketDocSize = Object.bsonsize(buckets[i]); + assert.lte(bucketDocSize, 16 * 1024 * 1024); + } + + coll.drop(); + // Stats do not reset on v7.0 when a collection drops. Thus, many checks are path-dependent + // without a reboot. +} + +runTest(/*isOrderedWrite=*/ true); +runTest(/*isOrderedWrite=*/ false); + +rst.stopSet(); diff --git a/jstests/noPassthrough/bucket_update_single_measurement.js b/jstests/noPassthrough/bucket_update_single_measurement.js new file mode 100644 index 00000000000..703a69c5cd4 --- /dev/null +++ b/jstests/noPassthrough/bucket_update_single_measurement.js @@ -0,0 +1,100 @@ +/** + * Bucket Update: Large meta near the BSON limit, allows only one measurement due to lower + * timeseries bucket size limits - Bucket::kLargeMeasurementsMaxBucketSize. + * + * @tags: [ + * requires_replication, + * ] + */ + +const rst = new ReplSetTest({nodes: 1}); +const nodes = rst.startSet(); +rst.initiate(); +const primary = rst.getPrimary(); +const db = primary.getDB("test"); + +const testDB = db.getSiblingDB(jsTestName()); +const coll = testDB.coll; +const bucketsColl = testDB.system.buckets[coll.getName()]; +const timeField = "t"; +const metaField = "m"; + +function runTest(isOrderedWrite) { + jsTestLog("runTest(ordered=" + isOrderedWrite.toString() + ")"); + + // Setup + + assert.commandWorked(testDB.createCollection( + coll.getName(), {timeseries: {timeField: timeField, metaField: metaField}})); + + const timestamp = ISODate("2025-01-01T12:00:00Z"); + const meta = "a".repeat(1024 * 1024 * 15); + const measurement1 = {a: 1, [timeField]: timestamp, [metaField]: meta}; + const measurement2 = { + a: 2, + [timeField]: ISODate("2025-01-01T12:00:00Z"), + [metaField]: meta, + b: "b".repeat(1024 * 1024 * 0.335), + }; + + // Insert Measurements + + jsTestLog("insert1"); + assert.commandWorked(coll.insert(measurement1, {ordered: isOrderedWrite})); + + let stats = coll.stats().timeseries; + assert.eq(isOrderedWrite ? 1 : 3, stats.numBucketInserts, tojson(stats)); + assert.eq(0, stats.numBucketUpdates, tojson(stats)); + assert.eq(isOrderedWrite ? 1 : 3, stats.numBucketsOpenedDueToMetadata, tojson(stats)); + assert.eq(isOrderedWrite ? 0 : 2, stats.numBucketsClosedDueToSize, tojson(stats)); + assert.eq(isOrderedWrite ? 0 : 2, stats.numBucketDocumentsTooLargeInsert, tojson(stats)); + assert.eq(0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats)); + + // This insert will land in a new bucket due to Bucket::kLargeMeasurementsMaxBucketSize being + // exceeded by the first measurement. + jsTestLog("insert2"); + assert.commandWorked(coll.insert(measurement1, {ordered: isOrderedWrite})); + + stats = coll.stats().timeseries; + assert.eq(isOrderedWrite ? 2 : 4, stats.numBucketInserts, tojson(stats)); + assert.eq(0, stats.numBucketUpdates, tojson(stats)); + assert.eq(isOrderedWrite ? 1 : 3, stats.numBucketsOpenedDueToMetadata, tojson(stats)); + assert.eq(isOrderedWrite ? 1 : 3, stats.numBucketsClosedDueToSize, tojson(stats)); + assert.eq(isOrderedWrite ? 0 : 2, stats.numBucketDocumentsTooLargeInsert, tojson(stats)); + assert.eq(0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats)); + + // This insert is not insertable, due to the 3x inflation of metric size + large meta, + // this measurement is too large for bucket update and bucket insert. + jsTestLog("insert3"); + assert.commandFailedWithCode(coll.insert(measurement2, {ordered: isOrderedWrite}), + ErrorCodes.BSONObjectTooLarge); + + stats = coll.stats().timeseries; + assert.eq( + isOrderedWrite ? 2 : 4, stats.numBucketInserts, tojson(stats)); // TODO: address this break + assert.eq(0, stats.numBucketUpdates, tojson(stats)); + assert.eq(isOrderedWrite ? 2 : 3, stats.numBucketsOpenedDueToMetadata, tojson(stats)); + assert.eq(isOrderedWrite ? 2 : 4, stats.numBucketsClosedDueToSize, tojson(stats)); + // The failed ordered write retries as unordered and thus makes 2 + // unsuccessful attempts. + assert.eq(isOrderedWrite ? 2 : 3, stats.numBucketDocumentsTooLargeInsert, tojson(stats)); + assert.eq(0, stats.numBucketDocumentsTooLargeUpdate, tojson(stats)); + + // Check Results + // TODO(SERVER-108699): Remove this check. + + let buckets = bucketsColl.find().toArray(); + for (let i = 0; i < buckets.length; i++) { + let bucketDocSize = Object.bsonsize(buckets[i]); + assert.lte(bucketDocSize, 16 * 1024 * 1024); + } + + coll.drop(); + // Stats do not reset on v7.0 when a collection drops. Thus, many checks are path-dependent + // without a reboot. +} + +runTest(/*isOrderedWrite=*/ true); +runTest(/*isOrderedWrite=*/ false); + +rst.stopSet(); diff --git a/jstests/noPassthrough/document_too_large_to_delete_with_batched_delete.js b/jstests/noPassthrough/document_too_large_to_delete_with_batched_delete.js index b58ac9d5aff..83aec6b9950 100644 --- a/jstests/noPassthrough/document_too_large_to_delete_with_batched_delete.js +++ b/jstests/noPassthrough/document_too_large_to_delete_with_batched_delete.js @@ -67,7 +67,7 @@ function tests(conn, isStandalone) { assert.commandWorked(testDb.c.insert({_id: "X".repeat(r)})); } if (idLen >= minIdErrorLen) { - assert.commandFailedWithCode(res, ErrorCodes.BadValue); + assert.commandFailedWithCode(res, ErrorCodes.BSONObjectTooLarge); } else if (isStandalone) { assert.commandWorked(res); } else { diff --git a/jstests/noPassthrough/insert_docs_larger_than_max_user_size_standalone.js b/jstests/noPassthrough/insert_docs_larger_than_max_user_size_standalone.js index a9c39fdd732..ec5f474c56d 100644 --- a/jstests/noPassthrough/insert_docs_larger_than_max_user_size_standalone.js +++ b/jstests/noPassthrough/insert_docs_larger_than_max_user_size_standalone.js @@ -20,7 +20,8 @@ const bsonMaxInternalSize = bsonMaxUserSize + (16 * 1024); // Trying to insert an object that is the maximum size will fail. let obj = {x: 'x'.repeat(bsonMaxUserSize)}; -assert.commandFailedWithCode(coll.insert(obj), ErrorCodes.BadValue, "object to insert too large"); +assert.commandFailedWithCode( + coll.insert(obj), ErrorCodes.BSONObjectTooLarge, "object to insert too large"); // The string value in the field is a number of bytes smaller than the max, to account for other // data in the BSON object. This value below will create an object very close to the maximum user @@ -66,7 +67,7 @@ conn = rst.start(0, { }); oplog = conn.getDB("local").getCollection('oplog.rs'); assert.commandFailedWithCode( - oplog.insert(lastOplogEntry), ErrorCodes.BadValue, "object to insert too large"); + oplog.insert(lastOplogEntry), ErrorCodes.BSONObjectTooLarge, "object to insert too large"); rst.stop(0, undefined /* signal */, undefined /* opts */, {forRestart: true}); // Restart as standalone with the 'allowDocumentsGreaterThanMaxUserSize' server parameter enabled to diff --git a/jstests/noPassthrough/oplog_document_key.js b/jstests/noPassthrough/oplog_document_key.js index f4b01b0ad40..76e73944587 100644 --- a/jstests/noPassthrough/oplog_document_key.js +++ b/jstests/noPassthrough/oplog_document_key.js @@ -85,7 +85,7 @@ const testWriteOplogDocumentKey = ({sharded, inTransaction}) => { performWrites(function largeInsert(coll) { const largeDoc = {_id: 'x'.repeat(16 * 1024 * 1024), a: 0}; - assert.commandFailedWithCode(coll.insert(largeDoc), ErrorCodes.BadValue); + assert.commandFailedWithCode(coll.insert(largeDoc), ErrorCodes.BSONObjectTooLarge); }); }; diff --git a/src/mongo/db/ops/insert.cpp b/src/mongo/db/ops/insert.cpp index f29e6dbc255..a2cdf396df8 100644 --- a/src/mongo/db/ops/insert.cpp +++ b/src/mongo/db/ops/insert.cpp @@ -96,7 +96,7 @@ StatusWith fixDocumentForInsert(OperationContext* opCtx, // already been validated for size on the source cluster, and were successfully inserted // into the source oplog. if (doc.objsize() > BSONObjMaxUserSize && !gAllowDocumentsGreaterThanMaxUserSize) - return StatusWith(ErrorCodes::BadValue, + return StatusWith(ErrorCodes::BSONObjectTooLarge, str::stream() << "object to insert too large" << ". size in bytes: " << doc.objsize() << ", max size: " << BSONObjMaxUserSize); diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 8add210b166..ef783840351 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -1780,9 +1780,25 @@ Status performAtomicTimeseriesWrites( for (auto& op : insertOps) { invariant(op.getDocuments().size() == 1); + auto doc = op.getDocuments().front(); + + // Since this bypasses the usual write path, size validation is needed. + if (MONGO_unlikely(doc.objsize() > BSONObjMaxUserSize)) { + LOGV2_WARNING(10856504, + "Ordered time-series bucket insert is too large.", + "bucketSize"_attr = doc.objsize(), + "ns"_attr = ns); + timeseries::bucket_catalog::markBucketInsertTooLarge( + timeseries::bucket_catalog::BucketCatalog::get(opCtx), + ns.getTimeseriesViewNamespace()); + + return {ErrorCodes::BSONObjectTooLarge, + "Ordered time-series bucket insert is too large"}; + } + inserts.emplace_back(op.getStmtIds() ? *op.getStmtIds() : std::vector{kUninitializedStmtId}, - op.getDocuments().front(), + doc, slot ? *(*slot)++ : OplogSlot{}); } @@ -1838,6 +1854,25 @@ Status performAtomicTimeseriesWrites( invariant(false, "Unexpected update type"); } + // Since this bypasses the usual write path, size validation is needed. + if (MONGO_unlikely(updated.objsize() > BSONObjMaxUserSize)) { + invariant(false); + // This block isn't expected to be hit on v7.0 because the object + // would have failed BSON construction via exception earlier in the write + // path. Keeping this here for completeness. + LOGV2_WARNING( + 10856505, + "Ordered time-series bucket update is too large. Will internally retry write on " + "a new bucket.", + "bucketSize"_attr = updated.objsize()); + timeseries::bucket_catalog::markBucketUpdateTooLarge( + timeseries::bucket_catalog::BucketCatalog::get(opCtx), + ns.getTimeseriesViewNamespace()); + + return {ErrorCodes::BSONObjectTooLarge, + "Ordered time-series bucket update is too large"}; + } + if (slot) { args.oplogSlots = {**slot}; fassert(5481600, @@ -2422,26 +2457,55 @@ bool commitTimeseriesBucket(OperationContext* opCtx, const auto docId = batch->bucketId.oid; const bool performInsert = batch->numPreviouslyCommittedMeasurements == 0; if (performInsert) { - const auto output = - performTimeseriesInsert(opCtx, batch, metadata, std::move(stmtIds), request); - if (auto error = write_ops_exec::generateError( - opCtx, output.result.getStatus(), start + index, errors->size())) { - bool canContinue = output.canContinue; - // Automatically attempts to retry on DuplicateKey error. - if (error->getStatus().code() == ErrorCodes::DuplicateKey && - retryAttemptsForDup[index]++ < gTimeseriesInsertMaxRetriesOnDuplicates.load()) { - docsToRetry->push_back(index); - canContinue = true; - } else { - errors->emplace_back(std::move(*error)); + try { + const auto output = + performTimeseriesInsert(opCtx, batch, metadata, std::move(stmtIds), request); + if (auto error = write_ops_exec::generateError( + opCtx, output.result.getStatus(), start + index, errors->size())) { + bool canContinue = output.canContinue; + // Automatically attempts to retry on DuplicateKey error. + if (error->getStatus().code() == ErrorCodes::DuplicateKey && + retryAttemptsForDup[index]++ < gTimeseriesInsertMaxRetriesOnDuplicates.load()) { + docsToRetry->push_back(index); + canContinue = true; + } else { + if (output.result.getStatus() == ErrorCodes::BSONObjectTooLarge) { + LOGV2_WARNING(10856506, + "Unordered time-series bucket insert is too large.", + "statusMsg"_attr = output.result.getStatus().reason(), + "canContinue"_attr = canContinue, + "ns"_attr = request.getNamespace(), + "batchNs"_attr = batch->bucketId.ns); + timeseries::bucket_catalog::markBucketInsertTooLarge(bucketCatalog, + batch->bucketId.ns); + } + errors->emplace_back(std::move(*error)); + } + abort(bucketCatalog, batch, output.result.getStatus()); + return canContinue; } - abort(bucketCatalog, batch, output.result.getStatus()); - return canContinue; - } - invariant(output.result.getValue().getN() == 1, - str::stream() << "Expected 1 insertion of document with _id '" << docId - << "', but found " << output.result.getValue().getN() << "."); + invariant(output.result.getValue().getN() == 1, + str::stream() << "Expected 1 insertion of document with _id '" << docId + << "', but found " << output.result.getValue().getN() << "."); + } catch (const DBException& ex) { + if (ex.toStatus() == ErrorCodes::BSONObjectTooLarge) { + LOGV2_WARNING(10856502, + "Unordered time-series bucket insert is too large.", + "statusMsg"_attr = ex.toStatus().reason(), + "ns"_attr = batch->bucketId.ns); + auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); + timeseries::bucket_catalog::markBucketInsertTooLarge(bucketCatalog, + batch->bucketId.ns); + auto error = write_ops_exec::generateError( + opCtx, ex.toStatus(), start + index, errors->size()); + errors->emplace_back(std::move(*error)); + abort(bucketCatalog, batch, ex.toStatus()); + + return false; + } + throw; + } } else { auto op = batch->decompressed.has_value() ? makeTimeseriesDecompressAndUpdateOp( @@ -2461,6 +2525,18 @@ bool commitTimeseriesBucket(OperationContext* opCtx, return true; } else if (auto error = write_ops_exec::generateError( opCtx, output.result.getStatus(), start + index, errors->size())) { + if (output.result.getStatus() == ErrorCodes::BSONObjectTooLarge) { + invariant(false); + LOGV2_WARNING(10856507, + "Unordered time-series bucket update is too large.", + "statusMsg"_attr = output.result.getStatus().reason()); + timeseries::bucket_catalog::markBucketUpdateTooLarge(bucketCatalog, + request.getNamespace()); + errors->emplace_back(std::move(*error)); + abort(bucketCatalog, batch, output.result.getStatus()); + + return false; + } errors->emplace_back(std::move(*error)); abort(bucketCatalog, batch, output.result.getStatus()); return output.canContinue; @@ -2484,13 +2560,13 @@ bool commitTimeseriesBucket(OperationContext* opCtx, } // Returns true if commit was successful, false otherwise. May also throw. -bool commitTimeseriesBucketsAtomically(OperationContext* opCtx, - TimeseriesBatches* batches, - TimeseriesStmtIds&& stmtIds, - std::vector* errors, - boost::optional* opTime, - boost::optional* electionId, - const write_ops::InsertCommandRequest& request) { +Status commitTimeseriesBucketsAtomically(OperationContext* opCtx, + TimeseriesBatches* batches, + TimeseriesStmtIds&& stmtIds, + std::vector* errors, + boost::optional* opTime, + boost::optional* electionId, + const write_ops::InsertCommandRequest& request) { auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); std::vector>> @@ -2503,7 +2579,7 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx, } if (batchesToCommit.empty()) { - return true; + return Status::OK(); } // Sort by bucket so that preparing the commit for each batch cannot deadlock. @@ -2529,19 +2605,58 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx, auto prepareCommitStatus = prepareCommit(bucketCatalog, batch); if (!prepareCommitStatus.isOK()) { abortStatus = prepareCommitStatus; - return false; + return prepareCommitStatus; } if (batch.get()->numPreviouslyCommittedMeasurements == 0) { - insertOps.push_back(makeTimeseriesInsertOp( - batch, metadata, std::move(stmtIds[batch.get().get()]), request)); + try { + insertOps.push_back(makeTimeseriesInsertOp( + batch, metadata, std::move(stmtIds[batch.get().get()]), request)); + } catch (const DBException& ex) { + if (ex.toStatus() == ErrorCodes::BSONObjectTooLarge) { + LOGV2_WARNING(10856500, + "Ordered time-series bucket insert is too large.", + "statusMsg"_attr = ex.toStatus().reason(), + "ns"_attr = batch.get()->bucketId.ns); + auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); + timeseries::bucket_catalog::markBucketInsertTooLarge( + bucketCatalog, batch.get()->bucketId.ns); + abortStatus = ex.toStatus(); + return ex.toStatus(); + } + throw; + } } else { - if (batch.get()->decompressed.has_value()) { - updateOps.push_back(makeTimeseriesDecompressAndUpdateOp( - opCtx, batch, metadata, std::move(stmtIds[batch.get().get()]), request)); - } else { - updateOps.push_back(makeTimeseriesUpdateOp( - opCtx, batch, metadata, std::move(stmtIds[batch.get().get()]), request)); + try { + if (batch.get()->decompressed.has_value()) { + updateOps.push_back(makeTimeseriesDecompressAndUpdateOp( + opCtx, + batch, + metadata, + std::move(stmtIds[batch.get().get()]), + request)); + } else { + updateOps.push_back( + makeTimeseriesUpdateOp(opCtx, + batch, + metadata, + std::move(stmtIds[batch.get().get()]), + request)); + } + } catch (const DBException& ex) { + if (ex.toStatus() == ErrorCodes::BSONObjectTooLarge) { + LOGV2_WARNING(10856501, + "Ordered time-series bucket update is too large.", + "statusMsg"_attr = ex.toStatus().reason(), + "compressed"_attr = batch.get()->decompressed.has_value(), + "ns"_attr = batch.get()->bucketId.ns); + auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); + timeseries::bucket_catalog::markBucketUpdateTooLarge( + bucketCatalog, batch.get()->bucketId.ns); + abortStatus = ex.toStatus(); + return ex.toStatus(); + } + throw; } } } @@ -2554,7 +2669,7 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx, timeseries::bucket_catalog::resetBucketOIDCounter(); } abortStatus = result; - return false; + return result; } getOpTimeAndElectionId(opCtx, opTime, electionId); @@ -2579,7 +2694,7 @@ bool commitTimeseriesBucketsAtomically(OperationContext* opCtx, } batchGuard.dismiss(); - return true; + return Status::OK(); } // For sharded time-series collections, we need to use the granularity from the config @@ -2759,6 +2874,7 @@ std::tuple inser size_t start, size_t numDocs, const std::vector& indices, + timeseries::BucketReopeningPermittance allowQueryBasedReopening, std::vector* errors, bool* containsRetry, const write_ops::InsertCommandRequest& request) { @@ -2837,6 +2953,10 @@ std::tuple inser StatusWith swResult = Status{ErrorCodes::BadValue, "Uninitialized InsertResult"}; do { + timeseries::bucket_catalog::AllowQueryBasedReopening reopening = + allowQueryBasedReopening == timeseries::BucketReopeningPermittance::kAllowed + ? timeseries::bucket_catalog::AllowQueryBasedReopening::kAllow + : timeseries::bucket_catalog::AllowQueryBasedReopening::kDisallow; if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled( serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) { swResult = timeseries::bucket_catalog::tryInsert( @@ -2846,7 +2966,8 @@ std::tuple inser bucketsColl->getDefaultCollator(), timeSeriesOptions, measurementDoc, - canCombineTimeseriesInsertWithOtherClients(opCtx, request)); + canCombineTimeseriesInsertWithOtherClients(opCtx, request), + reopening); if (swResult.isOK()) { auto& insertResult = swResult.getValue(); @@ -2914,6 +3035,7 @@ std::tuple inser timeSeriesOptions, measurementDoc, canCombineTimeseriesInsertWithOtherClients(opCtx, request), + reopening, reopeningContext); } else if (auto* waiter = stdx::get_if( @@ -2935,7 +3057,8 @@ std::tuple inser bucketsColl->getDefaultCollator(), timeSeriesOptions, measurementDoc, - canCombineTimeseriesInsertWithOtherClients(opCtx, request)); + canCombineTimeseriesInsertWithOtherClients(opCtx, request), + reopening); } // If there is an era offset (between the bucket we want to reopen and the @@ -3002,25 +3125,33 @@ std::tuple inser return {std::move(batches), std::move(stmtIds), request.getDocuments().size()}; } -bool performOrderedTimeseriesWritesAtomically(OperationContext* opCtx, - std::vector* errors, - boost::optional* opTime, - boost::optional* electionId, - bool* containsRetry, - const write_ops::InsertCommandRequest& request) { - auto [batches, stmtIds, numInserted] = insertIntoBucketCatalog( - opCtx, 0, request.getDocuments().size(), {}, errors, containsRetry, request); +Status performOrderedTimeseriesWritesAtomically(OperationContext* opCtx, + std::vector* errors, + boost::optional* opTime, + boost::optional* electionId, + bool* containsRetry, + const write_ops::InsertCommandRequest& request) { + auto [batches, stmtIds, numInserted] = + insertIntoBucketCatalog(opCtx, + 0, + request.getDocuments().size(), + {}, + timeseries::BucketReopeningPermittance::kAllowed, + errors, + containsRetry, + request); hangTimeseriesInsertBeforeCommit.pauseWhileSet(); - if (!commitTimeseriesBucketsAtomically( - opCtx, &batches, std::move(stmtIds), errors, opTime, electionId, request)) { - return false; + auto commitResult = commitTimeseriesBucketsAtomically( + opCtx, &batches, std::move(stmtIds), errors, opTime, electionId, request); + if (!commitResult.isOK()) { + return commitResult; } getTimeseriesBatchResults(opCtx, batches, 0, batches.size(), true, errors, opTime, electionId); - return true; + return Status::OK(); } /** @@ -3036,14 +3167,15 @@ std::vector performUnorderedTimeseriesWrites( size_t start, size_t numDocs, const std::vector& indices, + const timeseries::BucketReopeningPermittance bucketReopening, std::vector* errors, boost::optional* opTime, boost::optional* electionId, bool* containsRetry, const write_ops::InsertCommandRequest& request, absl::flat_hash_map& retryAttemptsForDup) { - auto [batches, bucketStmtIds, _] = - insertIntoBucketCatalog(opCtx, start, numDocs, indices, errors, containsRetry, request); + auto [batches, bucketStmtIds, _] = insertIntoBucketCatalog( + opCtx, start, numDocs, indices, bucketReopening, errors, containsRetry, request); hangTimeseriesInsertBeforeCommit.pauseWhileSet(); @@ -3112,14 +3244,16 @@ std::vector performUnorderedTimeseriesWrites( return docsToRetry; } -void performUnorderedTimeseriesWritesWithRetries(OperationContext* opCtx, - size_t start, - size_t numDocs, - std::vector* errors, - boost::optional* opTime, - boost::optional* electionId, - bool* containsRetry, - const write_ops::InsertCommandRequest& request) { +void performUnorderedTimeseriesWritesWithRetries( + OperationContext* opCtx, + size_t start, + size_t numDocs, + timeseries::BucketReopeningPermittance bucketReopening, + std::vector* errors, + boost::optional* opTime, + boost::optional* electionId, + bool* containsRetry, + const write_ops::InsertCommandRequest& request) { std::vector docsToRetry; absl::flat_hash_map retryAttemptsForDup; do { @@ -3127,6 +3261,7 @@ void performUnorderedTimeseriesWritesWithRetries(OperationContext* opCtx, start, numDocs, docsToRetry, + bucketReopening, errors, opTime, electionId, @@ -3148,17 +3283,22 @@ size_t performOrderedTimeseriesWrites(OperationContext* opCtx, boost::optional* electionId, bool* containsRetry, const write_ops::InsertCommandRequest& request) { - if (performOrderedTimeseriesWritesAtomically( - opCtx, errors, opTime, electionId, containsRetry, request)) { + auto result = performOrderedTimeseriesWritesAtomically( + opCtx, errors, opTime, electionId, containsRetry, request); + if (result.isOK()) { return request.getDocuments().size(); } // The atomic commit failed and might have populated 'errors'. To retry inserting each // measurement one by one, first clear 'errors' so the retry starts with a clean state. errors->clear(); + timeseries::BucketReopeningPermittance bucketReopening = + result.code() == ErrorCodes::BSONObjectTooLarge + ? timeseries::BucketReopeningPermittance::kDisallowed + : timeseries::BucketReopeningPermittance::kAllowed; for (size_t i = 0; i < request.getDocuments().size(); ++i) { performUnorderedTimeseriesWritesWithRetries( - opCtx, i, 1, errors, opTime, electionId, containsRetry, request); + opCtx, i, 1, bucketReopening, errors, opTime, electionId, containsRetry, request); if (!errors->empty()) { return i; } @@ -3217,14 +3357,16 @@ write_ops::InsertCommandReply performTimeseriesWrites( baseReply.setN(performOrderedTimeseriesWrites( opCtx, &errors, &opTime, &electionId, &containsRetry, request)); } else { - performUnorderedTimeseriesWritesWithRetries(opCtx, - 0, - request.getDocuments().size(), - &errors, - &opTime, - &electionId, - &containsRetry, - request); + performUnorderedTimeseriesWritesWithRetries( + opCtx, + 0, + request.getDocuments().size(), + timeseries::BucketReopeningPermittance::kAllowed, + &errors, + &opTime, + &electionId, + &containsRetry, + request); baseReply.setN(request.getDocuments().size() - errors.size()); } diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp index 7557afe0eda..ed9c6b8bc62 100644 --- a/src/mongo/db/ops/write_ops_retryability_test.cpp +++ b/src/mongo/db/ops/write_ops_retryability_test.cpp @@ -328,7 +328,7 @@ TEST_F(WriteOpsRetryability, PerformOrderedInsertsStopsAtBadDoc) { ASSERT_EQ(2, result.results.size()); ASSERT_TRUE(result.results[0].isOK()); ASSERT_FALSE(result.results[1].isOK()); - ASSERT_EQ(ErrorCodes::BadValue, result.results[1].getStatus()); + ASSERT_EQ(ErrorCodes::BSONObjectTooLarge, result.results[1].getStatus()); } TEST_F(WriteOpsRetryability, PerformUnorderedInsertsContinuesAtBadDoc) { @@ -362,7 +362,7 @@ TEST_F(WriteOpsRetryability, PerformUnorderedInsertsContinuesAtBadDoc) { ASSERT_TRUE(result.results[0].isOK()); ASSERT_FALSE(result.results[1].isOK()); ASSERT_TRUE(result.results[2].isOK()); - ASSERT_EQ(ErrorCodes::BadValue, result.results[1].getStatus()); + ASSERT_EQ(ErrorCodes::BSONObjectTooLarge, result.results[1].getStatus()); } using FindAndModifyRetryability = MockReplCoordServerFixture; diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket.cpp index b767a5c597b..692fca7cbfd 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket.cpp @@ -28,6 +28,7 @@ */ #include "mongo/db/timeseries/bucket_catalog/bucket.h" +#include "mongo/db/timeseries/timeseries_constants.h" namespace mongo::timeseries::bucket_catalog { @@ -84,7 +85,11 @@ void calculateBucketFieldsAndSizeChange(const Bucket& bucket, for (const auto& elem : doc) { auto fieldName = elem.fieldNameStringData(); if (fieldName == metaField) { - // Ignore the metadata field since it will not be inserted. + // Only account for the meta field size once, on bucket insert, since it is stored + // uncompressed at the top-level of the bucket. + if (bucket.size == 0) { + sizeToBeAdded += kBucketMetaFieldName.size() + elem.size() - elem.fieldNameSize(); + } continue; } diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp index 5c55c6c444e..d1ff46b3c88 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp @@ -150,9 +150,17 @@ StatusWith tryInsert(OperationContext* opCtx, const StringData::ComparatorInterface* comparator, const TimeseriesOptions& options, const BSONObj& doc, - CombineWithInsertsFromOtherClients combine) { - return internal::insert( - opCtx, catalog, ns, comparator, options, doc, combine, internal::AllowBucketCreation::kNo); + CombineWithInsertsFromOtherClients combine, + const AllowQueryBasedReopening reopening) { + return internal::insert(opCtx, + catalog, + ns, + comparator, + options, + doc, + combine, + internal::AllowBucketCreation::kNo, + reopening); } StatusWith insert(OperationContext* opCtx, @@ -162,6 +170,7 @@ StatusWith insert(OperationContext* opCtx, const TimeseriesOptions& options, const BSONObj& doc, CombineWithInsertsFromOtherClients combine, + const AllowQueryBasedReopening reopening, ReopeningContext* reopeningContext) { return internal::insert(opCtx, catalog, @@ -171,6 +180,7 @@ StatusWith insert(OperationContext* opCtx, doc, combine, internal::AllowBucketCreation::kYes, + reopening, reopeningContext); } @@ -381,6 +391,14 @@ void clear(BucketCatalog& catalog, StringData dbName) { }); } +void markBucketInsertTooLarge(BucketCatalog& catalog, const NamespaceString& ns) { + internal::getOrInitializeExecutionStats(catalog, ns).incNumBucketDocumentsTooLargeInsert(); +} + +void markBucketUpdateTooLarge(BucketCatalog& catalog, const NamespaceString& ns) { + internal::getOrInitializeExecutionStats(catalog, ns).incNumBucketDocumentsTooLargeUpdate(); +} + BucketId extractBucketId(BucketCatalog& bucketCatalog, const TimeseriesOptions& options, const StringData::ComparatorInterface* comparator, diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h index 434ff8454d9..1de7293a774 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h @@ -59,6 +59,12 @@ namespace mongo::timeseries::bucket_catalog { using StripeNumber = std::uint8_t; using ShouldClearFn = std::function; +/** + * Mode enum to control whether getReopeningCandidate() will allow query-based + * reopening of buckets when attempting to accommodate a new measurement. + */ +enum class AllowQueryBasedReopening { kAllow, kDisallow }; + /** * Whether to allow inserts to be batched together with those from other clients. */ @@ -213,7 +219,8 @@ StatusWith tryInsert(OperationContext* opCtx, const StringData::ComparatorInterface* comparator, const TimeseriesOptions& options, const BSONObj& doc, - CombineWithInsertsFromOtherClients combine); + CombineWithInsertsFromOtherClients combine, + AllowQueryBasedReopening allowQueryBasedReopening); /** * Returns the WriteBatch into which the document was inserted and a list of any buckets that were @@ -231,6 +238,7 @@ StatusWith insert(OperationContext* opCtx, const TimeseriesOptions& options, const BSONObj& doc, CombineWithInsertsFromOtherClients combine, + AllowQueryBasedReopening allowQueryBasedReopening, ReopeningContext* reopeningContext = nullptr); /** @@ -300,6 +308,20 @@ void clear(BucketCatalog& catalog, const NamespaceString& ns); */ void clear(BucketCatalog& catalog, StringData dbName); +/** + * Increments an FTDC counter. + * Denotes an event where a generated time-series bucket document for insert exceeded the BSON + * size limit. + */ +void markBucketInsertTooLarge(BucketCatalog& catalog, const NamespaceString& ns); + +/** + * Increments an FTDC counter. + * Denotes an event where a generated time-series bucket document for update exceeded the BSON + * size limit. + */ +void markBucketUpdateTooLarge(BucketCatalog& catalog, const NamespaceString& ns); + /** * Extracts the BucketId from a bucket document. */ diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp index cd55fe8e800..205e8007037 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp @@ -651,6 +651,7 @@ StatusWith insert(OperationContext* opCtx, const BSONObj& doc, CombineWithInsertsFromOtherClients combine, AllowBucketCreation mode, + AllowQueryBasedReopening allowQueryBasedReopening, ReopeningContext* reopeningContext) { invariant(!ns.isTimeseriesBucketsCollection()); @@ -662,7 +663,7 @@ StatusWith insert(OperationContext* opCtx, auto time = res.getValue().second; ExecutionStatsController stats = getOrInitializeExecutionStats(catalog, ns); - if (reopeningContext) { + if (reopeningContext && allowQueryBasedReopening == AllowQueryBasedReopening::kAllow) { updateBucketFetchAndQueryStats(*reopeningContext, stats); } @@ -752,7 +753,7 @@ StatusWith insert(OperationContext* opCtx, if (!bucket) { invariant(mode == AllowBucketCreation::kNo); return getReopeningContext( - opCtx, catalog, stripe, stripeLock, info, catalogEra, AllowQueryBasedReopening::kAllow); + opCtx, catalog, stripe, stripeLock, info, catalogEra, allowQueryBasedReopening); } auto insertionResult = insertIntoBucket( @@ -797,7 +798,8 @@ StatusWith insert(OperationContext* opCtx, stripeLock, info, catalogEra, - (*reason == RolloverReason::kTimeBackward) + ((allowQueryBasedReopening == AllowQueryBasedReopening::kAllow) && + (*reason == RolloverReason::kTimeBackward)) ? AllowQueryBasedReopening::kAllow : AllowQueryBasedReopening::kDisallow); } diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h index 551296024dd..65ddc089c14 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h @@ -74,12 +74,6 @@ enum class IgnoreBucketState { kYes, kNo }; */ enum class BucketPrepareAction { kPrepare, kUnprepare }; -/** - * Mode enum to control whether getReopeningCandidate() will allow query-based - * reopening of buckets when attempting to accommodate a new measurement. - */ -enum class AllowQueryBasedReopening { kAllow, kDisallow }; - /** * Maps bucket identifier to the stripe that is responsible for it. */ @@ -208,6 +202,7 @@ StatusWith insert(OperationContext* opCtx, const BSONObj& doc, CombineWithInsertsFromOtherClients combine, AllowBucketCreation mode, + AllowQueryBasedReopening allowQueryBasedReopening, ReopeningContext* reopeningContext = nullptr); /** diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp index d4b16a4e0b2..4580be5b14e 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp @@ -185,7 +185,8 @@ void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns, _getCollator(ns), _getTimeseriesOptions(ns), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto& batch = stdx::get(result.getValue()).batch; _commit(batch, numPreviouslyCommittedMeasurements); } @@ -217,7 +218,8 @@ void BucketCatalogTest::_testMeasurementSchema( _getCollator(_ns1), _getTimeseriesOptions(_ns1), timestampedDoc.obj(), - CombineWithInsertsFromOtherClients::kAllow) + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow) .isOK()); auto post = _getExecutionStat(_ns1, kNumSchemaChanges); @@ -301,7 +303,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch1 = stdx::get(result1.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch1)); @@ -313,7 +316,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch2 = stdx::get(result2.getValue()).batch; ASSERT_EQ(batch1, batch2); ASSERT(!claimWriteBatchCommitRights(*batch2)); @@ -345,7 +349,8 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch = stdx::get(result.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch)); auto bucketId = batch->bucketId; @@ -360,21 +365,24 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << _metaField << "123"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto result2 = insert(_opCtx, *_bucketCatalog, _ns1, _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << _metaField << BSONObj()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto result3 = insert(_opCtx, *_bucketCatalog, _ns2, _getCollator(_ns2), _getTimeseriesOptions(_ns2), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); // Inserts should all be into three distinct buckets (and therefore batches). ASSERT_NE(stdx::get(result1.getValue()).batch, @@ -414,7 +422,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << _metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto result2 = insert( _opCtx, *_bucketCatalog, @@ -422,7 +431,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << _metaField << BSON_ARRAY(BSON("b" << 1 << "a" << 0))), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_EQ(stdx::get(result1.getValue()).batch, stdx::get(result2.getValue()).batch); @@ -448,7 +458,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) { BSON(_timeField << Date_t::now() << _metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto result2 = insert(_opCtx, *_bucketCatalog, @@ -458,7 +469,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) { BSON(_timeField << Date_t::now() << _metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("b" << 1 << "a" << 0) << BSON("g" << 0 << "f" << 1))))), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_EQ(stdx::get(result1.getValue()).batch, stdx::get(result2.getValue()).batch); @@ -488,7 +500,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) { << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON_ARRAY("123" << "456"))))), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto result2 = insert(_opCtx, *_bucketCatalog, @@ -499,7 +512,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) { << BSONObj(BSON("c" << BSON_ARRAY(BSON("b" << 1 << "a" << 0) << BSON_ARRAY("123" << "456"))))), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_EQ(stdx::get(result1.getValue()).batch, stdx::get(result2.getValue()).batch); @@ -526,14 +540,16 @@ TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << _metaField << BSONNULL), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto result2 = insert(_opCtx, *_bucketCatalog, _ns1, _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); // Inserts should all be into three distinct buckets (and therefore batches). ASSERT_NE(stdx::get(result1.getValue()).batch, @@ -592,7 +608,8 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch1 = stdx::get(result1.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch1)); ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); @@ -606,7 +623,8 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch2 = stdx::get(result2.getValue()).batch; ASSERT_NE(batch1, batch2); @@ -624,7 +642,8 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto& batch = stdx::get(result.getValue()).batch; ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); @@ -640,7 +659,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto& batch = stdx::get(result.getValue()).batch; ASSERT_BSONOBJ_EQ(BSONObj(), getMetadata(*_bucketCatalog, batch->bucketId)); @@ -655,7 +675,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << "a" << 0), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT(result.isOK()); auto batch = stdx::get(result.getValue()).batch; auto oldId = batch->bucketId; @@ -672,7 +693,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << "a" << 1), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT(result.isOK()); batch = stdx::get(result.getValue()).batch; _commit(batch, 1); @@ -685,7 +707,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT(result.isOK()); batch = stdx::get(result.getValue()).batch; _commit(batch, 2); @@ -700,7 +723,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << "a" << i), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT(result.isOK()); batch = stdx::get(result.getValue()).batch; _commit(batch, i); @@ -715,7 +739,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto& batch2 = stdx::get(result2.getValue()).batch; ASSERT_NE(oldId, batch2->bucketId); _commit(batch2, 0); @@ -731,7 +756,8 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch1 = stdx::get(result1.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch1)); ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); @@ -745,7 +771,8 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch2 = stdx::get(result2.getValue()).batch; ASSERT_NE(batch1, batch2); @@ -766,7 +793,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch = stdx::get(result.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch)); @@ -782,7 +810,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); batch = stdx::get(result.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); @@ -809,7 +838,8 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch = stdx::get(result.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); @@ -831,7 +861,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch1 = stdx::get(result1.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch1)); ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); @@ -845,7 +876,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch2 = stdx::get(result2.getValue()).batch; ASSERT_NE(batch1, batch2); ASSERT_EQ(batch1->bucketId, batch2->bucketId); @@ -870,7 +902,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch3 = stdx::get(result3.getValue()).batch; ASSERT_NE(batch1, batch3); ASSERT_NE(batch2, batch3); @@ -892,7 +925,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch = stdx::get(result.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch)); @@ -912,7 +946,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch1 = stdx::get(result1.getValue()).batch; auto result2 = insert(_makeOperationContext().second.get(), @@ -921,7 +956,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch2 = stdx::get(result2.getValue()).batch; auto result3 = insert(_makeOperationContext().second.get(), @@ -930,7 +966,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch3 = stdx::get(result3.getValue()).batch; auto result4 = insert(_makeOperationContext().second.get(), @@ -939,7 +976,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); auto batch4 = stdx::get(result4.getValue()).batch; ASSERT_NE(batch1, batch2); @@ -959,7 +997,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch1 = stdx::get(result1.getValue()).batch; auto result2 = insert(_makeOperationContext().second.get(), @@ -968,7 +1007,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch2 = stdx::get(result2.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch1)); @@ -999,7 +1039,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch1 = stdx::get(result1.getValue()).batch; auto result2 = insert(_makeOperationContext().second.get(), @@ -1008,7 +1049,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch2 = stdx::get(result2.getValue()).batch; auto result3 = insert(_makeOperationContext().second.get(), @@ -1017,7 +1059,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch3 = stdx::get(result3.getValue()).batch; ASSERT_EQ(batch1->bucketId, batch2->bucketId); @@ -1057,7 +1100,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch4 = stdx::get(result4.getValue()).batch; ASSERT_NE(batch2->bucketId, batch4->bucketId); } @@ -1069,7 +1113,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch1 = stdx::get(result1.getValue()).batch; auto result2 = insert(_makeOperationContext().second.get(), @@ -1078,7 +1123,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch2 = stdx::get(result2.getValue()).batch; // Batch 1 and 2 use the same bucket. @@ -1102,7 +1148,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch3 = stdx::get(result3.getValue()).batch; ASSERT_NE(batch1->bucketId, batch3->bucketId); } @@ -1114,7 +1161,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch1 = stdx::get(result1.getValue()).batch; auto result2 = insert(_makeOperationContext().second.get(), @@ -1123,7 +1171,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), - CombineWithInsertsFromOtherClients::kDisallow); + CombineWithInsertsFromOtherClients::kDisallow, + AllowQueryBasedReopening::kAllow); auto batch2 = stdx::get(result2.getValue()).batch; // Batch 2 is the first batch to commit the time field. @@ -1348,7 +1397,8 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a":-100,"b":100})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); // No buckets are closed. ASSERT(stdx::get(result.getValue()).closedBuckets.empty()); @@ -1398,7 +1448,8 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"},"tag":42, "a":-100,"b":100})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); // No buckets are closed. ASSERT(stdx::get(result.getValue()).closedBuckets.empty()); @@ -1454,7 +1505,8 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a":{},"b":{}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); // The reopened bucket gets closed as the schema is incompatible. ASSERT_EQ(1, stdx::get(result.getValue()).closedBuckets.size()); @@ -1508,7 +1560,8 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a":-100,"b":100})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); // No buckets are closed. ASSERT(stdx::get(result.getValue()).closedBuckets.empty()); @@ -1568,7 +1621,8 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a":{},"b":{}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); // The reopened bucket gets closed as the schema is incompatible. ASSERT_EQ(1, stdx::get(result.getValue()).closedBuckets.size()); @@ -1601,7 +1655,8 @@ TEST_F(BucketCatalogTest, ArchivingUnderMemoryPressure) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << _metaField << meta++), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); auto batch = stdx::get(result.getValue()).batch; ASSERT(claimWriteBatchCommitRights(*batch)); @@ -1677,7 +1732,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); ASSERT(stdx::holds_alternative(result.getValue())); ASSERT_TRUE(stdx::holds_alternative>( @@ -1694,7 +1750,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); auto batch = stdx::get(result.getValue()).batch; ASSERT(batch); @@ -1714,7 +1771,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); ASSERT(stdx::holds_alternative(result.getValue())); ASSERT_TRUE(stdx::holds_alternative>( @@ -1730,7 +1788,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); ASSERT(stdx::holds_alternative(result.getValue())); ASSERT_TRUE(stdx::holds_alternative( @@ -1747,7 +1806,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}, "tag": "foo"})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold)); ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold)); @@ -1770,7 +1830,8 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); ASSERT(stdx::holds_alternative(result.getValue())); ASSERT_TRUE( @@ -1793,7 +1854,8 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a": true})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); auto batch = stdx::get(result.getValue()).batch; ASSERT(batch); @@ -1812,7 +1874,8 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}, "a": {}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); batch = stdx::get(result.getValue()).batch; ASSERT(batch); @@ -1836,7 +1899,8 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); auto batch = stdx::get(result.getValue()).batch; ASSERT(batch); @@ -1872,6 +1936,7 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow, &reopeningContext); ASSERT_OK(result.getStatus()); ASSERT_TRUE(stdx::holds_alternative(result.getValue())); @@ -1887,15 +1952,16 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); ASSERT_FALSE(stdx::get(result.getValue()).closedBuckets.empty()); - // Verify that if we try another insert for the soft-closed bucket, we get a query-based - // reopening candidate. + // Verify that if we try another insert for the soft-closed bucket, we get a + // query-based reopening candidate. result = tryInsert(_opCtx, *_bucketCatalog, _ns1, _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:35:40.000Z"}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); ASSERT_TRUE(stdx::holds_alternative(result.getValue())); ASSERT_TRUE(stdx::holds_alternative>( @@ -1915,7 +1981,8 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result.getStatus()); auto batch = stdx::get(result.getValue()).batch; ASSERT(batch); @@ -1958,6 +2025,7 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) { _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow, &reopeningContext); ASSERT_NOT_OK(result.getStatus()); ASSERT_EQ(result.getStatus().code(), ErrorCodes::WriteConflict); @@ -1975,7 +2043,8 @@ TEST_F(BucketCatalogTest, QueryBasedReopeningConflictsWithQueryBasedReopening) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"},"tag":"a"})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result1.getStatus()); auto* context = stdx::get_if(&result1.getValue()); ASSERT(context); @@ -1990,7 +2059,8 @@ TEST_F(BucketCatalogTest, QueryBasedReopeningConflictsWithQueryBasedReopening) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:50.000Z"},"tag":"a"})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result2.getStatus()); ASSERT(stdx::holds_alternative(result2.getValue())); } @@ -2006,7 +2076,8 @@ TEST_F(BucketCatalogTest, ReopeningConflictsWithPreparedBatch) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"},"tag":"b"})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result1.getStatus()); auto batch1 = stdx::get(result1.getValue()).batch; ASSERT(batch1); @@ -2023,7 +2094,8 @@ TEST_F(BucketCatalogTest, ReopeningConflictsWithPreparedBatch) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:45.000Z"},"tag":"b"})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result2.getStatus()); auto batch2 = stdx::get(result2.getValue()).batch; ASSERT(batch2); @@ -2038,7 +2110,8 @@ TEST_F(BucketCatalogTest, ReopeningConflictsWithPreparedBatch) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:50.000Z"},"tag":"b"})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result3.getStatus()); ASSERT(stdx::holds_alternative(result3.getValue())); } @@ -2055,7 +2128,8 @@ TEST_F(BucketCatalogTest, PreparingBatchConflictsWithQueryBasedReopening) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"},"tag":"c"})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result1->getStatus()); auto* context = stdx::get_if(&result1->getValue()); ASSERT(context); @@ -2069,7 +2143,8 @@ TEST_F(BucketCatalogTest, PreparingBatchConflictsWithQueryBasedReopening) { _getCollator(_ns1), _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-07-05T15:34:40.000Z"},"tag":"c"})"), - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result2.getStatus()); auto batch = stdx::get(result2.getValue()).batch; ASSERT(batch); @@ -2114,7 +2189,8 @@ TEST_F(BucketCatalogTest, ArchiveBasedReopeningConflictsWithArchiveBasedReopenin _getCollator(_ns1), options, doc, - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result1->getStatus()); auto* context = stdx::get_if(&result1->getValue()); ASSERT(context); @@ -2130,7 +2206,8 @@ TEST_F(BucketCatalogTest, ArchiveBasedReopeningConflictsWithArchiveBasedReopenin _getCollator(_ns1), options, doc, - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result2->getStatus()); ASSERT(stdx::holds_alternative(result2->getValue())); } @@ -2167,7 +2244,8 @@ TEST_F(BucketCatalogTest, _getCollator(_ns1), options, doc1, - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result1->getStatus()); auto* context1 = stdx::get_if(&result1->getValue()); ASSERT(context1); @@ -2191,7 +2269,8 @@ TEST_F(BucketCatalogTest, _getCollator(_ns1), options, doc2, - CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow, + AllowQueryBasedReopening::kAllow); ASSERT_OK(result2->getStatus()); auto* context2 = stdx::get_if(&result2->getValue()); ASSERT(context2); diff --git a/src/mongo/db/timeseries/bucket_catalog/execution_stats.cpp b/src/mongo/db/timeseries/bucket_catalog/execution_stats.cpp index fe1f30bdc6b..32c61b06128 100644 --- a/src/mongo/db/timeseries/bucket_catalog/execution_stats.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/execution_stats.cpp @@ -158,6 +158,16 @@ void ExecutionStatsController::incNumDuplicateBucketsReopened(long long incremen _globalStats.numDuplicateBucketsReopened.fetchAndAddRelaxed(increment); } +void ExecutionStatsController::incNumBucketDocumentsTooLargeInsert(long long increment) { + _collectionStats->numBucketDocumentsTooLargeInsert.fetchAndAddRelaxed(increment); + _globalStats.numBucketDocumentsTooLargeInsert.fetchAndAddRelaxed(increment); +} + +void ExecutionStatsController::incNumBucketDocumentsTooLargeUpdate(long long increment) { + _collectionStats->numBucketDocumentsTooLargeUpdate.fetchAndAddRelaxed(increment); + _globalStats.numBucketDocumentsTooLargeUpdate.fetchAndAddRelaxed(increment); +} + void appendExecutionStatsToBuilder(const ExecutionStats& stats, BSONObjBuilder& builder) { builder.appendNumber("numBucketInserts", stats.numBucketInserts.load()); builder.appendNumber("numBucketUpdates", stats.numBucketUpdates.load()); @@ -206,6 +216,11 @@ void appendExecutionStatsToBuilder(const ExecutionStats& stats, BSONObjBuilder& builder.appendNumber("numDuplicateBucketsReopened", stats.numDuplicateBucketsReopened.load()); } + + builder.appendNumber("numBucketDocumentsTooLargeInsert", + stats.numBucketDocumentsTooLargeInsert.load()); + builder.appendNumber("numBucketDocumentsTooLargeUpdate", + stats.numBucketDocumentsTooLargeUpdate.load()); } } // namespace mongo::timeseries::bucket_catalog diff --git a/src/mongo/db/timeseries/bucket_catalog/execution_stats.h b/src/mongo/db/timeseries/bucket_catalog/execution_stats.h index d62da07571d..cfa709e558e 100644 --- a/src/mongo/db/timeseries/bucket_catalog/execution_stats.h +++ b/src/mongo/db/timeseries/bucket_catalog/execution_stats.h @@ -62,6 +62,8 @@ struct ExecutionStats { AtomicWord numBucketQueriesFailed; AtomicWord numBucketReopeningsFailed; AtomicWord numDuplicateBucketsReopened; + AtomicWord numBucketDocumentsTooLargeInsert; + AtomicWord numBucketDocumentsTooLargeUpdate; }; class ExecutionStatsController { @@ -97,6 +99,8 @@ public: void incNumBucketQueriesFailed(long long increment = 1); void incNumBucketReopeningsFailed(long long increment = 1); void incNumDuplicateBucketsReopened(long long increment = 1); + void incNumBucketDocumentsTooLargeInsert(long long increment = 1); + void incNumBucketDocumentsTooLargeUpdate(long long increment = 1); private: std::shared_ptr _collectionStats; diff --git a/src/mongo/db/timeseries/timeseries_write_util.h b/src/mongo/db/timeseries/timeseries_write_util.h index 58d74da7ba4..2ddff7f5074 100644 --- a/src/mongo/db/timeseries/timeseries_write_util.h +++ b/src/mongo/db/timeseries/timeseries_write_util.h @@ -55,6 +55,11 @@ BSONObj makeNewDocumentForWrite( const boost::optional& comparator, boost::optional currentMinTime); +enum class BucketReopeningPermittance { + kAllowed, + kDisallowed, +}; + /** * Performs modifications atomically for a user command on a time-series collection. * Replaces the bucket document for a partial bucket modification and removes the bucket for a full