SERVER-107320 Remove dependency on PlanExecutorPipeline in aggregation_request_helper (#38386)

GitOrigin-RevId: 704bee95a2de0a5c2a8398873adee8c2cd885559
This commit is contained in:
Gil Alon 2025-07-14 09:27:12 -04:00 committed by MongoDB Bot
parent 7970ed6735
commit af07ed86d3
22 changed files with 40 additions and 41 deletions

View File

@ -47,6 +47,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/write_ops/write_ops_parsers.h"
#include "mongo/db/shard_role.h"
#include "mongo/db/tenant_id.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/util/assert_util.h"

View File

@ -35,6 +35,7 @@
#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/query/multiple_collection_accessor.h"
#include "mongo/db/read_concern.h"

View File

@ -186,6 +186,25 @@ bool checkRetryableWriteAlreadyApplied(const AggExState& aggExState,
return true;
}
PlanExecutorPipeline::ResumableScanType getResumableScanType(const AggregateCommandRequest& request,
bool isChangeStream) {
// $changeStream cannot be run on the oplog, and $_requestReshardingResumeToken can only be run
// on the oplog. An aggregation request with both should therefore never reach this point.
tassert(5353400,
"$changeStream can't be combined with _requestReshardingResumeToken: true",
!(isChangeStream && request.getRequestReshardingResumeToken()));
if (isChangeStream) {
return PlanExecutorPipeline::ResumableScanType::kChangeStream;
}
if (request.getRequestReshardingResumeToken()) {
return PlanExecutorPipeline::ResumableScanType::kOplogScan;
}
if (request.getRequestResumeToken()) {
return PlanExecutorPipeline::ResumableScanType::kNaturalOrderScan;
}
return PlanExecutorPipeline::ResumableScanType::kNone;
}
/**
* If a pipeline is empty (assuming that a $cursor stage hasn't been created yet), it could mean
* that we were able to absorb all pipeline stages and pull them into a single PlanExecutor. So,
@ -684,8 +703,7 @@ std::vector<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> prepareExecuto
execs.emplace_back(plan_executor_factory::make(
std::move(pipelineExpCtx),
std::move(pipelineIt),
aggregation_request_helper::getResumableScanType(aggExState.getRequest(),
aggExState.hasChangeStream())));
getResumableScanType(aggExState.getRequest(), aggExState.hasChangeStream())));
}
if (aggCatalogState.lockAcquired()) {

View File

@ -37,6 +37,7 @@
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/summation.h"
#include <climits>
#include <cmath>

View File

@ -46,6 +46,7 @@
#include "mongo/db/index_names.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/shard_id.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/chunk_manager.h"

View File

@ -194,25 +194,6 @@ void validateRequestFromClusterQueryWithoutShardKey(const AggregateCommandReques
}
}
PlanExecutorPipeline::ResumableScanType getResumableScanType(const AggregateCommandRequest& request,
bool isChangeStream) {
// $changeStream cannot be run on the oplog, and $_requestReshardingResumeToken can only be run
// on the oplog. An aggregation request with both should therefore never reach this point.
tassert(5353400,
"$changeStream can't be combined with _requestReshardingResumeToken: true",
!(isChangeStream && request.getRequestReshardingResumeToken()));
if (isChangeStream) {
return PlanExecutorPipeline::ResumableScanType::kChangeStream;
}
if (request.getRequestReshardingResumeToken()) {
return PlanExecutorPipeline::ResumableScanType::kOplogScan;
}
if (request.getRequestResumeToken()) {
return PlanExecutorPipeline::ResumableScanType::kNaturalOrderScan;
}
return PlanExecutorPipeline::ResumableScanType::kNone;
}
const mongo::OptionalBool& getFromRouter(const AggregateCommandRequest& request) {
// Check both fields because we cannot rely on the feature flag checks. An aggregate command
// with 'fromRouter' field set could be sent directly to an initial sync node with uninitialized

View File

@ -43,8 +43,8 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/exchange_spec_gen.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/legacy_runtime_constants_gen.h"
#include "mongo/db/pipeline/plan_executor_pipeline.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/version_context.h"
#include "mongo/db/write_concern_options.h"
@ -115,13 +115,6 @@ void validateRequestForAPIVersion(const OperationContext* opCtx,
*/
void validateRequestFromClusterQueryWithoutShardKey(const AggregateCommandRequest& request);
/**
* Returns the type of resumable scan required by this aggregation, if applicable. Otherwise returns
* ResumableScanType::kNone.
*/
PlanExecutorPipeline::ResumableScanType getResumableScanType(const AggregateCommandRequest& request,
bool isChangeStream);
// TODO SERVER-95358 remove once 9.0 becomes last LTS.
const mongo::OptionalBool& getFromRouter(const AggregateCommandRequest& request);

View File

@ -30,6 +30,7 @@
#pragma once
#include "mongo/base/status.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_rank_fusion_inputs_gen.h"
#include "mongo/db/pipeline/document_source_score_fusion_inputs_gen.h"

View File

@ -52,7 +52,9 @@
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/query/query_shape/serialization_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/shard_role.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/stdx/unordered_set.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
@ -70,10 +72,6 @@
namespace mongo {
class BSONObj;
// TODO SERVER-107320 remove 'CollectionOrViewAcquisition' and 'CollectionRoutingInfo'.
class CollectionOrViewAcquisition;
class CollectionRoutingInfo;
class OperationContext;
class Pipeline;
class PipelineDeleter;

View File

@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/query/count_command_gen.h"
#include "mongo/db/query/find_command.h"
namespace mongo {

View File

@ -39,6 +39,7 @@
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/parsed_distinct_command.h"
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/db/storage/storage_options.h"

View File

@ -27,6 +27,8 @@
* it in the license file.
*/
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/query/query_fcv_environment_for_test.h"

View File

@ -33,6 +33,7 @@
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/canonical_query_encoder.h"
#include "mongo/db/query/plan_cache/plan_cache_bm_fixture.h"
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/util/intrusive_counter.h"

View File

@ -33,6 +33,7 @@
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/canonical_query_encoder.h"
#include "mongo/db/query/plan_cache/plan_cache_bm_fixture.h"
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/util/intrusive_counter.h"

View File

@ -28,11 +28,10 @@
*/
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/query/query_request_helper.h"
#include "mongo/db/query/query_settings/query_settings_service.h"
#include "mongo/db/query/query_shape/find_cmd_shape.h"
#include "mongo/db/query/query_shape/query_shape.h"
#include "mongo/db/query/query_test_service_context.h"
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/platform/random.h"
#include "mongo/util/assert_util.h"

View File

@ -28,6 +28,7 @@
*/
#include "mongo/bson/json.h"
#include "mongo/db/query/query_request_helper.h"
#include "mongo/db/query/query_settings/query_settings_backfill.h"
#include "mongo/db/query/query_shape/find_cmd_shape.h"
#include "mongo/db/query/query_shape/shape_helpers.h"

View File

@ -38,9 +38,6 @@ mongo_cc_library(
srcs = [
"query_shape.cpp",
"shape_helpers.cpp",
"//src/mongo/db/pipeline:aggregation_request_helper.h",
"//src/mongo/db/pipeline:plan_executor_pipeline.h",
"//src/mongo/db/pipeline:plan_explainer_pipeline.h",
"//src/mongo/db/query:count_request.h",
"//src/mongo/db/query:projection_ast_util.h",
],

View File

@ -54,6 +54,7 @@
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/analyze_shard_key_util.h"
#include "mongo/db/s/ddl_lock_manager.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/write_concern_options.h"

View File

@ -41,6 +41,7 @@
#include "mongo/db/s/sharding_logging.h"
#include "mongo/db/s/sharding_recovery_service.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/shard_role.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/executor/async_rpc.h"
#include "mongo/s/request_types/sharded_ddl_commands_gen.h"

View File

@ -31,6 +31,7 @@
#include "mongo/base/error_codes.h"
#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/client.h"

View File

@ -90,9 +90,6 @@ mongo_cc_library(
"//src/mongo/db/commands/query_cmd:bulk_write_parser.h",
"//src/mongo/db/pipeline:aggregate_command_gen",
"//src/mongo/db/pipeline:aggregation_request_helper.h",
"//src/mongo/db/pipeline:explain_util.h",
"//src/mongo/db/pipeline:plan_executor_pipeline.h",
"//src/mongo/db/pipeline:plan_explainer_pipeline.h",
"//src/mongo/db/query:count_request.h",
"//src/mongo/db/repl:optime_with.h",
"//src/mongo/rpc:write_concern_error_detail.h",

View File

@ -33,6 +33,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/exec/agg/exec_pipeline.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"