7.0.16 release maintenance (#28907)

GitOrigin-RevId: 6170c0d97374089f5cad91aa4a03b1a690ad6276
This commit is contained in:
Matt Broadstone 2024-11-07 13:55:22 -08:00 committed by MongoDB Bot
parent 5d57590ea6
commit 1b713aa512
153 changed files with 2390 additions and 2670 deletions

4
.github/CODEOWNERS vendored
View File

@ -2,8 +2,10 @@
* @10gen/server-release
# Exclude some test files and READMEs from the backport approvals
/jstests/**/*
/etc/backports_required_for_multiversion_tests.yml
/etc/*.suppressions
/README.md
**/README.md
# Exclude all the tests under "jstests" directories from backport approvals
**/jstests/**/*

View File

@ -2002,26 +2002,6 @@ if env.get('ENABLE_OOM_RETRY'):
else:
env['OOM_RETRY_ATTEMPTS'] = 10
env['OOM_RETRY_MAX_DELAY_SECONDS'] = 120
if env.ToolchainIs('clang', 'gcc'):
env['OOM_RETRY_MESSAGES'] = [
': out of memory',
'virtual memory exhausted: Cannot allocate memory',
': fatal error: Killed signal terminated program cc1',
# TODO: SERVER-77322 remove this non memory related ICE.
r'during IPA pass: cp.+g\+\+: internal compiler error',
'ld terminated with signal 9',
]
elif env.ToolchainIs('msvc'):
env['OOM_RETRY_MESSAGES'] = [
'LNK1102: out of memory',
'C1060: compiler is out of heap space',
'c1xx : fatal error C1063: INTERNAL COMPILER ERROR',
r'LNK1171: unable to load mspdbcore\.dll',
"LNK1201: error writing to program database ''",
]
env['OOM_RETRY_RETURNCODES'] = [1102]
env.Tool('oom_auto_retry')
if env.ToolchainIs('clang'):

View File

@ -90,13 +90,13 @@ OS_DOCKER_LOOKUP = {
'rhel55': None,
'rhel57': None,
'rhel62': None,
'rhel70': ('centos:7', "yum",
'rhel70': ('registry.access.redhat.com/ubi7/ubi', "yum",
frozenset(["rh-python38.x86_64", "wget", "pkgconfig", "systemd", "procps", "file"]),
"/opt/rh/rh-python38/root/usr/bin/python3"),
'rhel71': ('centos:7', "yum",
'rhel71': ('registry.access.redhat.com/ubi7/ubi', "yum",
frozenset(["rh-python38.x86_64", "wget", "pkgconfig", "systemd", "procps", "file"]),
"/opt/rh/rh-python38/root/usr/bin/python3"),
'rhel72': ('centos:7', "yum",
'rhel72': ('registry.access.redhat.com/ubi7/ubi', "yum",
frozenset(["rh-python38.x86_64", "wget", "pkgconfig", "systemd", "procps", "file"]),
"/opt/rh/rh-python38/root/usr/bin/python3"),
'rhel8': ('almalinux:8', "yum",
@ -227,12 +227,6 @@ def run_test(test: Test, client: DockerClient) -> Tuple[Test, Result]:
commands: List[str] = ["export PYTHONIOENCODING=UTF-8"]
if test.os_name.startswith('rhel'):
if test.os_name.startswith('rhel7'):
# RHEL 7 needs the SCL installed for Python 3
commands += [
"yum -y install centos-release-scl",
"yum-config-manager --enable centos-sclo-rh",
]
# RHEL distros need EPEL for Compass dependencies
commands += [
"yum -y install yum-utils epel-release",
@ -258,9 +252,15 @@ def run_test(test: Test, client: DockerClient) -> Tuple[Test, Result]:
container: Container | None = None
try:
image = get_image(test, client)
container = client.containers.run(image, command=f"bash -c \"{join_commands(commands)}\"",
auto_remove=True, detach=True,
volumes=[f'{test_external_root}:{test_docker_root}'])
container = client.containers.run(
image, command=f"bash -c \"{join_commands(commands)}\"", auto_remove=True, detach=True,
volumes=[
f'{test_external_root}:{test_docker_root}',
'/etc/pki/entitlement/:/run/secrets/etc-pki-entitlement',
'/etc/rhsm:/run/secrets/rhsm',
'/etc/yum.repos.d/redhat.repo:/run/secrets/redhat.repo',
'/etc/yum.repos.d/redhat-rhsm.repo:/run/secrets/redhat-rhsm.repo'
])
for log in container.logs(stream=True):
result["log_raw"] += log.decode('UTF-8')
# This is pretty verbose, lets run this way for a while and we can delete this if it ends up being too much

View File

@ -26,25 +26,35 @@ if [[ "${machine}" = "Cygwin" ]]; then
echo "----------------------"
echo -e "\n=> Setting _NT_SOURCE_PATH environment variable for debuggers to pick up source files."
SRC_DIR_HASH=$(readlink -f /cygdrive/z/data/mci/source-*)
SRC_DIR="${SRC_DIR_HASH}/src"
SRC_DIR_HASH=$(find /cygdrive/z/data/mci -name 'source-*' | head -n 1 | xargs -I% basename %)
SRC_DIR="Z:\data\mci\\${SRC_DIR_HASH}\src"
echo "Source Path: [${SRC_DIR}]"
set -x;
setx _NT_SOURCE_PATH "${SRC_DIR}"
{ set +x; } 2>/dev/null
echo -e "\n=> Setting _NT_SYMBOL_PATH environment variable for debuggers to pick up the symbols."
DBG_SYMBOLS_HASH=$(find /cygdrive/z/data/mci -name 'artifacts-*dist_test_debug' | head -n 1 | xargs -I% basename %)
DBG_SYMBOLS_WINPATH="\"Z:\data\mci\\${DBG_SYMBOLS_HASH}\extracted_symbols\dist-test\bin;srv*;\""
DBG_ARCHIVE_PARENT=$(readlink -f /cygdrive/z/data/mci/artifacts-*dist_test_debug)
DBG_ARCHIVE=$(readlink -f ${DBG_ARCHIVE_PARENT}/debugsymbols-*.zip)
DBG_ARCHIVE_TARGET_PARENT="${DBG_ARCHIVE_PARENT}/extracted_symbols"
DBG_ARCHIVE_TARGET="${DBG_ARCHIVE_TARGET_PARENT}/dist-test/bin"
echo "Symbols Dir: [${DBG_ARCHIVE_TARGET}]"
echo -e "\n=> Copying .natvis file to Visual Studio's expected directories (System for 2017/2019 and User)."
set -x;
cp "/cygdrive/z/data/mci/${SRC_DIR_HASH}/buildscripts/win/mongodb.natvis" /cygdrive/c/Program\ Files\ \(x86\)/Microsoft\ Visual\ Studio/2017/Professional/Common7/Packages/Debugger/Visualizers
cp "/cygdrive/z/data/mci/${SRC_DIR_HASH}/buildscripts/win/mongodb.natvis" /cygdrive/c/Program\ Files\ \(x86\)/Microsoft\ Visual\ Studio/2019/Professional/Common7/Packages/Debugger/Visualizers
mkdir "${USERPROFILE}/Documents/Visual Studio 2022/Visualizers"
cp "/cygdrive/z/data/mci/${SRC_DIR_HASH}/buildscripts/win/mongodb.natvis" "${USERPROFILE}/Documents/Visual Studio 2022/Visualizers"
{ set +x; } 2>/dev/null
echo -e "\n=> Extracting Symbol files."
set -x;
mkdir -p ${DBG_ARCHIVE_TARGET_PARENT}
unzip -n ${DBG_ARCHIVE} -d ${DBG_ARCHIVE_TARGET_PARENT}
setx _NT_SYMBOL_PATH "${DBG_ARCHIVE_TARGET};srv*;"
setx _NT_SYMBOL_PATH "${DBG_SYMBOLS_WINPATH}"
{ set +x; } 2>/dev/null
echo -e "\n=> Extracting Core Dumps to Desktop."
@ -93,6 +103,7 @@ if [[ "${machine}" = "Cygwin" ]]; then
if [[ -z $(ls ${COREDUMP_ARCHIVE_TARGET}/dump* 2>/dev/null) ]]; then
echo "No core dumps found."
fi
{ set +x; } 2>/dev/null
echo "Copied to Desktop."
} &> ${out_dir}

View File

@ -9,6 +9,8 @@
https://github.com/KindDragon/CPPDebuggerVisualizers
To load in Visual Studio, the natvis file must be in the Visual Studio project.
Note: with buildscripts/setup_spawnhost_coredump, this file should be automatically placed in the user-specific Natvis directory.
https://learn.microsoft.com/en-us/visualstudio/debugger/create-custom-views-of-native-objects?view=vs-2022#BKMK_natvis_location
To load in WinDBG, run ".nvload mongdb.natvis"
-->
<Type Name="mongo::Status">

View File

@ -553,6 +553,10 @@ last-continuous:
ticket: SERVER-93099
- test_file: jstests/core/timeseries/timeseries_update_mixed_schema_bucket.js
ticket: SERVER-93099
- test_file: jstests/sharding/refresh_sessions.js
ticket: SERVER-94635
- test_file: jstests/core/query/project/projection_with_hashed_index.js
ticket: SERVER-91757
suites: null
last-lts:
all:
@ -1158,4 +1162,8 @@ last-lts:
ticket: SERVER-93099
- test_file: jstests/core/timeseries/timeseries_update_mixed_schema_bucket.js
ticket: SERVER-93099
- test_file: jstests/sharding/refresh_sessions.js
ticket: SERVER-94635
- test_file: jstests/core/query/project/projection_with_hashed_index.js
ticket: SERVER-91757
suites: null

View File

@ -285,7 +285,7 @@ variables:
- name: unittest_shell_hang_analyzer_gen
- name: test_packages
distros:
- ubuntu2004-package
- rhel94-large-packagetest
- name: vector_search
- name: vector_search_auth
- name: vector_search_ssl
@ -794,7 +794,7 @@ buildvariants:
display_name: "* macOS DEBUG"
cron: "0 */4 * * *" # From the ${project_required_suggested_cron} parameter
run_on:
- macos-1100
- macos-11
expansions: &macos-debug-expansions
compile_variant: *macos-debug-suggested
test_flags: --excludeWithAnyTags=incompatible_with_macos --enableEnterpriseTests=off
@ -830,7 +830,7 @@ buildvariants:
display_name: "Enterprise macOS Via Rosetta 2"
cron: "0 4 * * *" # From the ${project_nightly_cron} parameter.
run_on:
- macos-1100-arm64
- macos-11-arm64
expansions:
compile_variant: *enterprise-macos-rosetta-2
test_flags: --excludeWithAnyTags=incompatible_with_macos,requires_gcm
@ -861,7 +861,7 @@ buildvariants:
display_name: "Enterprise macOS DEBUG"
cron: "0 4 * * *" # From the ${project_nightly_cron} parameter.
run_on:
- macos-1100
- macos-11
expansions:
compile_variant: *enterprise-macos
test_flags: --excludeWithAnyTags=incompatible_with_macos,requires_gcm
@ -892,7 +892,7 @@ buildvariants:
- name: &enterprise-macos-arm64 enterprise-macos-arm64
display_name: "~ Enterprise macOS arm64"
run_on:
- macos-1100-arm64
- macos-11-arm64
expansions:
compile_variant: *enterprise-macos-arm64
test_flags: --excludeWithAnyTags=incompatible_with_macos,requires_gcm
@ -1623,7 +1623,7 @@ buildvariants:
cron: "0 4 * * *" # From the ${project_nightly_cron} parameter.
stepback: false
run_on:
- macos-1100
- macos-11
expansions:
num_scons_link_jobs_available: 0.1
compile_env: DEVELOPER_DIR=/Applications/Xcode13.app

View File

@ -70,7 +70,7 @@ buildvariants:
- name: unittest_shell_hang_analyzer_gen
- name: test_packages
distros:
- ubuntu2004-package
- rhel94-large-packagetest
- name: vector_search
- name: vector_search_auth
- name: vector_search_ssl

View File

@ -1047,7 +1047,7 @@ buildvariants:
- name: .stitch
- name: test_packages
distros:
- ubuntu2004-package
- rhel94-large-packagetest
- name: selinux_rhel7_org
- name: .publish
- name: generate_buildid_to_debug_symbols_mapping
@ -2558,7 +2558,7 @@ buildvariants:
display_name: macOS
cron: "0 4 * * *" # From the ${project_nightly_cron} parameter.
run_on:
- macos-1100
- macos-11
expansions:
test_flags: --excludeWithAnyTags=incompatible_with_macos,requires_external_data_source,requires_latch_analyzer --enableEnterpriseTests=off
push_path: osx
@ -2594,7 +2594,7 @@ buildvariants:
display_name: macOS arm64
cron: "0 4 * * *" # From the ${project_nightly_cron} parameter.
run_on:
- macos-1100-arm64
- macos-11-arm64
expansions:
test_flags: --excludeWithAnyTags=incompatible_with_macos,requires_external_data_source,requires_latch_analyzer --enableEnterpriseTests=off
push_path: osx
@ -2630,7 +2630,7 @@ buildvariants:
display_name: Enterprise macOS
cron: "0 4 * * *" # From the ${project_nightly_cron} parameter.
run_on:
- macos-1100
- macos-11
expansions:
test_flags: --excludeWithAnyTags=incompatible_with_macos,requires_gcm,requires_external_data_source,requires_latch_analyzer
additional_package_targets: >-
@ -2668,7 +2668,7 @@ buildvariants:
display_name: Enterprise macOS arm64
cron: "0 4 * * *" # From the ${project_nightly_cron} parameter.
run_on:
- macos-1100-arm64
- macos-11-arm64
expansions:
test_flags: --excludeWithAnyTags=incompatible_with_macos,requires_gcm,requires_external_data_source,requires_latch_analyzer
additional_package_targets: >-

View File

@ -18,7 +18,7 @@ buildvariants:
display_name: "Ninja Build: macOS Enterprise"
cron: "0 4 * * *" # From the ${project_nightly_cron} parameter.
run_on:
- macos-1100
- macos-11
expansions:
compile_env: DEVELOPER_DIR=/Applications/Xcode13.app
compile_flags: --ssl -j$(sysctl -n hw.logicalcpu) --libc++ --variables-files=etc/scons/xcode_macosx.vars
@ -31,7 +31,7 @@ buildvariants:
display_name: "Ninja Build Profiles: macOS"
cron: "0 4 * * *" # From the ${project_nightly_cron} parameter.
run_on:
- macos-1100
- macos-11
expansions:
compile_env: DEVELOPER_DIR=/Applications/Xcode13.app
compile_flags: --ssl -j$(sysctl -n hw.logicalcpu) --libc++
@ -45,7 +45,7 @@ buildvariants:
display_name: "Ninja Build Profiles: macOS ARM"
cron: "0 4 * * *" # From the ${project_nightly_cron} parameter.
run_on:
- macos-1100-arm64
- macos-11-arm64
expansions:
compile_env: DEVELOPER_DIR=/Applications/Xcode13.app
compile_flags: --ssl -j$(sysctl -n hw.logicalcpu) --libc++

View File

@ -1,4 +1,8 @@
# Build variant to generate tasks for evergreen versions.
#
# Updates to this file may also need to appear in etc/system_perf_yml_components/variants/task_generation.yml,
# which is the same but excludes resmoke task generation tasks.
#
buildvariants:
- name: generate-tasks-for-version

View File

@ -2,7 +2,8 @@
pypiwin32>=223; sys_platform == "win32" and python_version > "3"
pywin32>=225; sys_platform == "win32" and python_version > "3"
cryptography == 2.3; platform_machine == "s390x" or platform_machine == "ppc64le" # Needed for oauthlib to use RSAAlgorithm # Version locked - see SERVER-36618
cryptography == 2.3; (platform_machine == "s390x" and platform_release < "5.14.0-362.8.1.el9_3.s390x") or (platform_machine == "ppc64le" and platform_release < "5.14.0-362.13.1.el9_3.ppc64le")
cryptography == 36.0.2; (platform_machine == "s390x" and platform_release >= "5.14.0-362.8.1.el9_3.s390x") or (platform_machine == "ppc64le" and platform_release >= "5.14.0-362.13.1.el9_3.ppc64le")
cryptography == 36.0.2; platform_machine != "s390x" and platform_machine != "ppc64le"
mongo-ninja-python == 1.11.1.5; (platform_machine == "x86_64" or platform_machine == "aarch64") and sys_platform == "linux"

File diff suppressed because it is too large Load Diff

View File

@ -1,252 +0,0 @@
functions:
f_dsi_run_workload: &dsi_run_func
- command: timeout.update
params:
exec_timeout_secs: ${exec_timeout_secs_override}
timeout_secs: ${timeout_secs_override}
- command: git.get_project
params:
directory: src/mongo
clone_depth: 1000
revisions:
dsi: ${dsi_rev}
genny: ${genny_rev}
linkbench: ${linkbench_rev}
linkbench2: ${linkbench2_rev}
tsbs: ${tsbs_rev}
workloads: ${workloads_rev}
YCSB: ${YCSB_rev}
flamegraph: ${flamegraph_rev}
PrivateWorkloads: ${PrivateWorkloads_rev}
- command: expansions.write
params:
file: ./expansions.yml
redacted: true
- command: shell.exec
params:
script: ./src/dsi/run-dsi run_workload
- command: shell.exec
type: system
params:
script: ./src/dsi/run-dsi determine_failure -m SYSTEM
- command: shell.exec
type: setup
params:
script: ./src/dsi/run-dsi determine_failure -m SETUP
- command: shell.exec
type: test
params:
script: ./src/dsi/run-dsi determine_failure -m TEST
f_run_dsi_workload: *dsi_run_func # Do not use this function. It is deprecated.
## DSI_SELFTEST ##
dsi_selftest_setup_tests:
- command: git.get_project
params:
directory: src/dsi
dsi_selftest_check_python_formatting:
- command: shell.exec
type: test
params:
script: ./src/dsi/run-dsi selftest testscripts/check-format-python.sh
dsi_selftest_lint_python_scripts:
- command: shell.exec
type: test
params:
script: ./src/dsi/run-dsi selftest testscripts/lint-python.sh
dsi_selftest_lint_yml:
- command: shell.exec
type: test
params:
script: ./src/dsi/run-dsi selftest testscripts/lint-yml.sh
dsi_selftest_pytest:
- command: shell.exec
type: test
params:
script: ./src/dsi/run-dsi selftest testscripts/pytest.sh
dsi_selftest_mypy:
- command: shell.exec
type: test
params:
script: ./src/dsi/run-dsi selftest testscripts/mypy.sh
dsi_selftest_shellcheck:
- command: shell.exec
type: test
params:
script: ./src/dsi/run-dsi selftest testscripts/lint-shell.sh
dsi_selftest_e2e:
- command: shell.exec
type: test
params:
script: ./src/dsi/run-dsi e2e_test
tasks:
###
# Same in every DSI project
- name: renew_ssl_cert
commands:
- command: git.get_project
params:
directory: src/mongo
revisions:
dsi: ${dsi_rev}
# Run the script to generate ssl cert files
- command: shell.exec
params:
script: AWS_ACCESS_KEY_ID=${terraform_key} AWS_SECRET_ACCESS_KEY=${terraform_secret} ./src/dsi/run-dsi generate_ssl_cert
# Upload files for further DSI usage
- command: s3.put
params:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: member.pem
# path to the remote file is intended to be static
remote_file: dsi/ssl/member.pem
bucket: mciuploads
# the visibility has to be public for consumption by DSI
permissions: public-read
content_type: text/plain
display_name: member.pem
- command: s3.put
params:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: root.crt
# path to the remote file is intended to be static
remote_file: dsi/ssl/root.crt
bucket: mciuploads
# the visibility has to be public for consumption by DSI
permissions: public-read
content_type: text/plain
display_name: root.crt
- name: ycsb.2023-09
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "ycsb.2023-09"
- name: ycsb_60GB.2023-09
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "ycsb-60GB.2023-09"
- name: ycsb_60GB.long.2023-09
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "ycsb-60GB.long.2023-09"
- name: ycsb_secondary_reads.2023-09
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "ycsb-secondary-reads.2023-09"
- name: ycsb_w1.2023-09
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "ycsb-w1.2023-09"
- name: ycsb_stepdowns.2023-09
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "ycsb_stepdowns.2023-09"
- name: ycsb_rolling_restarts.2023-09
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "ycsb_rolling_restarts.2023-09"
- name: ycsb_non_retryable_writes_stepdowns.2023-09
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "ycsb_non_retryable_writes_stepdowns.2023-09"
- name: ycsb_non_retryable_writes_rolling_restarts.2023-09
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "ycsb_non_retryable_writes_rolling_restarts.2023-09"
- name: locust_bulk_insert
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "locust_bulk_insert"
- name: tsbs_query_fixed_bucketing
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "tsbs_query_fixed_bucketing"
- name: tsbs-expression-query
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "tsbs_query_genny"
test_control_params: |
{task_name: tsbs_expression_query,
config_filename: ./src/genny/dist/etc/genny/workloads/query/TimeseriesTsbsExpressionQuery.yml}
- name: startup
priority: 5
commands:
- func: f_dsi_run_workload
vars:
test_control: "startup"
- name: dbcheck
priority: 5
exec_timeout_secs: 43200 # 12 hours
commands:
- func: f_dsi_run_workload
timeout_secs: 43200 # 12 hours
vars:
test_control: "dbcheck"
additional_tfvars: "tags: {expire-on-delta: 12}" # increase host expiration to 12 hours.
- name: genny_resharding_withIndexes
exec_timeout_secs: 172800 # 2 days
commands:
- func: f_dsi_run_workload
vars:
test_control: auto_genny_workload
auto_workload_path: ./src/genny/dist/etc/genny/workloads/sharding/ReshardCollectionWithIndexes.yml
- name: tie-breaking-heuristics
commands:
- func: f_dsi_run_workload
vars:
test_control: mongo-perf.2023-02
test_control_params: |
{include_filter_1: tie-breaking,
include_filter_2: core regression,
exclude_filter: single_threaded,
threads: "1 2 4 8",
read_cmd: 'true'}
- name: stream_workloads
commands:
- func: f_dsi_run_workload
vars:
test_control: streams.2023-10

View File

@ -1,407 +0,0 @@
variables:
- &amazon2_x86_compile_variant_dependency
depends_on:
- name: archive_dist_test
variant: amazon2-x86-compile
- name: package_supplementary_data
variant: amazon2-x86-compile
- &amazon_linux2_arm64_compile_variant_dependency
depends_on:
- name: archive_dist_test
variant: amazon2-arm64-compile
- name: package_supplementary_data
variant: amazon2-arm64-compile
- &amazon_linux2_arm64_mongocrypt_compile_variant_dependency
depends_on:
- name: archive_dist_test
variant: amazon2-x86-compile
- name: package_supplementary_data
variant: amazon2-arm64-compile
- name: crypt_create_lib
variant: amazon2-arm64-mongocrypt-shlib-compile
- &schedule_variant_auto_tasks_task
name: schedule_variant_auto_tasks
activate: false
depends_on:
- name: schedule_global_auto_tasks
variant: task_generation
- &schedule_patch_auto_tasks_task
name: schedule_patch_auto_tasks
activate: false
depends_on:
- name: schedule_global_auto_tasks
variant: task_generation
- modules: &perf_modules
- dsi
- genny
- workloads
- linkbench
- linkbench2
- tsbs
- mongo-perf
- YCSB
- PrivateWorkloads
- py-tpcc
- flamegraph
buildvariants:
- <<: *amazon_linux2_arm64_compile_variant_dependency
name: perf-atlas-M60-real.arm.aws.2023-11
display_name: PERF M60-Atlas ReplSet ARM AWS 2023-11
cron: "0 0 * * 0,4" # 00:00 on Sunday, Thursday
modules: *perf_modules
expansions:
mongodb_setup_release: 2022-11
mongodb_setup: atlas
canaries: none
atlas_setup: M60-repl
use_custom_build: true
infrastructure_provisioning: workload_client_arm.2023-04
infrastructure_provisioning_release: 2023-09
workload_setup: 2022-11
platform: linux
project_dir: &perf_project_dir dsi
storageEngine: wiredTiger
compile_variant: amazon2-arm64-compile
use_supplementary_data: true
run_on:
- "rhel94-perf-atlas-large"
tasks:
- *schedule_patch_auto_tasks_task
- *schedule_variant_auto_tasks_task
- name: ycsb.2023-09
- name: ycsb_60GB.2023-09
- name: tpcc
- name: tpcc_majority
- name: linkbench
- name: linkbench2
- <<: *amazon2_x86_compile_variant_dependency
name: perf-atlas-M60-real.intel.azure.2023-11
display_name: PERF M60-Atlas ReplSet Intel Azure 2023-11
cron: "0 0 * * 0,4" # 00:00 on Sunday, Thursday
modules: *perf_modules
expansions:
mongodb_setup_release: 2022-11
mongodb_setup: atlas
canaries: none
atlas_setup: M60-repl-azure
use_custom_build_azure: true
compile_variant: amazon2-x86-compile
use_supplementary_data: true
infrastructure_provisioning: workload_client_intel.2023-11
infrastructure_provisioning_release: 2023-09
workload_setup: 2022-11
platform: linux
project_dir: *perf_project_dir
storageEngine: wiredTiger
run_on:
- "rhel94-perf-atlas-large"
tasks: # Cannot use *3nodetasks because secondary_performance uses a special mongodb setup
- *schedule_patch_auto_tasks_task
- *schedule_variant_auto_tasks_task
- name: ycsb.2023-09
- name: ycsb_60GB.2023-09
- name: tpcc
- name: tpcc_majority
- name: linkbench
- name: linkbench2
- <<: *amazon_linux2_arm64_compile_variant_dependency
name: perf-3-shard.arm.aws.2023-11
display_name: PERF 3-Shard Cluster ARM AWS 2023-11
cron: "0 0 * * 4" # 00:00 on Thursday
modules: *perf_modules
expansions:
mongodb_setup_release: 2022-11
mongodb_setup: shard
infrastructure_provisioning_release: 2023-09
infrastructure_provisioning: shard
workload_setup: 2022-11
platform: linux
project_dir: *perf_project_dir
authentication: enabled
storageEngine: wiredTiger
compile_variant: amazon2-arm64-compile
use_supplementary_data: true
run_on:
- "rhel94-perf-shard"
tasks:
- *schedule_patch_auto_tasks_task
- *schedule_variant_auto_tasks_task
- name: ycsb.2023-09
- name: ycsb_w1.2023-09
- name: crud_workloads_majority
- name: crud_workloads_w1
- name: misc_workloads
- name: map_reduce_workloads
- name: smoke_test
- name: mongos_workloads
- name: mongos_large_catalog_workloads
- name: move_chunk_workloads
- name: change_streams_latency
- name: change_streams_listen_throughput
- name: change_streams_multi_mongos
- name: tsbs_query_sharded
- name: tsbs_query_finance_sharded
- name: tsbs_query_sharded_balancer
- name: tsbs_query_finance_sharded_balancer
- <<: *amazon_linux2_arm64_compile_variant_dependency
name: perf-3-node-replSet.arm.aws.2023-11
display_name: PERF 3-Node ReplSet ARM AWS 2023-11
cron: "0 0 * * 1,2,3,4,5,6" # Everyday except Sunday at 00:00
modules: *perf_modules
expansions:
mongodb_setup_release: 2022-11
mongodb_setup: replica
infrastructure_provisioning_release: 2023-09
infrastructure_provisioning: replica
workload_setup: 2022-11
platform: linux
project_dir: *perf_project_dir
authentication: enabled
storageEngine: wiredTiger
compile_variant: amazon2-arm64-compile
use_supplementary_data: true
run_on:
- "rhel94-perf-replset"
tasks:
- *schedule_patch_auto_tasks_task
- *schedule_variant_auto_tasks_task
- name: ycsb.2023-09
- name: ycsb_w1.2023-09
- name: ycsb_60GB.2023-09
- name: ycsb.load
- name: ycsb_60GB.long.2023-09
- name: ycsb_secondary_reads.2023-09
- name: crud_workloads_majority
- name: crud_workloads_w1
- name: misc_workloads
- name: map_reduce_workloads
- name: refine_shard_key_transaction_stress
- name: smoke_test
- name: secondary_performance # Uses a special 2 node mongodb setup
- name: non_sharded_workloads
- name: bestbuy_agg
- name: bestbuy_agg_merge_different_db
- name: bestbuy_agg_merge_same_db
- name: bestbuy_agg_merge_wordcount
- name: bestbuy_query
- name: change_streams_preimage_throughput
- name: change_streams_latency
- name: change_streams_preimage_latency
- name: change_streams_listen_throughput
- name: snapshot_reads
- name: secondary_reads
- name: tpcc
- name: tpcc_majority
- name: tpch_1_normalized
- name: tpch_1_denormalized
# TODO: Enable in SERVER-66572.
# - name: tpch_10_normalized
# - name: tpch_10_denormalized
- name: linkbench
- name: linkbench2
- name: tsbs_load
- name: tsbs_query
- name: tsbs_query_finance
- name: tsbs_query_manual_bucketing
- name: tsbs-query-genny
- name: tsbs-query-optimizations
- name: tsbs-expression-query
- name: big_update_10k
- name: mixed_workloads_genny_rate_limited_high_value
- name: load_test_high_value
- name: majority_reads10_k_threads_high_value
- name: large_indexed_ins_high_value
- name: expressive_queries_high_value
- name: time_series_sort_high_value
- <<: *amazon2_x86_compile_variant_dependency
name: perf-3-node-replSet-intel.intel.aws.2023-11
display_name: PERF 3-Node ReplSet Intel AWS 2023-11
cron: "0 0 * * 1,2,3,4,5,6" # Everyday except Sunday at 00:00
modules: *perf_modules
expansions:
mongodb_setup_release: 2022-11
mongodb_setup: replica
infrastructure_provisioning_release: 2023-09
infrastructure_provisioning: replica-intel.2023-11
workload_setup: 2022-11
platform: linux
project_dir: *perf_project_dir
authentication: enabled
storageEngine: wiredTiger
compile_variant: amazon2-x86-compile
use_supplementary_data: true
run_on:
- "rhel94-perf-replset"
tasks:
- *schedule_patch_auto_tasks_task
- *schedule_variant_auto_tasks_task
- name: ycsb.2023-09
- name: ycsb_60GB.2023-09
- name: crud_workloads_majority
- name: smoke_test
- name: linkbench
- name: linkbench2
- name: mixed_workloads_genny_rate_limited_high_value
- name: tsbs-expression-query
# On PERF-730 we changed the initial sync tests to use two nodes instead of three. To avoid
# losing history, the name remains unchanged, but the display_name reflects the change to 2-Node.
- <<: *amazon_linux2_arm64_compile_variant_dependency
name: perf-2-node-replSet-initialsync.arm.aws.2023-11
display_name: PERF 2-Node ReplSet Initial Sync ARM AWS 2023-11
cron: "0 0 * * 4" # 00:00 on Thursday
modules: *perf_modules
expansions:
mongodb_setup_release: 2022-11
mongodb_setup: replica-2node
infrastructure_provisioning_release: 2023-09
infrastructure_provisioning: replica-2node
workload_setup: 2022-11
platform: linux
authentication: disabled
storageEngine: wiredTiger
compile_variant: amazon2-arm64-compile
use_supplementary_data: true
project_dir: *perf_project_dir
run_on:
- "rhel94-perf-replset"
tasks:
- *schedule_patch_auto_tasks_task
- *schedule_variant_auto_tasks_task
- name: initialsync-large
- name: initialsync-large-fcbis
- <<: *amazon_linux2_arm64_mongocrypt_compile_variant_dependency
name: perf-shard-lite-fle.arm.aws.2023-11
display_name: PERF Shard Lite FLE ARM AWS 2023-11
cron: "0 0 * * 0,4" # 00:00 on Sunday,Thursday
modules: *perf_modules
expansions:
mongodb_setup_release: 2022-11
mongodb_setup: shard-lite-fle
infrastructure_provisioning_release: 2023-09
infrastructure_provisioning: shard-lite
workload_setup: 2022-11
platform: linux
project_dir: *perf_project_dir
authentication: enabled
storageEngine: wiredTiger
compile_variant: amazon2-arm64-compile
use_supplementary_data: true
shlib_compile_variant: amazon2-arm64-mongocrypt-shlib-compile
mongocrypt_shlib_required: true
run_on:
- "rhel94-perf-shard-lite"
tasks:
- *schedule_patch_auto_tasks_task
- *schedule_variant_auto_tasks_task
- name: medical_workload_diagnosis_50_50_high_value
- name: ycsb_like_queryable_encrypt1_cfdefault_high_value
- <<: *amazon_linux2_arm64_compile_variant_dependency
name: perf-mongo-perf-standalone.arm.aws.2023-11
display_name: PERF Mongo-Perf Standalone inMemory ARM AWS 2023-11
cron: &linux-microbench-cron "0 0 * * *" # Everyday at 00:00
modules: *perf_modules
expansions:
mongodb_setup_release: 2022-11
mongodb_setup: mongo-perf-standalone.2023-02
infrastructure_provisioning_release: 2023-09
infrastructure_provisioning: workload_client_mongod_combined.2023-01
workload_setup: 2022-11
use_scons_cache: true
platform: linux
canaries: none
storageEngine: inMemory
project_dir: *perf_project_dir
compile_variant: amazon2-arm64-compile
use_supplementary_data: true
run_on:
- "rhel94-perf-microbenchmarks"
tasks:
- name: big_collection
- name: genny_scale_InsertRemove
- name: genny_execution_UserAcquisition
- name: aggregation_read_commands
- name: aggregation_read_commands_large_dataset
- name: agg-query-comparison_read_commands
- name: query_read_commands
- name: query_read_commands_large_dataset
- name: views-aggregation
- name: views-query
- name: where_read_commands
- name: update_read_commands
- name: insert_read_commands
- name: wildcard-index-read_read_commands
- name: wildcard-index-write_read_commands
- name: geo_read_commands
- name: misc_read_commands
- name: misc_custom_filter_default_read_commands
- name: misc_custom_filter_slow_or_sample_read_commands
- name: misc_custom_filter_complex_read_commands
- name: misc_custom_filter_whole_doc_read_commands
- name: misc_slowms_everything_read_commands
- name: singleThreaded_read_commands
- name: pipeline-updates
- name: javascript
- name: compound_wildcard_index_write_commands
- name: compound_wildcard_index_read_commands
- <<: *amazon2_x86_compile_variant_dependency
name: perf-mongo-perf-standalone.intel.aws.2023-11
display_name: PERF Mongo-Perf Standalone inMemory Intel AWS 2023-11
cron: *linux-microbench-cron
modules: *perf_modules
expansions:
mongodb_setup_release: 2022-11
mongodb_setup: mongo-perf-standalone.2023-02
infrastructure_provisioning_release: 2023-09
infrastructure_provisioning: workload_client_mongod_combined_intel.2023-11
workload_setup: 2022-11
use_scons_cache: true
platform: linux
canaries: none
storageEngine: inMemory
project_dir: *perf_project_dir
compile_variant: amazon2-x86-compile
use_supplementary_data: true
run_on:
- "rhel94-perf-microbenchmarks"
tasks:
- name: big_collection
- name: genny_scale_InsertRemove
- name: genny_execution_UserAcquisition
- name: aggregation_read_commands
- name: aggregation_read_commands_large_dataset
- name: agg-query-comparison_read_commands
- name: query_read_commands
- name: query_read_commands_large_dataset
- name: views-aggregation
- name: views-query
- name: where_read_commands
- name: update_read_commands
- name: insert_read_commands
- name: wildcard-index-read_read_commands
- name: wildcard-index-write_read_commands
- name: geo_read_commands
- name: misc_read_commands
- name: misc_custom_filter_default_read_commands
- name: misc_custom_filter_slow_or_sample_read_commands
- name: misc_custom_filter_complex_read_commands
- name: misc_custom_filter_whole_doc_read_commands
- name: misc_slowms_everything_read_commands
- name: singleThreaded_read_commands
- name: pipeline-updates
- name: javascript
- name: compound_wildcard_index_write_commands
- name: compound_wildcard_index_read_commands

View File

@ -0,0 +1,15 @@
# Replacement for generate-tasks-for-version from evergreen_yml_components.
#
# This is similar to generate-tasks-for-version in evergreen_yml_components,
# but doesn't include actual task generation, which does nothing
# for sys-perf, and breaks when it tries to process the sys-perf project.
#
buildvariants:
- name: generate-tasks-for-version
display_name: "! Generate tasks for evergreen version"
activate: true
run_on:
- rhel80-medium
tasks:
- name: version_expansions_gen

View File

@ -1,7 +1,7 @@
set -o errexit
cd src
git clone --branch=v0.2.0-jepsen-mongodb-master --depth=1 git@github.com:10gen/jepsen.git jepsen-mongodb
git clone --branch=v0.3.0-jepsen-mongodb-master --depth=1 git@github.com:10gen/jepsen.git jepsen-mongodb
cd jepsen-mongodb
lein install

View File

@ -43,5 +43,5 @@ function activate_venv {
fi
export PIP_CACHE_DIR=${workdir}/pip_cache
echo "python set to $(which $python)"
echo "python set to $(which $python) and python version: $($python --version)"
}

View File

@ -0,0 +1,215 @@
// Tests the validation logic for combinations of "collectionless" stages like $documents with or
// around sub-pipelines. For the cases that should be legal, we mostly just care the the command
// succeeds. However, we will use 'resultsEq' to test correct semantics while we are here, gaining
// more coverage.
// TODO SERVER-94226 consider extending this test to cases like $currentOp and $queryStats as well.
// This test uses stages like $documents which are not permitted inside a $facet stage.
// @tags: [do_not_wrap_aggregations_in_facets]
(function() {
"use strict";
load("jstests/aggregation/extras/utils.js"); // For 'resultsEq.'
load("jstests/libs/fixture_helpers.js"); // For 'isMongos.'
const coll = db[jsTestName()];
coll.drop();
const targetCollForMerge = db["target_coll"];
targetCollForMerge.drop();
assert.commandWorked(coll.insert({_id: 0, arr: [{}, {}]}));
{
// Tests for an aggregation over a collection (i.e. {aggregate: "collName"} commands) with a
// $documents stage used in a sub-pipeline. Each of these cases should be legal, which is most
// of the value of the assertion. We will use 'resultsEq' to test correct semantics while we are
// here.
// $lookup.
assert(resultsEq(coll.aggregate([
{$lookup: {
let: {documents: "$arr"},
pipeline: [
{$documents: "$$documents"},
],
as: "duplicated"
}},
]).toArray(), [{_id: 0, arr: [{}, {}], duplicated: [{}, {}]}]));
// $unionWith.
assert(resultsEq(coll.aggregate([
{
$unionWith: {
pipeline: [
{$documents: [{_id: "gen"}]},
],
}
},
])
.toArray(),
[{_id: 0, arr: [{}, {}]}, {_id: "gen"}]));
// Both, and more nesting.
assert(resultsEq(coll.aggregate([{
$unionWith: {
coll: coll.getName(),
pipeline: [{
$lookup: {
pipeline: [
{$documents: []},
{$unionWith: {coll: coll.getName(), pipeline: []}}
],
as: "nest"
}
}]
}
}])
.toArray(),
[
{_id: 0, arr: [{}, {}]},
{_id: 0, arr: [{}, {}], nest: [{_id: 0, arr: [{}, {}]}]}
]));
}
{
// Tests for a db-level aggregate (i.e. {aggregate: 1} commands) with sub-pipelines on regular
// collections.
// $facet
assert(resultsEq(db.aggregate([
{
$documents: [
{x: 1, y: 1, val: 1},
{x: 2, y: 2, val: 1},
{x: 3, y: 1, val: 2},
{x: 2, y: 2, val: 1}
]
},
{
$facet: {
sumByX: [{$group: {_id: "$x", sum: {$sum: "$val"}}}],
sumByY: [{$group: {_id: "$y", sum: {$sum: "$val"}}}]
}
}
]).toArray(),
[{
sumByX: [{_id: 1, sum: 1}, {_id: 2, sum: 2}, {_id: 3, sum: 2}],
sumByY: [{_id: 1, sum: 3}, {_id: 2, sum: 2}]
}]));
if (!FixtureHelpers.isMongos(db)) {
// This doesn't work on mongos in v7.0 and earlier - waiting for SERVER-65534.
// $lookup.
assert(resultsEq(db.aggregate([
{$documents: [{x: 1, arr: [{x: 2}]}, {y: 1, arr: []}]},
{$lookup: {
let: {documents: "$arr"},
pipeline: [
{$documents: "$$documents"},
],
as: "duplicated"
}},
]).toArray(),
[
{x: 1, arr: [{x: 2}], duplicated: [{x: 2}]},
{y: 1, arr: [], duplicated: []}
]));
// $merge.
assert.doesNotThrow(() => db.aggregate([
{
$documents: [
{_id: 2, x: "foo"},
{_id: 4, x: "bar"},
]
},
{
$merge: {
into: targetCollForMerge.getName(),
on: "_id",
whenMatched: [{$set: {x: {$setUnion: ["$x", "$$new.x"]}}}]
}
}
]));
assert(resultsEq(targetCollForMerge.find({}, {_id: 1}).toArray(), [{_id: 2}, {_id: 4}]));
// $unionWith
assert(resultsEq(db.aggregate([
{$documents: [{_id: 2}, {_id: 4}]},
{$unionWith: {coll: coll.getName(), pipeline: []}}
]).toArray(),
[{_id: 2}, {_id: 4}, {_id: 0, arr: [{}, {}]}]));
// All of the above, plus nesting.
const results =
db.aggregate([
{$documents: [{_id: "first"}]},
{
$unionWith: {
pipeline: [
{$documents: [{_id: "uw"}]},
{$unionWith: {pipeline: [{$documents: [{_id: "uw_2"}]}]}},
{
$facet: {
allTogether:
[{$group: {_id: null, all: {$addToSet: "$_id"}}}],
countEach: [{$group: {_id: "$_id", count: {$sum: 1}}}],
}
},
{
$lookup:
{pipeline: [{$documents: [{x: "lu1"}, {x: "lu2"}]}], as: "xs"}
},
{$set: {xs: {$map: {input: "$xs", in : "$$this.x"}}}}
]
},
},
]).toArray();
assert(resultsEq(results,
[
{_id: "first"},
{
allTogether: [{_id: null, all: ["uw", "uw_2"]}],
countEach: [{_id: "uw", count: 1}, {_id: "uw_2", count: 1}],
xs: ["lu1", "lu2"]
}
]),
results);
}
}
// Test for invalid combinations.
// To use $documents inside a $lookup, there must not be a "from" argument.
// As of SERVER-94144, this does not throw on 7.0 and older branches.
assert.doesNotThrow(
() => coll.aggregate([{$lookup: {from: "foo", pipeline: [{$documents: []}], as: "lustuff"}}]));
assert.doesNotThrow(
() => coll.aggregate([
{$lookup: {
from: "foo",
let: {docs: "$arr"},
pipeline: [
{$documents: "$$docs"},
{$lookup: {
from: "foo",
let: {x: "$x", y: "$y"},
pipeline: [
{$match: {$expr: {$and: [
{$eq: ["$x", "$$x"]},
{$eq: ["$y", "$$y"]}
]}}}
],
as: "doesnt_matter"
}}
],
as: "lustuff"
}}]));
// To use $documents inside a $unionWith, there must not be a "coll" argument.
// As of SERVER-94144, this does not throw on 7.0 and older branches.
assert.doesNotThrow(
() => coll.aggregate([{$unionWith: {coll: "foo", pipeline: [{$documents: []}]}}]));
// Cannot use $documents inside of $facet.
assert.throwsWithCode(() => coll.aggregate([{$facet: {test: [{$documents: []}]}}]), 40600);
})();

View File

@ -27,8 +27,7 @@ function countAuthInLog(conn) {
}
} else if (entry.id === kAuthenticationFailedLogId) {
// Authentication can fail legitimately because the secondary abandons the connection
// during shutdown - if we do encounter an authentication failure in the log, make sure
// that it is only of this type, fail anything else
// during shutdown.
assert.eq(entry.attr.result, ErrorCodes.AuthenticationAbandoned);
} else {
// Irrelevant.

View File

@ -112,20 +112,6 @@ var $config = extendWorkload($config, function($config, $super) {
}
};
$config.teardown = function teardown(db, collName, cluster) {
$super.teardown.apply(this, arguments);
// If a shard node that is acting as a router for an internal transaction is
// killed/terminated/stepped down or the transaction's session is killed while running a
// non-retryable transaction, the transaction would be left in-progress since nothing
// would aborted it. Such dangling transactions can cause the CheckReplDBHash hook to hang
// as the fsyncLock command requires taking the global S lock and it cannot do that while
// there is an in-progress transaction.
if (TestData.runningWithShardStepdowns || this.retryOnKilledSession) {
this.killAllSessions(cluster);
}
};
$config.states.init = function init(db, collName, connCache) {
const retryableErrorMessages = [
"The server is in quiesce mode and will shut down",

View File

@ -66,6 +66,7 @@ var $config = extendWorkload($config, function($config, $super) {
// The refienCollectionCoordinator interrupt all migrations by setting `allowMigration`
// to false
ErrorCodes.Interrupted,
ErrorCodes.OrphanedRangeCleanUpFailed,
];
return (err.code && codes.includes(err.code)) ||
(err.message &&

View File

@ -34,7 +34,11 @@ if (hostinfo.os.type != "") {
assert.neq(hostinfo.system.currentTime, "" || null, "Missing Current Time");
assert.neq(hostinfo.system.cpuAddrSize, "" || null || 0, "Missing CPU Address Size");
assert.neq(hostinfo.system.memSizeMB, "" || null, "Missing Memory Size");
assert.neq(hostinfo.system.numCores, "" || null || 0, "Missing Number of Cores");
assert.neq(hostinfo.system.numCores, "" || null || 0, "Missing Number of Logical Cores");
// Check that numCoresAvailableToProcess != -1 as that indicates syscall failure.
assert.neq(hostinfo.system.numCoresAvailableToProcess,
"" || null || -1,
"Missing Number of Cores Available To Process");
assert.neq(
hostinfo.system.numPhysicalCores, "" || null || 0, "Missing Number of Physical Cores");
assert.neq(hostinfo.system.numCpuSockets, "" || null || 0, "Missing Number of CPU Sockets");

View File

@ -0,0 +1,48 @@
/**
* Confirm that a hashed index field does not prevent the index prefix field to be used for covered
* projection and to produce correct result.
* @tags: [
* # Explain may return incomplete results if interrupted by a stepdown.
* does_not_support_stepdowns,
* ]
*/
load("jstests/aggregation/extras/utils.js");
load("jstests/libs/analyze_plan.js");
const coll = db[jsTestName()];
coll.drop();
assert.commandWorked(
coll.insertMany([{_id: 1, a: {b: 5}}, {_id: 2, a: {b: 2, c: 1}}, {_id: 3, a: {b: 0}}]));
// Confirm that the hashed index scan produces the same results as the collection scan.
const resultsCollScan = coll.find({}, {_id: 0, 'a.b': 1}).sort({'a.b': 1});
assert.commandWorked(coll.createIndex({'a.b': 1, a: 'hashed'}));
const resultsIndexScan = coll.find({}, {_id: 0, 'a.b': 1}).sort({'a.b': 1});
assert(orderedArrayEq(resultsCollScan, resultsIndexScan));
// Check that the index with hashed field is used in the plan.
const explain = coll.find({}, {_id: 0, 'a.b': 1}).sort({'a.b': 1}).explain();
const plan = getWinningPlanFromExplain(explain);
const project = getPlanStage(plan, "PROJECTION_DEFAULT");
assert.neq(project, null, explain);
const ixScan = getPlanStage(plan, "IXSCAN");
assert.eq(ixScan.indexName, "a.b_1_a_hashed", explain);
const fetch = getPlanStage(plan, "FETCH");
const shards = getShardsFromExplain(explain);
if (shards) {
// In sharded environment if a sharding_filter stage is added, a FETCH stage is also added on
// top of the index scan. Otherwise covered projection is used without a fetch.
const shardingFilter = getPlanStage(plan, "SHARDING_FILTER");
if (shardingFilter) {
assert.neq(fetch, null, plan);
} else {
assert.eq(fetch, null, plan);
}
} else {
// In non-sharded environment covered projection is used without a FETCH stage.
assert.eq(fetch, null, plan);
}

View File

@ -41,6 +41,19 @@ function getRejectedPlan(rejectedPlan) {
return rejectedPlan.hasOwnProperty("queryPlan") ? rejectedPlan.queryPlan : rejectedPlan;
}
/**
* Help function to extract shards from explain in sharded environment. Returns null for
* non-sharded plans.
*/
function getShardsFromExplain(explain) {
if (explain.hasOwnProperty("queryPlanner") &&
explain.queryPlanner.hasOwnProperty("winningPlan")) {
return explain.queryPlanner.winningPlan.shards;
}
return null;
}
/**
* Returns a sub-element of the 'cachedPlan' explain output which represents a query plan.
*/

View File

@ -447,8 +447,8 @@ function checkChangeStreamEntry({queryStatsEntry, db, collectionName, numExecs,
assert.eq(collectionName, queryStatsEntry.key.queryShape.cmdNs.coll);
// Confirm entry is a change stream request.
let stringifiedPipeline = JSON.stringify(queryStatsEntry.key.queryShape.pipeline, null, 0);
assert(stringifiedPipeline.includes("_internalChangeStream"));
const pipelineShape = queryStatsEntry.key.queryShape.pipeline;
assert(pipelineShape[0].hasOwnProperty("$changeStream"), pipelineShape);
// TODO SERVER-76263 Support reporting 'collectionType' on a sharded cluster.
if (!FixtureHelpers.isMongos(db)) {

View File

@ -0,0 +1,38 @@
/**
* Test local catalog for commands like listDatabases, focusing on consistency with the durable
* storage snapshot.
*/
load('jstests/libs/fail_point_util.js'); // For configureFailPoint(),
// kDefaultWaitForFailPointTimeout()
let replTest = new ReplSetTest({
name: jsTestName(),
nodes: 1,
});
replTest.startSet();
replTest.initiate();
let mongod = replTest.getPrimary();
const slowPublishDb = "catalog_snapshot_consistency_slow_publish_db";
const slowPublishColl = "coll";
// List database should reflect an implicitly created database that has been committed but not
// published into the local catalog yet. Use a failpoint to hang before publishing the catalog,
// simulating a slow catalog publish.
const failPoint = configureFailPoint(mongod,
"hangBeforePublishingCatalogUpdates",
{collectionNS: slowPublishDb + '.' + slowPublishColl});
const waitDbCreate = startParallelShell(`{
db.getSiblingDB('${slowPublishDb}')['${slowPublishColl}'].createIndex({a:1});
}`, mongod.port);
failPoint.wait();
let cmdRes = assert.commandWorked(
mongod.adminCommand({listDatabases: 1, filter: {$expr: {$eq: ["$name", slowPublishDb]}}}));
assert.eq(1, cmdRes.databases.length, tojson(cmdRes));
assert.eq(slowPublishDb, cmdRes.databases[0].name, tojson(cmdRes));
failPoint.off();
waitDbCreate();
replTest.stopSet();

View File

@ -0,0 +1,22 @@
// Tests that lookups on local capped collections acquire a snapshot on the capped collection
// correctly. Tests the scenario fixed by SERVER-91203 no longer causes a crash.
let rst = new ReplSetTest({nodes: {n0: {profile: "0"}}});
rst.startSet();
rst.initiate();
const dbName = "test";
const collName = "foo";
let testDB = rst.getPrimary().getDB(dbName);
let testColl = testDB.getCollection(collName);
testColl.insert({a: 1});
testDB.setProfilingLevel(2);
const pipeline =
[{$lookup: {from: 'system.profile', localField: 'key', foreignField: 'key', as: 'results'}}];
testColl.aggregate(pipeline).toArray();
rst.stopSet();

View File

@ -1,64 +1,19 @@
/**
* This test confirms that query stats store key fields for an aggregate command are properly nested
* and none are missing when running a change stream query with $_passthroughToShard.
* and none are missing. It also validates the exact pipeline in the query shape.
* @tags: [
* requires_sharding,
* uses_change_streams,
* requires_replication,
* requires_sharding,
* requires_fcv_70
* ]
*/
load("jstests/libs/query_stats_utils.js");
load("jstests/libs/query_stats_utils.js"); // For runCommandAndValidateQueryStats.
load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
const dbName = jsTestName();
const collName = "coll";
// $_passthroughToShard is only possible on a sharded cluster.
const st = new ShardingTest({
shards: 2,
mongos: 1,
config: 1,
rs: {nodes: 1},
other: {
mongosOptions: {
setParameter: {
internalQueryStatsRateLimit: -1,
}
}
}
});
const sdb = st.s0.getDB(dbName);
assert.commandWorked(sdb.dropDatabase());
sdb.setProfilingLevel(0, -1);
st.shard0.getDB(dbName).setProfilingLevel(0, -1);
// Shard the relevant collections.
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.name}));
// Shard the collection on {_id: 1}, split at {_id: 0} and move the empty upper chunk to
// shard1.
st.shardColl(collName, {_id: 1}, {_id: 0}, {_id: 0}, dbName);
const shardId = st.shard0.shardName;
let coll = sdb[collName];
const aggregateCommandObj = {
aggregate: coll.getName(),
pipeline: [{"$changeStream": {}}],
allowDiskUse: false,
cursor: {batchSize: 2},
maxTimeMS: 50 * 1000,
bypassDocumentValidation: false,
readConcern: {level: "majority"},
collation: {locale: "en_US", strength: 2},
hint: {"v": 1},
comment: "",
let : {},
apiDeprecationErrors: false,
apiVersion: "1",
apiStrict: false,
$_passthroughToShard: {shard: shardId}
};
const queryShapeAggregateFields =
["cmdNs", "command", "pipeline", "allowDiskUse", "collation", "let"];
@ -77,17 +32,153 @@ const queryStatsAggregateKeyFields = [
"hint",
"readConcern",
"cursor.batchSize",
"$_passthroughToShard",
"$_passthroughToShard.shard"
];
assert.commandWorked(coll.createIndex({v: 1}));
runCommandAndValidateQueryStats({
coll: coll,
const testCases = [
// Default fields.
{
pipeline: [{"$changeStream": {}}],
expectedShapifiedPipeline: [{
"$changeStream": {
startAtOperationTime: "?timestamp",
fullDocument: "default",
fullDocumentBeforeChange: "off"
}
}]
},
// Non default field values.
{
pipeline: [{
"$changeStream": {
fullDocument: "updateLookup",
fullDocumentBeforeChange: "required",
showExpandedEvents: true,
}
}],
expectedShapifiedPipeline: [{
"$changeStream": {
startAtOperationTime: "?timestamp",
fullDocument: "updateLookup",
fullDocumentBeforeChange: "required",
showExpandedEvents: true,
}
}],
},
// $changeStream followed by a $match. $changeStream internally creates another $match stage
// which shouldn't appear in the query shape, but a $match in the user specified pipeline should
// appear in the query shape.
{
pipeline: [{$changeStream: {}}, {$match: {a: "field"}}],
expectedShapifiedPipeline: [
{
"$changeStream": {
startAtOperationTime: "?timestamp",
fullDocument: "default",
fullDocumentBeforeChange: "off"
}
},
{$match: {a: {$eq: "?string"}}}
]
}
];
function assertPipelineField(conn, expectedPipeline) {
const entry = getLatestQueryStatsEntry(conn, {collName: collName});
const statsPipeline = getValueAtPath(entry, "key.queryShape.pipeline");
assert.eq(statsPipeline, expectedPipeline);
}
function validateResumeTokenQueryShape(conn, coll) {
// Start a change stream.
const changeStream = coll.watch([]);
// Going to create an invalid event by checking a change stream on a dropped collection.
assert.commandWorked(coll.insert({_id: 1}));
assert(coll.drop());
assert.soon(() => changeStream.hasNext());
changeStream.next();
const invalidateResumeToken = changeStream.getResumeToken();
// Resume the change stream using 'startAfter' field.
coll.watch([], {startAfter: invalidateResumeToken});
assert.commandWorked(coll.insert({_id: 2}));
const expectedShapifiedPipeline = [{
"$changeStream": {
resumeAfter: {_data: "?string"},
fullDocument: "default",
fullDocumentBeforeChange: "off"
}
}];
assertPipelineField(conn, expectedShapifiedPipeline);
}
function validateChangeStreamAggKey(conn) {
const db = conn.getDB("test");
assertDropAndRecreateCollection(db, collName);
// Change streams with 'startAfter' or 'resumeAfter' are only executed after a certain event and
// require re-parsing a resume token. To validate the query shape of these pipelines, we have to
// execute the events to register the pipeline.
validateResumeTokenQueryShape(conn, db[collName]);
// Validate the key for the rest of the pipelines.
testCases.forEach(input => {
const pipeline = input.pipeline;
const aggCmdObj = {
aggregate: collName,
pipeline: pipeline,
allowDiskUse: false,
cursor: {batchSize: 2},
maxTimeMS: 50 * 1000,
bypassDocumentValidation: false,
readConcern: {level: "majority"},
collation: {locale: "en_US", strength: 2},
hint: {"v": 1},
comment: "",
let : {},
apiDeprecationErrors: false,
apiVersion: "1",
apiStrict: false,
};
runCommandAndValidateQueryStats({
coll: db[collName],
commandName: "aggregate",
commandObj: aggregateCommandObj,
commandObj: aggCmdObj,
shapeFields: queryShapeAggregateFields,
keyFields: queryStatsAggregateKeyFields
});
});
assertPipelineField(conn, input.expectedShapifiedPipeline);
});
}
st.stop();
{
// Test on a sharded cluster.
const st = new ShardingTest({
mongos: 1,
shards: 2,
config: 1,
rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}},
mongosOptions: {
setParameter: {
internalQueryStatsRateLimit: -1,
}
},
});
validateChangeStreamAggKey(st.s);
st.stop();
}
{
// Test the non-sharded case.
const rst = new ReplSetTest({nodes: 2});
rst.startSet({setParameter: {internalQueryStatsRateLimit: -1}});
rst.initiate();
rst.getPrimary().getDB("admin").setLogLevel(3, "queryStats");
// Only aggregations run on replica sets have the '$readPreference' field in the key.
queryStatsAggregateKeyFields.push("$readPreference");
validateChangeStreamAggKey(rst.getPrimary());
rst.stopSet();
}

View File

@ -0,0 +1,93 @@
/**
* This test confirms that query stats store key fields for an aggregate command are properly nested
* and none are missing when running a change stream query with $_passthroughToShard.
* @tags: [
* requires_sharding,
* uses_change_streams,
* ]
*/
load("jstests/libs/query_stats_utils.js");
const dbName = jsTestName();
const collName = "coll";
// $_passthroughToShard is only possible on a sharded cluster.
const st = new ShardingTest({
shards: 2,
mongos: 1,
config: 1,
rs: {nodes: 1},
other: {
mongosOptions: {
setParameter: {
internalQueryStatsRateLimit: -1,
}
}
}
});
const sdb = st.s0.getDB(dbName);
assert.commandWorked(sdb.dropDatabase());
sdb.setProfilingLevel(0, -1);
st.shard0.getDB(dbName).setProfilingLevel(0, -1);
// Shard the relevant collections.
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.name}));
// Shard the collection on {_id: 1}, split at {_id: 0} and move the empty upper chunk to
// shard1.
st.shardColl(collName, {_id: 1}, {_id: 0}, {_id: 0}, dbName);
const shardId = st.shard0.shardName;
let coll = sdb[collName];
const aggregateCommandObj = {
aggregate: coll.getName(),
pipeline: [{"$changeStream": {}}],
allowDiskUse: false,
cursor: {batchSize: 2},
maxTimeMS: 50 * 1000,
bypassDocumentValidation: false,
readConcern: {level: "majority"},
collation: {locale: "en_US", strength: 2},
hint: {"v": 1},
comment: "",
let : {},
apiDeprecationErrors: false,
apiVersion: "1",
apiStrict: false,
$_passthroughToShard: {shard: shardId}
};
const queryShapeAggregateFields =
["cmdNs", "command", "pipeline", "allowDiskUse", "collation", "let"];
// The outer fields not nested inside queryShape.
const queryStatsAggregateKeyFields = [
"queryShape",
"cursor",
"maxTimeMS",
"bypassDocumentValidation",
"comment",
"apiDeprecationErrors",
"apiVersion",
"apiStrict",
"collectionType",
"client",
"hint",
"readConcern",
"cursor.batchSize",
"$_passthroughToShard",
"$_passthroughToShard.shard"
];
assert.commandWorked(coll.createIndex({v: 1}));
runCommandAndValidateQueryStats({
coll: coll,
commandName: "aggregate",
commandObj: aggregateCommandObj,
shapeFields: queryShapeAggregateFields,
keyFields: queryStatsAggregateKeyFields
});
st.stop();

View File

@ -22,14 +22,6 @@ function getNumCursorsLessThan30Seconds() {
return db.serverStatus().metrics.cursor.lifespan.lessThan30Seconds;
}
function getNumCursorsLessThan1Minute() {
return db.serverStatus().metrics.cursor.lifespan.lessThan1Minute;
}
function getNumCursorsLessThan10Minutes() {
return db.serverStatus().metrics.cursor.lifespan.lessThan10Minutes;
}
for (let i = 0; i < 40; i++) {
coll.insert({a: i, b: "field b"});
}
@ -38,8 +30,6 @@ const initialNumCursorsLt1s = getNumCursorsLessThan1Second();
const initialNumCursorsLt5s = getNumCursorsLessThan5Seconds();
const initialNumCursorsLt15s = getNumCursorsLessThan15Seconds();
const initialNumCursorsLt30s = getNumCursorsLessThan30Seconds();
const initialNumCursorsLt1m = getNumCursorsLessThan1Minute();
const initialNumCursorsLt10m = getNumCursorsLessThan10Minutes();
// Since we aren't guaranteed perfect timings, the checks in this test have been relaxed to window
// sizes of 30s. For example, a cursor that is expected to die in under 5s may actually take longer
@ -65,21 +55,4 @@ for (let i = 0; i < 3; i++) {
}
assert.eq(cursorsDeadSinceStartLt30Seconds(), 4);
const cursorLt1Minute = coll.find().batchSize(2);
const cursorLt10Minutes = coll.aggregate([], {cursor: {batchSize: 2}});
cursorLt1Minute.next();
cursorLt10Minutes.next();
sleep(31000); // Sleep for 31 s.
while (cursorLt1Minute.hasNext()) {
cursorLt1Minute.next();
}
assert.eq(getNumCursorsLessThan1Minute() - initialNumCursorsLt1m, 1);
sleep(30000); // Sleep another 30s, so the total should be greater than 1m and less than 10m.
while (cursorLt10Minutes.hasNext()) {
cursorLt10Minutes.next();
}
assert.eq(getNumCursorsLessThan10Minutes() - initialNumCursorsLt10m, 1);
}());

View File

@ -0,0 +1,21 @@
/**
* Ensure that the shell correctly handles exhaust queries
*/
const coll = db.shell_exhaust_queries;
// Ensure that read concern is not allowed
db.getMongo().setReadConcern('majority');
assert.throws(() => coll.find().addOption(DBQuery.Option.exhaust).itcount());
db.getMongo().setReadConcern(null);
// Ensure that collation is not allowed
assert.throws(
() => coll.find().collation({locale: "simple"}).addOption(DBQuery.Option.exhaust).itcount());
// Ensure that "allowDiskUse" is not allowed
assert.throws(() => coll.find().allowDiskUse(true).addOption(DBQuery.Option.exhaust).itcount());
// Ensure that read preference is handled correctly
db.getMongo().setReadPref('secondary');
assert.eq(0, coll.find().addOption(DBQuery.Option.exhaust).itcount());

View File

@ -5,6 +5,10 @@
"use strict";
load('jstests/aggregation/extras/utils.js'); // For arrayEq()
load('jstests/libs/fail_point_util.js'); // For configureFailPoint(),
// kDefaultWaitForFailPointTimeout()
load('jstests/libs/parallel_shell_helpers.js'); // For funWithArgs()
load('jstests/replsets/libs/tenant_migration_util.js'); // For makeTenantDB()
// Given the output from the listDatabasesForAllTenants command, ensures that the total size
// reported is the sum of the individual db sizes.
@ -96,6 +100,65 @@ function runTestCheckMultitenantDatabases(primary, numDBs) {
return tenantIds;
}
// Check that a delay in publishing the database creation to the in-memory catalog doesn't prevent
// the database from being visible.
function runTestCheckSlowPublishMultitenantDb(primary) {
const adminDB = primary.getDB("admin");
const tokenConn = new Mongo(primary.host);
let kTenant = ObjectId();
// Create a user for kTenant and then set the security token on the connection.
assert.commandWorked(primary.getDB('$external').runCommand({
createUser: "slowPublishTenant",
'$tenant': kTenant,
roles: [{role: 'readWriteAnyDatabase', db: 'admin'}]
}));
let token = _createSecurityToken({user: "slowPublishTenant", db: '$external', tenant: kTenant});
tokenConn._setSecurityToken(token);
const slowPublishDb = "slow_publish_multitenant_db";
const slowPublishColl = "coll";
// List database should reflect an implicitly created database that has been committed but not
// published into the local catalog yet. Use a failpoint to hang before publishing the catalog,
// simulating a slow catalog publish.
assert.commandWorked(adminDB.runCommand({
configureFailPoint: "hangBeforePublishingCatalogUpdates",
mode: "alwaysOn",
data: {tenant: kTenant, collectionNS: slowPublishDb + '.' + slowPublishColl}
}));
const shellFn = (token, dbName, collName) => {
let shellConn = db.getSiblingDB("admin").getMongo();
shellConn._setSecurityToken(token);
db.getSiblingDB(dbName)[collName].createIndex({a: 1});
};
const waitDbCreate = startParallelShell(
funWithArgs(shellFn, token, slowPublishDb, slowPublishColl), primary.port);
assert.commandWorked(adminDB.runCommand({
waitForFailPoint: "hangBeforePublishingCatalogUpdates",
timesEntered: 1,
maxTimeMS: kDefaultWaitForFailPointTimeout
}));
// use to verify that the database entry is correct
const expectedDatabase = [{"name": slowPublishDb, "tenantId": kTenant, "empty": true}];
let cmdRes = assert.commandWorked(adminDB.runCommand(
{listDatabasesForAllTenants: 1, filter: {$expr: {$eq: ["$name", slowPublishDb]}}}));
assert.eq(1, cmdRes.databases.length);
verifySizeSum(cmdRes);
verifyDatabaseEntries(cmdRes, expectedDatabase);
assert.commandWorked(adminDB.runCommand(
{configureFailPoint: "hangBeforePublishingCatalogUpdates", mode: "off"}));
waitDbCreate();
// Reset token
tokenConn._setSecurityToken(undefined);
}
// Test correctness of filter and nameonly options
function runTestCheckCmdOptions(primary, tenantIds) {
const adminDB = primary.getDB("admin");
@ -253,6 +316,7 @@ function runTestsWithMultiTenancySupport() {
const numDBs = 5;
const tenantIds = runTestCheckMultitenantDatabases(primary, numDBs);
runTestCheckCmdOptions(primary, tenantIds);
runTestCheckSlowPublishMultitenantDb(primary);
runTestInvalidCommands(primary);
rst.stopSet();

View File

@ -12,10 +12,6 @@ var QuerySamplingUtil = (function() {
return listCollectionRes.cursor.firstBatch[0].info.uuid;
}
function generateRandomString(length = 5) {
return extractUUIDFromObject(UUID()).substring(0, length);
}
function generateRandomCollation() {
return {locale: "en_US", strength: AnalyzeShardKeyUtil.getRandInteger(1, 5)};
}
@ -434,7 +430,6 @@ var QuerySamplingUtil = (function() {
return {
getCollectionUuid,
generateRandomString,
generateRandomCollation,
makeCmdObjIgnoreSessionInfo,
waitForActiveSamplingReplicaSet,

View File

@ -9,6 +9,7 @@
"use strict";
load("jstests/libs/config_shard_util.js");
load("jstests/libs/uuid_util.js");
load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js");
const testCases = [];
@ -112,7 +113,7 @@ function testDiffs(rst, testCase, expectSampling) {
// If running on the config server, use "config" as the database name since it is illegal to
// create a user database on the config server.
const dbName = rst.isConfigRS ? "config" : "testDb";
const collName = "testColl-" + QuerySamplingUtil.generateRandomString();
const collName = "testColl-" + extractUUIDFromObject(UUID());
const ns = dbName + "." + collName;
const primary = rst.getPrimary();

View File

@ -8,6 +8,7 @@
"use strict";
load("jstests/libs/config_shard_util.js");
load("jstests/libs/uuid_util.js");
load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js");
const supportedTestCases = [
@ -38,7 +39,7 @@ function testReadCmd(rst, cmdOpts, testCase) {
// If running on the config server, use "config" as the database name since it is illegal to
// create a user database on the config server.
const dbName = rst.isConfigRS ? "config" : "testDb";
const collName = "testColl-" + cmdOpts.cmdName + "-" + QuerySamplingUtil.generateRandomString();
const collName = "testColl-" + cmdOpts.cmdName + "-" + extractUUIDFromObject(UUID());
const ns = dbName + "." + collName;
const primary = rst.getPrimary();

View File

@ -8,6 +8,7 @@
"use strict";
load("jstests/libs/config_shard_util.js");
load("jstests/libs/uuid_util.js");
load("jstests/sharding/analyze_shard_key/libs/query_sampling_util.js");
const supportedTestCases = [
@ -27,7 +28,7 @@ function testWriteCmd(rst, cmdOpts, testCase) {
// If running on the config server, use "config" as the database name since it is illegal to
// create a user database on the config server.
const dbName = rst.isConfigRS ? "config" : "testDb";
const collName = "testColl-" + cmdOpts.cmdName + "-" + QuerySamplingUtil.generateRandomString();
const collName = "testColl-" + cmdOpts.cmdName + "-" + extractUUIDFromObject(UUID());
const ns = dbName + "." + collName;
const primary = rst.getPrimary();
@ -199,7 +200,7 @@ function testFindAndModifyCmd(rst, testCases) {
function testInsertCmd(rst) {
const dbName = "testDb";
const collName = "testColl-insert-" + QuerySamplingUtil.generateRandomString();
const collName = "testColl-insert-" + extractUUIDFromObject(UUID());
const primary = rst.getPrimary();
const db = primary.getDB(dbName);
// Verify that no mongods support persisting sampled insert queries. Specifically, "sampleId"

View File

@ -1,10 +1,12 @@
/**
*
* Tests the rewrite of NetworkInterfaceExceededTimeLimit exception coming from
* `executor/connection_pool.cpp` into MaxTimeMSError when MaxTimeMS option is set for a given
* sharding command.
*
* @tags: [requires_fcv_61]
* @tags: [
* requires_fcv_61,
* does_not_support_stepdowns,
* ]
*/
(function() {

View File

@ -8,8 +8,15 @@ var sessionsDb = "config";
var refresh = {refreshLogicalSessionCacheNow: 1};
var startSession = {startSession: 1};
// Create a cluster with 1 shard.
var cluster = new ShardingTest({shards: 2});
var cluster = new ShardingTest({
mongos: [{setParameter: {sessionWriteConcernTimeoutSystemMillis: 0, sessionMaxBatchSize: 500}}],
shards: 2,
rs: {setParameter: {sessionWriteConcernTimeoutSystemMillis: 0, sessionMaxBatchSize: 500}},
other: {
configOptions:
{setParameter: {sessionWriteConcernTimeoutSystemMillis: 0, sessionMaxBatchSize: 500}}
}
});
// Test that we can refresh without any sessions, as a sanity check.
{

View File

@ -21,14 +21,12 @@ const testDB = st.s.getDB('test');
const coll = testDB[jsTest.name()];
const collName = coll.getFullName();
// Shard a collection on _id:1 so that the initial chunk will reside on the primary shard (shard0)
assert.commandWorked(
st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
assert.commandWorked(st.s.adminCommand({shardCollection: collName, key: {_id: 1}}));
// Initialize TTL index: delete documents with field `a: <current date>` after 20 seconds
assert.commandWorked(coll.createIndex({a: 1}, {expireAfterSeconds: 20}));
// Insert documents that are going to be deleted in 20 seconds
// Insert documents that are going to be deleted by the TTL index created later on
const currTime = new Date();
var bulk = coll.initializeUnorderedBulkOp();
const nDocs = 100;
@ -37,16 +35,20 @@ for (let i = 0; i < nDocs; i++) {
}
assert.commandWorked(bulk.execute());
// Move all documents on other shards
// Move all documents to the other shard (shard1) but keep a chunk on shard0 to create the TTL index
assert.commandWorked(st.s.adminCommand({split: collName, middle: {_id: -1}}));
assert.commandWorked(
st.s.adminCommand({moveChunk: collName, find: {_id: 0}, to: st.shard1.shardName}));
// Verify that TTL index worked properly on owned documents
// Initialize TTL index: delete documents with field `a: <current date>` older than 1 second
assert.commandWorked(coll.createIndex({a: 1}, {expireAfterSeconds: 1}));
// Verify that TTL index worked properly on owned documents on shard1
assert.soon(function() {
return coll.countDocuments({}) == 0;
}, "Failed to move all documents", 60000 /* 60 seconds */, 5000 /* 5 seconds */);
// Verify that TTL index did not delete orphaned documents
// Verify that TTL index did not delete orphaned documents on shard0
assert.eq(nDocs, st.rs0.getPrimary().getCollection(collName).countDocuments({}));
st.stop();

View File

@ -127,6 +127,10 @@ for (let i = 0; i < 3; i++) {
lastUseValues[j] = sessionsCollectionArray[j].lastUse;
}
}
// Date_t has the granularity of milliseconds, so we have to make sure we don't run this loop
// faster than that.
sleep(10);
}
// 3. Verify that letting sessions expire (simulated by manual deletion) will kill their

View File

@ -32,6 +32,10 @@ import re
from typing import Callable, List, Dict
# Note: The auto-retry settings are prefixed w/ "OOM", but since it's an unconditional retry,
# it's not really OOM-specific. We're keeping the OOM prefix to make the code change simpler.
# (This custom retry logic will go away once the build is fully Bazelified).
def command_spawn_func(sh: str, escape: Callable[[str], str], cmd: str, args: List, env: Dict,
target: List, source: List):
@ -39,11 +43,6 @@ def command_spawn_func(sh: str, escape: Callable[[str], str], cmd: str, args: Li
success = False
build_env = target[0].get_build_env()
oom_messages = [
re.compile(msg, re.MULTILINE | re.DOTALL)
for msg in build_env.get('OOM_RETRY_MESSAGES', [])
]
oom_returncodes = [int(returncode) for returncode in build_env.get('OOM_RETRY_RETURNCODES', [])]
max_retries = build_env.get('OOM_RETRY_ATTEMPTS', 10)
oom_max_retry_delay = build_env.get('OOM_RETRY_MAX_DELAY_SECONDS', 120)
@ -63,12 +62,9 @@ def command_spawn_func(sh: str, escape: Callable[[str], str], cmd: str, args: Li
except subprocess.CalledProcessError as exc:
print(f"{os.path.basename(__file__)} captured error:")
print(exc.stdout)
if any([re.findall(oom_message, exc.stdout) for oom_message in oom_messages]) or any(
[oom_returncode == exc.returncode for oom_returncode in oom_returncodes]):
retries += 1
retry_delay = int((time.time() - start_time) +
oom_max_retry_delay * random.random())
print(f"Ran out of memory while trying to build {target[0]}", )
retry_delay = int((time.time() - start_time) + oom_max_retry_delay * random.random())
print(f"Failed while trying to build {target[0]}", )
if retries <= max_retries:
print(f"trying again in {retry_delay} seconds with retry attempt {retries}")
time.sleep(retry_delay)

View File

@ -1023,7 +1023,7 @@ bool BSONObj::coerceVector(std::vector<T>* out) const {
/**
* Types used to represent BSONElement memory in the Visual Studio debugger
*/
#if defined(_MSC_VER) && defined(_DEBUG)
#if defined(_MSC_VER)
struct BSONElementData {
char type;
char name;
@ -1039,6 +1039,6 @@ struct BSONElementDBRefType {
} bsonElementDBPointerType;
struct BSONElementCodeWithScopeType {
} bsonElementCodeWithScopeType;
#endif // defined(_MSC_VER) && defined(_DEBUG)
#endif // defined(_MSC_VER)
} // namespace mongo

View File

@ -904,7 +904,7 @@ BSONArrayIteratorSorted::BSONArrayIteratorSorted(const BSONArray& array)
/**
* Types used to represent BSONObj and BSONArray memory in the Visual Studio debugger
*/
#if defined(_MSC_VER) && defined(_DEBUG)
#if defined(_MSC_VER)
struct BSONObjData {
int32_t size;
} bsonObjDataInstance;
@ -912,6 +912,6 @@ struct BSONObjData {
struct BSONArrayData {
int32_t size;
} bsonObjArrayInstance;
#endif // defined(_MSC_VER) && defined(_DEBUG)
#endif // defined(_MSC_VER)
} // namespace mongo

View File

@ -262,6 +262,7 @@ AuthorizationManagerImpl::AuthorizationManagerImpl(
AuthorizationManagerImpl::~AuthorizationManagerImpl() = default;
std::unique_ptr<AuthorizationSession> AuthorizationManagerImpl::makeAuthorizationSession() {
invariant(_externalState != nullptr);
return std::make_unique<AuthorizationSessionImpl>(
_externalState->makeAuthzSessionExternalState(this),
AuthorizationSessionImpl::InstallMockForTestingOrAuthImpl{});

View File

@ -51,6 +51,11 @@
namespace mongo {
// Failpoint which causes to hang after wuow commits, before publishing the catalog updates on a
// given namespace.
MONGO_FAIL_POINT_DEFINE(hangBeforePublishingCatalogUpdates);
namespace {
// Sentinel id for marking a catalogId mapping range as unknown. Must use an invalid RecordId.
static RecordId kUnknownRangeMarkerId = RecordId::minLong();
@ -300,6 +305,21 @@ public:
// Create catalog write jobs for all updates registered in this WriteUnitOfWork
auto entries = _uncommittedCatalogUpdates.releaseEntries();
for (auto&& entry : entries) {
hangBeforePublishingCatalogUpdates.executeIf(
[&](const BSONObj& data) {
LOGV2(
9089303, "hangBeforePublishingCatalogUpdates enabled", logAttrs(entry.nss));
hangBeforePublishingCatalogUpdates.pauseWhileSet();
},
[&](const BSONObj& data) {
const auto tenantField = data.getField("tenant");
const auto tenantId = tenantField.ok()
? boost::optional<TenantId>(TenantId::parseFromBSON(tenantField))
: boost::none;
const auto fpNss =
NamespaceStringUtil::deserialize(tenantId, data["collectionNS"].str());
return fpNss.isEmpty() || entry.nss == fpNss;
});
switch (entry.action) {
case UncommittedCatalogUpdates::Entry::Action::kWritableCollection: {
writeJobs.push_back([collection = std::move(entry.collection),
@ -2039,6 +2059,46 @@ std::set<TenantId> CollectionCatalog::getAllTenants() const {
return ret;
}
std::vector<DatabaseName> CollectionCatalog::getAllConsistentDbNames(
OperationContext* opCtx) const {
return getAllConsistentDbNamesForTenant(opCtx, boost::none);
}
std::vector<DatabaseName> CollectionCatalog::getAllConsistentDbNamesForTenant(
OperationContext* opCtx, boost::optional<TenantId> tenantId) const {
// The caller must have an active storage snapshot
tassert(9089300,
"cannot get database list consistent to a snapshot without an active snapshot",
opCtx->recoveryUnit()->isActive());
// First get the dbnames that are not pending commit
std::vector<DatabaseName> ret = getAllDbNamesForTenant(tenantId);
stdx::unordered_set<DatabaseName> visitedDBs(ret.begin(), ret.end());
auto insertSortedIfUnique = [&ret, &visitedDBs](DatabaseName dbname) {
auto [_, isNewDB] = visitedDBs.emplace(dbname);
if (isNewDB) {
ret.insert(std::lower_bound(ret.begin(), ret.end(), dbname), dbname);
}
};
// Now iterate over uncommitted list and validate against the storage snapshot.
// Only consider databases we have not seen so far.
auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);
tassert(9089301,
"point in time catalog lookup for a database list is not supported",
RecoveryUnit::ReadSource::kNoTimestamp ==
opCtx->recoveryUnit()->getTimestampReadSource());
for (auto const& [ns, coll] : _pendingCommitNamespaces) {
if (!visitedDBs.contains(ns.dbName())) {
if (establishConsistentCollection(opCtx, ns, readTimestamp)) {
insertSortedIfUnique(ns.dbName());
}
}
}
return ret;
}
void CollectionCatalog::setAllDatabaseProfileFilters(std::shared_ptr<ProfileFilter> filter) {
auto dbProfileSettingsWriter = _databaseProfileSettings.transient();
for (const auto& [dbName, settings] : _databaseProfileSettings) {

View File

@ -570,6 +570,29 @@ public:
*/
std::set<TenantId> getAllTenants() const;
/**
* This function gets all the database names. The result is sorted in alphabetical ascending
* order. The returned list is consistent with the storage snapshot.
*
* Callers of this method must hold an active storage snapshot. This method takes a global lock
* in MODE_IS.
*
* Unlike DatabaseHolder::getNames(), this does not return databases that are empty.
*/
std::vector<DatabaseName> getAllConsistentDbNames(OperationContext* opCtx) const;
/**
* This function gets all the database names associated with tenantId. The result is sorted in
* alphabetical ascending order. The returned list is consistent with the storage snapshot.
*
* Callers of this method must hold an active storage snapshot. This method takes a global lock
* in MODE_IS.
*
* Unlike DatabaseHolder::getNames(), this does not return databases that are empty.
*/
std::vector<DatabaseName> getAllConsistentDbNamesForTenant(
OperationContext* opCtx, boost::optional<TenantId> tenantId) const;
/**
* Updates the profile filter on all databases with non-default settings.
*/

View File

@ -169,7 +169,8 @@ void validateTTLOptions(OperationContext* opCtx,
}
const auto clusteredAndCapped = [&](LockMode mode) {
AutoGetCollection collection(opCtx, ns, mode);
AutoGetCollection collection(
opCtx, ns, mode, AutoGetCollection::Options{}.expectedUUID(cmd.getCollectionUUID()));
if (collection) {
const auto c = collection.getCollection().get();
if (c->getClusteredInfo() && c->isCapped()) {
@ -190,7 +191,8 @@ void validateTTLOptions(OperationContext* opCtx,
void checkEncryptedFieldIndexRestrictions(OperationContext* opCtx,
const NamespaceString& ns,
const CreateIndexesCommand& cmd) {
AutoGetCollection collection(opCtx, ns, MODE_IS);
AutoGetCollection collection(
opCtx, ns, MODE_IS, AutoGetCollection::Options{}.expectedUUID(cmd.getCollectionUUID()));
if (!collection) {
return;
}

View File

@ -157,7 +157,13 @@ HostInfoReply HostInfoCmd::Invocation::typedRun(OperationContext*) {
system.setCpuAddrSize(static_cast<int>(p.getAddrSize()));
system.setMemSizeMB(static_cast<long>(p.getSystemMemSizeMB()));
system.setMemLimitMB(static_cast<long>(p.getMemSizeMB()));
system.setNumCores(static_cast<int>(p.getNumAvailableCores()));
system.setNumCores(static_cast<int>(p.getNumLogicalCores()));
const auto num_cores_avl_to_process = p.getNumCoresAvailableToProcess();
// Adding the num cores available to process only if API returns successfully ie. value >=0
if (num_cores_avl_to_process >= 0) {
system.setNumCoresAvailableToProcess(static_cast<int>(num_cores_avl_to_process));
}
system.setNumPhysicalCores(static_cast<int>(p.getNumPhysicalCores()));
system.setNumCpuSockets(static_cast<int>(p.getNumCpuSockets()));
system.setCpuArch(p.getArch());

View File

@ -65,6 +65,7 @@ structs:
memSizeMB: long
memLimitMB: long
numCores: int
numCoresAvailableToProcess: int
numPhysicalCores: int
numCpuSockets: int
cpuArch: string

View File

@ -348,6 +348,10 @@ public:
auto ws = std::make_unique<WorkingSet>();
auto root = std::make_unique<QueuedDataStage>(expCtx.get(), ws.get());
auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);
tassert(9089302,
"point in time catalog lookup for a collection list is not supported",
RecoveryUnit::ReadSource::kNoTimestamp ==
opCtx->recoveryUnit()->getTimestampReadSource());
if (DatabaseHolder::get(opCtx)->dbExists(opCtx, dbName)) {
if (auto collNames = _getExactNameMatches(matcher.get())) {

View File

@ -117,19 +117,22 @@ public:
std::unique_ptr<MatchExpression> filter = list_databases::getFilter(cmd, opCtx, ns());
std::vector<DatabaseName> dbNames;
StorageEngine* storageEngine = getGlobalServiceContext()->getStorageEngine();
{
Lock::GlobalLock lk(opCtx, MODE_IS);
// Read lock free through a consistent in-memory catalog and storage snapshot.
AutoReadLockFree lockFreeReadBlock(opCtx);
auto catalog = CollectionCatalog::get(opCtx);
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&hangBeforeListDatabases, opCtx, "hangBeforeListDatabases", []() {});
dbNames = storageEngine->listDatabases(cmd.getDbName().tenantId());
dbNames =
catalog->getAllConsistentDbNamesForTenant(opCtx, cmd.getDbName().tenantId());
}
std::vector<ListDatabasesReplyItem> items;
int64_t totalSize = list_databases::setReplyItems(opCtx,
int64_t totalSize =
list_databases::setReplyItems(opCtx,
dbNames,
items,
storageEngine,
getGlobalServiceContext()->getStorageEngine(),
nameOnly,
filter,
false /* setTenantId */,

View File

@ -112,22 +112,23 @@ public:
std::unique_ptr<MatchExpression> filter = list_databases::getFilter(cmd, opCtx, ns());
std::vector<DatabaseName> dbNames;
StorageEngine* storageEngine = getGlobalServiceContext()->getStorageEngine();
{
Lock::GlobalLock lk(opCtx, MODE_IS);
dbNames = storageEngine->listDatabases();
// Read lock free through a consistent in-memory catalog and storage snapshot.
AutoReadLockFree lockFreeReadBlock(opCtx);
auto catalog = CollectionCatalog::get(opCtx);
dbNames = catalog->getAllConsistentDbNames(opCtx);
}
std::vector<ListDatabasesForAllTenantsReplyItem> items;
int64_t totalSize = list_databases::setReplyItems(opCtx,
int64_t totalSize =
list_databases::setReplyItems(opCtx,
dbNames,
items,
storageEngine,
getGlobalServiceContext()->getStorageEngine(),
nameOnly,
filter,
true /* setTenantId */,
false /* authorizedDatabases*/);
Reply reply(items);
if (!nameOnly) {
reply.setTotalSize(totalSize);

View File

@ -156,6 +156,11 @@ StatusWith<bool> ClusterParameterDBClientService::updateParameterOnDisk(
return Status(ErrorCodes::FailedToParse, errmsg);
}
auto responseStatus = response.toStatus();
if (!responseStatus.isOK()) {
return responseStatus;
}
return response.getNModified() > 0 || response.getN() > 0;
}

View File

@ -1130,6 +1130,11 @@ ConsistentCatalogAndSnapshot getConsistentCatalogAndSnapshot(
// openCollection is eventually called to construct a Collection object from the durable
// catalog.
establishCappedSnapshotIfNeeded(opCtx, catalogBeforeSnapshot, nsOrUUID);
if (resolvedSecondaryNamespaces) {
for (const auto& secondaryNss : *resolvedSecondaryNamespaces) {
establishCappedSnapshotIfNeeded(opCtx, catalogBeforeSnapshot, {secondaryNss});
}
}
openSnapshot(opCtx, nss.isOplog());

View File

@ -96,7 +96,8 @@ auto rehydrateIndexKey(const BSONObj& keyPattern, const BSONObj& dehydratedKey)
BSONObjIterator valueIter{dehydratedKey};
while (keyIter.more() && valueIter.more()) {
auto fieldName = keyIter.next().fieldNameStringData();
const auto& keyElt = keyIter.next();
auto fieldName = keyElt.fieldNameStringData();
auto value = valueIter.next();
// Skip the $** index virtual field, as it's not part of the actual index key.
@ -104,6 +105,14 @@ auto rehydrateIndexKey(const BSONObj& keyPattern, const BSONObj& dehydratedKey)
continue;
}
// Skip hashed index fields. Rehydrating of index keys is used for covered projections.
// Rehydrating of hashed field value is pointless on its own. The query planner dependency
// analysis should make sure that a covered projection can only be generated for non-hashed
// fields.
if (keyElt.type() == mongo::String && keyElt.valueStringData() == IndexNames::HASHED) {
continue;
}
md.setNestedField(fieldName, Value{value});
}

View File

@ -227,6 +227,26 @@ TEST(SBEValues, HashCompound) {
obj2->push_back("b"_sd, value::TypeTags::NumberDouble, value::bitcastFrom<double>(-6.0));
obj2->push_back("c"_sd, value::TypeTags::NumberDouble, value::bitcastFrom<double>(-7.0));
ASSERT_EQUALS(value::hashValue(tag1, val1), value::hashValue(tag2, val2));
value::releaseValue(tag1, val1);
value::releaseValue(tag2, val2);
}
{
auto [tag1, val1] = value::makeNewArraySet();
auto set1 = value::getArraySetView(val1);
set1->push_back(value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(-5));
set1->push_back(value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(-6));
set1->push_back(value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(-7));
auto [tag2, val2] = value::makeNewArraySet();
auto set2 = value::getArraySetView(val2);
set2->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(-7.0));
set2->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(-6.0));
set2->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(-5.0));
ASSERT_EQUALS(value::hashValue(tag1, val1), value::hashValue(tag2, val2));
value::releaseValue(tag1, val1);

View File

@ -510,7 +510,6 @@ std::size_t hashValue(TypeTags tag, Value val, const CollatorInterface* collator
case TypeTags::ksValue:
return getKeyStringView(val)->hash();
case TypeTags::Array:
case TypeTags::ArraySet:
case TypeTags::bsonArray: {
auto arr = ArrayEnumerator{tag, val};
auto res = hashInit();
@ -524,6 +523,19 @@ std::size_t hashValue(TypeTags tag, Value val, const CollatorInterface* collator
return res;
}
case TypeTags::ArraySet: {
size_t size = getArraySetView(val)->size();
std::vector<size_t> valueHashes;
valueHashes.reserve(size);
for (ArrayEnumerator arr{tag, val}; !arr.atEnd(); arr.advance()) {
auto [elemTag, elemVal] = arr.getViewOfValue();
valueHashes.push_back(hashValue(elemTag, elemVal));
}
// TODO SERVER-92666 Implement a more efficient hashing algorithm
std::sort(valueHashes.begin(), valueHashes.end());
return std::accumulate(
valueHashes.begin(), valueHashes.end(), hashInit(), &hashCombine);
}
case TypeTags::Object:
case TypeTags::bsonObject: {
auto obj = ObjectEnumerator{tag, val};

View File

@ -107,7 +107,13 @@ public:
// Include the number of cpus to simplify client calculations
ProcessInfo p;
subObjBuilder.append("num_cpus", static_cast<int>(p.getNumAvailableCores()));
subObjBuilder.append("num_logical_cores", static_cast<int>(p.getNumLogicalCores()));
const auto num_cores_avlbl_to_process = p.getNumCoresAvailableToProcess();
// Adding the num cores available to process only if API is successful ie. value >=0
if (num_cores_avlbl_to_process >= 0) {
subObjBuilder.append("num_cores_available_to_process",
static_cast<int>(num_cores_avlbl_to_process));
}
processStatusErrors(
procparser::parseProcStatFile("/proc/stat"_sd, kCpuKeys, &subObjBuilder),

View File

@ -354,7 +354,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui
// If 'showExpandedEvents' is NOT set, add a filter that returns only classic change events.
if (!spec.getShowExpandedEvents()) {
stages.push_back(DocumentSourceMatch::create(
stages.push_back(DocumentSourceInternalChangeStreamMatch::create(
change_stream_filter::getMatchFilterForClassicOperationTypes(), expCtx));
}
return stages;
@ -371,6 +371,12 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
(replCoord &&
replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet));
// We will not validate user specified options when we are not expecting to execute queries,
// such as during $queryStats.
if (!expCtx->mongoProcessInterface->isExpectedToExecuteQueries()) {
return;
}
// If 'allChangesForCluster' is true, the stream must be opened on the 'admin' database with
// {aggregate: 1}.
uassert(ErrorCodes::InvalidOptions,

View File

@ -342,4 +342,29 @@ public:
}
};
/**
* A DocumentSource class for all internal change stream stages. This class is useful for
* shared logic between all of the internal change stream stages. For internally created match
* stages see 'DocumentSourceInternalChangeStreamMatch'.
*/
class DocumentSourceInternalChangeStreamStage : public DocumentSource {
public:
DocumentSourceInternalChangeStreamStage(StringData stageName,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(stageName, expCtx) {}
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const override {
if (opts.literalPolicy != LiteralSerializationPolicy::kUnchanged ||
opts.transformIdentifiers) {
// Stages made internally by 'DocumentSourceChangeStream' should not be serialized for
// query stats. For query stats we will serialize only the user specified $changeStream
// stage.
return Value();
}
return doSerialize(opts);
}
virtual Value doSerialize(const SerializationOptions& opts) const = 0;
};
} // namespace mongo

View File

@ -203,51 +203,41 @@ structs:
strict: true
description: A document used to specify the $_internalChangeStreamCheckInvalidate stage of
an aggregation pipeline.
query_shape_component: true
fields:
startAfterInvalidate:
type: resumeToken
optional: true
query_shape: custom
DocumentSourceChangeStreamCheckResumabilitySpec:
strict: true
description: A document used to specify the $_internalChangeStreamCheckResumability stage of
an aggregation pipeline.
query_shape_component: true
fields:
resumeToken:
type: resumeToken
optional: false
query_shape: custom
DocumentSourceChangeStreamAddPreImageSpec:
strict: true
description: A document used to specify the $_internalChangeStreamAddPreImage stage of
an aggregation pipeline.
query_shape_component: true
fields:
fullDocumentBeforeChange:
type: FullDocumentBeforeChangeMode
query_shape: parameter
DocumentSourceChangeStreamAddPostImageSpec:
strict: true
description: A document used to specify the $_internalChangeStreamAddPostImage stage of
an aggregation pipeline.
query_shape_component: true
fields:
fullDocument:
type: FullDocumentMode
query_shape: parameter
DocumentSourceChangeStreamHandleTopologyChangeSpec:
strict: true
description: A document used to specify the $_internalChangeStreamHandleTopologyChange stage of
an aggregation pipeline.
query_shape_component: true
fields:
originalAggregateCommand:
type: object
optional: true
query_shape: literal

View File

@ -209,14 +209,13 @@ boost::optional<Document> DocumentSourceChangeStreamAddPostImage::lookupLatestPo
pExpCtx, nss, *resumeTokenData.uuid, documentKey, std::move(readConcern));
}
Value DocumentSourceChangeStreamAddPostImage::serialize(const SerializationOptions& opts) const {
Value DocumentSourceChangeStreamAddPostImage::doSerialize(const SerializationOptions& opts) const {
return opts.verbosity
? Value(Document{
{DocumentSourceChangeStream::kStageName,
Document{{"stage"_sd, kStageName},
{kFullDocumentFieldName, FullDocumentMode_serializer(_fullDocumentMode)}}}})
: Value(Document{
{kStageName,
DocumentSourceChangeStreamAddPostImageSpec(_fullDocumentMode).toBSON(opts)}});
: Value(Document{{kStageName,
DocumentSourceChangeStreamAddPostImageSpec(_fullDocumentMode).toBSON()}});
}
} // namespace mongo

View File

@ -38,7 +38,8 @@ namespace mongo {
* Part of the change stream API machinery used to look up the post-image of a document. Uses the
* "documentKey" field of the input to look up the new version of the document.
*/
class DocumentSourceChangeStreamAddPostImage final : public DocumentSource {
class DocumentSourceChangeStreamAddPostImage final
: public DocumentSourceInternalChangeStreamStage {
public:
static constexpr StringData kStageName = "$_internalChangeStreamAddPostImage"_sd;
static constexpr StringData kFullDocumentFieldName =
@ -116,7 +117,7 @@ public:
void addVariableRefs(std::set<Variables::Id>* refs) const final {}
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
Value doSerialize(const SerializationOptions& opts = SerializationOptions{}) const final;
const char* getSourceName() const final {
return kStageName.rawData();
@ -125,7 +126,8 @@ public:
private:
DocumentSourceChangeStreamAddPostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const FullDocumentModeEnum fullDocumentMode)
: DocumentSource(kStageName, expCtx), _fullDocumentMode(fullDocumentMode) {
: DocumentSourceInternalChangeStreamStage(kStageName, expCtx),
_fullDocumentMode(fullDocumentMode) {
tassert(5842300,
"the 'fullDocument' field cannot be 'default'",
_fullDocumentMode != FullDocumentModeEnum::kDefault);

View File

@ -139,16 +139,16 @@ boost::optional<Document> DocumentSourceChangeStreamAddPreImage::lookupPreImage(
return preImageField.getDocument().getOwned();
}
Value DocumentSourceChangeStreamAddPreImage::serialize(const SerializationOptions& opts) const {
Value DocumentSourceChangeStreamAddPreImage::doSerialize(const SerializationOptions& opts) const {
return opts.verbosity
? Value(Document{
{DocumentSourceChangeStream::kStageName,
Document{{"stage"_sd, "internalAddPreImage"_sd},
{"fullDocumentBeforeChange"_sd,
FullDocumentBeforeChangeMode_serializer(_fullDocumentBeforeChangeMode)}}}})
: Value(Document{{kStageName,
DocumentSourceChangeStreamAddPreImageSpec(_fullDocumentBeforeChangeMode)
.toBSON(opts)}});
: Value(Document{
{kStageName,
DocumentSourceChangeStreamAddPreImageSpec(_fullDocumentBeforeChangeMode).toBSON()}});
}
std::string DocumentSourceChangeStreamAddPreImage::makePreImageNotFoundErrorMsg(

View File

@ -40,7 +40,7 @@ namespace mongo {
* The identifier of pre-image is in "preImageId" field of the incoming document. The pre-image is
* set to "fullDocumentBeforeChange" field of the returned document.
*/
class DocumentSourceChangeStreamAddPreImage final : public DocumentSource {
class DocumentSourceChangeStreamAddPreImage final : public DocumentSourceInternalChangeStreamStage {
public:
static constexpr StringData kStageName = "$_internalChangeStreamAddPreImage"_sd;
static constexpr StringData kFullDocumentBeforeChangeFieldName =
@ -67,7 +67,8 @@ public:
DocumentSourceChangeStreamAddPreImage(const boost::intrusive_ptr<ExpressionContext>& expCtx,
FullDocumentBeforeChangeModeEnum mode)
: DocumentSource(kStageName, expCtx), _fullDocumentBeforeChangeMode(mode) {
: DocumentSourceInternalChangeStreamStage(kStageName, expCtx),
_fullDocumentBeforeChangeMode(mode) {
// This stage should never be created with FullDocumentBeforeChangeMode::kOff.
invariant(_fullDocumentBeforeChangeMode != FullDocumentBeforeChangeModeEnum::kOff);
}
@ -109,7 +110,7 @@ public:
void addVariableRefs(std::set<Variables::Id>* refs) const final {}
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
Value doSerialize(const SerializationOptions& opts = SerializationOptions{}) const final;
const char* getSourceName() const final {
return kStageName.rawData();

View File

@ -181,7 +181,8 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckInvalidate::doGetNe
return nextInput;
}
Value DocumentSourceChangeStreamCheckInvalidate::serialize(const SerializationOptions& opts) const {
Value DocumentSourceChangeStreamCheckInvalidate::doSerialize(
const SerializationOptions& opts) const {
BSONObjBuilder builder;
if (opts.verbosity) {
BSONObjBuilder sub(builder.subobjStart(DocumentSourceChangeStream::kStageName));
@ -192,7 +193,7 @@ Value DocumentSourceChangeStreamCheckInvalidate::serialize(const SerializationOp
if (_startAfterInvalidate) {
spec.setStartAfterInvalidate(ResumeToken(*_startAfterInvalidate));
}
builder.append(DocumentSourceChangeStreamCheckInvalidate::kStageName, spec.toBSON(opts));
builder.append(DocumentSourceChangeStreamCheckInvalidate::kStageName, spec.toBSON());
return Value(builder.obj());
}

View File

@ -39,7 +39,8 @@ namespace mongo {
* "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop
* for a single-collection change stream). It is not intended to be created by the user.
*/
class DocumentSourceChangeStreamCheckInvalidate final : public DocumentSource {
class DocumentSourceChangeStreamCheckInvalidate final
: public DocumentSourceInternalChangeStreamStage {
public:
static constexpr StringData kStageName = "$_internalChangeStreamCheckInvalidate"_sd;
@ -64,7 +65,7 @@ public:
return boost::none;
}
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
Value doSerialize(const SerializationOptions& opts = SerializationOptions{}) const final;
void addVariableRefs(std::set<Variables::Id>* refs) const final {}
@ -81,7 +82,7 @@ private:
*/
DocumentSourceChangeStreamCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<ResumeTokenData> startAfterInvalidate)
: DocumentSource(kStageName, expCtx),
: DocumentSourceInternalChangeStreamStage(kStageName, expCtx),
_startAfterInvalidate(std::move(startAfterInvalidate)) {
invariant(!_startAfterInvalidate ||
_startAfterInvalidate->fromInvalidate == ResumeTokenData::kFromInvalidate);

View File

@ -133,7 +133,8 @@ DocumentSourceChangeStreamCheckResumability::compareAgainstClientResumeToken(
DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResumability(
const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
: DocumentSource(getSourceName(), expCtx), _tokenFromClient(std::move(token)) {}
: DocumentSourceInternalChangeStreamStage(getSourceName(), expCtx),
_tokenFromClient(std::move(token)) {}
intrusive_ptr<DocumentSourceChangeStreamCheckResumability>
DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx,
@ -211,7 +212,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckResumability::doGet
MONGO_UNREACHABLE;
}
Value DocumentSourceChangeStreamCheckResumability::serialize(
Value DocumentSourceChangeStreamCheckResumability::doSerialize(
const SerializationOptions& opts) const {
BSONObjBuilder builder;
if (opts.verbosity) {
@ -223,7 +224,7 @@ Value DocumentSourceChangeStreamCheckResumability::serialize(
builder.append(
kStageName,
DocumentSourceChangeStreamCheckResumabilitySpec(ResumeToken(_tokenFromClient))
.toBSON(opts));
.toBSON());
}
return Value(builder.obj());
}

View File

@ -59,7 +59,7 @@ namespace mongo {
* - Otherwise we cannot resume, as we do not know if there were any events between the resume token
* and the first matching document in the oplog.
*/
class DocumentSourceChangeStreamCheckResumability : public DocumentSource {
class DocumentSourceChangeStreamCheckResumability : public DocumentSourceInternalChangeStreamStage {
public:
static constexpr StringData kStageName = "$_internalChangeStreamCheckResumability"_sd;
@ -90,7 +90,7 @@ public:
return boost::none;
}
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const override;
Value doSerialize(const SerializationOptions& opts = SerializationOptions{}) const override;
void addVariableRefs(std::set<Variables::Id>* refs) const final {}

View File

@ -87,7 +87,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckTopologyChange::doG
return nextInput;
}
Value DocumentSourceChangeStreamCheckTopologyChange::serialize(
Value DocumentSourceChangeStreamCheckTopologyChange::doSerialize(
const SerializationOptions& opts) const {
if (opts.verbosity) {
return Value(DOC(DocumentSourceChangeStream::kStageName

View File

@ -45,7 +45,8 @@ namespace mongo {
* that previously may not have held any data for the collection being watched, and they contain the
* information necessary for the mongoS to include the new shard in the merged change stream.
*/
class DocumentSourceChangeStreamCheckTopologyChange final : public DocumentSource {
class DocumentSourceChangeStreamCheckTopologyChange final
: public DocumentSourceInternalChangeStreamStage {
public:
static constexpr StringData kStageName = "$_internalChangeStreamCheckTopologyChange"_sd;
@ -67,14 +68,14 @@ public:
return boost::none;
}
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
Value doSerialize(const SerializationOptions& opts = SerializationOptions{}) const final;
void addVariableRefs(std::set<Variables::Id>* refs) const final {}
private:
DocumentSourceChangeStreamCheckTopologyChange(
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(kStageName, expCtx) {}
: DocumentSourceInternalChangeStreamStage(kStageName, expCtx) {}
GetNextResult doGetNext() final;
};

View File

@ -153,7 +153,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamEnsureResumeTokenPresent
}
}
Value DocumentSourceChangeStreamEnsureResumeTokenPresent::serialize(
Value DocumentSourceChangeStreamEnsureResumeTokenPresent::doSerialize(
const SerializationOptions& opts) const {
BSONObjBuilder builder;
if (opts.verbosity) {

View File

@ -55,7 +55,7 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec);
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
Value doSerialize(const SerializationOptions& opts = SerializationOptions{}) const final;
private:
/**

View File

@ -129,7 +129,7 @@ DocumentSourceChangeStreamHandleTopologyChange::create(
DocumentSourceChangeStreamHandleTopologyChange::DocumentSourceChangeStreamHandleTopologyChange(
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(kStageName, expCtx) {}
: DocumentSourceInternalChangeStreamStage(kStageName, expCtx) {}
StageConstraints DocumentSourceChangeStreamHandleTopologyChange::constraints(
Pipeline::SplitState) const {
@ -269,7 +269,7 @@ BSONObj DocumentSourceChangeStreamHandleTopologyChange::replaceResumeTokenInComm
return newCmd.freeze().toBson();
}
Value DocumentSourceChangeStreamHandleTopologyChange::serialize(
Value DocumentSourceChangeStreamHandleTopologyChange::doSerialize(
const SerializationOptions& opts) const {
if (opts.verbosity) {
return Value(DOC(DocumentSourceChangeStream::kStageName

View File

@ -46,7 +46,8 @@ namespace mongo {
* the first time. When this event is detected, this stage will establish a new cursor on that
* shard and add it to the cursors being merged.
*/
class DocumentSourceChangeStreamHandleTopologyChange final : public DocumentSource {
class DocumentSourceChangeStreamHandleTopologyChange final
: public DocumentSourceInternalChangeStreamStage {
public:
static constexpr StringData kStageName =
change_stream_constants::stage_names::kHandleTopologyChange;
@ -64,7 +65,7 @@ public:
return kStageName.rawData();
}
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
Value doSerialize(const SerializationOptions& opts = SerializationOptions{}) const final;
StageConstraints constraints(Pipeline::SplitState) const final;

View File

@ -96,8 +96,8 @@ std::unique_ptr<MatchExpression> buildOplogMatchFilter(
DocumentSourceChangeStreamOplogMatch::DocumentSourceChangeStreamOplogMatch(
Timestamp clusterTime, const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSourceMatch(change_stream_filter::buildOplogMatchFilter(expCtx, clusterTime),
expCtx) {
: DocumentSourceInternalChangeStreamMatch(
change_stream_filter::buildOplogMatchFilter(expCtx, clusterTime), expCtx) {
_clusterTime = clusterTime;
expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
}
@ -205,7 +205,7 @@ Pipeline::SourceContainer::iterator DocumentSourceChangeStreamOplogMatch::doOpti
return nextChangeStreamStageItr;
}
Value DocumentSourceChangeStreamOplogMatch::serialize(const SerializationOptions& opts) const {
Value DocumentSourceChangeStreamOplogMatch::doSerialize(const SerializationOptions& opts) const {
BSONObjBuilder builder;
if (opts.verbosity) {
BSONObjBuilder sub(builder.subobjStart(DocumentSourceChangeStream::kStageName));
@ -215,13 +215,10 @@ Value DocumentSourceChangeStreamOplogMatch::serialize(const SerializationOptions
sub.done();
} else {
BSONObjBuilder sub(builder.subobjStart(kStageName));
if (opts.literalPolicy != LiteralSerializationPolicy::kUnchanged ||
opts.transformIdentifiers) {
sub.append(DocumentSourceChangeStreamOplogMatchSpec::kFilterFieldName,
getMatchExpression()->serialize(opts));
} else {
// 'SerializationOptions' are not required here, since serialization for explain and query
// stats occur before this function call.
DocumentSourceChangeStreamOplogMatchSpec(_predicate).serialize(&sub);
}
sub.done();
}
return Value(builder.obj());

View File

@ -36,7 +36,7 @@ namespace mongo {
* A custom subclass of DocumentSourceMatch which is used to generate a $match stage to be applied
* on the oplog. The stage requires itself to be the first stage in the pipeline.
*/
class DocumentSourceChangeStreamOplogMatch final : public DocumentSourceMatch {
class DocumentSourceChangeStreamOplogMatch final : public DocumentSourceInternalChangeStreamMatch {
public:
static constexpr StringData kStageName = "$_internalChangeStreamOplogMatch"_sd;
@ -45,7 +45,7 @@ public:
DocumentSourceChangeStreamOplogMatch(const DocumentSourceChangeStreamOplogMatch& other,
const boost::intrusive_ptr<ExpressionContext>& newExpCtx)
: DocumentSourceMatch(other, newExpCtx) {
: DocumentSourceInternalChangeStreamMatch(other, newExpCtx) {
_clusterTime = other._clusterTime;
_optimizedEndOfPipeline = other._optimizedEndOfPipeline;
}
@ -74,7 +74,7 @@ public:
StageConstraints constraints(Pipeline::SplitState pipeState) const final;
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
Value doSerialize(const SerializationOptions& opts) const final;
protected:
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
@ -88,7 +88,8 @@ private:
*/
DocumentSourceChangeStreamOplogMatch(BSONObj filter,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSourceMatch(std::move(filter), expCtx), _optimizedEndOfPipeline(true) {
: DocumentSourceInternalChangeStreamMatch(std::move(filter), expCtx),
_optimizedEndOfPipeline(true) {
expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
}

View File

@ -95,15 +95,23 @@ using DSChangeStream = DocumentSourceChangeStream;
// Deterministic values used for testing
const UUID testConstUuid = UUID::parse("6948DF80-14BD-4E04-8842-7668D9C001F5").getValue();
class ExecutableStubMongoProcessInterface : public StubMongoProcessInterface {
bool isExpectedToExecuteQueries() override {
return true;
}
};
class ChangeStreamStageTestNoSetup : public AggregationContextFixture {
public:
ChangeStreamStageTestNoSetup() : ChangeStreamStageTestNoSetup(nss) {}
explicit ChangeStreamStageTestNoSetup(NamespaceString nsString)
: AggregationContextFixture(nsString) {}
: AggregationContextFixture(nsString) {
getExpCtx()->mongoProcessInterface =
std::make_unique<ExecutableStubMongoProcessInterface>();
};
};
struct MockMongoInterface final : public StubMongoProcessInterface {
struct MockMongoInterface final : public ExecutableStubMongoProcessInterface {
// Used by operations which need to obtain the oplog's UUID.
static const UUID& oplogUuid() {
static const UUID* oplog_uuid = new UUID(UUID::gen());
@ -4476,7 +4484,7 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV1HighWaterMark) {
ASSERT_FALSE(next.isAdvanced());
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamAddPostImage) {
TEST_F(ChangeStreamStageTestNoSetup, DocumentSourceChangeStreamAddPostImageEmptyForQueryStats) {
auto spec = DocumentSourceChangeStreamSpec();
spec.setFullDocument(FullDocumentModeEnum::kUpdateLookup);
@ -4485,12 +4493,13 @@ TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamAddPostImag
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamAddPostImage":{"fullDocument":"updateLookup"}})",
docSource->serialize().getDocument().toBson());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamAddPostImage":{"fullDocument":"updateLookup"}})",
redact(*docSource));
auto opts = SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue};
ASSERT(docSource->serialize(opts).missing());
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamAddPreImage) {
TEST_F(ChangeStreamStageTestNoSetup, DocumentSourceChangeStreamAddPreImageEmptyForQueryStats) {
auto docSource = DocumentSourceChangeStreamAddPreImage{
getExpCtx(), FullDocumentBeforeChangeModeEnum::kWhenAvailable};
@ -4501,12 +4510,14 @@ TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamAddPreImage
}
})",
docSource.serialize().getDocument().toBson());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamAddPreImage":{"fullDocumentBeforeChange":"whenAvailable"}})",
redact(docSource));
auto opts = SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue};
ASSERT(docSource.serialize(opts).missing());
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamCheckInvalidate) {
TEST_F(ChangeStreamStageTestNoSetup, DocumentSourceChangeStreamCheckInvalidateEmptyForQueryStats) {
DocumentSourceChangeStreamSpec spec;
spec.setResumeAfter(
ResumeToken::parse(makeResumeToken(Timestamp(),
@ -4526,24 +4537,14 @@ TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamCheckInvali
}
})",
docSource->serialize().getDocument().toBson());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamCheckInvalidate":{"startAfterInvalidate":{
"_data": "?string"
}}})",
redact(*docSource));
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamCheckInvalidate":{"startAfterInvalidate":{
"_data": "820000000000000000292904"
}}})",
docSource
->serialize(SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue})
.getDocument()
.toBson());
auto opts = SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue};
ASSERT(docSource->serialize(opts).missing());
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamCheckResumability) {
TEST_F(ChangeStreamStageTestNoSetup,
DocumentSourceChangeStreamCheckResumabilityEmptyForQueryStats) {
DocumentSourceChangeStreamSpec spec;
spec.setResumeAfter(
ResumeToken::parse(makeResumeToken(Timestamp(),
@ -4562,39 +4563,27 @@ TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamCheckResuma
}
})",
docSource->serialize().getDocument().toBson());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamCheckResumability":{
"resumeToken": {
"_data": "?string"
}
}})",
redact(*docSource));
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamCheckResumability":{
"resumeToken": {
"_data": "820000000000000000292904"
}
}})",
docSource
->serialize(SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue})
.getDocument()
.toBson());
auto opts = SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue};
ASSERT(docSource->serialize(opts).missing());
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamCheckTopologyChange) {
TEST_F(ChangeStreamStageTestNoSetup,
DocumentSourceChangeStreamCheckTopologyChangeEmptyForQueryStats) {
auto docSource = DocumentSourceChangeStreamCheckTopologyChange::create(getExpCtx());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamCheckTopologyChange":{}})",
docSource->serialize().getDocument().toBson());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamCheckTopologyChange":{}})",
redact(*docSource));
auto opts = SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue};
ASSERT(docSource->serialize(opts).missing());
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamEnsureResumeTokenPresent) {
TEST_F(ChangeStreamStageTestNoSetup,
DocumentSourceChangeStreamEnsureResumeTokenPresentEmptyForQueryStats) {
DocumentSourceChangeStreamSpec spec;
spec.setResumeAfter(
ResumeToken::parse(makeResumeToken(Timestamp(),
@ -4613,26 +4602,23 @@ TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamEnsureResum
}
})",
docSource->serialize().getDocument().toBson());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({
"$_internalChangeStreamEnsureResumeTokenPresent": {
"resumeToken": {
"_data": "?string"
}
}
})",
redact(*docSource));
auto opts = SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue};
ASSERT(docSource->serialize(opts).missing());
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamHandleTopologyChange) {
TEST_F(ChangeStreamStageTestNoSetup,
DocumentSourceChangeStreamHandleTopologyChangeEmptyForQueryStats) {
auto docSource = DocumentSourceChangeStreamHandleTopologyChange::create(getExpCtx());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamHandleTopologyChange":{}})",
docSource->serialize().getDocument().toBson());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({"$_internalChangeStreamHandleTopologyChange":{}})",
redact(*docSource));
auto opts = SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue};
ASSERT(docSource->serialize(opts).missing());
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamSplitLargeEvent) {
@ -4674,9 +4660,10 @@ TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamTransform)
}
})",
docSource->serialize().getDocument().toBson());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({
"$_internalChangeStreamTransform": {
"$changeStream": {
"resumeAfter": {
"_data": "?string"
},
@ -4688,9 +4675,9 @@ TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamTransform)
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({
"$_internalChangeStreamTransform": {
"$changeStream": {
"resumeAfter": {
"_data": "820000000000000000292904"
"_data": "8200000000000000002B0229296E04"
},
"fullDocument": "default",
"fullDocumentBeforeChange": "off"
@ -4703,6 +4690,64 @@ TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamTransform)
.toBson());
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamTransformMoreFields) {
DocumentSourceChangeStreamSpec spec;
spec.setStartAfter(
ResumeToken::parse(makeResumeToken(Timestamp(),
testConstUuid,
BSON("_id" << 1 << "x" << 2),
DocumentSourceChangeStream::kInsertOpType)));
spec.setFullDocument(FullDocumentModeEnum::kRequired);
spec.setFullDocumentBeforeChange(FullDocumentBeforeChangeModeEnum::kWhenAvailable);
spec.setShowExpandedEvents(true);
auto docSource = DocumentSourceChangeStreamTransform::create(getExpCtx(), spec);
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({
"$_internalChangeStreamTransform": {
"startAfter": {
"_data": "8200000000000000002B042C0100296E5A10046948DF8014BD4E0488427668D9C001F5463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B657900461E5F6964002B021E78002B04000004"
},
"fullDocument": "required",
"fullDocumentBeforeChange": "whenAvailable",
"showExpandedEvents": true
}
})",
docSource->serialize().getDocument().toBson());
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({
"$changeStream": {
"startAfter": {
"_data": "?string"
},
"fullDocument": "required",
"fullDocumentBeforeChange": "whenAvailable",
"showExpandedEvents": true
}
})",
redact(*docSource));
ASSERT_BSONOBJ_EQ_AUTO( // NOLINT
R"({
"$changeStream": {
"startAfter": {
"_data": "8200000000000000002B0229296E04"
},
"fullDocument": "required",
"fullDocumentBeforeChange": "whenAvailable",
"showExpandedEvents": true
}
})",
docSource
->serialize(SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue})
.getDocument()
.toBson());
}
// For DocumentSource types which contain an arbitrarily internal
// MatchExpression, we don't want match the entire structure. This
// assertion allows us to check some basic structure.
@ -4747,21 +4792,16 @@ void assertRedactedMatchExpressionContainsOperatorsAndRedactedFieldPaths(BSONEle
ASSERT(redactedFieldPaths > 0);
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamUnwindTransaction) {
TEST_F(ChangeStreamStageTestNoSetup,
DocumentSourceChangeStreamUnwindTransactionEmptyForQueryStats) {
auto docSource = DocumentSourceChangeStreamUnwindTransaction::create(getExpCtx());
auto redacted = redact(*docSource);
// First, check the outermost structure.
BSONElement el = redacted.getField("$_internalChangeStreamUnwindTransaction"_sd);
ASSERT(el);
el = el.Obj().getField("filter");
ASSERT(el);
el = el.Obj().firstElement();
assertRedactedMatchExpressionContainsOperatorsAndRedactedFieldPaths(el);
auto opts = SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue};
ASSERT(docSource->serialize(opts).missing());
}
TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamOplogMatch) {
TEST_F(ChangeStreamStageTestNoSetup, DocumentSourceChangeStreamOplogMatchEmptyForQueryStats) {
DocumentSourceChangeStreamSpec spec;
spec.setResumeAfter(
ResumeToken::parse(makeResumeToken(Timestamp(),
@ -4771,15 +4811,9 @@ TEST_F(ChangeStreamStageTestNoSetup, RedactDocumentSourceChangeStreamOplogMatch)
auto docSource = DocumentSourceChangeStreamOplogMatch::create(getExpCtx(), spec);
auto redacted = redact(*docSource);
// First, check the outermost structure.
BSONElement el = redacted.getField("$_internalChangeStreamOplogMatch"_sd);
ASSERT(el);
el = el.Obj().getField("filter");
ASSERT(el);
el = el.Obj().firstElement();
assertRedactedMatchExpressionContainsOperatorsAndRedactedFieldPaths(el);
auto opts = SerializationOptions{
.literalPolicy = LiteralSerializationPolicy::kToRepresentativeParseableValue};
ASSERT(docSource->serialize(opts).missing());
}
} // namespace

View File

@ -75,7 +75,8 @@ DocumentSourceChangeStreamTransform::createFromBson(
DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec)
: DocumentSource(DocumentSourceChangeStreamTransform::kStageName, expCtx),
: DocumentSourceInternalChangeStreamStage(DocumentSourceChangeStreamTransform::kStageName,
expCtx),
_changeStreamSpec(std::move(spec)),
_transformer(expCtx, _changeStreamSpec),
_isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) {
@ -206,8 +207,14 @@ Value DocumentSourceChangeStreamTransform::serialize(const SerializationOptions&
{"options"_sd, _changeStreamSpec.toBSON(opts)}}}});
}
return Value(Document{
{DocumentSourceChangeStreamTransform::kStageName, _changeStreamSpec.toBSON(opts)}});
// Internal change stream stages are not serialized for query stats. Query stats uses this stage
// to serialize the user specified stage, and therefore if serializing for query stats, we
// should use the '$changeStream' stage name.
auto stageName =
(opts.literalPolicy != LiteralSerializationPolicy::kUnchanged || opts.transformIdentifiers)
? DocumentSourceChangeStream::kStageName
: DocumentSourceChangeStreamTransform::kStageName;
return Value(Document{{stageName, _changeStreamSpec.toBSON(opts)}});
}
DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTracker* deps) const {

View File

@ -34,7 +34,7 @@
namespace mongo {
class DocumentSourceChangeStreamTransform : public DocumentSource {
class DocumentSourceChangeStreamTransform : public DocumentSourceInternalChangeStreamStage {
public:
static constexpr StringData kStageName = "$_internalChangeStreamTransform"_sd;
@ -58,6 +58,13 @@ public:
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
/**
* This function should never be called, since this DocumentSource has its own serialize method.
*/
Value doSerialize(const SerializationOptions& opts) const final {
MONGO_UNREACHABLE;
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final;
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {

View File

@ -106,7 +106,7 @@ DocumentSourceChangeStreamUnwindTransaction::createFromBson(
DocumentSourceChangeStreamUnwindTransaction::DocumentSourceChangeStreamUnwindTransaction(
const BSONObj& filter, const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(kStageName, expCtx) {
: DocumentSourceInternalChangeStreamStage(kStageName, expCtx) {
rebuild(filter);
}
@ -128,7 +128,7 @@ StageConstraints DocumentSourceChangeStreamUnwindTransaction::constraints(
ChangeStreamRequirement::kChangeStreamStage);
}
Value DocumentSourceChangeStreamUnwindTransaction::serialize(
Value DocumentSourceChangeStreamUnwindTransaction::doSerialize(
const SerializationOptions& opts) const {
tassert(7481400, "expression has not been initialized", _expression);
@ -141,14 +141,10 @@ Value DocumentSourceChangeStreamUnwindTransaction::serialize(
return Value(DOC(DocumentSourceChangeStream::kStageName << builder.obj()));
}
Value spec;
if (opts.literalPolicy != LiteralSerializationPolicy::kUnchanged || opts.transformIdentifiers) {
spec = Value(DOC(DocumentSourceChangeStreamUnwindTransactionSpec::kFilterFieldName
<< _expression->serialize(opts)));
} else {
spec = Value(DocumentSourceChangeStreamUnwindTransactionSpec(_filter).toBSON());
}
return Value(Document{{kStageName, spec}});
// 'SerializationOptions' are not required here, since serialization for explain and query
// stats occur before this function call.
return Value(Document{
{kStageName, Value{DocumentSourceChangeStreamUnwindTransactionSpec{_filter}.toBSON()}}});
}
DepsTracker::State DocumentSourceChangeStreamUnwindTransaction::getDependencies(

View File

@ -41,7 +41,7 @@ namespace mongo {
* output, but all other entries pass through unmodified. Note that the namespace filter applies
* only to unwound transaction operations, not to any other entries.
*/
class DocumentSourceChangeStreamUnwindTransaction : public DocumentSource {
class DocumentSourceChangeStreamUnwindTransaction : public DocumentSourceInternalChangeStreamStage {
public:
static constexpr StringData kStageName = "$_internalChangeStreamUnwindTransaction"_sd;
@ -57,7 +57,7 @@ public:
DocumentSource::GetModPathsReturn getModifiedPaths() const final;
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
Value doSerialize(const SerializationOptions& opts = SerializationOptions{}) const final;
StageConstraints constraints(Pipeline::SplitState pipeState) const final;

View File

@ -1093,7 +1093,8 @@ void DocumentSourceLookUp::serializeToArray(std::vector<Value>& array,
}
if (opts.transformIdentifiers ||
opts.literalPolicy != LiteralSerializationPolicy::kUnchanged) {
return Pipeline::parse(*_userPipeline, _fromExpCtx)->serializeToBson(opts);
return Pipeline::parse(*_userPipeline, _fromExpCtx, lookupPipeValidator)
->serializeToBson(opts);
}
if (opts.serializeForQueryAnalysis) {
// If we are in query analysis, encrypted fields will have been marked in the

View File

@ -582,4 +582,22 @@ void DocumentSourceMatch::rebuild(BSONObj filter) {
getDependencies(&_dependencies);
}
Value DocumentSourceInternalChangeStreamMatch::serialize(const SerializationOptions& opts) const {
if (opts.literalPolicy != LiteralSerializationPolicy::kUnchanged || opts.transformIdentifiers) {
// Stages made internally by 'DocumentSourceChangeStream' should not be serialized for
// query stats. For query stats we will serialize only the user specified $changeStream
// stage.
return Value();
}
return doSerialize(opts);
}
intrusive_ptr<DocumentSourceInternalChangeStreamMatch>
DocumentSourceInternalChangeStreamMatch::create(BSONObj filter,
const intrusive_ptr<ExpressionContext>& expCtx) {
intrusive_ptr<DocumentSourceInternalChangeStreamMatch> internalMatch(
new DocumentSourceInternalChangeStreamMatch(filter, expCtx));
return internalMatch;
}
} // namespace mongo

View File

@ -233,4 +233,42 @@ private:
DepsTracker _dependencies;
};
/**
* A DocumentSource class for all internal change stream stages that are also match stages. This
* currently handles parsing for query stats.
*/
class DocumentSourceInternalChangeStreamMatch : public DocumentSourceMatch {
public:
DocumentSourceInternalChangeStreamMatch(std::unique_ptr<MatchExpression> expr,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSourceMatch(std::move(expr), expCtx) {}
static boost::intrusive_ptr<DocumentSourceInternalChangeStreamMatch> create(
BSONObj filter, const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
* Must override the serialize method, since internal change stream stages are serialized
* differently than match stages. This function mirrors
* DocumentSourceInternalChangeStreamStage::serialize and was added because this class cannot
* inherit from both DocumentSourceInternalChangeStreamStage and DocumentSourceMatch.
*/
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
virtual Value doSerialize(const SerializationOptions& opts) const {
return DocumentSourceMatch::serialize(opts);
};
protected:
DocumentSourceInternalChangeStreamMatch(const BSONObj& query,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSourceMatch(query, expCtx) {}
DocumentSourceInternalChangeStreamMatch(
const DocumentSourceInternalChangeStreamMatch& other,
const boost::intrusive_ptr<ExpressionContext>& newExpCtx)
: DocumentSourceMatch(
other.serialize().getDocument().toBson().firstElement().embeddedObject(),
newExpCtx ? newExpCtx : other.pExpCtx) {}
};
} // namespace mongo

View File

@ -53,12 +53,7 @@ REGISTER_DOCUMENT_SOURCE(unionWith,
AllowedWithApiStrict::kAlways);
namespace {
std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
ExpressionContext::ResolvedNamespace resolvedNs,
std::vector<BSONObj> currentPipeline) {
auto validatorCallback = [](const Pipeline& pipeline) {
void validatorCallback(const Pipeline& pipeline) {
const auto& sources = pipeline.getSources();
std::for_each(sources.begin(), sources.end(), [](auto& src) {
uassert(31441,
@ -66,7 +61,12 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition(
<< " is not allowed within a $unionWith's sub-pipeline",
src->constraints().isAllowedInUnionPipeline());
});
};
}
std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
ExpressionContext::ResolvedNamespace resolvedNs,
std::vector<BSONObj> currentPipeline) {
MakePipelineOptions opts;
opts.attachCursorSource = false;
@ -370,7 +370,9 @@ Value DocumentSourceUnionWith::serialize(const SerializationOptions& opts) const
std::move(_pushedDownStages.begin(),
_pushedDownStages.end(),
std::back_inserter(recoveredPipeline));
pipeCopy = Pipeline::parse(recoveredPipeline, _pipeline->getContext()).release();
pipeCopy =
Pipeline::parse(recoveredPipeline, _pipeline->getContext(), validatorCallback)
.release();
} else {
// The plan does not require reading from the sub-pipeline, so just include the
// serialization in the explain output.
@ -403,7 +405,7 @@ Value DocumentSourceUnionWith::serialize(const SerializationOptions& opts) const
auto serializedPipeline = [&]() -> std::vector<BSONObj> {
if (opts.transformIdentifiers ||
opts.literalPolicy != LiteralSerializationPolicy::kUnchanged) {
return Pipeline::parse(_userPipeline, _pipeline->getContext())
return Pipeline::parse(_userPipeline, _pipeline->getContext(), validatorCallback)
->serializeToBson(opts);
}
return _pipeline->serializeToBson(opts);

View File

@ -299,21 +299,21 @@ ResumeTokenData ResumeToken::getData() const {
}
Document ResumeToken::toDocument(const SerializationOptions& options) const {
/*
* This is our default resume token for the representative query shape.
* We use a high water mark token, otherwise a resume event is expected when reparsing.
* When serializing the "_typeBits", we purposely avoid serializing with SerializationOptions,
* as this will result in mistakenly add '?undefined' to the Document.
* The serialization of the Document will typically exclude the "_typeBits" if they
* were unset, which is the case for "kDefaultToken".
*/
static const auto kDefaultToken = makeHighWaterMarkToken(Timestamp(), 0);
return Document{{kDataFieldName,
options.serializeLiteral(_hexKeyString, Value(kDefaultToken._hexKeyString))},
// This is our default resume token for the representative query shape.
static const auto kDefaultTokenQueryStats = makeHighWaterMarkToken(Timestamp(), 1);
return Document{
{kDataFieldName,
options.serializeLiteral(_hexKeyString, Value(kDefaultTokenQueryStats._hexKeyString))},
// When serializing with 'kToDebugTypeString' 'serializeLiteral' will return an
// incorrect result. Therefore, we prefer to always exclude '_typeBits' when serializing
// the debug string by passing an empty value, since '_typeBits' is rarely set and will
// always be either missing or of type BinData.
{kTypeBitsFieldName,
options.literalPolicy != LiteralSerializationPolicy::kToDebugTypeString
? options.serializeLiteral(_typeBits, kDefaultToken._typeBits)
: kDefaultToken._typeBits}};
options.literalPolicy == LiteralSerializationPolicy::kToDebugTypeString
? Value()
: options.serializeLiteral(_typeBits, kDefaultTokenQueryStats._typeBits)}};
}
BSONObj ResumeToken::toBSON(const SerializationOptions& options) const {

View File

@ -869,7 +869,7 @@ void testIntervalFuzz(const uint64_t seed, PseudoRandom& threadLocalRNG) {
// Number of bits held in the bitset. This include MinKey and MaxKey, so it must be at least two.
static constexpr int bitsetSize = 11;
static const size_t numThreads = ProcessInfo::getNumCores();
static const size_t numThreads = ProcessInfo::getNumLogicalCores();
TEST_F(IntervalIntersection, IntervalPermutations) {
// Number of permutations is bitsetSize^4 * 2^4 * 2

View File

@ -153,7 +153,7 @@ ServiceContext::ConstructorActionRegisterer queryStatsStoreManagerRegisterer{
// number of cores. The size needs to be cast to a double since we want to round up the
// number of partitions, and therefore need to avoid int division.
size_t numPartitions = std::ceil(double(size) / (16 * 1024 * 1024));
auto numLogicalCores = ProcessInfo::getNumCores();
auto numLogicalCores = ProcessInfo::getNumLogicalCores();
if (numPartitions < numLogicalCores) {
numPartitions = numLogicalCores;
}

View File

@ -86,7 +86,7 @@ ServiceContext::ConstructorActionRegisterer planCacheRegisterer{
}
auto& globalPlanCache = sbePlanCacheDecoration(serviceCtx);
globalPlanCache =
std::make_unique<sbe::PlanCache>(cappedCacheSize, ProcessInfo::getNumCores());
std::make_unique<sbe::PlanCache>(cappedCacheSize, ProcessInfo::getNumLogicalCores());
}};
} // namespace

View File

@ -95,7 +95,7 @@ size_t getRequestedMemSizeInBytes(const MemorySize& memSize) {
size_t planCacheSize = convertToSizeInBytes(memSize);
uassert(5968001,
"Cache size must be at least 1KB * number of cores",
planCacheSize >= 1024 * ProcessInfo::getNumCores());
planCacheSize >= 1024 * ProcessInfo::getNumLogicalCores());
return planCacheSize;
}

View File

@ -42,6 +42,7 @@
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
#include "mongo/db/repl/topology_version_observer.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/log_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/clock_source.h"
@ -120,6 +121,9 @@ protected:
const Milliseconds sleepTime = Milliseconds(100);
std::unique_ptr<TopologyVersionObserver> observer;
unittest::MinimumLoggedSeverityGuard severityGuard{logv2::LogComponent::kDefault,
logv2::LogSeverity::Debug(4)};
};
@ -142,11 +146,15 @@ TEST_F(TopologyVersionObserverTest, UpdateCache) {
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen, opCtx.get());
auto sleepCounter = 0;
// Wait for the observer to update its cache
while (observer->getCached()->getTopologyVersion()->getCounter() ==
cachedResponse->getTopologyVersion()->getCounter()) {
sleepFor(sleepTime);
// Make sure the test doesn't wait here for longer than 15 seconds.
ASSERT_LTE(sleepCounter++, 150);
}
LOGV2(9326401, "Observer topology incremented after successful election");
auto newResponse = observer->getCached();
ASSERT(newResponse && newResponse->getTopologyVersion());

View File

@ -93,6 +93,7 @@ CoordinatorCommitMonitor::CoordinatorCommitMonitor(
std::vector<ShardId> recipientShards,
CoordinatorCommitMonitor::TaskExecutorPtr executor,
CancellationToken cancelToken,
int delayBeforeInitialQueryMillis,
Milliseconds maxDelayBetweenQueries)
: _metrics{std::move(metrics)},
_ns(std::move(ns)),
@ -100,11 +101,12 @@ CoordinatorCommitMonitor::CoordinatorCommitMonitor(
_executor(std::move(executor)),
_cancelToken(std::move(cancelToken)),
_threshold(Milliseconds(gRemainingReshardingOperationTimeThresholdMillis.load())),
_delayBeforeInitialQueryMillis(Milliseconds(delayBeforeInitialQueryMillis)),
_maxDelayBetweenQueries(maxDelayBetweenQueries) {}
SemiFuture<void> CoordinatorCommitMonitor::waitUntilRecipientsAreWithinCommitThreshold() const {
return _makeFuture()
return _makeFuture(_delayBeforeInitialQueryMillis)
.onError([](Status status) {
if (ErrorCodes::isCancellationError(status.code()) ||
ErrorCodes::isInterruption(status.code())) {
@ -198,9 +200,16 @@ CoordinatorCommitMonitor::queryRemainingOperationTimeForRecipients() const {
return {minRemainingTime, maxRemainingTime};
}
ExecutorFuture<void> CoordinatorCommitMonitor::_makeFuture() const {
ExecutorFuture<void> CoordinatorCommitMonitor::_makeFuture(Milliseconds delayBetweenQueries) const {
return ExecutorFuture<void>(_executor)
.then([this] { return queryRemainingOperationTimeForRecipients(); })
// Start waiting so that we have a more time to calculate a more realistic remaining time
// estimate.
.then([this, anchor = shared_from_this(), delayBetweenQueries] {
return _executor->sleepFor(delayBetweenQueries, _cancelToken)
.then([this, anchor = std::move(anchor)] {
return queryRemainingOperationTimeForRecipients();
});
})
.onError([this](Status status) {
if (_cancelToken.isCanceled()) {
// Do not retry on cancellation errors.
@ -235,12 +244,10 @@ ExecutorFuture<void> CoordinatorCommitMonitor::_makeFuture() const {
// The following ensures that the monitor would never sleep for more than a predefined
// maximum delay between querying recipient shards. Thus, it can handle very large,
// and potentially inaccurate estimates of the remaining operation time.
auto sleepTime = std::min(remainingTimes.max - _threshold, _maxDelayBetweenQueries);
return _executor->sleepFor(sleepTime, _cancelToken)
.then([this, anchor = std::move(anchor)] {
// We are not canceled yet, so schedule new queries against recipient shards.
return _makeFuture();
});
auto delayBetweenQueries =
std::min(remainingTimes.max - _threshold, _maxDelayBetweenQueries);
return _makeFuture(delayBetweenQueries);
});
}

View File

@ -74,6 +74,7 @@ public:
std::vector<ShardId> recipientShards,
TaskExecutorPtr executor,
CancellationToken cancelToken,
int delayBeforeInitialQueryMillis,
Milliseconds maxDelayBetweenQueries = kMaxDelayBetweenQueries);
SemiFuture<void> waitUntilRecipientsAreWithinCommitThreshold() const;
@ -90,7 +91,7 @@ public:
RemainingOperationTimes queryRemainingOperationTimeForRecipients() const;
private:
ExecutorFuture<void> _makeFuture() const;
ExecutorFuture<void> _makeFuture(Milliseconds delayBetweenQueries) const;
static constexpr auto kDiagnosticLogLevel = 0;
static constexpr auto kMaxDelayBetweenQueries = Seconds(30);
@ -102,6 +103,8 @@ private:
const CancellationToken _cancelToken;
const Milliseconds _threshold;
const Milliseconds _delayBeforeInitialQueryMillis;
const Milliseconds _maxDelayBetweenQueries;
TaskExecutorPtr _networkExecutor;

View File

@ -169,6 +169,7 @@ void CoordinatorCommitMonitorTest::setUp() {
_recipientShards,
_futureExecutor,
_cancellationSource->token(),
0,
Milliseconds(0));
_commitMonitor->setNetworkExecutorForTest(executor());
}

View File

@ -2035,7 +2035,8 @@ void ReshardingCoordinator::_startCommitMonitor(
_coordinatorDoc.getSourceNss(),
resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()),
**executor,
_ctHolder->getCommitMonitorToken());
_ctHolder->getCommitMonitorToken(),
resharding::gReshardingDelayBeforeRemainingOperationTimeQueryMillis.load());
_commitMonitorQuiesced = _commitMonitor->waitUntilRecipientsAreWithinCommitThreshold()
.thenRunOn(**executor)

View File

@ -180,10 +180,10 @@ boost::optional<Milliseconds> ReshardingMetrics::getRecipientHighEstimateRemaini
getStartFor(TimedPhase::kApplying).has_value(),
getBytesWrittenCount(),
getApproxBytesToScanCount(),
getElapsed<Seconds>(TimedPhase::kCloning, getClockSource()).value_or(Seconds{0}),
getElapsed<Milliseconds>(TimedPhase::kCloning, getClockSource()).value_or(Seconds{0}),
getOplogEntriesApplied(),
getOplogEntriesFetched(),
getElapsed<Seconds>(TimedPhase::kApplying, getClockSource()).value_or(Seconds{0}));
getElapsed<Milliseconds>(TimedPhase::kApplying, getClockSource()).value_or(Seconds{0}));
}
std::unique_ptr<ReshardingMetrics> ReshardingMetrics::makeInstance(UUID instanceId,

View File

@ -714,5 +714,23 @@ TEST_F(ReshardingMetricsTest, OnStateTransitionInformsCumulativeMetrics) {
});
}
TEST_F(ReshardingMetricsTest, RecipientReportsRemainingTimeLowElapsed) {
auto metrics = createInstanceMetrics(getClockSource(), UUID::gen(), Role::kRecipient);
const auto& clock = getClockSource();
constexpr auto timeSpentCloning = Seconds(20);
constexpr auto timeSpentApplying = Milliseconds(50);
metrics->onOplogEntriesFetched(500000);
metrics->setStartFor(TimedPhase::kCloning, clock->now());
clock->advance(timeSpentCloning);
metrics->setEndFor(TimedPhase::kCloning, clock->now());
metrics->setStartFor(TimedPhase::kApplying, clock->now());
clock->advance(timeSpentApplying);
metrics->onOplogEntriesApplied(300000);
auto report = metrics->getHighEstimateRemainingTimeMillis();
ASSERT_NE(report, Milliseconds{0});
}
} // namespace
} // namespace mongo

Some files were not shown because too many files have changed in this diff Show More