SERVER-82043 Enhancement of Commit Message Validation for 10gen/mongo Commits (#16160)

GitOrigin-RevId: 26c212f31e
This commit is contained in:
Juan Gu 2023-10-10 10:58:21 -07:00 committed by MongoDB Bot
parent 78aeadff05
commit e88e3415ed
50 changed files with 1310 additions and 1340 deletions

View File

@ -1,14 +1,8 @@
"""Module for syncing a repo with Copybara and setting up configurations."""
import argparse
import subprocess
import os
import sys
from typing import Optional
from github import GithubIntegration
from buildscripts.util.read_config import read_config_file
def run_command(command): # noqa: D406,D407
"""
@ -49,38 +43,8 @@ def create_mongodb_bot_gitconfig():
print("mongodb-bot.gitconfig file created.")
def get_installation_access_token(app_id: int, private_key: str,
installation_id: int) -> Optional[str]: # noqa: D406,D407,D413
"""
Obtain an installation access token using JWT.
Args:
- app_id (int): The application ID for GitHub App.
- private_key (str): The private key associated with the GitHub App.
- installation_id (int): The installation ID of the GitHub App for a particular account.
Returns:
- Optional[str]: The installation access token. Returns `None` if there's an error obtaining the token.
"""
integration = GithubIntegration(app_id, private_key)
auth = integration.get_access_token(installation_id)
if auth:
return auth.token
else:
print("Error obtaining installation token")
return None
def main():
"""Clone the Copybara repo, build its Docker image, and set up and run migrations."""
parser = argparse.ArgumentParser()
parser.add_argument("--expansion-file", dest="expansion_file", type=str,
help="Location of expansions file generated by evergreen.")
args = parser.parse_args()
# Check if the copybara directory already exists
if os.path.exists('copybara'):
print("Copybara directory already exists.")
@ -90,47 +54,25 @@ def main():
# Navigate to the Copybara directory and build the Copybara Docker image
run_command("cd copybara && docker build --rm -t copybara .")
# Read configurations
expansions = read_config_file(args.expansion_file)
access_token_copybara_syncer = get_installation_access_token(
expansions["app_id_copybara_syncer"], expansions["private_key_copybara_syncer"],
expansions["installation_id_copybara_syncer"])
# Create the mongodb-bot.gitconfig file as necessary.
create_mongodb_bot_gitconfig()
current_dir = os.getcwd()
git_destination_url_with_token = f"https://x-access-token:{access_token_copybara_syncer}@github.com/mongodb/mongo.git"
# Set up the Docker command and execute it
current_dir = os.getcwd()
docker_cmd = [
"docker run",
"-v ~/.ssh:/root/.ssh",
"-v ~/mongodb-bot.gitconfig:/root/.gitconfig",
f'-v "{current_dir}/copybara.sky":/usr/src/app/copy.bara.sky',
"-e COPYBARA_CONFIG='copy.bara.sky'",
"-e COPYBARA_SUBCOMMAND='migrate'",
f"-e COPYBARA_OPTIONS='-v --git-destination-url={git_destination_url_with_token}'",
"copybara copybara",
"docker run", "-v ~/.ssh:/root/.ssh", "-v ~/mongodb-bot.gitconfig:/root/.gitconfig",
f'-v "{current_dir}/copybara.staging.sky":/usr/src/app/copy.bara.sky',
"-e COPYBARA_CONFIG='copy.bara.sky'", "-e COPYBARA_SUBCOMMAND='migrate'",
"-e COPYBARA_OPTIONS='-v'", "copybara copybara"
]
try:
run_command(" ".join(docker_cmd))
except subprocess.CalledProcessError as err:
error_message = str(err.stderr)
acceptable_error_messages = [
# Indicates the two repositories are identical
"No new changes to import for resolved ref",
# Indicates differences exist but no changes affect the destination, for example: exclusion rules
"Iterative workflow produced no changes in the destination for resolved ref",
]
if any(acceptable_message in error_message
for acceptable_message in acceptable_error_messages):
return
raise
# Handle the specific error case for "No new changes..." between two repos
if "No new changes to import for resolved ref" not in str(err.stderr):
raise
if __name__ == "__main__":

View File

@ -1,28 +0,0 @@
# This configuration is for migrating code from one Git repository to another using Copybara.
# It selectively copies content, excluding specific paths and preserving authorship.
sourceUrl = "git@github.com:10gen/mongo.git"
destinationUrl = "git@github.com:mongodb/mongo.git"
core.workflow(
name = "default",
origin = git.origin(
url = sourceUrl,
ref = "v4.4",
# VersionSelector
),
destination = git.destination(
url = destinationUrl,
fetch = "v4.4",
push = "v4.4",
),
# Change path to the folder you want to publish publicly
origin_files = glob(["**"], exclude=["src/mongo/db/modules/**"]),
authoring = authoring.pass_thru("MongoDB <mongodb@mongodb.com>"),
mode = "ITERATIVE",
# Change the path here to the folder you want to publish publicly
# transformations = [
# core.move("path/to/folder/you/want/exported", ""),
# ],
)

View File

@ -165,8 +165,6 @@ all:
ticket: SERVER-78793
- test_file: jstests/sharding/fsync_lock_unlock.js
ticket: SERVER-78149
- test_file: jstests/sharding/flushRoutingTableCacheUpdates_enforced_on_collections.js
ticket: SERVER-81985
suites:
change_streams_multiversion: null
concurrency_replication_multiversion: null

View File

@ -3721,7 +3721,7 @@ functions:
set -o errexit
${activate_virtualenv}
$python buildscripts/sync_repo_with_copybara.py --expansion-file expansions.yml
$python buildscripts/sync_repo_with_copybara.py
# Pre task steps
pre:
@ -5339,9 +5339,6 @@ tasks:
- *git_get_project
- *set_task_expansion_macros
- *set_up_virtualenv
- command: expansions.write
params:
file: src/expansions.yml
- func: "sync repo with copybara"
## integration test suites ##

View File

@ -9,4 +9,3 @@ flask == 1.1.1
itsdangerous == 2.0.0
Werkzeug == 2.0.3
ocspbuilder == 0.10.2
PyGithub == 1.53

View File

@ -1,20 +0,0 @@
// Ensure that a call to _flushRoutingTableCacheUpdates in a sharded cluster will return error if
// attempted on a database instead of a collection.
let st = new ShardingTest({});
const testDBName = jsTestName();
const collName = 'coll';
const testDB = st.s.getDB(testDBName);
assert.commandWorked(testDB.adminCommand({enableSharding: testDBName}));
assert.commandWorked(
testDB.adminCommand({shardCollection: testDB[collName].getFullName(), key: {x: 1}}));
// On a collection, the command works
assert.commandWorked(
st.shard0.adminCommand({_flushRoutingTableCacheUpdates: testDB[collName].getFullName()}));
// But on a database, the command fails with error "IllegalOperation"
assert.commandFailedWithCode(st.shard0.adminCommand({_flushRoutingTableCacheUpdates: testDBName}),
ErrorCodes.IllegalOperation);
st.stop();

View File

@ -10,34 +10,24 @@
load("jstests/libs/chunk_manipulation_util.js");
load("jstests/libs/parallelTester.js");
// Documents inserted in this test are in the shape {_id: int} so the size is 18 bytes
const docSizeInBytes = 18;
function ShardStat() {
this.countDonorMoveChunkStarted = 0;
this.countRecipientMoveChunkStarted = 0;
this.countDocsClonedOnRecipient = 0;
this.countDocsClonedOnDonor = 0;
this.countDocsDeletedByRangeDeleter = 0;
this.countBytesDeletedByRangeDeleter = 0;
this.countDocsDeletedOnDonor = 0;
}
function incrementStatsAndCheckServerShardStats(db, donor, recipient, numDocs) {
function incrementStatsAndCheckServerShardStats(donor, recipient, numDocs) {
++donor.countDonorMoveChunkStarted;
donor.countDocsClonedOnDonor += numDocs;
++recipient.countRecipientMoveChunkStarted;
recipient.countDocsClonedOnRecipient += numDocs;
donor.countDocsDeletedByRangeDeleter += numDocs;
// The size of each document inserted in this test is 1 byte, so the number of bytes
// deleted must be exactly `numDocs`
donor.countBytesDeletedByRangeDeleter += numDocs * docSizeInBytes;
donor.countDocsDeletedOnDonor += numDocs;
const statsFromServerStatus = shardArr.map(function(shardVal) {
return shardVal.getDB('admin').runCommand({serverStatus: 1}).shardingStatistics;
});
for (let i = 0; i < shardArr.length; ++i) {
let countDocsDeleted = statsFromServerStatus[i].hasOwnProperty('countDocsDeletedOnDonor')
? statsFromServerStatus[i].countDocsDeletedOnDonor
: statsFromServerStatus[i].countDocsDeletedByRangeDeleter;
assert(statsFromServerStatus[i]);
assert(statsFromServerStatus[i].countStaleConfigErrors);
assert(statsFromServerStatus[i].totalCriticalSectionCommitTimeMillis);
@ -53,13 +43,8 @@ function incrementStatsAndCheckServerShardStats(db, donor, recipient, numDocs) {
assert.eq(stats[i].countDocsClonedOnRecipient,
statsFromServerStatus[i].countDocsClonedOnRecipient);
assert.eq(stats[i].countDocsClonedOnDonor, statsFromServerStatus[i].countDocsClonedOnDonor);
assert.eq(stats[i].countDocsDeletedByRangeDeleter, countDocsDeleted);
const fcvDoc = db.adminCommand({getParameter: 1, featureCompatibilityVersion: 1});
if (MongoRunner.compareBinVersions(fcvDoc.featureCompatibilityVersion.version, '5.0') >=
0) {
assert.eq(stats[i].countBytesDeletedByRangeDeleter,
statsFromServerStatus[i].countBytesDeletedByRangeDeleter);
}
assert.eq(stats[i].countDocsDeletedOnDonor,
statsFromServerStatus[i].countDocsDeletedOnDonor);
assert.eq(stats[i].countRecipientMoveChunkStarted,
statsFromServerStatus[i].countRecipientMoveChunkStarted);
}
@ -136,7 +121,7 @@ if (MongoRunner.compareBinVersions(fcvDoc.featureCompatibilityVersion.version, '
// Move chunk from shard0 to shard1 without docs.
assert.commandWorked(
mongos.adminCommand({moveChunk: coll + '', find: {_id: 1}, to: st.shard1.shardName}));
incrementStatsAndCheckServerShardStats(testDB, stats[0], stats[1], numDocsInserted);
incrementStatsAndCheckServerShardStats(stats[0], stats[1], numDocsInserted);
// Insert docs and then move chunk again from shard1 to shard0.
for (let i = 0; i < numDocsToInsert; ++i) {
@ -145,17 +130,17 @@ for (let i = 0; i < numDocsToInsert; ++i) {
}
assert.commandWorked(mongos.adminCommand(
{moveChunk: coll + '', find: {_id: 1}, to: st.shard0.shardName, _waitForDelete: true}));
incrementStatsAndCheckServerShardStats(testDB, stats[1], stats[0], numDocsInserted);
incrementStatsAndCheckServerShardStats(stats[1], stats[0], numDocsInserted);
// Check that numbers are indeed cumulative. Move chunk from shard0 to shard1.
assert.commandWorked(mongos.adminCommand(
{moveChunk: coll + '', find: {_id: 1}, to: st.shard1.shardName, _waitForDelete: true}));
incrementStatsAndCheckServerShardStats(testDB, stats[0], stats[1], numDocsInserted);
incrementStatsAndCheckServerShardStats(stats[0], stats[1], numDocsInserted);
// Move chunk from shard1 to shard0.
assert.commandWorked(mongos.adminCommand(
{moveChunk: coll + '', find: {_id: 1}, to: st.shard0.shardName, _waitForDelete: true}));
incrementStatsAndCheckServerShardStats(testDB, stats[1], stats[0], numDocsInserted);
incrementStatsAndCheckServerShardStats(stats[1], stats[0], numDocsInserted);
//
// Tests for the count of migrations aborting from lock timeouts.

View File

@ -379,20 +379,19 @@ private:
invariant(!_commandObj.isEmpty());
bob->append("find", _commandObj["update"].String());
extractQueryDetails(_updateOpObj, bob);
bob->append("batchSize", 1);
bob->append("singleBatch", true);
if (const auto& shardVersion = _commandObj.getField("shardVersion");
!shardVersion.eoo()) {
bob->append(shardVersion);
}
if (const auto& databaseVersion = _commandObj.getField("databaseVersion");
!databaseVersion.eoo()) {
bob->append(databaseVersion);
}
bob->append("find", _commandObj["update"].String());
extractQueryDetails(_updateOpObj, bob);
bob->append("batchSize", 1);
bob->append("singleBatch", true);
}
private:

View File

@ -567,6 +567,10 @@ Status storeMongodOptions(const moe::Environment& params) {
return Status(ErrorCodes::BadValue, sb.str());
}
}
} else {
if (serverGlobalParams.port < 0 || serverGlobalParams.port > 65535) {
return Status(ErrorCodes::BadValue, "bad --port number");
}
}
if (params.count("sharding.clusterRole")) {
auto clusterRoleParam = params["sharding.clusterRole"].as<std::string>();

View File

@ -47,9 +47,9 @@
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/grid.h"
#include "mongo/util/str.h"
namespace mongo {
using MigrateInfoVector = BalancerChunkSelectionPolicy::MigrateInfoVector;
@ -90,27 +90,6 @@ StatusWith<ZoneInfo> createCollectionZoneInfo(OperationContext* opCtx,
*/
StatusWith<DistributionStatus> createCollectionDistributionStatus(
OperationContext* opCtx, const ShardStatisticsVector& allShards, ChunkManager* chunkMgr) {
ShardToChunksMap shardToChunksMap;
// Makes sure there is an entry in shardToChunksMap for every shard, so empty shards will also
// be accounted for
for (const auto& stat : allShards) {
shardToChunksMap[stat.shardId];
}
chunkMgr->forEachChunk([&](const auto& chunkEntry) {
ChunkType chunk;
chunk.setNS(chunkMgr->getns());
chunk.setMin(chunkEntry.getMin());
chunk.setMax(chunkEntry.getMax());
chunk.setJumbo(chunkEntry.isJumbo());
chunk.setShard(chunkEntry.getShardId());
chunk.setVersion(chunkEntry.getLastmod());
shardToChunksMap[chunkEntry.getShardId()].push_back(chunk);
return true;
});
auto swZoneInfo = createCollectionZoneInfo(
opCtx, chunkMgr->getns(), chunkMgr->getShardKeyPattern().getKeyPattern());
@ -118,8 +97,7 @@ StatusWith<DistributionStatus> createCollectionDistributionStatus(
return swZoneInfo.getStatus();
}
return {DistributionStatus{
chunkMgr->getns(), std::move(shardToChunksMap), std::move(swZoneInfo.getValue())}};
return {DistributionStatus{chunkMgr->getns(), std::move(swZoneInfo.getValue()), chunkMgr}};
}
/**
@ -548,7 +526,7 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* opCt
}
return BalancerPolicy::isShardSuitableReceiver(*newShardIterator,
distribution.getTagForChunk(chunk));
distribution.getTagForRange(chunk.getRange()));
}
StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidatesForCollection(
@ -608,7 +586,7 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi
const DistributionStatus& distribution = collInfoStatus.getValue();
for (const auto& tagRangeEntry : distribution.tagRanges()) {
for (const auto& tagRangeEntry : distribution.getZoneInfo().zoneRanges()) {
const auto& tagRange = tagRangeEntry.second;
const auto chunkAtZoneMin = cm->findIntersectingChunkWithSimpleCollation(tagRange.min);

View File

@ -58,66 +58,140 @@ namespace {
// optimal average across all shards for a zone for a rebalancing migration to be initiated.
const size_t kDefaultImbalanceThreshold = 1;
ChunkType makeChunkType(const NamespaceString& nss, const Chunk& chunk) {
ChunkType ct{nss, chunk.getRange(), chunk.getLastmod(), chunk.getShardId()};
ct.setJumbo(chunk.isJumbo());
return ct;
}
/**
* Return a vector of zones after they have been normalized according to the given chunk
* configuration.
*
* If a zone covers only partially a chunk, boundaries of that zone will be shrank so that the
* normalized zone won't overlap with that chunk. The boundaries of a normalized zone will never
* fall in the middle of a chunk.
*
* Additionally the vector will contain also zones for the "NoZone",
*/
std::vector<ZoneRange> normalizeZones(const ChunkManager* cm, const ZoneInfo& zoneInfo) {
std::vector<ZoneRange> normalizedRanges;
const auto& globalMaxKey = cm->getShardKeyPattern().getKeyPattern().globalMax();
auto lastMax = cm->getShardKeyPattern().getKeyPattern().globalMin();
for (const auto& zoneRangeIt : zoneInfo.zoneRanges()) {
const auto& zoneRange = zoneRangeIt.second;
const auto& minChunk = cm->findIntersectingChunkWithSimpleCollation(zoneRange.min);
const auto gtMin =
SimpleBSONObjComparator::kInstance.evaluate(zoneRange.min > minChunk.getMin());
const auto& normalizedMin = gtMin ? minChunk.getMax() : zoneRange.min;
const auto& normalizedMax = [&] {
if (SimpleBSONObjComparator::kInstance.evaluate(zoneRange.max == globalMaxKey)) {
return zoneRange.max;
}
const auto& maxChunk = cm->findIntersectingChunkWithSimpleCollation(zoneRange.max);
const auto gtMax =
SimpleBSONObjComparator::kInstance.evaluate(zoneRange.max > maxChunk.getMin()) &&
SimpleBSONObjComparator::kInstance.evaluate(
zoneRange.max != cm->getShardKeyPattern().getKeyPattern().globalMax());
return gtMax ? maxChunk.getMin() : zoneRange.max;
}();
if (SimpleBSONObjComparator::kInstance.evaluate(normalizedMin == normalizedMax)) {
// This zone does not fully contain any chunk thus we can ignore it
continue;
}
if (SimpleBSONObjComparator::kInstance.evaluate(normalizedMin != lastMax)) {
// The zone is not contiguous with the previous one so we add a kNoZoneRange
// does not fully contain any chunk so we will ignore it
normalizedRanges.emplace_back(lastMax, normalizedMin, ZoneInfo::kNoZoneName);
}
normalizedRanges.emplace_back(normalizedMin, normalizedMax, zoneRange.zone);
lastMax = normalizedMax;
}
if (SimpleBSONObjComparator::kInstance.evaluate(lastMax != globalMaxKey)) {
normalizedRanges.emplace_back(lastMax, globalMaxKey, ZoneInfo::kNoZoneName);
}
return normalizedRanges;
}
} // namespace
DistributionStatus::DistributionStatus(NamespaceString nss,
ShardToChunksMap shardToChunksMap,
ZoneInfo zoneInfo)
: _nss(std::move(nss)),
_shardChunks(std::move(shardToChunksMap)),
_zoneInfo(std::move(zoneInfo)) {}
ZoneInfo zoneInfo,
const ChunkManager* chunkMngr)
: _nss(std::move(nss)), _zoneInfo(std::move(zoneInfo)), _chunkMngr(chunkMngr) {
size_t DistributionStatus::totalChunks() const {
size_t total = 0;
_normalizedZones = normalizeZones(_chunkMngr, _zoneInfo);
for (const auto& shardChunk : _shardChunks) {
total += shardChunk.second.size();
for (const auto& zoneRange : _normalizedZones) {
chunkMngr->forEachOverlappingChunk(
zoneRange.min, zoneRange.max, false /* isMaxInclusive */, [&](const auto& chunkInfo) {
_shardToZoneSizeMap[chunkInfo.getShardId()][zoneRange.zone]++;
return true;
});
}
return total;
}
size_t DistributionStatus::totalChunksWithTag(const std::string& tag) const {
size_t total = 0;
for (const auto& shardChunk : _shardChunks) {
total += numberOfChunksInShardWithTag(shardChunk.first, tag);
for (const auto& [_, zoneSizeMap] : _shardToZoneSizeMap) {
const auto& zoneIt = zoneSizeMap.find(tag);
if (zoneIt != zoneSizeMap.end()) {
total += zoneIt->second;
}
}
return total;
}
size_t DistributionStatus::numberOfChunksInShard(const ShardId& shardId) const {
const auto& shardChunks = getChunks(shardId);
return shardChunks.size();
const auto shardZonesIt = _shardToZoneSizeMap.find(shardId);
if (shardZonesIt == _shardToZoneSizeMap.end()) {
return 0;
}
size_t total = 0;
for (const auto& [_, numChunks] : shardZonesIt->second) {
total += numChunks;
}
return total;
}
size_t DistributionStatus::numberOfChunksInShardWithTag(const ShardId& shardId,
const string& tag) const {
const auto& shardChunks = getChunks(shardId);
size_t total = 0;
for (const auto& chunk : shardChunks) {
if (tag == getTagForChunk(chunk)) {
total++;
}
const auto shardZonesIt = _shardToZoneSizeMap.find(shardId);
if (shardZonesIt == _shardToZoneSizeMap.end()) {
return 0;
}
const auto& shardTags = shardZonesIt->second;
return total;
const auto& zoneIt = shardTags.find(tag);
if (zoneIt == shardTags.end()) {
return 0;
}
return zoneIt->second;
}
const vector<ChunkType>& DistributionStatus::getChunks(const ShardId& shardId) const {
ShardToChunksMap::const_iterator i = _shardChunks.find(shardId);
invariant(i != _shardChunks.end());
return i->second;
string DistributionStatus::getTagForRange(const ChunkRange& range) const {
return _zoneInfo.getZoneForChunk(range);
}
string DistributionStatus::getTagForChunk(const ChunkType& chunk) const {
return _zoneInfo.getZoneForChunk(chunk.getRange());
const StringMap<size_t>& DistributionStatus::getChunksPerTagMap(const ShardId& shardId) const {
static const StringMap<size_t> emptyMap;
const auto shardZonesIt = _shardToZoneSizeMap.find(shardId);
if (shardZonesIt == _shardToZoneSizeMap.end()) {
return emptyMap;
}
return shardZonesIt->second;
}
const string ZoneInfo::kNoZoneName = "";
ZoneInfo::ZoneInfo()
: _zoneRanges(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ZoneRange>()) {}
@ -168,11 +242,11 @@ string ZoneInfo::getZoneForChunk(const ChunkRange& chunk) const {
// We should never have a partial overlap with a chunk range. If it happens, treat it as if this
// chunk doesn't belong to a tag
if (minIntersect != maxIntersect) {
return "";
return ZoneInfo::kNoZoneName;
}
if (minIntersect == _zoneRanges.end()) {
return "";
return ZoneInfo::kNoZoneName;
}
const ZoneRange& intersectRange = minIntersect->second;
@ -183,7 +257,7 @@ string ZoneInfo::getZoneForChunk(const ChunkRange& chunk) const {
return intersectRange.zone;
}
return "";
return ZoneInfo::kNoZoneName;
}
void DistributionStatus::report(BSONObjBuilder* builder) const {
@ -191,15 +265,15 @@ void DistributionStatus::report(BSONObjBuilder* builder) const {
// Report all shards
BSONArrayBuilder shardArr(builder->subarrayStart("shards"));
for (const auto& shardChunk : _shardChunks) {
for (const auto& [shardId, zoneSizeMap] : _shardToZoneSizeMap) {
BSONObjBuilder shardEntry(shardArr.subobjStart());
shardEntry.append("name", shardChunk.first.toString());
shardEntry.append("name", shardId.toString());
BSONArrayBuilder chunkArr(shardEntry.subarrayStart("chunks"));
for (const auto& chunk : shardChunk.second) {
chunkArr.append(chunk.toConfigBSON());
BSONObjBuilder tagsObj(shardEntry.subobjStart("tags"));
for (const auto& [tagName, numChunks] : zoneSizeMap) {
tagsObj.appendNumber(tagName, static_cast<long long>(numChunks));
}
chunkArr.doneFast();
tagsObj.doneFast();
shardEntry.doneFast();
}
@ -242,7 +316,7 @@ Status BalancerPolicy::isShardSuitableReceiver(const ClusterStatistics::ShardSta
str::stream() << stat.shardId << " is currently draining."};
}
if (!chunkTag.empty() && !stat.shardTags.count(chunkTag)) {
if (chunkTag != ZoneInfo::kNoZoneName && !stat.shardTags.count(chunkTag)) {
return {ErrorCodes::IllegalOperation,
str::stream() << stat.shardId << " is not in the correct zone " << chunkTag};
}
@ -360,10 +434,28 @@ MigrateInfo chooseRandomMigration(const ShardStatisticsVector& shardStats,
"fromShardId"_attr = sourceShardId,
"toShardId"_attr = destShardId);
const auto& chunks = distribution.getChunks(sourceShardId);
const auto& randomChunk = [&] {
const auto numChunksOnSourceShard = distribution.numberOfChunksInShard(sourceShardId);
const auto rndChunkIdx = getRandomIndex(numChunksOnSourceShard);
ChunkType rndChunk;
int idx{0};
distribution.getChunkManager()->forEachChunk([&](const auto& chunk) {
if (chunk.getShardId() == sourceShardId && idx++ == rndChunkIdx) {
rndChunk = makeChunkType(distribution.nss(), chunk);
rndChunk.setJumbo(chunk.isJumbo());
return false;
}
return true;
});
invariant(rndChunk.getShard().isValid());
return rndChunk;
}();
return {destShardId,
chunks[getRandomIndex(chunks.size())],
randomChunk,
MoveChunkRequest::ForceJumbo::kDoNotForce,
MigrateInfo::chunksImbalance};
}
@ -395,57 +487,70 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt
if (!availableShards->count(stat.shardId))
continue;
const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId);
if (chunks.empty())
continue;
// Now we know we need to move to chunks off this shard, but only if permitted by the
// tags policy
unsigned numJumboChunks = 0;
// Since we have to move all chunks, lets just do in order
for (const auto& chunk : chunks) {
if (chunk.getJumbo()) {
numJumboChunks++;
continue;
}
const string tag = distribution.getTagForChunk(chunk);
const ShardId to =
_getLeastLoadedReceiverShard(shardStats, distribution, tag, *availableShards);
if (!to.isValid()) {
if (migrations.empty()) {
LOGV2_WARNING(21889,
"Chunk {chunk} is on a draining shard, but no appropriate "
"recipient found",
"Chunk is on a draining shard, but no appropriate "
"recipient found",
"chunk"_attr = redact(chunk.toString()));
const auto& chunksPerTagMap = distribution.getChunksPerTagMap(stat.shardId);
for (const auto& tagIt : chunksPerTagMap) {
const auto& zoneName = tagIt.first;
for (const auto& zoneRange : distribution.getNormalizedZones()) {
if (zoneRange.zone != zoneName) {
continue;
}
distribution.getChunkManager()->forEachOverlappingChunk(
zoneRange.min,
zoneRange.max,
false /* isMaxInclusive */,
[&](const auto& chunk) {
if (chunk.getShardId() != stat.shardId) {
return true; // continue
}
if (chunk.isJumbo()) {
numJumboChunks++;
return true; // continue
}
const ShardId to = _getLeastLoadedReceiverShard(
shardStats, distribution, zoneName, *availableShards);
if (!to.isValid()) {
if (migrations.empty()) {
LOGV2_WARNING(
21889,
"Chunk {chunk} is on a draining shard, but no appropriate "
"recipient found",
"Chunk is on a draining shard, but no appropriate "
"recipient found",
"chunk"_attr = redact(
makeChunkType(distribution.nss(), chunk).toString()));
}
return true; // continue
}
invariant(to != stat.shardId);
migrations.emplace_back(to,
makeChunkType(distribution.nss(), chunk),
MoveChunkRequest::ForceJumbo::kForceBalancer,
MigrateInfo::drain);
invariant(availableShards->erase(stat.shardId));
invariant(availableShards->erase(to));
return false; // break
});
if (migrations.empty()) {
LOGV2_WARNING(21890,
"Unable to find any chunk to move from draining shard "
"{shardId}. numJumboChunks: {numJumboChunks}",
"Unable to find any chunk to move from draining shard",
"shardId"_attr = stat.shardId,
"numJumboChunks"_attr = numJumboChunks);
}
if (availableShards->size() < 2) {
return migrations;
}
continue;
}
invariant(to != stat.shardId);
migrations.emplace_back(
to, chunk, MoveChunkRequest::ForceJumbo::kForceBalancer, MigrateInfo::drain);
invariant(availableShards->erase(stat.shardId));
invariant(availableShards->erase(to));
break;
}
if (migrations.empty()) {
LOGV2_WARNING(21890,
"Unable to find any chunk to move from draining shard "
"{shardId}. numJumboChunks: {numJumboChunks}",
"Unable to find any chunk to move from draining shard",
"shardId"_attr = stat.shardId,
"numJumboChunks"_attr = numJumboChunks);
}
if (availableShards->size() < 2) {
return migrations;
}
}
}
@ -453,57 +558,77 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt
// 2) Check for chunks, which are on the wrong shard and must be moved off of it
if (!distribution.tags().empty()) {
for (const auto& stat : shardStats) {
if (!availableShards->count(stat.shardId))
continue;
const vector<ChunkType>& chunks = distribution.getChunks(stat.shardId);
const auto& chunksPerTagMap = distribution.getChunksPerTagMap(stat.shardId);
for (const auto& tagIt : chunksPerTagMap) {
const auto& zoneName = tagIt.first;
for (const auto& chunk : chunks) {
const string tag = distribution.getTagForChunk(chunk);
if (tag.empty())
if (zoneName == ZoneInfo::kNoZoneName)
continue;
if (stat.shardTags.count(tag))
if (stat.shardTags.count(zoneName))
continue;
if (chunk.getJumbo()) {
LOGV2_WARNING(
21891,
"Chunk {chunk} violates zone {zone}, but it is jumbo and cannot be moved",
"Chunk violates zone, but it is jumbo and cannot be moved",
"chunk"_attr = redact(chunk.toString()),
"zone"_attr = redact(tag));
continue;
}
const ShardId to =
_getLeastLoadedReceiverShard(shardStats, distribution, tag, *availableShards);
if (!to.isValid()) {
if (migrations.empty()) {
LOGV2_WARNING(21892,
"Chunk {chunk} violates zone {zone}, but no appropriate "
"recipient found",
"Chunk violates zone, but no appropriate recipient found",
"chunk"_attr = redact(chunk.toString()),
"zone"_attr = redact(tag));
for (const auto& zoneRange : distribution.getNormalizedZones()) {
if (zoneRange.zone != zoneName) {
continue;
}
continue;
distribution.getChunkManager()->forEachOverlappingChunk(
zoneRange.min,
zoneRange.max,
false /* isMaxInclusive */,
[&](const auto& chunk) {
if (chunk.getShardId() != stat.shardId) {
return true; // continue
}
if (chunk.isJumbo()) {
LOGV2_WARNING(
21891,
"Chunk {chunk} violates zone {zone}, but it is jumbo and "
"cannot be "
"moved",
"Chunk violates zone, but it is jumbo and cannot be moved",
"chunk"_attr =
redact(makeChunkType(distribution.nss(), chunk).toString()),
"zone"_attr = redact(zoneName));
return true; // continue
}
const ShardId to = _getLeastLoadedReceiverShard(
shardStats, distribution, zoneName, *availableShards);
if (!to.isValid()) {
if (migrations.empty()) {
LOGV2_WARNING(
21892,
"Chunk {chunk} violates zone {zone}, but no appropriate "
"recipient found",
"Chunk violates zone, but no appropriate recipient found",
"chunk"_attr = redact(
makeChunkType(distribution.nss(), chunk).toString()),
"zone"_attr = redact(zoneName));
}
return true; // continue
}
invariant(to != stat.shardId);
migrations.emplace_back(
to,
makeChunkType(distribution.nss(), chunk),
forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer
: MoveChunkRequest::ForceJumbo::kDoNotForce,
MigrateInfo::zoneViolation);
invariant(availableShards->erase(stat.shardId));
invariant(availableShards->erase(to));
return false; // break
});
}
invariant(to != stat.shardId);
migrations.emplace_back(to,
chunk,
forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer
: MoveChunkRequest::ForceJumbo::kDoNotForce,
MigrateInfo::zoneViolation);
invariant(availableShards->erase(stat.shardId));
invariant(availableShards->erase(to));
break;
}
if (availableShards->size() < 2) {
return migrations;
if (availableShards->size() < 2) {
return migrations;
}
}
}
}
@ -511,24 +636,35 @@ vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardSt
// 3) for each tag balance
vector<string> tagsPlusEmpty(distribution.tags().begin(), distribution.tags().end());
tagsPlusEmpty.push_back("");
tagsPlusEmpty.push_back(ZoneInfo::kNoZoneName);
for (const auto& tag : tagsPlusEmpty) {
const size_t totalNumberOfChunksWithTag =
(tag.empty() ? distribution.totalChunks() : distribution.totalChunksWithTag(tag));
size_t totalNumberOfShardsWithTag = 0;
for (const auto& stat : shardStats) {
if (tag.empty() || stat.shardTags.count(tag)) {
totalNumberOfShardsWithTag++;
const auto totalNumberOfChunksWithTag = [&] {
if (tag == ZoneInfo::kNoZoneName) {
return static_cast<size_t>(distribution.getChunkManager()->numChunks());
}
}
return distribution.totalChunksWithTag(tag);
}();
const auto totalNumberOfShardsWithTag = [&] {
if (tag == ZoneInfo::kNoZoneName) {
return shardStats.size();
}
size_t numShardsWithTag{0};
for (const auto& stat : shardStats) {
if (stat.shardTags.count(tag)) {
numShardsWithTag++;
}
}
return numShardsWithTag;
}();
// Skip zones which have no shards assigned to them. This situation is not harmful, but
// should not be possible so warn the operator to correct it.
if (totalNumberOfShardsWithTag == 0) {
if (!tag.empty()) {
if (tag != ZoneInfo::kNoZoneName) {
LOGV2_WARNING(
21893,
"Zone {zone} in collection {namespace} has no assigned shards and chunks "
@ -565,7 +701,7 @@ boost::optional<MigrateInfo> BalancerPolicy::balanceSingleChunk(
const ChunkType& chunk,
const ShardStatisticsVector& shardStats,
const DistributionStatus& distribution) {
const string tag = distribution.getTagForChunk(chunk);
const string tag = distribution.getTagForRange(chunk.getRange());
stdx::unordered_set<ShardId> availableShards;
std::transform(shardStats.begin(),
@ -642,26 +778,41 @@ bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats,
if (imbalance < kDefaultImbalanceThreshold)
return false;
const vector<ChunkType>& chunks = distribution.getChunks(from);
unsigned numJumboChunks = 0;
bool chunkFound = false;
for (const auto& chunk : chunks) {
if (distribution.getTagForChunk(chunk) != tag)
continue;
if (chunk.getJumbo()) {
numJumboChunks++;
for (const auto& zoneRange : distribution.getNormalizedZones()) {
if (zoneRange.zone != tag) {
continue;
}
migrations->emplace_back(to, chunk, forceJumbo, MigrateInfo::chunksImbalance);
invariant(availableShards->erase(chunk.getShard()));
invariant(availableShards->erase(to));
return true;
distribution.getChunkManager()->forEachOverlappingChunk(
zoneRange.min, zoneRange.max, false /* isMaxInclusive */, [&](const auto& chunk) {
if (chunk.getShardId() != from) {
return true; // continue
}
if (chunk.isJumbo()) {
numJumboChunks++;
return true; // continue
}
migrations->emplace_back(to,
makeChunkType(distribution.nss(), chunk),
forceJumbo,
MigrateInfo::chunksImbalance);
invariant(availableShards->erase(chunk.getShardId()));
invariant(availableShards->erase(to));
chunkFound = true;
return false; // break
});
if (chunkFound) {
return chunkFound;
}
}
if (numJumboChunks) {
if (!chunkFound && numJumboChunks) {
LOGV2_WARNING(
21894,
"Shard: {shardId}, collection: {namespace} has only jumbo chunks for "
@ -673,7 +824,7 @@ bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats,
"numJumboChunks"_attr = numJumboChunks);
}
return false;
return chunkFound;
}
ZoneRange::ZoneRange(const BSONObj& a_min, const BSONObj& a_max, const std::string& _zone)

View File

@ -37,12 +37,12 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/balancer/cluster_statistics.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/request_types/move_chunk_request.h"
#include "mongo/s/shard_id.h"
namespace mongo {
struct ZoneRange {
ZoneRange(const BSONObj& a_min, const BSONObj& a_max, const std::string& _zone);
@ -78,13 +78,15 @@ struct MigrateInfo {
};
typedef std::vector<ClusterStatistics::ShardStatistics> ShardStatisticsVector;
typedef std::map<ShardId, std::vector<ChunkType>> ShardToChunksMap;
typedef std::map<ShardId, StringMap<size_t>> ShardToZoneSizeMap;
/**
* Keeps track of zones for a collection.
*/
class ZoneInfo {
public:
static const std::string kNoZoneName;
ZoneInfo();
ZoneInfo(ZoneInfo&&) = default;
@ -114,6 +116,14 @@ public:
return _zoneRanges;
}
const ZoneRange& getZoneRange(const std::string& zoneName) const {
for (const auto& [_, zoneRange] : _zoneRanges) {
if (zoneRange.zone == zoneName)
return zoneRange;
}
MONGO_UNREACHABLE;
}
private:
// Map of zone max key to the zone description
BSONObjIndexedMap<ZoneRange> _zoneRanges;
@ -132,7 +142,7 @@ class DistributionStatus {
DistributionStatus& operator=(const DistributionStatus&) = delete;
public:
DistributionStatus(NamespaceString nss, ShardToChunksMap shardToChunksMap, ZoneInfo zoneInfo);
DistributionStatus(NamespaceString nss, ZoneInfo zoneInfo, const ChunkManager* chunkMngr);
DistributionStatus(DistributionStatus&&) = default;
/**
@ -142,11 +152,6 @@ public:
return _nss;
}
/**
* Returns total number of chunks across all shards.
*/
size_t totalChunks() const;
/**
* Returns the total number of chunks across all shards, which fall into the specified zone's
* range.
@ -163,18 +168,6 @@ public:
*/
size_t numberOfChunksInShardWithTag(const ShardId& shardId, const std::string& tag) const;
/**
* Returns all chunks for the specified shard.
*/
const std::vector<ChunkType>& getChunks(const ShardId& shardId) const;
/**
* Returns all tag ranges defined for the collection.
*/
const BSONObjIndexedMap<ZoneRange>& tagRanges() const {
return _zoneInfo.zoneRanges();
}
/**
* Returns all tags defined for the collection.
*/
@ -186,7 +179,21 @@ public:
* Using the set of tags defined for the collection, returns what tag corresponds to the
* specified chunk. If the chunk doesn't fall into any tag returns the empty string.
*/
std::string getTagForChunk(const ChunkType& chunk) const;
std::string getTagForRange(const ChunkRange& range) const;
const ChunkManager* getChunkManager() const {
return _chunkMngr;
}
const std::vector<ZoneRange>& getNormalizedZones() const {
return _normalizedZones;
}
const ZoneInfo& getZoneInfo() const {
return _zoneInfo;
}
const StringMap<size_t>& getChunksPerTagMap(const ShardId& shardId) const;
/**
* Returns a BSON/string representation of this distribution status.
@ -198,11 +205,16 @@ private:
// Namespace for which this distribution applies
NamespaceString _nss;
// Map of what chunks are owned by each shard
ShardToChunksMap _shardChunks;
// Map that tracks how many chunks every shard is owning in each zone
// shardId -> zoneName -> numChunks
ShardToZoneSizeMap _shardToZoneSizeMap;
// Info for zones.
ZoneInfo _zoneInfo;
std::vector<ZoneRange> _normalizedZones;
const ChunkManager* _chunkMngr;
};
class BalancerPolicy {

View File

@ -46,9 +46,11 @@ using std::stringstream;
using std::vector;
using ShardStatistics = ClusterStatistics::ShardStatistics;
typedef std::map<ShardId, std::vector<ChunkType>> ShardToChunksMap;
const auto emptyTagSet = std::set<std::string>();
const std::string emptyShardVersion = "";
const auto kConfigId = ShardId("config");
const auto kShardId0 = ShardId("shard0");
const auto kShardId1 = ShardId("shard1");
const auto kShardId2 = ShardId("shard2");
@ -57,6 +59,25 @@ const auto kShardId4 = ShardId("shard4");
const auto kShardId5 = ShardId("shard5");
const NamespaceString kNamespace("TestDB", "TestColl");
const uint64_t kNoMaxSize = 0;
const KeyPattern kSKeyPattern(BSON("x" << 1));
const boost::optional<Timestamp> kCollTimestamp;
const OID kCollEpoch;
std::shared_ptr<RoutingTableHistory> makeRoutingTable(const std::vector<ChunkType>& chunks) {
static const UUID kCollectionUUID{UUID::gen()};
return RoutingTableHistory::makeNew(
kNamespace, kCollectionUUID, kSKeyPattern, nullptr, false, kCollEpoch, chunks);
}
std::shared_ptr<ChunkManager> makeChunkManager(const std::vector<ChunkType>& chunks) {
return std::make_shared<ChunkManager>(makeRoutingTable(chunks), boost::none /* clusterTime */);
}
DistributionStatus makeDistStatus(std::shared_ptr<ChunkManager> cm,
ZoneInfo zoneInfo = ZoneInfo()) {
return {kNamespace, std::move(zoneInfo), cm.get()};
}
/**
* Constructs a shard statistics vector and a consistent mapping of chunks to shards given the
@ -65,8 +86,8 @@ const uint64_t kNoMaxSize = 0;
*
* [MinKey, 1), [1, 2), [2, 3) ... [N - 1, MaxKey)
*/
std::pair<ShardStatisticsVector, ShardToChunksMap> generateCluster(
const vector<std::pair<ShardStatistics, size_t>>& shardsAndNumChunks) {
std::pair<std::pair<ShardStatisticsVector, ShardToChunksMap>, std::shared_ptr<ChunkManager>>
generateCluster(const vector<std::pair<ShardStatistics, size_t>>& shardsAndNumChunks) {
int64_t totalNumChunks = 0;
for (const auto& entry : shardsAndNumChunks) {
totalNumChunks += std::get<1>(entry);
@ -77,9 +98,9 @@ std::pair<ShardStatisticsVector, ShardToChunksMap> generateCluster(
int64_t currentChunk = 0;
ChunkVersion chunkVersion(1, 0, OID::gen());
ChunkVersion chunkVersion(1, 0, kCollEpoch);
const KeyPattern shardKeyPattern(BSON("x" << 1));
std::vector<ChunkType> chunks;
for (auto it = shardsAndNumChunks.begin(); it != shardsAndNumChunks.end(); it++) {
ShardStatistics shard = std::move(it->first);
@ -92,22 +113,23 @@ std::pair<ShardStatisticsVector, ShardToChunksMap> generateCluster(
ChunkType chunk;
chunk.setNS(kNamespace);
chunk.setMin(currentChunk == 0 ? shardKeyPattern.globalMin()
: BSON("x" << currentChunk));
chunk.setMax(currentChunk == totalNumChunks - 1 ? shardKeyPattern.globalMax()
chunk.setMin(currentChunk == 0 ? kSKeyPattern.globalMin() : BSON("x" << currentChunk));
chunk.setMax(currentChunk == totalNumChunks - 1 ? kSKeyPattern.globalMax()
: BSON("x" << currentChunk + 1));
chunk.setShard(shard.shardId);
chunk.setVersion(chunkVersion);
chunkVersion.incMajor();
chunkMap[shard.shardId].push_back(std::move(chunk));
chunkMap[shard.shardId].push_back(chunk);
chunks.push_back(std::move(chunk));
}
shardStats.push_back(std::move(shard));
}
return std::make_pair(std::move(shardStats), std::move(chunkMap));
return std::make_pair(std::make_pair(std::move(shardStats), std::move(chunkMap)),
makeChunkManager(chunks));
}
std::vector<MigrateInfo> balanceChunks(const ShardStatisticsVector& shardStats,
@ -125,13 +147,12 @@ std::vector<MigrateInfo> balanceChunks(const ShardStatisticsVector& shardStats,
}
TEST(BalancerPolicy, Basic) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0},
{ShardStatistics(kShardId2, kNoMaxSize, 3, false, emptyTagSet, emptyShardVersion), 3}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
@ -141,13 +162,12 @@ TEST(BalancerPolicy, Basic) {
}
TEST(BalancerPolicy, SmallClusterShouldBePerfectlyBalanced) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1},
{ShardStatistics(kShardId1, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 2},
{ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId1, migrations[0].from);
ASSERT_EQ(kShardId2, migrations[0].to);
@ -157,46 +177,33 @@ TEST(BalancerPolicy, SmallClusterShouldBePerfectlyBalanced) {
}
TEST(BalancerPolicy, SingleChunkShouldNotMove) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1},
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
ASSERT(
balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), true, false)
.empty());
ASSERT(
balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false)
.empty());
ASSERT(balanceChunks(cluster.first, makeDistStatus(cm), true, false).empty());
ASSERT(balanceChunks(cluster.first, makeDistStatus(cm), false, false).empty());
}
TEST(BalancerPolicy, BalanceThresholdObeyed) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 2},
{ShardStatistics(kShardId1, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 2},
{ShardStatistics(kShardId2, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1},
{ShardStatistics(kShardId3, kNoMaxSize, 1, false, emptyTagSet, emptyShardVersion), 1}});
ASSERT(
balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), true, false)
.empty());
ASSERT(
balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false)
.empty());
ASSERT(balanceChunks(cluster.first, makeDistStatus(cm), true, false).empty());
ASSERT(balanceChunks(cluster.first, makeDistStatus(cm), false, false).empty());
}
TEST(BalancerPolicy, ParallelBalancing) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0},
{ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT_EQ(2U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@ -213,7 +220,7 @@ TEST(BalancerPolicy, ParallelBalancing) {
}
TEST(BalancerPolicy, ParallelBalancingDoesNotPutChunksOnShardsAboveTheOptimal) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 100, false, emptyTagSet, emptyShardVersion), 100},
{ShardStatistics(kShardId1, kNoMaxSize, 90, false, emptyTagSet, emptyShardVersion), 90},
{ShardStatistics(kShardId2, kNoMaxSize, 90, false, emptyTagSet, emptyShardVersion), 90},
@ -221,8 +228,7 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotPutChunksOnShardsAboveTheOptimal) {
{ShardStatistics(kShardId4, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0},
{ShardStatistics(kShardId5, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT_EQ(2U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@ -239,14 +245,13 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotPutChunksOnShardsAboveTheOptimal) {
}
TEST(BalancerPolicy, ParallelBalancingDoesNotMoveChunksFromShardsBelowOptimal) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 100, false, emptyTagSet, emptyShardVersion), 100},
{ShardStatistics(kShardId1, kNoMaxSize, 30, false, emptyTagSet, emptyShardVersion), 30},
{ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 5},
{ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@ -257,7 +262,7 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotMoveChunksFromShardsBelowOptimal) {
}
TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNecessary) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 8, false, emptyTagSet, emptyShardVersion), 8},
{ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0},
@ -266,10 +271,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNe
// Here kShardId0 would have been selected as a donor
stdx::unordered_set<ShardId> availableShards{kShardId1, kShardId2, kShardId3};
const auto migrations(
BalancerPolicy::balance(cluster.first,
DistributionStatus(kNamespace, cluster.second, ZoneInfo()),
&availableShards,
false));
BalancerPolicy::balance(cluster.first, makeDistStatus(cm), &availableShards, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId1, migrations[0].from);
@ -280,7 +282,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNe
}
TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNotNecessary) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 12, false, emptyTagSet, emptyShardVersion), 12},
{ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0},
@ -289,15 +291,12 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNo
// Here kShardId0 would have been selected as a donor
stdx::unordered_set<ShardId> availableShards{kShardId1, kShardId2, kShardId3};
const auto migrations(
BalancerPolicy::balance(cluster.first,
DistributionStatus(kNamespace, cluster.second, ZoneInfo()),
&availableShards,
false));
BalancerPolicy::balance(cluster.first, makeDistStatus(cm), &availableShards, false));
ASSERT_EQ(0U, migrations.size());
}
TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseDestinationShards) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId2, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0},
@ -306,10 +305,7 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseDestinationShards) {
// Here kShardId2 would have been selected as a recipient
stdx::unordered_set<ShardId> availableShards{kShardId0, kShardId1, kShardId3};
const auto migrations(
BalancerPolicy::balance(cluster.first,
DistributionStatus(kNamespace, cluster.second, ZoneInfo()),
&availableShards,
false));
BalancerPolicy::balance(cluster.first, makeDistStatus(cm), &availableShards, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@ -319,68 +315,116 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseDestinationShards) {
ASSERT_EQ(MigrateInfo::chunksImbalance, migrations[0].reason);
}
TEST(BalancerPolicy, JumboChunksNotMoved) {
auto cluster = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
TEST(BalancerPolicy, JumboChunksNotMovedWhileEnforcingZones) {
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 3, false, emptyTagSet, emptyShardVersion), 3},
{ShardStatistics(kShardId1, kNoMaxSize, 3, false, {"a"}, emptyShardVersion), 3}});
cluster.second[kShardId0][0].setJumbo(true);
cluster.second[kShardId0][1].setJumbo(false); // Only chunk 1 is not jumbo
cluster.second[kShardId0][2].setJumbo(true);
cluster.second[kShardId0][3].setJumbo(true);
// construct a new chunk map where all the chunks are jumbo except this one
const auto& jumboChunk = cluster.second[kShardId0][1];
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
std::vector<ChunkType> chunks;
cm->forEachChunk([&](const auto& chunk) {
ChunkType ct{kNamespace, chunk.getRange(), chunk.getLastmod(), chunk.getShardId()};
if (chunk.getLastmod() == jumboChunk.getVersion())
ct.setJumbo(false);
else
ct.setJumbo(true);
chunks.emplace_back(std::move(ct));
return true;
});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(
ZoneRange(kSKeyPattern.globalMin(), kSKeyPattern.globalMax(), "a")));
auto newCm = makeChunkManager(chunks);
const auto distribution = makeDistStatus(newCm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMin(), migrations[0].minKey);
ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), migrations[0].maxKey);
ASSERT_BSONOBJ_EQ(jumboChunk.getMin(), migrations[0].minKey);
ASSERT_BSONOBJ_EQ(jumboChunk.getMax(), migrations[0].maxKey);
ASSERT_EQ(MigrateInfo::zoneViolation, migrations[0].reason);
}
TEST(BalancerPolicy, JumboChunksNotMoved) {
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
// construct a new chunk map where all the chunks are jumbo except this one
const auto& jumboChunk = cluster.second[kShardId0][1];
std::vector<ChunkType> chunks;
cm->forEachChunk([&](const auto& chunk) {
ChunkType ct{kNamespace, chunk.getRange(), chunk.getLastmod(), chunk.getShardId()};
if (chunk.getLastmod() == jumboChunk.getVersion())
ct.setJumbo(false);
else
ct.setJumbo(true);
chunks.emplace_back(std::move(ct));
return true;
});
auto newCm = makeChunkManager(chunks);
const auto migrations(balanceChunks(cluster.first, makeDistStatus(newCm), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
ASSERT_BSONOBJ_EQ(jumboChunk.getMin(), migrations[0].minKey);
ASSERT_BSONOBJ_EQ(jumboChunk.getMax(), migrations[0].maxKey);
ASSERT_EQ(MigrateInfo::chunksImbalance, migrations[0].reason);
}
TEST(BalancerPolicy, JumboChunksNotMovedParallel) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0},
{ShardStatistics(kShardId2, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 0}});
cluster.second[kShardId0][0].setJumbo(true);
cluster.second[kShardId0][1].setJumbo(false); // Only chunk 1 is not jumbo
cluster.second[kShardId0][2].setJumbo(true);
cluster.second[kShardId0][3].setJumbo(true);
// construct a new chunk map where all the chunks are jumbo except the ones listed below
const auto& jumboChunk0 = cluster.second[kShardId0][1];
const auto& jumboChunk1 = cluster.second[kShardId2][2];
cluster.second[kShardId2][0].setJumbo(true);
cluster.second[kShardId2][1].setJumbo(true);
cluster.second[kShardId2][2].setJumbo(false); // Only chunk 1 is not jumbo
cluster.second[kShardId2][3].setJumbo(true);
std::vector<ChunkType> chunks;
cm->forEachChunk([&](const auto& chunk) {
ChunkType ct{kNamespace, chunk.getRange(), chunk.getLastmod(), chunk.getShardId()};
if (chunk.getLastmod() == jumboChunk0.getVersion() ||
chunk.getLastmod() == jumboChunk1.getVersion())
ct.setJumbo(false);
else
ct.setJumbo(true);
chunks.emplace_back(std::move(ct));
return true;
});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
auto newCm = makeChunkManager(chunks);
const auto migrations(balanceChunks(cluster.first, makeDistStatus(newCm), false, false));
ASSERT_EQ(2U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMin(), migrations[0].minKey);
ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), migrations[0].maxKey);
ASSERT_BSONOBJ_EQ(jumboChunk0.getMin(), migrations[0].minKey);
ASSERT_BSONOBJ_EQ(jumboChunk0.getMax(), migrations[0].maxKey);
ASSERT_EQ(MigrateInfo::chunksImbalance, migrations[0].reason);
ASSERT_EQ(kShardId2, migrations[1].from);
ASSERT_EQ(kShardId3, migrations[1].to);
ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][2].getMin(), migrations[1].minKey);
ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][2].getMax(), migrations[1].maxKey);
ASSERT_BSONOBJ_EQ(jumboChunk1.getMin(), migrations[1].minKey);
ASSERT_BSONOBJ_EQ(jumboChunk1.getMax(), migrations[1].maxKey);
ASSERT_EQ(MigrateInfo::chunksImbalance, migrations[1].reason);
}
TEST(BalancerPolicy, DrainingSingleChunk) {
// shard0 is draining and chunks will go to shard1, even though it has a lot more chunks
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1},
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
@ -391,14 +435,13 @@ TEST(BalancerPolicy, DrainingSingleChunk) {
TEST(BalancerPolicy, DrainingSingleChunkPerShard) {
// shard0 and shard2 are draining and chunks will go to shard1 and shard3 in parallel
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1},
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5},
{ShardStatistics(kShardId2, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1},
{ShardStatistics(kShardId3, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT_EQ(2U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
@ -416,12 +459,11 @@ TEST(BalancerPolicy, DrainingSingleChunkPerShard) {
TEST(BalancerPolicy, DrainingWithTwoChunksFirstOneSelected) {
// shard0 is draining and chunks will go to shard1, even though it has a lot more chunks
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 2},
{ShardStatistics(kShardId1, kNoMaxSize, 0, false, emptyTagSet, emptyShardVersion), 5}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
@ -433,13 +475,12 @@ TEST(BalancerPolicy, DrainingWithTwoChunksFirstOneSelected) {
TEST(BalancerPolicy, DrainingMultipleShardsFirstOneSelected) {
// shard0 and shard1 are both draining with very little chunks in them and chunks will go to
// shard2, even though it has a lot more chunks that the other two
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, true, emptyTagSet, emptyShardVersion), 1},
{ShardStatistics(kShardId1, kNoMaxSize, 5, true, emptyTagSet, emptyShardVersion), 2},
{ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 16}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId2, migrations[0].to);
@ -450,18 +491,17 @@ TEST(BalancerPolicy, DrainingMultipleShardsFirstOneSelected) {
TEST(BalancerPolicy, DrainingMultipleShardsWontAcceptChunks) {
// shard0 has many chunks, but can't move them to shard1 or shard2 because they are draining
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId1, kNoMaxSize, 0, true, emptyTagSet, emptyShardVersion), 0},
{ShardStatistics(kShardId2, kNoMaxSize, 0, true, emptyTagSet, emptyShardVersion), 0}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT(migrations.empty());
}
TEST(BalancerPolicy, DrainingSingleAppropriateShardFoundDueToTag) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, false, {"NYC"}, emptyShardVersion), 4},
{ShardStatistics(kShardId1, kNoMaxSize, 2, false, {"LAX"}, emptyShardVersion), 4},
{ShardStatistics(kShardId2, kNoMaxSize, 1, true, {"LAX"}, emptyShardVersion), 1}});
@ -469,7 +509,7 @@ TEST(BalancerPolicy, DrainingSingleAppropriateShardFoundDueToTag) {
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(
cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "LAX")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
@ -481,7 +521,7 @@ TEST(BalancerPolicy, DrainingSingleAppropriateShardFoundDueToTag) {
}
TEST(BalancerPolicy, DrainingNoAppropriateShardsFoundDueToTag) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, false, {"NYC"}, emptyShardVersion), 4},
{ShardStatistics(kShardId1, kNoMaxSize, 2, false, {"LAX"}, emptyShardVersion), 4},
{ShardStatistics(kShardId2, kNoMaxSize, 1, true, {"SEA"}, emptyShardVersion), 1}});
@ -489,7 +529,7 @@ TEST(BalancerPolicy, DrainingNoAppropriateShardsFoundDueToTag) {
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(
cluster.second[kShardId2][0].getMin(), cluster.second[kShardId2][0].getMax(), "SEA")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT(migrations.empty());
@ -497,13 +537,12 @@ TEST(BalancerPolicy, DrainingNoAppropriateShardsFoundDueToTag) {
TEST(BalancerPolicy, NoBalancingDueToAllNodesEitherDrainingOrMaxedOut) {
// shard0 and shard2 are draining, shard1 is maxed out
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 2, true, emptyTagSet, emptyShardVersion), 1},
{ShardStatistics(kShardId1, 1, 1, false, emptyTagSet, emptyShardVersion), 6},
{ShardStatistics(kShardId2, kNoMaxSize, 1, true, emptyTagSet, emptyShardVersion), 1}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT(migrations.empty());
}
@ -511,13 +550,12 @@ TEST(BalancerPolicy, BalancerRespectsMaxShardSizeOnlyBalanceToNonMaxed) {
// Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 3. Other
// shards have maxSize = 0 = unset. Even though the overloaded shard has the least number of
// less chunks, we shouldn't move chunks to that shard.
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, 1, 3, false, emptyTagSet, emptyShardVersion), 2},
{ShardStatistics(kShardId1, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 5},
{ShardStatistics(kShardId2, kNoMaxSize, 10, false, emptyTagSet, emptyShardVersion), 10}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId2, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
@ -529,27 +567,26 @@ TEST(BalancerPolicy, BalancerRespectsMaxShardSizeWhenAllBalanced) {
// Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4. Other
// shards have maxSize = 0 = unset. We check that being over the maxSize is NOT equivalent to
// draining, we don't want to empty shards for no other reason than they are over this limit.
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, 1, 4, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId1, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4},
{ShardStatistics(kShardId2, kNoMaxSize, 4, false, emptyTagSet, emptyShardVersion), 4}});
const auto migrations(balanceChunks(
cluster.first, DistributionStatus(kNamespace, cluster.second, ZoneInfo()), false, false));
const auto migrations(balanceChunks(cluster.first, makeDistStatus(cm), false, false));
ASSERT(migrations.empty());
}
TEST(BalancerPolicy, BalancerRespectsTagsWhenDraining) {
// shard1 drains the proper chunk to shard0, even though it is more loaded than shard2
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 6},
{ShardStatistics(kShardId1, kNoMaxSize, 5, true, {"a", "b"}, emptyShardVersion), 2},
{ShardStatistics(kShardId1, kNoMaxSize, 5, true, {"a", "b"}, emptyShardVersion), 1},
{ShardStatistics(kShardId2, kNoMaxSize, 5, false, {"b"}, emptyShardVersion), 2}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 7), "a")));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(BSON("x" << 8), kMaxBSONKey, "b")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kSKeyPattern.globalMin(), BSON("x" << 7), "a")));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(BSON("x" << 8), kSKeyPattern.globalMax(), "b")));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
@ -563,14 +600,14 @@ TEST(BalancerPolicy, BalancerRespectsTagsWhenDraining) {
TEST(BalancerPolicy, BalancerRespectsTagPolicyBeforeImbalance) {
// There is a large imbalance between shard0 and shard1, but the balancer must first fix the
// chunks, which are on a wrong shard due to tag policy
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 2},
{ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 6},
{ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 2}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 100), "a")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kSKeyPattern.globalMin(), BSON("x" << 100), "a")));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
@ -584,15 +621,15 @@ TEST(BalancerPolicy, BalancerRespectsTagPolicyBeforeImbalance) {
TEST(BalancerPolicy, BalancerFixesIncorrectTagsWithCrossShardViolationOfTags) {
// The zone policy dictates that the same shard must donate and also receive chunks. The test
// validates that the same shard is not used as a donor and recipient as part of the same round.
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
{ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
{ShardStatistics(kShardId2, kNoMaxSize, 5, false, {"b"}, emptyShardVersion), 3}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 1), "b")));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(BSON("x" << 8), kMaxBSONKey, "a")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kSKeyPattern.globalMin(), BSON("x" << 1), "b")));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(BSON("x" << 8), kSKeyPattern.globalMax(), "a")));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
@ -605,14 +642,14 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsWithCrossShardViolationOfTags) {
TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedCluster) {
// Chunks are balanced across shards, but there are wrong tags, which need to be fixed
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
{ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
{ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 10), "a")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kSKeyPattern.globalMin(), BSON("x" << 10), "a")));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
@ -625,29 +662,30 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedCluster) {
TEST(BalancerPolicy, BalancerTagAlreadyBalanced) {
// Chunks are balanced across shards for the tag.
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 3, false, {"a"}, emptyShardVersion), 2},
{ShardStatistics(kShardId1, kNoMaxSize, 2, false, {"a"}, emptyShardVersion), 2}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kMinBSONKey, kMaxBSONKey, "a")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
ASSERT_OK(zoneInfo.addRangeToZone(
ZoneRange(kSKeyPattern.globalMin(), kSKeyPattern.globalMax(), "a")));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
ASSERT(balanceChunks(cluster.first, distribution, false, false).empty());
}
TEST(BalancerPolicy, BalancerMostOverLoadShardHasMultipleTags) {
// shard0 has chunks [MinKey, 1), [1, 2), [2, 3), [3, 4), [4, 5), so two chunks each
// for tag "b" and "c". So [1, 2) is expected to be moved to shard1 in round 1.
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a", "b", "c"}, emptyShardVersion), 5},
{ShardStatistics(kShardId1, kNoMaxSize, 1, false, {"b"}, emptyShardVersion), 1},
{ShardStatistics(kShardId2, kNoMaxSize, 1, false, {"c"}, emptyShardVersion), 1}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 1), "a")));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kSKeyPattern.globalMin(), BSON("x" << 1), "a")));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 3), "b")));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(BSON("x" << 3), BSON("x" << 5), "c")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
@ -662,16 +700,16 @@ TEST(BalancerPolicy, BalancerMostOverLoadShardHasMultipleTagsSkipTagWithShardInU
// shard0 has chunks [MinKey, 1), [1, 2), [2, 3), [3, 4), [4, 5), so two chunks each
// for tag "b" and "c". So [3, 4) is expected to be moved to shard2 because shard1 is
// in use.
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a", "b", "c"}, emptyShardVersion), 5},
{ShardStatistics(kShardId1, kNoMaxSize, 1, false, {"b"}, emptyShardVersion), 1},
{ShardStatistics(kShardId2, kNoMaxSize, 1, false, {"c"}, emptyShardVersion), 1}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 1), "a")));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kSKeyPattern.globalMin(), BSON("x" << 1), "a")));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 3), "b")));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(BSON("x" << 3), BSON("x" << 5), "c")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
stdx::unordered_set<ShardId> availableShards{kShardId0, kShardId2, kShardId3};
const auto migrations(
@ -686,15 +724,15 @@ TEST(BalancerPolicy, BalancerMostOverLoadShardHasMultipleTagsSkipTagWithShardInU
TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedClusterParallel) {
// Chunks are balanced across shards, but there are wrong tags, which need to be fixed
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
{ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
{ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3},
{ShardStatistics(kShardId3, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 20), "a")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kSKeyPattern.globalMin(), BSON("x" << 20), "a")));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(2U, migrations.size());
@ -712,14 +750,55 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedClusterParalle
ASSERT_EQ(MigrateInfo::zoneViolation, migrations[0].reason);
}
TEST(BalancerPolicy, ChunksInNoZoneSpanOnAllShardsWithEmptyZones) {
// Balanacer is able to move chunks in the noZone to shards with tags
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3},
{ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 0}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(BSON("x" << 100), kSKeyPattern.globalMax(), "a")));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(1U, migrations.size());
ASSERT_EQ(kShardId0, migrations[0].from);
ASSERT_EQ(kShardId1, migrations[0].to);
ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey);
ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey);
ASSERT_EQ(MigrateInfo::chunksImbalance, migrations[0].reason);
}
TEST(BalancerPolicy, BalancingNoZoneIgnoreTotalShardSize) {
// Shard1 is overloaded and contains:
// [min, 1) [1, 2) [2, 3] -> zone("a")
// [3, 4) [4, 5) [5, 6) -> NoZone
//
// But it won't donate any chunk since the
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 3},
{ShardStatistics(kShardId1, kNoMaxSize, 5, false, {"a"}, emptyShardVersion), 6},
{ShardStatistics(kShardId2, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 3}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kSKeyPattern.globalMin(), BSON("x" << 6), "a")));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
const auto migrations(balanceChunks(cluster.first, distribution, false, false));
ASSERT_EQ(0U, migrations.size());
}
TEST(BalancerPolicy, BalancerHandlesNoShardsWithTag) {
auto cluster = generateCluster(
auto [cluster, cm] = generateCluster(
{{ShardStatistics(kShardId0, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 2},
{ShardStatistics(kShardId1, kNoMaxSize, 5, false, emptyTagSet, emptyShardVersion), 2}});
ZoneInfo zoneInfo;
ASSERT_OK(zoneInfo.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 7), "NonExistentZone")));
DistributionStatus distribution(kNamespace, cluster.second, std::move(zoneInfo));
ASSERT_OK(zoneInfo.addRangeToZone(
ZoneRange(kSKeyPattern.globalMin(), BSON("x" << 7), "NonExistentZone")));
const auto distribution = makeDistStatus(cm, std::move(zoneInfo));
ASSERT(balanceChunks(cluster.first, distribution, false, false).empty());
}
@ -732,7 +811,7 @@ TEST(DistributionStatus, AddTagRangeOverlap) {
ASSERT_OK(zInfo.addRangeToZone(ZoneRange(BSON("x" << 20), BSON("x" << 30), "b")));
ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
zInfo.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << 2), "d")));
zInfo.addRangeToZone(ZoneRange(kSKeyPattern.globalMin(), BSON("x" << 2), "d")));
ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
zInfo.addRangeToZone(ZoneRange(BSON("x" << -1), BSON("x" << 5), "d")));
ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
@ -744,7 +823,7 @@ TEST(DistributionStatus, AddTagRangeOverlap) {
ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
zInfo.addRangeToZone(ZoneRange(BSON("x" << -1), BSON("x" << 32), "d")));
ASSERT_EQ(ErrorCodes::RangeOverlapConflict,
zInfo.addRangeToZone(ZoneRange(BSON("x" << 25), kMaxBSONKey, "d")));
zInfo.addRangeToZone(ZoneRange(BSON("x" << 25), kSKeyPattern.globalMax(), "d")));
}
TEST(DistributionStatus, ChunkTagsSelectorWithRegularKeys) {
@ -752,128 +831,37 @@ TEST(DistributionStatus, ChunkTagsSelectorWithRegularKeys) {
ASSERT_OK(zInfo.addRangeToZone(ZoneRange(BSON("x" << 1), BSON("x" << 10), "a")));
ASSERT_OK(zInfo.addRangeToZone(ZoneRange(BSON("x" << 10), BSON("x" << 20), "b")));
ASSERT_OK(zInfo.addRangeToZone(ZoneRange(BSON("x" << 20), BSON("x" << 30), "c")));
DistributionStatus d(kNamespace, ShardToChunksMap{}, std::move(zInfo));
{
ChunkType chunk;
chunk.setMin(kMinBSONKey);
chunk.setMax(BSON("x" << 1));
ASSERT_EQUALS("", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 0));
chunk.setMax(BSON("x" << 1));
ASSERT_EQUALS("", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 1));
chunk.setMax(BSON("x" << 5));
ASSERT_EQUALS("a", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 10));
chunk.setMax(BSON("x" << 20));
ASSERT_EQUALS("b", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 15));
chunk.setMax(BSON("x" << 20));
ASSERT_EQUALS("b", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 25));
chunk.setMax(BSON("x" << 30));
ASSERT_EQUALS("c", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 35));
chunk.setMax(BSON("x" << 40));
ASSERT_EQUALS("", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 30));
chunk.setMax(kMaxBSONKey);
ASSERT_EQUALS("", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 40));
chunk.setMax(kMaxBSONKey);
ASSERT_EQUALS("", d.getTagForChunk(chunk));
}
ASSERT_EQUALS(ZoneInfo::kNoZoneName,
zInfo.getZoneForChunk({kSKeyPattern.globalMin(), BSON("x" << 1)}));
ASSERT_EQUALS(ZoneInfo::kNoZoneName, zInfo.getZoneForChunk({BSON("x" << 0), BSON("x" << 1)}));
ASSERT_EQUALS("a", zInfo.getZoneForChunk({BSON("x" << 1), BSON("x" << 5)}));
ASSERT_EQUALS("b", zInfo.getZoneForChunk({BSON("x" << 10), BSON("x" << 20)}));
ASSERT_EQUALS("b", zInfo.getZoneForChunk({BSON("x" << 15), BSON("x" << 20)}));
ASSERT_EQUALS("c", zInfo.getZoneForChunk({BSON("x" << 25), BSON("x" << 30)}));
ASSERT_EQUALS(ZoneInfo::kNoZoneName, zInfo.getZoneForChunk({BSON("x" << 35), BSON("x" << 40)}));
ASSERT_EQUALS(ZoneInfo::kNoZoneName,
zInfo.getZoneForChunk({BSON("x" << 30), kSKeyPattern.globalMax()}));
ASSERT_EQUALS(ZoneInfo::kNoZoneName,
zInfo.getZoneForChunk({BSON("x" << 40), kSKeyPattern.globalMax()}));
}
TEST(DistributionStatus, ChunkTagsSelectorWithMinMaxKeys) {
ZoneInfo zInfo;
ASSERT_OK(zInfo.addRangeToZone(ZoneRange(kMinBSONKey, BSON("x" << -100), "a")));
ASSERT_OK(zInfo.addRangeToZone(ZoneRange(kSKeyPattern.globalMin(), BSON("x" << -100), "a")));
ASSERT_OK(zInfo.addRangeToZone(ZoneRange(BSON("x" << -10), BSON("x" << 10), "b")));
ASSERT_OK(zInfo.addRangeToZone(ZoneRange(BSON("x" << 100), kMaxBSONKey, "c")));
DistributionStatus d(kNamespace, ShardToChunksMap{}, std::move(zInfo));
ASSERT_OK(zInfo.addRangeToZone(ZoneRange(BSON("x" << 100), kSKeyPattern.globalMax(), "c")));
{
ChunkType chunk;
chunk.setMin(kMinBSONKey);
chunk.setMax(BSON("x" << -100));
ASSERT_EQUALS("a", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << -100));
chunk.setMax(BSON("x" << -11));
ASSERT_EQUALS("", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << -10));
chunk.setMax(BSON("x" << 0));
ASSERT_EQUALS("b", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 0));
chunk.setMax(BSON("x" << 10));
ASSERT_EQUALS("b", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 10));
chunk.setMax(BSON("x" << 20));
ASSERT_EQUALS("", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 10));
chunk.setMax(BSON("x" << 100));
ASSERT_EQUALS("", d.getTagForChunk(chunk));
}
{
ChunkType chunk;
chunk.setMin(BSON("x" << 200));
chunk.setMax(kMaxBSONKey);
ASSERT_EQUALS("c", d.getTagForChunk(chunk));
}
ASSERT_EQUALS("a", zInfo.getZoneForChunk({kSKeyPattern.globalMin(), BSON("x" << -100)}));
ASSERT_EQUALS(ZoneInfo::kNoZoneName,
zInfo.getZoneForChunk({BSON("x" << -100), BSON("x" << -11)}));
ASSERT_EQUALS("b", zInfo.getZoneForChunk({BSON("x" << -10), BSON("x" << 0)}));
ASSERT_EQUALS("b", zInfo.getZoneForChunk({BSON("x" << 0), BSON("x" << 10)}));
ASSERT_EQUALS(ZoneInfo::kNoZoneName, zInfo.getZoneForChunk({BSON("x" << 10), BSON("x" << 20)}));
ASSERT_EQUALS(ZoneInfo::kNoZoneName,
zInfo.getZoneForChunk({BSON("x" << 10), BSON("x" << 100)}));
ASSERT_EQUALS("c", zInfo.getZoneForChunk({BSON("x" << 200), kSKeyPattern.globalMax()}));
}
} // namespace

View File

@ -112,10 +112,6 @@ public:
<< " if in read-only mode",
!storageGlobalParams.readOnly);
uassert(ErrorCodes::IllegalOperation,
str::stream() << "Can only call " << Request::kCommandName << " on collections",
!ns().coll().empty());
auto& oss = OperationShardingState::get(opCtx);
{

View File

@ -37,7 +37,6 @@
#include "mongo/db/auth/privilege.h"
#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/chunk_move_write_concern_options.h"
#include "mongo/db/s/migration_destination_manager.h"
@ -45,7 +44,6 @@
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/start_chunk_clone_request.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/request_types/migration_secondary_throttle_options.h"
#include "mongo/util/assert_util.h"
@ -113,12 +111,6 @@ public:
// shard will not receive a chunk after refreshing.
const auto shardVersion = forceShardFilteringMetadataRefresh(opCtx, nss);
// Wait for the ShardServerCatalogCacheLoader to finish flushing the metadata to the
// storage. This is not required for correctness, but helps mitigate stalls on secondaries
// when a shard receives the first chunk for a collection with a large routing table.
CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, nss);
repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
uassertStatusOK(
MigrationDestinationManager::get(opCtx)->start(opCtx,
nss,

View File

@ -204,8 +204,7 @@ StatusWith<int> deleteNextBatch(OperationContext* opCtx,
hangBeforeDoingDeletion.pauseWhileSet(opCtx);
}
long long bytesDeleted = 0;
int numDocsDeleted = 0;
int numDeleted = 0;
do {
BSONObj deletedObj;
@ -240,14 +239,12 @@ StatusWith<int> deleteNextBatch(OperationContext* opCtx,
<< ", stats: " << Explain::getWinningPlanStats(exec.get()));
}
bytesDeleted += deletedObj.objsize();
invariant(PlanExecutor::ADVANCED == state);
} while (++numDocsDeleted < numDocsToRemovePerBatch);
ShardingStatistics::get(opCtx).countDocsDeletedOnDonor.addAndFetch(1);
ShardingStatistics::get(opCtx).countDocsDeletedByRangeDeleter.addAndFetch(numDocsDeleted);
ShardingStatistics::get(opCtx).countBytesDeletedByRangeDeleter.addAndFetch(bytesDeleted);
} while (++numDeleted < numDocsToRemovePerBatch);
return numDocsDeleted;
return numDeleted;
}

View File

@ -61,8 +61,7 @@ void ShardingStatistics::report(BSONObjBuilder* builder) const {
builder->append("countDocsClonedOnRecipient", countDocsClonedOnRecipient.load());
builder->append("countDocsClonedOnDonor", countDocsClonedOnDonor.load());
builder->append("countRecipientMoveChunkStarted", countRecipientMoveChunkStarted.load());
builder->append("countDocsDeletedByRangeDeleter", countDocsDeletedByRangeDeleter.load());
builder->append("countBytesDeletedByRangeDeleter", countBytesDeletedByRangeDeleter.load());
builder->append("countDocsDeletedOnDonor", countDocsDeletedOnDonor.load());
builder->append("countDonorMoveChunkLockTimeout", countDonorMoveChunkLockTimeout.load());
builder->append("countDonorMoveChunkAbortConflictingIndexOperation",
countDonorMoveChunkAbortConflictingIndexOperation.load());

View File

@ -65,13 +65,9 @@ struct ShardingStatistics {
// node.
AtomicWord<long long> countDocsClonedOnDonor{0};
// Cumulative, always-increasing counter of how many documents have been deleted by the
// rangeDeleter.
AtomicWord<long long> countDocsDeletedByRangeDeleter{0};
// Cumulative, always-increasing counter of how many bytes have been deleted by the
// rangeDeleter.
AtomicWord<long long> countBytesDeletedByRangeDeleter{0};
// Cumulative, always-increasing counter of how many documents have been deleted on the donor
// node by the rangeDeleter.
AtomicWord<long long> countDocsDeletedOnDonor{0};
// Cumulative, always-increasing counter of how many chunks this node started to receive
// (whether the receiving succeeded or not)

View File

@ -75,7 +75,6 @@ configs:
is_constexpr: false
short_name: port
arg_vartype: Int
validator: { gte: 0, lte: 65535 }
'net.ipv6':
description: 'Enable IPv6 support (disabled by default)'
short_name: ipv6

View File

@ -555,17 +555,6 @@ void CatalogCache::purgeCollection(const NamespaceString& nss) {
itDb->second.erase(nss.ns());
}
void CatalogCache::resetCollection(const NamespaceString& nss) {
stdx::lock_guard<Latch> lg(_mutex);
auto itDb = _collectionsByDb.find(nss.db());
if (itDb == _collectionsByDb.end()) {
return;
}
itDb->second[nss.ns()] = std::make_shared<CollectionRoutingInfoEntry>();
}
void CatalogCache::purgeDatabase(StringData dbName) {
stdx::lock_guard<Latch> lg(_mutex);
_databases.erase(dbName);

View File

@ -283,16 +283,11 @@ public:
void invalidateEntriesThatReferenceShard(const ShardId& shardId);
/**
* Non-blocking method, which removes the entire specified collection from the cache.
* Non-blocking method, which removes the entire specified collection from the cache (resulting
* in full refresh on subsequent access)
*/
void purgeCollection(const NamespaceString& nss);
/**
* Non-blocking method, which purges the routing info of the specified collection and marks the
* entry as to "needs refresh"
*/
void resetCollection(const NamespaceString& nss);
/**
* Non-blocking method, which removes the entire specified database (including its collections)
* from the cache.

View File

@ -661,57 +661,5 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunk) {
ASSERT_EQ(version, cm->getVersion({"1"}));
}
TEST_F(CatalogCacheRefreshTest, CachedCollectionShouldRefreshAfterResetCollection) {
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto initalChunkManager(makeChunkManager(kNss, shardKeyPattern, nullptr, true, {}));
ASSERT_EQ(1, initalChunkManager->numChunks());
setupNShards(2);
// 1 - purge the collection: expect the collection to be unsharded after an unforced refresh
Grid::get(getServiceContext())->catalogCache()->purgeCollection(kNss);
auto future = scheduleRoutingInfoUnforcedRefresh(kNss);
auto newRoutingInfo = future.default_timed_get();
ASSERT(!newRoutingInfo->cm());
// 2 - forced refresh - to refill the cache
future = scheduleRoutingInfoForcedRefresh(kNss);
auto version = initalChunkManager->getVersion();
expectGetCollection(version.epoch(), shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
version.incMajor();
ChunkType chunk1(kNss,
{shardKeyPattern.getKeyPattern().globalMin(),
shardKeyPattern.getKeyPattern().globalMax()},
version,
{"1"});
chunk1.setName(OID::gen());
return std::vector<BSONObj>{chunk1.toConfigBSON()};
}());
auto newRoutingInfo2 = future.default_timed_get();
ASSERT(newRoutingInfo2->cm());
// 3 - reset the collection: expect the collection to be sharded even if the refresh is unforced
Grid::get(getServiceContext())->catalogCache()->resetCollection(kNss);
future = scheduleRoutingInfoUnforcedRefresh(kNss);
version = newRoutingInfo2->cm()->getVersion();
expectGetCollection(version.epoch(), shardKeyPattern);
// Return set of chunks, which represent a move
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
version.incMajor();
ChunkType chunk1(kNss,
{shardKeyPattern.getKeyPattern().globalMin(),
shardKeyPattern.getKeyPattern().globalMax()},
version,
{"1"});
chunk1.setName(OID::gen());
return std::vector<BSONObj>{chunk1.toConfigBSON()};
}());
auto newRoutingInfo3 = future.default_timed_get();
ASSERT(newRoutingInfo3->cm());
}
} // namespace
} // namespace mongo

View File

@ -474,6 +474,23 @@ public:
return _rt->numChunks();
}
template <typename Callable>
void forEachOverlappingChunk(const BSONObj& min,
const BSONObj& max,
bool isMaxInclusive,
Callable&& handler) const {
_rt->forEachOverlappingChunk(
min,
max,
isMaxInclusive,
[this, handler = std::forward<Callable>(handler)](const auto& chunkInfo) mutable {
if (!handler(Chunk{*chunkInfo, _clusterTime})) {
return false;
}
return true;
});
}
/**
* Returns true if a document with the given "shardKey" is owned by the shard with the given
* "shardId" in this routing table. If "shardKey" is empty returns false. If "shardKey" is not a

View File

@ -61,9 +61,9 @@ std::vector<BSONObj> genRandomSplitPoints(size_t numChunks) {
std::vector<BSONObj> splitPoints;
splitPoints.reserve(numChunks + 1);
splitPoints.emplace_back(kShardKeyPattern.globalMin());
int64_t nextSplit{-1000};
int nextSplit{-1000};
for (size_t i = 0; i < numChunks - 1; ++i) {
nextSplit += 10 * (_random.nextInt32(10) + 1);
nextSplit += i * 10 * (_random.nextInt32(10) + 1);
splitPoints.emplace_back(BSON(kSKey << nextSplit));
}
splitPoints.emplace_back(kShardKeyPattern.globalMax());
@ -183,25 +183,25 @@ BSONObj calculateIntermediateShardKey(const BSONObj& leftKey,
const auto isMinKey = leftKey.woCompare(kShardKeyPattern.globalMin()) == 0;
const auto isMaxKey = rightKey.woCompare(kShardKeyPattern.globalMax()) == 0;
int64_t splitPoint;
int splitPoint;
if (isMinKey && isMaxKey) {
// [min, max] -> split at 0
splitPoint = 0;
} else if (!isMinKey && !isMaxKey) {
// [x, y] -> split in the middle
auto min = leftKey.firstElement().numberLong();
auto max = rightKey.firstElement().numberLong();
auto min = leftKey.firstElement().numberInt();
auto max = rightKey.firstElement().numberInt();
invariant(min + 1 < max,
str::stream() << "Can't split range [" << min << ", " << max << "]");
splitPoint = min + ((max - min) / 2);
} else if (isMaxKey) {
// [x, maxKey] -> split at x*2;
auto prevBound = leftKey.firstElement().numberLong();
auto prevBound = leftKey.firstElement().numberInt();
auto increment = prevBound ? prevBound : _random.nextInt32(100) + 1;
splitPoint = prevBound + std::abs(increment);
} else if (isMinKey) {
// [minKey, x] -> split at x*2;
auto prevBound = rightKey.firstElement().numberLong();
auto prevBound = rightKey.firstElement().numberInt();
auto increment = prevBound ? prevBound : _random.nextInt32(100) + 1;
splitPoint = prevBound - std::abs(increment);
} else {
@ -235,8 +235,8 @@ void performRandomChunkOperations(std::vector<ChunkType>* chunksPtr, size_t numO
auto splitChunk = [&] {
auto chunkToSplitIt = chunks.begin() + _random.nextInt32(chunks.size());
while (chunkToSplitIt != chunks.begin() && chunkToSplitIt != std::prev(chunks.end()) &&
(chunkToSplitIt->getMax().firstElement().numberLong() -
chunkToSplitIt->getMin().firstElement().numberLong()) < 2) {
(chunkToSplitIt->getMax().firstElement().numberInt() -
chunkToSplitIt->getMin().firstElement().numberInt()) < 2) {
// If the chunk is unsplittable select another one
chunkToSplitIt = chunks.begin() + _random.nextInt32(chunks.size());
}

View File

@ -102,9 +102,7 @@ public:
"Routing metadata flushed for collection {namespace}",
"Routing metadata flushed for collection",
"namespace"_attr = nss);
// resetCollection marks the collection as "needs to refresh" instead of erasing it
// from memory. This will force a refresh at the next getCollectionRoutingInfoAt.
catalogCache->resetCollection(nss);
catalogCache->purgeCollection(nss);
}
}

View File

@ -115,6 +115,13 @@ Status storeMongosOptions(const moe::Environment& params) {
return ret;
}
if (params.count("net.port")) {
int port = params["net.port"].as<int>();
if (port <= 0 || port > 65535) {
return Status(ErrorCodes::BadValue, "error: port number must be between 1 and 65535");
}
}
if (params.count("security.javascriptEnabled")) {
mongosGlobalParams.scriptingEnabled = params["security.javascriptEnabled"].as<bool>();
}

View File

@ -18,22 +18,10 @@ def flag_declare(name):
tfile = open(tmp_file, 'w')
lcnt = 0
max_flags = 0
parsing = False
start = 0
stopped = ''
for line in f:
lcnt = lcnt + 1
if stopped != '':
stopped = ''
m = re.search("\d+", line)
if m != None:
fld_size = int(m.group(0))
if max_flags > fld_size:
print("file: " + name + " line: " + str(lcnt))
print("Flag stop value of " + str(max_flags) \
+ " is larger than field size " + str(fld_size))
sys.exit(1)
if line.find('AUTOMATIC FLAG VALUE GENERATION START') != -1:
m = re.search("\d+", line)
if m == None:
@ -53,22 +41,15 @@ def flag_declare(name):
" GENERATION STOP 32", file=sys.stderr)
sys.exit(1)
end = int(m.group(0))
poweroftwo = (end != 0) and ((end & (end-1)) == 0)
if not poweroftwo and end != 12:
print(name + ": line " + str(lcnt) + ", the stop value " + str(end) +
" is not a power of 2", file=sys.stderr)
sys.exit(1)
# Compare the number of flags defined and against the number
# of flags allowed
max_flags = len(defines)
if max_flags > end - start:
if len(defines) > end - start:
print(name + ": line " + str(lcnt) +\
": exceeds maximum {0} limit bit flags".format(end), file=sys.stderr)
sys.exit(1)
# Calculate number of hex bytes, create format string
fmt = "0x%%0%dxu" % ((start + max_flags + 3) / 4)
fmt = "0x%%0%dxu" % ((start + len(defines) + 3) / 4)
# Generate the flags starting from an offset set from the start value.
tfile.write(header)
@ -80,8 +61,6 @@ def flag_declare(name):
parsing = False
start = 0
stopped = line
continue
elif parsing and line.find('#define') == -1:
print(name + ": line " + str(lcnt) +\
": unexpected flag line, no #define", file=sys.stderr)
@ -90,7 +69,6 @@ def flag_declare(name):
defines.append(line)
else:
tfile.write(line)
stopped = ''
tfile.close()
compare_srcfile(tmp_file, name)

View File

@ -206,7 +206,6 @@ conn_stats = [
CacheStat('cache_eviction_aggressive_set', 'eviction currently operating in aggressive mode', 'no_clear,no_scale'),
CacheStat('cache_eviction_app', 'pages evicted by application threads'),
CacheStat('cache_eviction_app_dirty', 'modified pages evicted by application threads'),
CacheStat('cache_eviction_clear_ordinary', 'pages removed from the ordinary queue to be queued for urgent eviction'),
CacheStat('cache_eviction_empty_score', 'eviction empty score', 'no_clear,no_scale'),
CacheStat('cache_eviction_fail', 'pages selected for eviction unable to be evicted'),
CacheStat('cache_eviction_fail_active_children_on_an_internal_page', 'pages selected for eviction unable to be evicted because of active children on an internal page'),
@ -750,6 +749,7 @@ dsrc_stats = [
RecStat('rec_overflow_key_internal', 'internal-page overflow keys'),
RecStat('rec_overflow_key_leaf', 'leaf-page overflow keys'),
RecStat('rec_overflow_value', 'overflow values written'),
RecStat('rec_page_match', 'page checksum matches'),
RecStat('rec_prefix_compression', 'leaf page key bytes discarded using prefix compression', 'size'),
RecStat('rec_suffix_compression', 'internal page key bytes discarded using suffix compression', 'size'),
RecStat('rec_time_window_pages_prepared', 'pages written including at least one prepare'),

View File

@ -2,5 +2,5 @@
"vendor": "wiredtiger",
"github": "wiredtiger/wiredtiger.git",
"branch": "mongodb-4.4",
"commit": "5cb0a12c80a7edce61f72abea1bee37e14b70d5f"
"commit": "4b7b81665cdfd23a303f5e0573e2a2f4b62607bc"
}

View File

@ -1120,19 +1120,19 @@ __debug_page_metadata(WT_DBG *ds, WT_REF *ref)
WT_RET(ds->f(ds, ", entries %" PRIu32, entries));
WT_RET(ds->f(ds, ", %s", __wt_page_is_modified(page) ? "dirty" : "clean"));
if (F_ISSET_ATOMIC_16(page, WT_PAGE_BUILD_KEYS))
if (F_ISSET_ATOMIC(page, WT_PAGE_BUILD_KEYS))
WT_RET(ds->f(ds, ", keys-built"));
if (F_ISSET_ATOMIC_16(page, WT_PAGE_DISK_ALLOC))
if (F_ISSET_ATOMIC(page, WT_PAGE_DISK_ALLOC))
WT_RET(ds->f(ds, ", disk-alloc"));
if (F_ISSET_ATOMIC_16(page, WT_PAGE_DISK_MAPPED))
if (F_ISSET_ATOMIC(page, WT_PAGE_DISK_MAPPED))
WT_RET(ds->f(ds, ", disk-mapped"));
if (F_ISSET_ATOMIC_16(page, WT_PAGE_EVICT_LRU))
if (F_ISSET_ATOMIC(page, WT_PAGE_EVICT_LRU))
WT_RET(ds->f(ds, ", evict-lru"));
if (F_ISSET_ATOMIC_16(page, WT_PAGE_OVERFLOW_KEYS))
if (F_ISSET_ATOMIC(page, WT_PAGE_OVERFLOW_KEYS))
WT_RET(ds->f(ds, ", overflow-keys"));
if (F_ISSET_ATOMIC_16(page, WT_PAGE_SPLIT_INSERT))
if (F_ISSET_ATOMIC(page, WT_PAGE_SPLIT_INSERT))
WT_RET(ds->f(ds, ", split-insert"));
if (F_ISSET_ATOMIC_16(page, WT_PAGE_UPDATE_IGNORE))
if (F_ISSET_ATOMIC(page, WT_PAGE_UPDATE_IGNORE))
WT_RET(ds->f(ds, ", update-ignore"));
if (mod != NULL)

View File

@ -74,7 +74,7 @@ __wt_page_out(WT_SESSION_IMPL *session, WT_PAGE **pagep)
/* Assert we never discard a dirty page or a page queue for eviction. */
WT_ASSERT(session, !__wt_page_is_modified(page));
WT_ASSERT(session, !F_ISSET_ATOMIC_16(page, WT_PAGE_EVICT_LRU));
WT_ASSERT(session, !F_ISSET_ATOMIC(page, WT_PAGE_EVICT_LRU));
/*
* If a root page split, there may be one or more pages linked from the page; walk the list,
@ -93,11 +93,11 @@ __wt_page_out(WT_SESSION_IMPL *session, WT_PAGE **pagep)
__wt_cache_page_evict(session, page);
dsk = (WT_PAGE_HEADER *)page->dsk;
if (F_ISSET_ATOMIC_16(page, WT_PAGE_DISK_ALLOC))
if (F_ISSET_ATOMIC(page, WT_PAGE_DISK_ALLOC))
__wt_cache_page_image_decr(session, page);
/* Discard any mapped image. */
if (F_ISSET_ATOMIC_16(page, WT_PAGE_DISK_MAPPED))
if (F_ISSET_ATOMIC(page, WT_PAGE_DISK_MAPPED))
(void)S2BT(session)->bm->map_discard(
S2BT(session)->bm, session, dsk, (size_t)dsk->mem_size);
@ -128,7 +128,7 @@ __wt_page_out(WT_SESSION_IMPL *session, WT_PAGE **pagep)
}
/* Discard any allocated disk image. */
if (F_ISSET_ATOMIC_16(page, WT_PAGE_DISK_ALLOC))
if (F_ISSET_ATOMIC(page, WT_PAGE_DISK_ALLOC))
__wt_overwrite_and_free_len(session, dsk, dsk->mem_size);
__wt_overwrite_and_free(session, page);
@ -150,7 +150,7 @@ __free_page_modify(WT_SESSION_IMPL *session, WT_PAGE *page)
mod = page->modify;
/* In some failed-split cases, we can't discard updates. */
update_ignore = F_ISSET_ATOMIC_16(page, WT_PAGE_UPDATE_IGNORE);
update_ignore = F_ISSET_ATOMIC(page, WT_PAGE_UPDATE_IGNORE);
switch (mod->rec_result) {
case WT_PM_REC_MULTIBLOCK:

View File

@ -361,7 +361,7 @@ __wt_page_inmem(WT_SESSION_IMPL *session, WT_REF *ref, const void *image, uint32
/* Allocate and initialize a new WT_PAGE. */
WT_RET(__wt_page_alloc(session, dsk->type, alloc_entries, true, &page));
page->dsk = dsk;
F_SET_ATOMIC_16(page, flags);
F_SET_ATOMIC(page, flags);
/*
* Track the memory allocated to build this page so we can update the cache statistics in a
@ -663,7 +663,7 @@ __inmem_row_int(WT_SESSION_IMPL *session, WT_PAGE *page, size_t *sizep)
* we can do during a checkpoint.
*/
if (overflow_keys)
F_SET_ATOMIC_16(page, WT_PAGE_OVERFLOW_KEYS);
F_SET_ATOMIC(page, WT_PAGE_OVERFLOW_KEYS);
err:
__wt_scr_free(session, &current);
@ -851,7 +851,7 @@ __inmem_row_leaf(WT_SESSION_IMPL *session, WT_PAGE *page, bool *preparedp)
* Mark the page as not needing that work if there aren't stretches of prefix-compressed keys.
*/
if (best_prefix_count <= 10)
F_SET_ATOMIC_16(page, WT_PAGE_BUILD_KEYS);
F_SET_ATOMIC(page, WT_PAGE_BUILD_KEYS);
if (preparedp != NULL && prepare)
*preparedp = true;

View File

@ -160,7 +160,7 @@ __split_ovfl_key_cleanup(WT_SESSION_IMPL *session, WT_PAGE *page, WT_REF *ref)
uint32_t cell_offset;
/* There's a per-page flag if there are any overflow keys at all. */
if (!F_ISSET_ATOMIC_16(page, WT_PAGE_OVERFLOW_KEYS))
if (!F_ISSET_ATOMIC(page, WT_PAGE_OVERFLOW_KEYS))
return (0);
/*
@ -1620,7 +1620,7 @@ __split_multi_inmem_fail(WT_SESSION_IMPL *session, WT_PAGE *orig, WT_MULTI *mult
*/
if (ref != NULL) {
if (ref->page != NULL)
F_SET_ATOMIC_16(ref->page, WT_PAGE_UPDATE_IGNORE);
F_SET_ATOMIC(ref->page, WT_PAGE_UPDATE_IGNORE);
__wt_free_ref(session, ref, orig->type, true);
}
}
@ -1754,7 +1754,7 @@ __split_insert(WT_SESSION_IMPL *session, WT_REF *ref)
WT_ASSERT(session, __wt_page_is_modified(page));
WT_ASSERT(session, ref->ft_info.del == NULL);
F_SET_ATOMIC_16(page, WT_PAGE_SPLIT_INSERT); /* Only split in-memory once. */
F_SET_ATOMIC(page, WT_PAGE_SPLIT_INSERT); /* Only split in-memory once. */
/* Find the last item on the page. */
if (type == WT_PAGE_ROW_LEAF)
@ -2245,7 +2245,7 @@ __wt_split_rewrite(WT_SESSION_IMPL *session, WT_REF *ref, WT_MULTI *multi)
*/
__wt_page_modify_clear(session, page);
if (!F_ISSET(S2C(session)->cache, WT_CACHE_EVICT_SCRUB) || multi->supd_restore)
F_SET_ATOMIC_16(page, WT_PAGE_EVICT_NO_PROGRESS);
F_SET_ATOMIC(page, WT_PAGE_EVICT_NO_PROGRESS);
__wt_ref_out(session, ref);
/* Swap the new page into place. */

View File

@ -9,7 +9,6 @@
#include "wt_internal.h"
static int __evict_clear_all_walks(WT_SESSION_IMPL *);
static void __evict_list_clear_page_locked(WT_SESSION_IMPL *, WT_REF *, bool);
static int WT_CDECL __evict_lru_cmp(const void *, const void *);
static int __evict_lru_pages(WT_SESSION_IMPL *, bool);
static int __evict_lru_walk(WT_SESSION_IMPL *);
@ -147,32 +146,37 @@ static inline void
__evict_list_clear(WT_SESSION_IMPL *session, WT_EVICT_ENTRY *e)
{
if (e->ref != NULL) {
WT_ASSERT(session, F_ISSET_ATOMIC_16(e->ref->page, WT_PAGE_EVICT_LRU));
F_CLR_ATOMIC_16(e->ref->page, WT_PAGE_EVICT_LRU | WT_PAGE_EVICT_LRU_URGENT);
WT_ASSERT(session, F_ISSET_ATOMIC(e->ref->page, WT_PAGE_EVICT_LRU));
F_CLR_ATOMIC(e->ref->page, WT_PAGE_EVICT_LRU);
}
e->ref = NULL;
e->btree = WT_DEBUG_POINT;
}
/*
* __evict_list_clear_page_locked --
* This function searches for the page in all the eviction queues (skipping the urgent queue if
* requested) and clears it if found. It does not take the eviction queue lock, so the caller
* should hold the appropriate locks before calling this function.
* __wt_evict_list_clear_page --
* Make sure a page is not in the LRU eviction list. This called from the page eviction code to
* make sure there is no attempt to evict a child page multiple times.
*/
static void
__evict_list_clear_page_locked(WT_SESSION_IMPL *session, WT_REF *ref, bool exclude_urgent)
void
__wt_evict_list_clear_page(WT_SESSION_IMPL *session, WT_REF *ref)
{
WT_CACHE *cache;
WT_EVICT_ENTRY *evict;
uint32_t elem, i, q, last_queue_idx;
uint32_t i, elem, q;
bool found;
last_queue_idx = exclude_urgent ? WT_EVICT_URGENT_QUEUE : WT_EVICT_QUEUE_MAX;
cache = S2C(session)->cache;
found = false;
WT_ASSERT(session, __wt_ref_is_root(ref) || ref->state == WT_REF_LOCKED);
for (q = 0; q < last_queue_idx && !found; q++) {
/* Fast path: if the page isn't on the queue, don't bother searching. */
if (!F_ISSET_ATOMIC(ref->page, WT_PAGE_EVICT_LRU))
return;
cache = S2C(session)->cache;
__wt_spin_lock(session, &cache->evict_queue_lock);
found = false;
for (q = 0; q < WT_EVICT_QUEUE_MAX && !found; q++) {
__wt_spin_lock(session, &cache->evict_queues[q].evict_lock);
elem = cache->evict_queues[q].evict_max;
for (i = 0, evict = cache->evict_queues[q].evict_queue; i < elem; i++, evict++)
@ -183,31 +187,7 @@ __evict_list_clear_page_locked(WT_SESSION_IMPL *session, WT_REF *ref, bool exclu
}
__wt_spin_unlock(session, &cache->evict_queues[q].evict_lock);
}
WT_ASSERT(session, !F_ISSET_ATOMIC_16(ref->page, WT_PAGE_EVICT_LRU));
}
/*
* __wt_evict_list_clear_page --
* Check whether a page is present in the LRU eviction list. If the page is found in the list,
* remove it. This is called from the page eviction code to make sure there is no attempt to
* evict a child page multiple times.
*/
void
__wt_evict_list_clear_page(WT_SESSION_IMPL *session, WT_REF *ref)
{
WT_CACHE *cache;
WT_ASSERT(session, __wt_ref_is_root(ref) || ref->state == WT_REF_LOCKED);
/* Fast path: if the page isn't in the queue, don't bother searching. */
if (!F_ISSET_ATOMIC_16(ref->page, WT_PAGE_EVICT_LRU))
return;
cache = S2C(session)->cache;
__wt_spin_lock(session, &cache->evict_queue_lock);
/* Remove the reference from the eviction queues. */
__evict_list_clear_page_locked(session, ref, false);
WT_ASSERT(session, !F_ISSET_ATOMIC(ref->page, WT_PAGE_EVICT_LRU));
__wt_spin_unlock(session, &cache->evict_queue_lock);
}
@ -1602,7 +1582,7 @@ static bool
__evict_push_candidate(
WT_SESSION_IMPL *session, WT_EVICT_QUEUE *queue, WT_EVICT_ENTRY *evict, WT_REF *ref)
{
uint16_t orig_flags, new_flags;
uint8_t orig_flags, new_flags;
u_int slot;
/*
@ -1612,7 +1592,7 @@ __evict_push_candidate(
orig_flags = new_flags = ref->page->flags_atomic;
FLD_SET(new_flags, WT_PAGE_EVICT_LRU);
if (orig_flags == new_flags ||
!__wt_atomic_cas16(&ref->page->flags_atomic, orig_flags, new_flags))
!__wt_atomic_cas8(&ref->page->flags_atomic, orig_flags, new_flags))
return (false);
/* Keep track of the maximum slot we are using. */
@ -1943,7 +1923,7 @@ __evict_walk_tree(WT_SESSION_IMPL *session, WT_EVICT_QUEUE *queue, u_int max_ent
internal_pages_seen++;
/* Use the EVICT_LRU flag to avoid putting pages onto the list multiple times. */
if (F_ISSET_ATOMIC_16(page, WT_PAGE_EVICT_LRU)) {
if (F_ISSET_ATOMIC(page, WT_PAGE_EVICT_LRU)) {
pages_already_queued++;
if (F_ISSET(ref, WT_REF_FLAG_INTERNAL))
internal_pages_already_queued++;
@ -2500,35 +2480,18 @@ __wt_page_evict_urgent(WT_SESSION_IMPL *session, WT_REF *ref)
WT_ASSERT(session, !__wt_ref_is_root(ref));
page = ref->page;
if (S2BT(session)->evict_disabled > 0 || F_ISSET_ATOMIC_16(page, WT_PAGE_EVICT_LRU_URGENT))
return (false);
cache = S2C(session)->cache;
if (F_ISSET_ATOMIC_16(page, WT_PAGE_EVICT_LRU) && F_ISSET(cache, WT_CACHE_EVICT_ALL))
if (F_ISSET_ATOMIC(page, WT_PAGE_EVICT_LRU) || S2BT(session)->evict_disabled > 0)
return (false);
/* Append to the urgent queue if we can. */
cache = S2C(session)->cache;
urgent_queue = &cache->evict_queues[WT_EVICT_URGENT_QUEUE];
queued = false;
__wt_spin_lock(session, &cache->evict_queue_lock);
/* Check again, in case we raced with another thread. */
if (S2BT(session)->evict_disabled > 0 || F_ISSET_ATOMIC_16(page, WT_PAGE_EVICT_LRU_URGENT))
if (F_ISSET_ATOMIC(page, WT_PAGE_EVICT_LRU) || S2BT(session)->evict_disabled > 0)
goto done;
/*
* If the page is already in the LRU eviction list, clear it from the list if eviction server is
* not running.
*/
if (F_ISSET_ATOMIC_16(page, WT_PAGE_EVICT_LRU)) {
if (!F_ISSET(cache, WT_CACHE_EVICT_ALL)) {
__evict_list_clear_page_locked(session, ref, true);
WT_STAT_CONN_INCR(session, cache_eviction_clear_ordinary);
} else
goto done;
}
__wt_spin_lock(session, &urgent_queue->evict_lock);
if (__evict_queue_empty(urgent_queue, false)) {
urgent_queue->evict_current = urgent_queue->evict_queue;
@ -2539,7 +2502,6 @@ __wt_page_evict_urgent(WT_SESSION_IMPL *session, WT_REF *ref)
__evict_push_candidate(session, urgent_queue, evict, ref)) {
++urgent_queue->evict_candidates;
queued = true;
FLD_SET(page->flags_atomic, WT_PAGE_EVICT_LRU_URGENT);
}
__wt_spin_unlock(session, &urgent_queue->evict_lock);

View File

@ -149,8 +149,7 @@ __evict_stats_update(WT_SESSION_IMPL *session, uint8_t flags)
if (eviction_time_milliseconds > WT_MINUTE * WT_THOUSAND)
__wt_verbose(session, WT_VERB_EVICT,
"Warning: Eviction took more than 1 minute (%" PRIu64
"us). Building disk image took %" PRIu64 "us. History store wrapup took %" PRIu64
"us.",
"). Building disk image took %" PRIu64 "us. History store wrapup took %" PRIu64 "us.",
eviction_time,
WT_CLOCKDIFF_US(session->reconcile_timeline.image_build_finish,
session->reconcile_timeline.image_build_start),

View File

@ -52,7 +52,7 @@ __evict_stat_walk(WT_SESSION_IMPL *session)
if (!__wt_ref_is_root(next_walk) && !__wt_page_can_evict(session, next_walk, NULL))
++num_not_queueable;
if (F_ISSET_ATOMIC_16(page, WT_PAGE_EVICT_LRU))
if (F_ISSET_ATOMIC(page, WT_PAGE_EVICT_LRU))
++num_queued;
if (size > max_pagesize)

View File

@ -137,6 +137,15 @@ struct __wt_addr {
#define WT_ADDR_LEAF 2 /* Leaf page */
#define WT_ADDR_LEAF_NO 3 /* Leaf page, no overflow */
uint8_t type;
/*
* If an address is both as an address for the previous and the current multi-block
* reconciliations, that is, a block we're writing matches the block written the last time, it
* will appear in both the current boundary points as well as the page modification's list of
* previous blocks. The reuse flag is how we know that's happening so the block is treated
* correctly (not free'd on error, for example).
*/
uint8_t reuse;
};
/*
@ -280,7 +289,14 @@ struct __wt_multi {
uint32_t supd_entries;
bool supd_restore; /* Whether to restore saved update chains to this page */
WT_ADDR addr; /* Disk image written address */
/*
* Disk image was written: address, size and checksum. On subsequent reconciliations of this
* page, we avoid writing the block if it's unchanged by comparing size and checksum; the reuse
* flag is set when the block is unchanged and we're reusing a previous address.
*/
WT_ADDR addr;
uint32_t size;
uint32_t checksum;
};
/*
@ -644,19 +660,18 @@ struct __wt_page {
uint8_t type; /* Page type */
/* AUTOMATIC FLAG VALUE GENERATION START 0 */
#define WT_PAGE_BUILD_KEYS 0x001u /* Keys have been built in memory */
#define WT_PAGE_DISK_ALLOC 0x002u /* Disk image in allocated memory */
#define WT_PAGE_DISK_MAPPED 0x004u /* Disk image in mapped memory */
#define WT_PAGE_EVICT_LRU 0x008u /* Page is on the LRU queue */
#define WT_PAGE_EVICT_LRU_URGENT 0x010u /* Page is in the urgent queue */
#define WT_PAGE_EVICT_NO_PROGRESS 0x020u /* Eviction doesn't count as progress */
#define WT_PAGE_OVERFLOW_KEYS 0x040u /* Page has overflow keys */
#define WT_PAGE_SPLIT_INSERT 0x080u /* A leaf page was split for append */
#define WT_PAGE_UPDATE_IGNORE 0x100u /* Ignore updates on page discard */
/* AUTOMATIC FLAG VALUE GENERATION STOP 16 */
uint16_t flags_atomic; /* Atomic flags, use F_*_ATOMIC_16 */
#define WT_PAGE_BUILD_KEYS 0x01u /* Keys have been built in memory */
#define WT_PAGE_DISK_ALLOC 0x02u /* Disk image in allocated memory */
#define WT_PAGE_DISK_MAPPED 0x04u /* Disk image in mapped memory */
#define WT_PAGE_EVICT_LRU 0x08u /* Page is on the LRU queue */
#define WT_PAGE_EVICT_NO_PROGRESS 0x10u /* Eviction doesn't count as progress */
#define WT_PAGE_OVERFLOW_KEYS 0x20u /* Page has overflow keys */
#define WT_PAGE_SPLIT_INSERT 0x40u /* A leaf page was split for append */
#define WT_PAGE_UPDATE_IGNORE 0x80u /* Ignore updates on page discard */
/* AUTOMATIC FLAG VALUE GENERATION STOP 8 */
uint8_t flags_atomic; /* Atomic flags, use F_*_ATOMIC */
uint8_t unused; /* Unused padding */
uint8_t unused[2]; /* Unused padding */
size_t memory_footprint; /* Memory attached to the page */

View File

@ -574,7 +574,7 @@ __wt_cache_page_evict(WT_SESSION_IMPL *session, WT_PAGE *page)
* Track if eviction makes progress. This is used in various places to determine whether
* eviction is stuck.
*/
if (!F_ISSET_ATOMIC_16(page, WT_PAGE_EVICT_NO_PROGRESS))
if (!F_ISSET_ATOMIC(page, WT_PAGE_EVICT_NO_PROGRESS))
(void)__wt_atomic_addv64(&cache->eviction_progress, 1);
}
@ -1249,9 +1249,9 @@ __wt_row_leaf_key_instantiate(WT_SESSION_IMPL *session, WT_PAGE *page)
* doing a cursor previous call, and this page has never been checked for excessively long
* stretches of prefix-compressed keys, do it now.
*/
if (F_ISSET_ATOMIC_16(page, WT_PAGE_BUILD_KEYS))
if (F_ISSET_ATOMIC(page, WT_PAGE_BUILD_KEYS))
return (0);
F_SET_ATOMIC_16(page, WT_PAGE_BUILD_KEYS);
F_SET_ATOMIC(page, WT_PAGE_BUILD_KEYS);
/* Walk the keys, making sure there's something easy to work with periodically. */
skip = 0;
@ -1543,7 +1543,7 @@ __wt_leaf_page_can_split(WT_SESSION_IMPL *session, WT_PAGE *page)
* Only split a page once, otherwise workloads that update in the middle of the page could
* continually split without benefit.
*/
if (F_ISSET_ATOMIC_16(page, WT_PAGE_SPLIT_INSERT))
if (F_ISSET_ATOMIC(page, WT_PAGE_SPLIT_INSERT))
return (false);
/*
@ -1694,7 +1694,7 @@ __wt_page_can_evict(WT_SESSION_IMPL *session, WT_REF *ref, bool *inmem_splitp)
* no-longer-used overflow keys, which will corrupt the checkpoint's block management.
*/
if (__wt_btree_syncing_by_other_session(session) &&
F_ISSET_ATOMIC_16(ref->home, WT_PAGE_OVERFLOW_KEYS)) {
F_ISSET_ATOMIC(ref->home, WT_PAGE_OVERFLOW_KEYS)) {
WT_STAT_CONN_INCR(session, cache_eviction_fail_parent_has_overflow_items);
return (false);
}

View File

@ -28,27 +28,22 @@
/*
* Atomic versions of the flag set/clear macros.
*/
#define F_ISSET_ATOMIC(p, mask) ((p)->flags_atomic & (uint8_t)(mask))
#define F_ISSET_ATOMIC_16(p, mask) ((p)->flags_atomic & (uint16_t)(mask))
#define F_SET_ATOMIC_16(p, mask) \
do { \
uint16_t __orig; \
if (F_ISSET_ATOMIC_16(p, mask)) \
break; \
do { \
__orig = (p)->flags_atomic; \
} while (!__wt_atomic_cas16(&(p)->flags_atomic, __orig, __orig | (uint16_t)(mask))); \
#define F_SET_ATOMIC(p, mask) \
do { \
uint8_t __orig; \
do { \
__orig = (p)->flags_atomic; \
} while (!__wt_atomic_cas8(&(p)->flags_atomic, __orig, __orig | (uint8_t)(mask))); \
} while (0)
#define F_CLR_ATOMIC_16(p, mask) \
do { \
uint16_t __orig; \
if (!F_ISSET_ATOMIC_16(p, mask)) \
break; \
do { \
__orig = (p)->flags_atomic; \
} while (!__wt_atomic_cas16(&(p)->flags_atomic, __orig, __orig & ~(uint16_t)(mask))); \
#define F_CLR_ATOMIC(p, mask) \
do { \
uint8_t __orig; \
do { \
__orig = (p)->flags_atomic; \
} while (!__wt_atomic_cas8(&(p)->flags_atomic, __orig, __orig & ~(uint8_t)(mask))); \
} while (0)
/*

View File

@ -236,6 +236,12 @@ struct __wt_reconcile {
*/
bool cell_zero; /* Row-store internal page 0th key */
/*
* We calculate checksums to find previously written identical blocks, but once a match fails
* during an eviction, there's no point trying again.
*/
bool evict_matching_checksum_failed;
WT_REC_DICTIONARY **dictionary; /* Dictionary */
u_int dictionary_next, dictionary_slots; /* Next, max entries */
/* Skiplist head. */

View File

@ -455,7 +455,6 @@ struct __wt_connection_stats {
int64_t cache_read;
int64_t cache_read_deleted;
int64_t cache_read_deleted_prepared;
int64_t cache_eviction_clear_ordinary;
int64_t cache_pages_requested;
int64_t cache_eviction_pages_seen;
int64_t cache_eviction_pages_already_queued;
@ -1006,6 +1005,7 @@ struct __wt_dsrc_stats {
int64_t rec_overflow_key_leaf;
int64_t rec_multiblock_max;
int64_t rec_overflow_value;
int64_t rec_page_match;
int64_t rec_pages;
int64_t rec_pages_eviction;
int64_t rec_page_delete;

File diff suppressed because it is too large Load Diff

View File

@ -475,7 +475,7 @@ __rec_root_write(WT_SESSION_IMPL *session, WT_PAGE *page, uint32_t flags)
* discard these pages.
*/
WT_RET(__wt_page_alloc(session, page->type, mod->mod_multi_entries, false, &next));
F_SET_ATOMIC_16(next, WT_PAGE_EVICT_NO_PROGRESS);
F_SET_ATOMIC(next, WT_PAGE_EVICT_NO_PROGRESS);
WT_INTL_INDEX_GET(session, next, pindex);
for (i = 0; i < mod->mod_multi_entries; ++i) {
@ -654,6 +654,8 @@ __rec_init(WT_SESSION_IMPL *session, WT_REF *ref, uint32_t flags, WT_SALVAGE_COO
r->wrapup_checkpoint = NULL;
r->wrapup_checkpoint_compressed = false;
r->evict_matching_checksum_failed = false;
/*
* Dictionary compression only writes repeated values once. We grow the dictionary as necessary,
* always using the largest size we've seen.
@ -1643,7 +1645,7 @@ __rec_split_write_header(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_REC_CHUNK
dsk->recno = btree->type == BTREE_ROW ? WT_RECNO_OOB : multi->key.recno;
__rec_set_page_write_gen(btree, dsk);
dsk->mem_size = WT_STORE_SIZE(chunk->image.size);
dsk->mem_size = multi->size;
dsk->u.entries = chunk->entries;
dsk->type = page->type;
@ -1666,6 +1668,83 @@ __rec_split_write_header(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_REC_CHUNK
memset(WT_BLOCK_HEADER_REF(dsk), 0, btree->block_header);
}
/*
* __rec_split_write_reuse --
* Check if a previously written block can be reused.
*/
static bool
__rec_split_write_reuse(
WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_MULTI *multi, WT_ITEM *image, bool last_block)
{
WT_MULTI *multi_match;
WT_PAGE_MODIFY *mod;
mod = r->page->modify;
/*
* Don't bother calculating checksums for bulk loads, there's no reason to believe they'll be
* useful. Check because LSM does bulk-loads as part of normal operations and the check is
* cheap.
*/
if (r->is_bulk_load)
return (false);
/*
* Calculating the checksum is the expensive part, try to avoid it.
*
* Ignore the last block of any reconciliation. Pages are written in the same block order every
* time, so the last block written for a page is unlikely to match any previously written block
* or block written in the future, (absent a point-update earlier in the page which didn't
* change the size of the on-page object in any way).
*/
if (last_block)
return (false);
/*
* Quit if evicting with no previously written block to compare against. (In other words, if
* there's eviction pressure and the page was never written by a checkpoint, calculating a
* checksum is worthless.)
*
* Quit if evicting and a previous check failed, once there's a miss no future block will match.
*/
if (F_ISSET(r, WT_REC_EVICT)) {
if (mod->rec_result != WT_PM_REC_MULTIBLOCK || mod->mod_multi_entries < r->multi_next)
return (false);
if (r->evict_matching_checksum_failed)
return (false);
}
/* Calculate the checksum for this block. */
multi->checksum = __wt_checksum(image->data, image->size);
/*
* Don't check for a block match when writing blocks during compaction, the whole idea is to
* move those blocks. Check after calculating the checksum, we don't distinguish between pages
* written solely as part of the compaction and pages written at around the same time, and so
* there's a possibility the calculated checksum will be useful in the future.
*/
if (session->compact_state != WT_COMPACT_NONE)
return (false);
/*
* Pages are written in the same block order every time, only check the appropriate slot.
*/
if (mod->rec_result != WT_PM_REC_MULTIBLOCK || mod->mod_multi_entries < r->multi_next)
return (false);
multi_match = &mod->mod_multi[r->multi_next - 1];
if (multi_match->size != multi->size || multi_match->checksum != multi->checksum) {
r->evict_matching_checksum_failed = true;
return (false);
}
multi_match->addr.reuse = 1;
multi->addr = multi_match->addr;
WT_STAT_DATA_INCR(session, rec_page_match);
return (true);
}
/*
* __rec_compression_adjust --
* Adjust the pre-compression page size based on compression results.
@ -1792,6 +1871,8 @@ __rec_split_write(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_REC_CHUNK *chunk
default:
return (__wt_illegal_value(session, page->type));
}
multi->size = WT_STORE_SIZE(chunk->image.size);
multi->checksum = 0;
multi->supd_restore = false;
/* Set the key. */
@ -1853,6 +1934,14 @@ __rec_split_write(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_REC_CHUNK *chunk
WT_ASSERT(session, chunk->entries > 0);
}
/*
* If we wrote this block before, re-use it. Prefer a checksum of the compressed image. It's an
* identical test and should be faster.
*/
if (__rec_split_write_reuse(session, r, multi,
compressed_image == NULL ? &chunk->image : compressed_image, last_block))
goto copy_image;
/* Write the disk image and get an address. */
WT_RET(__wt_bt_write(session, compressed_image == NULL ? &chunk->image : compressed_image, addr,
&addr_size, &compressed_size, false, F_ISSET(r, WT_REC_CHECKPOINT),
@ -2009,13 +2098,14 @@ __rec_split_discard(WT_SESSION_IMPL *session, WT_PAGE *page)
__wt_free(session, multi->supd);
/*
* If the page was re-written free the backing disk blocks used in the previous write. The
* page may instead have been a disk image with associated saved updates: ownership of the
* disk image is transferred when rewriting the page in-memory and there may not have been
* saved updates. We've gotten this wrong a few times, so use the existence of an address to
* confirm backing blocks we care about, and free any disk image/saved updates.
* If the page was re-written free the backing disk blocks used in the previous write
* (unless the blocks were reused in this write). The page may instead have been a disk
* image with associated saved updates: ownership of the disk image is transferred when
* rewriting the page in-memory and there may not have been saved updates. We've gotten this
* wrong a few times, so use the existence of an address to confirm backing blocks we care
* about, and free any disk image/saved updates.
*/
if (multi->addr.addr != NULL) {
if (multi->addr.addr != NULL && !multi->addr.reuse) {
WT_RET(__wt_btree_block_free(session, multi->addr.addr, multi->addr.size));
__wt_free(session, multi->addr.addr);
}
@ -2077,9 +2167,11 @@ __rec_write_wrapup(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page)
WT_BM *bm;
WT_BTREE *btree;
WT_DECL_RET;
WT_MULTI *multi;
WT_PAGE_MODIFY *mod;
WT_REF *ref;
WT_TIME_AGGREGATE ta;
uint32_t i;
btree = S2BT(session);
bm = btree->bm;
@ -2228,7 +2320,14 @@ __rec_write_wrapup(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page)
if (WT_VERBOSE_ISSET(session, WT_VERB_SPLIT))
WT_RET(__rec_split_dump_keys(session, r));
/*
* The reuse flag was set in some cases, but we have to clear it, otherwise on subsequent
* reconciliation we would fail to remove blocks that are being discarded.
*/
split:
for (multi = r->multi, i = 0; i < r->multi_next; ++multi, ++i)
multi->addr.reuse = 0;
mod->mod_multi = r->multi;
mod->mod_multi_entries = r->multi_next;
mod->rec_result = WT_PM_REC_MULTIBLOCK;
@ -2250,15 +2349,33 @@ __rec_write_err(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_PAGE *page)
{
WT_DECL_RET;
WT_MULTI *multi;
WT_PAGE_MODIFY *mod;
uint32_t i;
mod = page->modify;
/*
* Clear the address-reused flag from the multiblock reconciliation information (otherwise we
* might think the backing block is being reused on a subsequent reconciliation where we want to
* free it).
*/
if (mod->rec_result == WT_PM_REC_MULTIBLOCK)
for (multi = mod->mod_multi, i = 0; i < mod->mod_multi_entries; ++multi, ++i)
multi->addr.reuse = 0;
/*
* On error, discard blocks we've written, they're unreferenced by the tree. This is not a
* question of correctness, we're avoiding block leaks.
*
* Don't discard backing blocks marked for reuse, they remain part of a previous reconciliation.
*/
for (multi = r->multi, i = 0; i < r->multi_next; ++multi, ++i)
if (multi->addr.addr != NULL)
WT_TRET(__wt_btree_block_free(session, multi->addr.addr, multi->addr.size));
if (multi->addr.addr != NULL) {
if (multi->addr.reuse)
multi->addr.addr = NULL;
else
WT_TRET(__wt_btree_block_free(session, multi->addr.addr, multi->addr.size));
}
WT_TRET(__wt_ovfl_track_wrapup_err(session, page));

View File

@ -195,6 +195,7 @@ static const char *const __stats_dsrc_desc[] = {
"reconciliation: leaf-page overflow keys",
"reconciliation: maximum blocks required for a page",
"reconciliation: overflow values written",
"reconciliation: page checksum matches",
"reconciliation: page reconciliation calls",
"reconciliation: page reconciliation calls for eviction",
"reconciliation: pages deleted",
@ -461,6 +462,7 @@ __wt_stat_dsrc_clear_single(WT_DSRC_STATS *stats)
stats->rec_overflow_key_leaf = 0;
stats->rec_multiblock_max = 0;
stats->rec_overflow_value = 0;
stats->rec_page_match = 0;
stats->rec_pages = 0;
stats->rec_pages_eviction = 0;
stats->rec_page_delete = 0;
@ -717,6 +719,7 @@ __wt_stat_dsrc_aggregate_single(WT_DSRC_STATS *from, WT_DSRC_STATS *to)
if (from->rec_multiblock_max > to->rec_multiblock_max)
to->rec_multiblock_max = from->rec_multiblock_max;
to->rec_overflow_value += from->rec_overflow_value;
to->rec_page_match += from->rec_page_match;
to->rec_pages += from->rec_pages;
to->rec_pages_eviction += from->rec_pages_eviction;
to->rec_page_delete += from->rec_page_delete;
@ -977,6 +980,7 @@ __wt_stat_dsrc_aggregate(WT_DSRC_STATS **from, WT_DSRC_STATS *to)
if ((v = WT_STAT_READ(from, rec_multiblock_max)) > to->rec_multiblock_max)
to->rec_multiblock_max = v;
to->rec_overflow_value += WT_STAT_READ(from, rec_overflow_value);
to->rec_page_match += WT_STAT_READ(from, rec_page_match);
to->rec_pages += WT_STAT_READ(from, rec_pages);
to->rec_pages_eviction += WT_STAT_READ(from, rec_pages_eviction);
to->rec_page_delete += WT_STAT_READ(from, rec_page_delete);
@ -1180,7 +1184,6 @@ static const char *const __stats_connection_desc[] = {
"cache: pages read into cache",
"cache: pages read into cache after truncate",
"cache: pages read into cache after truncate in prepare state",
"cache: pages removed from the ordinary queue to be queued for urgent eviction",
"cache: pages requested from the cache",
"cache: pages seen by eviction walk",
"cache: pages seen by eviction walk that are already queued",
@ -1730,7 +1733,6 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats)
stats->cache_read = 0;
stats->cache_read_deleted = 0;
stats->cache_read_deleted_prepared = 0;
stats->cache_eviction_clear_ordinary = 0;
stats->cache_pages_requested = 0;
stats->cache_eviction_pages_seen = 0;
stats->cache_eviction_pages_already_queued = 0;
@ -2275,7 +2277,6 @@ __wt_stat_connection_aggregate(WT_CONNECTION_STATS **from, WT_CONNECTION_STATS *
to->cache_read += WT_STAT_READ(from, cache_read);
to->cache_read_deleted += WT_STAT_READ(from, cache_read_deleted);
to->cache_read_deleted_prepared += WT_STAT_READ(from, cache_read_deleted_prepared);
to->cache_eviction_clear_ordinary += WT_STAT_READ(from, cache_eviction_clear_ordinary);
to->cache_pages_requested += WT_STAT_READ(from, cache_pages_requested);
to->cache_eviction_pages_seen += WT_STAT_READ(from, cache_eviction_pages_seen);
to->cache_eviction_pages_already_queued +=

View File

@ -65,7 +65,7 @@ class test_checkpoint04(wttest.WiredTigerTestCase):
def test_checkpoint_stats(self):
nrows = 100
ntables = 50
ntables = 10
multiplier = 1
# Run the loop and increase the value size with each iteration until
@ -119,10 +119,7 @@ class test_checkpoint04(wttest.WiredTigerTestCase):
time_total = self.get_stat(stat.conn.txn_checkpoint_time_total)
self.pr('txn_checkpoint_time_total ' + str(time_total))
# Account for When the connection re-opens on an existing datable as we perform a
# checkpoint during the open stage.
expected_ckpts = 3 if multiplier > 1 else 2
self.assertEqual(num_ckpt, expected_ckpts)
self.assertEqual(num_ckpt, 2)
self.assertEqual(running, 0)
self.assertEqual(prep_running, 0)
# Assert if this loop continues for more than 100 iterations.
@ -132,11 +129,11 @@ class test_checkpoint04(wttest.WiredTigerTestCase):
# Run the loop again if any of the below condition fails and exit if the test passes.
if prep_min < time_min and prep_max < time_max and prep_recent < time_recent and prep_total < time_total:
break
multiplier += 1
# Reopen the connection to reset statistics.
# We don't want stats from earlier runs to interfere with later runs.
self.reopen_conn()
else:
multiplier += 1
# Reopen the connection to reset statistics.
# We don't want stats from earlier runs to interfere with later runs.
self.reopen_conn()
if __name__ == '__main__':
wttest.run()

View File

@ -26,7 +26,7 @@
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
import wiredtiger, wttest, sys
import wiredtiger, wttest
from wtdataset import SimpleDataSet, ComplexDataSet, simple_key, simple_value
from wtscenario import make_scenarios
@ -42,7 +42,6 @@ class test_cursor_random(wttest.WiredTigerTestCase):
('not-sample', dict(config='next_random=true'))
]
scenarios = make_scenarios(types, config)
expected_warning_msg = 'Eviction took more than 1 minute'
# Check that opening a random cursor on a row-store returns not-supported
# for methods other than next, reconfigure and reset, and next returns
@ -111,12 +110,6 @@ class test_cursor_random(wttest.WiredTigerTestCase):
list.append(cursor.get_key())
self.assertGreater(len(set(list)), 80)
# Ignore the eviction generation drain warning as it is possible for eviction to
# take longer to evict pages due to overflow items on the page.
self.conn.close()
if (sys.platform.startswith('darwin')):
self.ignoreStdoutPatternIfExists(self.expected_warning_msg)
def test_cursor_random_multiple_insert_records_small(self):
self.cursor_random_multiple_insert_records(2000)
def test_cursor_random_multiple_insert_records_large(self):
@ -144,12 +137,6 @@ class test_cursor_random(wttest.WiredTigerTestCase):
list.append(cursor.get_key())
self.assertGreater(len(set(list)), 80)
# Ignore the eviction generation drain warning as it is possible for eviction to
# take longer to evict pages due to overflow items on the page.
self.conn.close()
if (sys.platform.startswith('darwin')):
self.ignoreStdoutPatternIfExists(self.expected_warning_msg)
def test_cursor_random_multiple_page_records_reopen_small(self):
self.cursor_random_multiple_page_records(2000, True)
def test_cursor_random_multiple_page_records_reopen_large(self):
@ -182,12 +169,6 @@ class test_cursor_random(wttest.WiredTigerTestCase):
for i in range(1,10):
self.assertEqual(cursor.next(), 0)
# Ignore the eviction generation drain warning as it is possible for eviction to
# take longer to evict pages due to overflow items on the page.
self.conn.close()
if (sys.platform.startswith('darwin')):
self.ignoreStdoutPatternIfExists(self.expected_warning_msg)
# Check that next_random fails in the presence of a set of values, all of
# which are deleted.
def test_cursor_random_deleted_all(self):
@ -205,12 +186,6 @@ class test_cursor_random(wttest.WiredTigerTestCase):
for i in range(1,10):
self.assertTrue(cursor.next(), wiredtiger.WT_NOTFOUND)
# Ignore the eviction generation drain warning as it is possible for eviction to
# take longer to evict pages due to overflow items on the page.
self.conn.close()
if (sys.platform.startswith('darwin')):
self.ignoreStdoutPatternIfExists(self.expected_warning_msg)
# Check that opening a random cursor on column-store returns not-supported.
class test_cursor_random_column(wttest.WiredTigerTestCase):
scenarios = make_scenarios([

View File

@ -198,7 +198,5 @@ class test_hs07(wttest.WiredTigerTestCase):
# Check that the new updates are only seen after the update timestamp
self.check(bigvalue, uri, nrows, 300)
self.ignoreStdoutPatternIfExists('Eviction took more than 1 minute')
if __name__ == '__main__':
wttest.run()

View File

@ -26,7 +26,7 @@
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
import wiredtiger, wttest, sys
import time, wiredtiger, wttest
from wtscenario import make_scenarios
# test_hs20.py
@ -102,8 +102,3 @@ class test_hs20(wttest.WiredTigerTestCase):
self.session.begin_transaction('read_timestamp=' + self.timestamp_str(3))
self.assertEqual(cursor[self.make_key(i)], value1 + "B")
self.session.rollback_transaction()
if (sys.platform.startswith('darwin')):
# Ignore the eviction generation drain warning as it is possible for eviction to take
# longer to evict pages due to overflow items on the page.
self.ignoreStdoutPatternIfExists('Eviction took more than 1 minute')

View File

@ -141,7 +141,5 @@ class test_prepare_hs01(wttest.WiredTigerTestCase):
nkeys = 4000
self.prepare_updates(uri, ds, nrows, nsessions, nkeys)
self.ignoreStdoutPatternIfExists('Eviction took more than 1 minute')
if __name__ == '__main__':
wttest.run()

View File

@ -91,7 +91,6 @@ class test_txn13(wttest.WiredTigerTestCase, suite_subprocess):
else:
self.session.commit_transaction()
self.ignoreStdoutPatternIfExists('Eviction took more than 1 minute')
self.assertTrue(gotException == self.expect_err)
if __name__ == '__main__':