mirror of https://github.com/mongodb/mongo
SERVER-114059 Update quorum checking to use the maintenance port (#44097)
GitOrigin-RevId: cf17bc5c326a07822e72cce4b5ce1d3cc828fab9
This commit is contained in:
parent
4844f4b6f1
commit
9c4b72367f
|
|
@ -9,6 +9,7 @@ import {ReplSetTest} from "jstests/libs/replsettest.js";
|
|||
import {afterEach, beforeEach, describe, it} from "jstests/libs/mochalite.js";
|
||||
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
||||
import {get_ipaddr} from "jstests/libs/host_ipaddr.js";
|
||||
import {configureFailPointForRS} from "jstests/libs/fail_point_util.js";
|
||||
|
||||
describe("Tests for maintenance port usage within JS test helpers", function () {
|
||||
beforeEach(() => {
|
||||
|
|
@ -20,6 +21,14 @@ describe("Tests for maintenance port usage within JS test helpers", function ()
|
|||
).version;
|
||||
return MongoRunner.compareBinVersions(lastLTSFCV, maintenancePortEnabledFCV) == -1;
|
||||
};
|
||||
|
||||
this.dropAllConns = function (rs) {
|
||||
rs.nodes.forEach((conn) => {
|
||||
const cfg = conn.getDB("local").system.replset.findOne();
|
||||
const allHosts = cfg.members.map((x) => x.host);
|
||||
assert.commandWorked(conn.adminCommand({dropConnections: 1, hostAndPort: allHosts}));
|
||||
});
|
||||
};
|
||||
});
|
||||
|
||||
it("ReplSetInitiate with maintenance port on FCV 8.0 will fail", () => {
|
||||
|
|
@ -120,6 +129,73 @@ describe("Tests for maintenance port usage within JS test helpers", function ()
|
|||
rs.stopSet();
|
||||
});
|
||||
|
||||
it("Initiate with wrong maintenance port on a majority of nodes should fail", () => {
|
||||
const rs = new ReplSetTest({nodes: [{maintenancePort: allocatePort()}, {}, {}]});
|
||||
rs.startSet();
|
||||
|
||||
jsTest.log.info(
|
||||
"Initiate should fail because we specify maintenance ports which are not open on the secondaries",
|
||||
);
|
||||
let config = rs.getReplSetConfig();
|
||||
config.members[1].maintenancePort = 27022;
|
||||
config.members[2].maintenancePort = 27023;
|
||||
|
||||
assert.commandFailedWithCode(rs.nodes[0].adminCommand({replSetInitiate: config}), ErrorCodes.NodeNotFound);
|
||||
|
||||
rs.stopSet();
|
||||
});
|
||||
|
||||
it("Reconfig with wrong maintenance port on a majority of nodes should fail", () => {
|
||||
const rs = new ReplSetTest({nodes: [{maintenancePort: allocatePort()}, {}, {}]});
|
||||
rs.startSet();
|
||||
rs.initiate();
|
||||
|
||||
jsTest.log.info(
|
||||
"Initiate should fail because we specify maintenance ports which are not open on the secondaries",
|
||||
);
|
||||
let config = rs.getReplSetConfigFromNode();
|
||||
config.members[1].maintenancePort = 27022;
|
||||
config.members[2].maintenancePort = 27023;
|
||||
config.version += 1;
|
||||
|
||||
assert.commandFailedWithCode(rs.getPrimary().adminCommand({replSetReconfig: config}), ErrorCodes.NodeNotFound);
|
||||
|
||||
rs.stopSet();
|
||||
});
|
||||
|
||||
it("Initiate when we can only reach the maintenance port on a majority of nodes should fail", () => {
|
||||
const rs = new ReplSetTest({nodes: 3, useMaintenancePorts: true});
|
||||
rs.startSet();
|
||||
|
||||
jsTest.log.info("Block connections on the main ports");
|
||||
let fps = configureFailPointForRS(rs.nodes, "rejectNewNonPriorityConnections");
|
||||
|
||||
jsTest.log.info("Initiate should fail because we can only connect on the maintenance ports");
|
||||
let config = rs.getReplSetConfig();
|
||||
assert.commandFailedWithCode(rs.nodes[0].adminCommand({replSetInitiate: config}), ErrorCodes.NodeNotFound);
|
||||
|
||||
rs.stopSet();
|
||||
});
|
||||
|
||||
it("Reconfig when we can only reach the maintenance port on a majority of nodes should fail", () => {
|
||||
const rs = new ReplSetTest({nodes: 3, useMaintenancePorts: true});
|
||||
rs.startSet();
|
||||
rs.initiate();
|
||||
|
||||
let config = rs.getReplSetConfigFromNode();
|
||||
|
||||
jsTest.log.info("Block connections on the main ports and drop existing connections");
|
||||
let fps = configureFailPointForRS(rs.nodes, "rejectNewNonPriorityConnections");
|
||||
this.dropAllConns(rs);
|
||||
|
||||
jsTest.log.info("Reconfig should fail because we can only connect on the maintenance ports");
|
||||
config.version += 1;
|
||||
assert.commandFailedWithCode(rs.getPrimary().adminCommand({replSetReconfig: config}), ErrorCodes.NodeNotFound);
|
||||
|
||||
fps.off();
|
||||
rs.stopSet();
|
||||
});
|
||||
|
||||
it("Initiate with maintenance port plus bindIp works when fast resolution works", () => {
|
||||
let ips = "localhost," + get_ipaddr();
|
||||
const rs = new ReplSetTest({
|
||||
|
|
|
|||
|
|
@ -65,15 +65,23 @@ QuorumChecker::QuorumChecker(const ReplSetConfig* rsConfig, int myIndex, long lo
|
|||
: _rsConfig(rsConfig),
|
||||
_myIndex(myIndex),
|
||||
_term(term),
|
||||
_responses(_rsConfig->getNumMembers(), {false, false, false}),
|
||||
_successfulVoterCount(0),
|
||||
_numResponses(1), // We "responded" to ourself already.
|
||||
_numElectable(0),
|
||||
_numResponsesRequired(_rsConfig->getNumMembers() +
|
||||
_rsConfig->getCountOfMembersWithMaintenancePort()),
|
||||
_vetoStatus(Status::OK()),
|
||||
_finalStatus(ErrorCodes::CallbackCanceled, "Quorum check canceled") {
|
||||
invariant(myIndex < _rsConfig->getNumMembers());
|
||||
const MemberConfig& myConfig = _rsConfig->getMemberAt(_myIndex);
|
||||
|
||||
_responses.at(myIndex) = {true, myConfig.getMaintenancePort().is_initialized(), true};
|
||||
if (myConfig.isVoter()) {
|
||||
_voters.push_back(myConfig.getHostAndPort());
|
||||
_successfulVoterCount++;
|
||||
}
|
||||
if (myConfig.getMaintenancePort()) {
|
||||
_numResponses++;
|
||||
}
|
||||
if (myConfig.isElectable()) {
|
||||
_numElectable = 1;
|
||||
|
|
@ -107,7 +115,9 @@ std::vector<RemoteCommandRequest> QuorumChecker::getRequests() const {
|
|||
}
|
||||
// hbArgs allows (but doesn't require) us to pass the current primary id as an optimization,
|
||||
// but it is not readily available within QuorumChecker.
|
||||
hbArgs.setSenderHost(myConfig.getHostAndPort());
|
||||
// Use the maintenance port because the recipient may send a heartbeat back to get a newer
|
||||
// configuration and we want them to use the maintenance port if it is available.
|
||||
hbArgs.setSenderHost(myConfig.getHostAndPortMaintenance());
|
||||
hbArgs.setSenderId(myConfig.getId().getData());
|
||||
hbArgs.setTerm(_term);
|
||||
hbRequest = hbArgs.toBSON();
|
||||
|
|
@ -121,12 +131,24 @@ std::vector<RemoteCommandRequest> QuorumChecker::getRequests() const {
|
|||
// No need to check self for liveness or unreadiness.
|
||||
continue;
|
||||
}
|
||||
requests.push_back(RemoteCommandRequest(_rsConfig->getMemberAt(i).getHostAndPort(),
|
||||
const auto& member = _rsConfig->getMemberAt(i);
|
||||
requests.push_back(RemoteCommandRequest(member.getHostAndPort(),
|
||||
DatabaseName::kAdmin,
|
||||
hbRequest,
|
||||
BSON(rpc::kReplSetMetadataFieldName << 1),
|
||||
nullptr,
|
||||
_rsConfig->getHeartbeatTimeoutPeriodMillis()));
|
||||
|
||||
// If a member has a maintenance port specified then we need to check connectivity to both
|
||||
// the main and the maintenance ports.
|
||||
if (member.getMaintenancePort()) {
|
||||
requests.push_back(RemoteCommandRequest(member.getHostAndPortMaintenance(),
|
||||
DatabaseName::kAdmin,
|
||||
hbRequest,
|
||||
BSON(rpc::kReplSetMetadataFieldName << 1),
|
||||
nullptr,
|
||||
_rsConfig->getHeartbeatTimeoutPeriodMillis()));
|
||||
}
|
||||
}
|
||||
|
||||
return requests;
|
||||
|
|
@ -140,6 +162,38 @@ void QuorumChecker::processResponse(const RemoteCommandRequest& request,
|
|||
}
|
||||
}
|
||||
|
||||
void QuorumChecker::_appendFailedHeartbeatResponses(str::stream& stream) {
|
||||
for (std::vector<std::pair<HostAndPort, Status>>::const_iterator it = _badResponses.begin();
|
||||
it != _badResponses.end();
|
||||
++it) {
|
||||
if (it != _badResponses.begin()) {
|
||||
stream << ", ";
|
||||
}
|
||||
stream << it->first.toString() << " failed with " << it->second.reason();
|
||||
}
|
||||
}
|
||||
|
||||
void QuorumChecker::_appendFullySuccessfulVotingHostAndPorts(str::stream& stream,
|
||||
int expectedResponses) {
|
||||
int count = 0;
|
||||
for (int i = 0; i < _rsConfig->getNumMembers(); ++i) {
|
||||
if (!_responses.at(i).fullySuccessful) {
|
||||
continue;
|
||||
}
|
||||
const auto& member = _rsConfig->getMemberAt(i);
|
||||
if (!member.isVoter()) {
|
||||
continue;
|
||||
}
|
||||
if (count != 0) {
|
||||
stream << ", ";
|
||||
}
|
||||
stream << member.getHostAndPort().toString();
|
||||
if (++count == expectedResponses) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void QuorumChecker::_onQuorumCheckComplete() {
|
||||
if (!_vetoStatus.isOK()) {
|
||||
_finalStatus = _vetoStatus;
|
||||
|
|
@ -149,14 +203,7 @@ void QuorumChecker::_onQuorumCheckComplete() {
|
|||
str::stream message;
|
||||
message << "replSetInitiate quorum check failed because not all proposed set members "
|
||||
"responded affirmatively: ";
|
||||
for (std::vector<std::pair<HostAndPort, Status>>::const_iterator it = _badResponses.begin();
|
||||
it != _badResponses.end();
|
||||
++it) {
|
||||
if (it != _badResponses.begin()) {
|
||||
message << ", ";
|
||||
}
|
||||
message << it->first.toString() << " failed with " << it->second.reason();
|
||||
}
|
||||
_appendFailedHeartbeatResponses(message);
|
||||
_finalStatus = Status(ErrorCodes::NodeNotFound, message);
|
||||
return;
|
||||
}
|
||||
|
|
@ -166,31 +213,21 @@ void QuorumChecker::_onQuorumCheckComplete() {
|
|||
"electable nodes responded; at least one required for config");
|
||||
return;
|
||||
}
|
||||
if (int(_voters.size()) < _rsConfig->getMajorityVoteCount()) {
|
||||
if (_successfulVoterCount < _rsConfig->getMajorityVoteCount()) {
|
||||
str::stream message;
|
||||
message << "Quorum check failed because not enough voting nodes responded; required "
|
||||
<< _rsConfig->getMajorityVoteCount() << " but ";
|
||||
|
||||
if (_voters.size() == 0) {
|
||||
if (_successfulVoterCount == 0) {
|
||||
message << "none responded";
|
||||
} else {
|
||||
message << "only the following " << _voters.size()
|
||||
<< " voting nodes responded: " << _voters.front().toString();
|
||||
for (size_t i = 1; i < _voters.size(); ++i) {
|
||||
message << ", " << _voters[i].toString();
|
||||
}
|
||||
message << "only the following " << _successfulVoterCount
|
||||
<< " voting nodes responded: ";
|
||||
_appendFullySuccessfulVotingHostAndPorts(message, _successfulVoterCount);
|
||||
}
|
||||
if (!_badResponses.empty()) {
|
||||
message << "; the following nodes did not respond affirmatively: ";
|
||||
for (std::vector<std::pair<HostAndPort, Status>>::const_iterator it =
|
||||
_badResponses.begin();
|
||||
it != _badResponses.end();
|
||||
++it) {
|
||||
if (it != _badResponses.begin()) {
|
||||
message << ", ";
|
||||
}
|
||||
message << it->first.toString() << " failed with " << it->second.reason();
|
||||
}
|
||||
_appendFailedHeartbeatResponses(message);
|
||||
}
|
||||
_finalStatus = Status(ErrorCodes::NodeNotFound, message);
|
||||
return;
|
||||
|
|
@ -257,14 +294,27 @@ void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& reque
|
|||
|
||||
for (int i = 0; i < _rsConfig->getNumMembers(); ++i) {
|
||||
const MemberConfig& memberConfig = _rsConfig->getMemberAt(i);
|
||||
if (memberConfig.getHostAndPort() != request.target) {
|
||||
if (memberConfig.getHostAndPort() != request.target &&
|
||||
memberConfig.getHostAndPortMaintenance() != request.target) {
|
||||
continue;
|
||||
}
|
||||
if (memberConfig.isElectable()) {
|
||||
++_numElectable;
|
||||
if (memberConfig.getMaintenancePort() &&
|
||||
memberConfig.getHostAndPortMaintenance() == request.target) {
|
||||
_responses.at(i).maintenanceResponseReceived = true;
|
||||
} else {
|
||||
_responses.at(i).mainResponseReceived = true;
|
||||
}
|
||||
if (memberConfig.isVoter()) {
|
||||
_voters.push_back(request.target);
|
||||
// Check if we have now received both responses for this node.
|
||||
_responses.at(i).fullySuccessful = _responses.at(i).mainResponseReceived &&
|
||||
(!memberConfig.getMaintenancePort() || _responses.at(i).maintenanceResponseReceived);
|
||||
// If we have received both responses for this node then update our global counters.
|
||||
if (_responses.at(i).fullySuccessful) {
|
||||
if (memberConfig.isVoter()) {
|
||||
++_successfulVoterCount;
|
||||
}
|
||||
if (memberConfig.isElectable()) {
|
||||
++_numElectable;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
@ -272,7 +322,7 @@ void QuorumChecker::_tabulateHeartbeatResponse(const RemoteCommandRequest& reque
|
|||
}
|
||||
|
||||
bool QuorumChecker::hasReceivedSufficientResponses() const {
|
||||
if (!_vetoStatus.isOK() || _numResponses == _rsConfig->getNumMembers()) {
|
||||
if (!_vetoStatus.isOK() || _numResponses == _numResponsesRequired) {
|
||||
// Vetoed or everybody has responded. All done.
|
||||
return true;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -94,6 +94,22 @@ private:
|
|||
void _tabulateHeartbeatResponse(const executor::RemoteCommandRequest& request,
|
||||
const executor::RemoteCommandResponse& response);
|
||||
|
||||
/**
|
||||
* Adds information about each failed heartbeat response to the provided stream with the format:
|
||||
* "<host:port> failed with <errmsg>, <host:port> failed with <errmsg>".
|
||||
*/
|
||||
void _appendFailedHeartbeatResponses(str::stream& stream);
|
||||
|
||||
/**
|
||||
* Adds information about each fully successful voting node to the provided stream with the
|
||||
* format:
|
||||
* "<host:port>, <host:port>"
|
||||
* A fully successful voting node is one which replied success over its main port and either
|
||||
* does not have a maintenance port configured or also replied success over the maintenance
|
||||
* port.
|
||||
*/
|
||||
void _appendFullySuccessfulVotingHostAndPorts(str::stream& stream, int expectedResponses);
|
||||
|
||||
// Pointer to the replica set configuration for which we're checking quorum.
|
||||
const ReplSetConfig* const _rsConfig;
|
||||
|
||||
|
|
@ -103,8 +119,18 @@ private:
|
|||
// The term of this node.
|
||||
const long long _term;
|
||||
|
||||
// List of voting nodes that have responded affirmatively.
|
||||
std::vector<HostAndPort> _voters;
|
||||
struct ResponseStatus {
|
||||
bool mainResponseReceived;
|
||||
bool maintenanceResponseReceived;
|
||||
bool fullySuccessful;
|
||||
};
|
||||
// Tracks main and maintenance port responses for each member. The indexes into this vector will
|
||||
// be the same as that of the _rsConfig and entries for non-voters will be all false.
|
||||
std::vector<ResponseStatus> _responses;
|
||||
// Tracks the number of voters for which their state is fully successful (meaning they have
|
||||
// responded on the main port and do not have a maintenance port configured or have responded
|
||||
// on both main and maintenance ports).
|
||||
int _successfulVoterCount;
|
||||
|
||||
// List of nodes with bad responses and the bad response status they returned.
|
||||
std::vector<std::pair<HostAndPort, Status>> _badResponses;
|
||||
|
|
@ -112,9 +138,15 @@ private:
|
|||
// Total number of responses and timeouts processed.
|
||||
int _numResponses;
|
||||
|
||||
// Number of electable nodes that have responded affirmatively.
|
||||
// Number of electable nodes that have responded affirmatively (on both their main and
|
||||
// maintenance ports).
|
||||
int _numElectable;
|
||||
|
||||
// Number of responses required. This will be equal to the number of members in the config plus
|
||||
// the number of members that have maintenance ports specified since we need to contact both
|
||||
// ports in that case.
|
||||
int _numResponsesRequired;
|
||||
|
||||
// Set to a non-OK status if a response from a remote node indicates
|
||||
// that the quorum check should definitely fail, such as because of
|
||||
// a replica set name mismatch.
|
||||
|
|
|
|||
|
|
@ -232,7 +232,7 @@ BSONObj makeHeartbeatRequest(const ReplSetConfig& rsConfig, int myConfigIndex) {
|
|||
if (rsConfig.getConfigVersion() == 1) {
|
||||
hbArgs.setCheckEmpty();
|
||||
}
|
||||
hbArgs.setSenderHost(myConfig.getHostAndPort());
|
||||
hbArgs.setSenderHost(myConfig.getHostAndPortMaintenance());
|
||||
hbArgs.setSenderId(myConfig.getId().getData());
|
||||
hbArgs.setTerm(0);
|
||||
return hbArgs.toBSON();
|
||||
|
|
@ -500,6 +500,65 @@ TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToSetIdMismatch) {
|
|||
ASSERT_NOT_REASON_CONTAINS(status, "h5:1");
|
||||
}
|
||||
|
||||
TEST_F(CheckQuorumForInitiate, QuorumCheckFailedDueToMaintenancePortUnreachable) {
|
||||
// In this test, "we" are host "h1:1". All nodes respond successfully to their heartbeat
|
||||
// requests on the main port but "we" don't respond via our maintenance port.
|
||||
|
||||
const ReplSetConfig rsConfig = assertMakeRSConfig(
|
||||
BSON("_id" << "rs0"
|
||||
<< "version" << 1 << "protocolVersion" << 1 << "members"
|
||||
<< BSON_ARRAY(BSON("_id" << 1 << "host"
|
||||
<< "h1:1")
|
||||
<< BSON("_id" << 2 << "host"
|
||||
<< "h2:1")
|
||||
<< BSON("_id" << 3 << "host"
|
||||
<< "h3:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 4 << "host"
|
||||
<< "h4:1")
|
||||
<< BSON("_id" << 5 << "host"
|
||||
<< "h5:1"))));
|
||||
const int myConfigIndex = 0;
|
||||
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
|
||||
|
||||
startQuorumCheck(rsConfig, myConfigIndex);
|
||||
const Date_t startDate = getNet()->now();
|
||||
const int numCommandsExpected =
|
||||
rsConfig.getNumMembers(); // One more than normal because one maintenance port.
|
||||
stdx::unordered_set<HostAndPort> seenHosts;
|
||||
getNet()->enterNetwork();
|
||||
for (int i = 0; i < numCommandsExpected; ++i) {
|
||||
const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
|
||||
const RemoteCommandRequest& request = noi->getRequest();
|
||||
ASSERT_EQUALS(DatabaseName::kAdmin, request.dbname);
|
||||
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
|
||||
ASSERT(seenHosts.insert(request.target).second)
|
||||
<< "Already saw " << request.target.toString();
|
||||
if (request.target == HostAndPort("h3", 2)) {
|
||||
getNet()->scheduleResponse(noi,
|
||||
startDate + Milliseconds(10),
|
||||
RemoteCommandResponse::make_forTest(
|
||||
Status(ErrorCodes::HostUnreachable, "No response")));
|
||||
} else {
|
||||
getNet()->scheduleResponse(noi,
|
||||
startDate + Milliseconds(10),
|
||||
makeHeartbeatResponse(rsConfig, Milliseconds(8)));
|
||||
}
|
||||
}
|
||||
getNet()->runUntil(startDate + Milliseconds(10));
|
||||
getNet()->exitNetwork();
|
||||
Status status = waitForQuorumCheck();
|
||||
ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
|
||||
ASSERT_REASON_CONTAINS(
|
||||
status, "replSetInitiate quorum check failed because not all proposed set members");
|
||||
ASSERT_NOT_REASON_CONTAINS(status, "h1:1");
|
||||
ASSERT_NOT_REASON_CONTAINS(status, "h2:1");
|
||||
ASSERT_REASON_CONTAINS(status, "h3:2");
|
||||
ASSERT_NOT_REASON_CONTAINS(status, "h3:1");
|
||||
ASSERT_NOT_REASON_CONTAINS(status, "h4:1");
|
||||
ASSERT_NOT_REASON_CONTAINS(status, "h5:1");
|
||||
ASSERT_NOT_REASON_CONTAINS(status, "h6:1");
|
||||
}
|
||||
|
||||
TEST_F(CheckQuorumForReconfig, QuorumCheckSucceedsWhenOtherNodesHaveHigherVersion) {
|
||||
// In this test, "we" are host "h3:1". The request to "h2" does not arrive before the end
|
||||
// of the test, and the request to "h1" comes back indicating a higher config version.
|
||||
|
|
@ -661,6 +720,187 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToInsufficientVoters) {
|
|||
ASSERT_NOT_REASON_CONTAINS(status, "h5:1");
|
||||
}
|
||||
|
||||
TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToInsufficientMaintenancePortResponses) {
|
||||
// In this test, "we" are host "h4". All nodes respond via their main port but only "h1"
|
||||
// responds via its maintenance port.
|
||||
|
||||
const ReplSetConfig rsConfig = assertMakeRSConfig(BSON(
|
||||
"_id" << "rs0"
|
||||
<< "version" << 2 << "protocolVersion" << 1 << "members"
|
||||
<< BSON_ARRAY(
|
||||
BSON("_id" << 1 << "host"
|
||||
<< "h1:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 2 << "host"
|
||||
<< "h2:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 3 << "host"
|
||||
<< "h3:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 4 << "host"
|
||||
<< "h4:1"
|
||||
<< "votes" << 0 << "priority" << 0 << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 5 << "host"
|
||||
<< "h5:1"
|
||||
<< "votes" << 0 << "priority" << 0 << "maintenancePort" << 2))));
|
||||
const int myConfigIndex = 3;
|
||||
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
|
||||
|
||||
std::set<HostAndPort> respondFailure = {HostAndPort("h2", 2), HostAndPort("h3", 2)};
|
||||
|
||||
startQuorumCheck(rsConfig, myConfigIndex);
|
||||
const Date_t startDate = getNet()->now();
|
||||
const int numCommandsExpected = (rsConfig.getNumMembers() * 2) - 2;
|
||||
stdx::unordered_set<HostAndPort> seenHosts;
|
||||
getNet()->enterNetwork();
|
||||
for (int i = 0; i < numCommandsExpected; ++i) {
|
||||
const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
|
||||
const RemoteCommandRequest& request = noi->getRequest();
|
||||
ASSERT_EQUALS(DatabaseName::kAdmin, request.dbname);
|
||||
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
|
||||
ASSERT(seenHosts.insert(request.target).second)
|
||||
<< "Already saw " << request.target.toString();
|
||||
if (respondFailure.contains(request.target)) {
|
||||
getNet()->scheduleResponse(noi,
|
||||
startDate + Milliseconds(10),
|
||||
RemoteCommandResponse::make_forTest(
|
||||
Status(ErrorCodes::HostUnreachable, "No response")));
|
||||
} else {
|
||||
getNet()->scheduleResponse(noi,
|
||||
startDate + Milliseconds(10),
|
||||
makeHeartbeatResponse(rsConfig, Milliseconds(8)));
|
||||
}
|
||||
}
|
||||
getNet()->runUntil(startDate + Milliseconds(10));
|
||||
getNet()->exitNetwork();
|
||||
Status status = waitForQuorumCheck();
|
||||
ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
|
||||
ASSERT_REASON_CONTAINS(status, "not enough voting nodes responded; required 2 but only");
|
||||
ASSERT_REASON_CONTAINS(status, "h1:1");
|
||||
ASSERT_REASON_CONTAINS(status, "h2:2 failed with");
|
||||
ASSERT_REASON_CONTAINS(status, "h3:2 failed with");
|
||||
ASSERT_NOT_REASON_CONTAINS(status, "h4:1");
|
||||
ASSERT_NOT_REASON_CONTAINS(status, "h5:1");
|
||||
}
|
||||
|
||||
TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToInsufficientMainPortResponses) {
|
||||
// In this test, "we" are host "h4". All nodes respond via their maintenance port but only "h1"
|
||||
// responds via its main port.
|
||||
|
||||
const ReplSetConfig rsConfig = assertMakeRSConfig(BSON(
|
||||
"_id" << "rs0"
|
||||
<< "version" << 2 << "protocolVersion" << 1 << "members"
|
||||
<< BSON_ARRAY(
|
||||
BSON("_id" << 1 << "host"
|
||||
<< "h1:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 2 << "host"
|
||||
<< "h2:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 3 << "host"
|
||||
<< "h3:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 4 << "host"
|
||||
<< "h4:1"
|
||||
<< "votes" << 0 << "priority" << 0 << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 5 << "host"
|
||||
<< "h5:1"
|
||||
<< "votes" << 0 << "priority" << 0 << "maintenancePort" << 2))));
|
||||
const int myConfigIndex = 3;
|
||||
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
|
||||
|
||||
std::set<HostAndPort> respondFailure = {HostAndPort("h2", 1), HostAndPort("h3", 1)};
|
||||
|
||||
startQuorumCheck(rsConfig, myConfigIndex);
|
||||
const Date_t startDate = getNet()->now();
|
||||
const int numCommandsExpected = (rsConfig.getNumMembers() * 2) - 2;
|
||||
stdx::unordered_set<HostAndPort> seenHosts;
|
||||
getNet()->enterNetwork();
|
||||
for (int i = 0; i < numCommandsExpected; ++i) {
|
||||
const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
|
||||
const RemoteCommandRequest& request = noi->getRequest();
|
||||
ASSERT_EQUALS(DatabaseName::kAdmin, request.dbname);
|
||||
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
|
||||
ASSERT(seenHosts.insert(request.target).second)
|
||||
<< "Already saw " << request.target.toString();
|
||||
if (respondFailure.contains(request.target)) {
|
||||
getNet()->scheduleResponse(noi,
|
||||
startDate + Milliseconds(10),
|
||||
RemoteCommandResponse::make_forTest(
|
||||
Status(ErrorCodes::HostUnreachable, "No response")));
|
||||
} else {
|
||||
getNet()->scheduleResponse(noi,
|
||||
startDate + Milliseconds(10),
|
||||
makeHeartbeatResponse(rsConfig, Milliseconds(8)));
|
||||
}
|
||||
}
|
||||
getNet()->runUntil(startDate + Milliseconds(10));
|
||||
getNet()->exitNetwork();
|
||||
Status status = waitForQuorumCheck();
|
||||
ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
|
||||
ASSERT_REASON_CONTAINS(status, "not enough voting nodes responded; required 2 but only");
|
||||
ASSERT_REASON_CONTAINS(status, "h1:1");
|
||||
ASSERT_REASON_CONTAINS(status, "h2:1 failed with");
|
||||
ASSERT_REASON_CONTAINS(status, "h3:1 failed with");
|
||||
ASSERT_NOT_REASON_CONTAINS(status, "h4:1");
|
||||
ASSERT_NOT_REASON_CONTAINS(status, "h5:1");
|
||||
}
|
||||
|
||||
TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToNonOverlappingMainAndMaintenancePortResponses) {
|
||||
// In this test, "we" are host "h3". Nodes 1 and 2 respond via their main port but not
|
||||
// maintenance port and hosts 4 and 5 respond via their maintenance port not main port. Thus we
|
||||
// have no overlapping majority.
|
||||
|
||||
const ReplSetConfig rsConfig = assertMakeRSConfig(
|
||||
BSON("_id" << "rs0"
|
||||
<< "version" << 2 << "protocolVersion" << 1 << "members"
|
||||
<< BSON_ARRAY(BSON("_id" << 1 << "host"
|
||||
<< "h1:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 2 << "host"
|
||||
<< "h2:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 3 << "host"
|
||||
<< "h3:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 4 << "host"
|
||||
<< "h4:1"
|
||||
<< "maintenancePort" << 2)
|
||||
<< BSON("_id" << 5 << "host"
|
||||
<< "h5:1"
|
||||
<< "maintenancePort" << 2))));
|
||||
const int myConfigIndex = 2;
|
||||
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
|
||||
|
||||
std::set<HostAndPort> respondFailure = {
|
||||
HostAndPort("h4", 1), HostAndPort("h5", 1), HostAndPort("h1", 2), HostAndPort("h2", 2)};
|
||||
|
||||
startQuorumCheck(rsConfig, myConfigIndex);
|
||||
const Date_t startDate = getNet()->now();
|
||||
const int numCommandsExpected = (rsConfig.getNumMembers() * 2) - 2;
|
||||
stdx::unordered_set<HostAndPort> seenHosts;
|
||||
getNet()->enterNetwork();
|
||||
for (int i = 0; i < numCommandsExpected; ++i) {
|
||||
const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
|
||||
const RemoteCommandRequest& request = noi->getRequest();
|
||||
ASSERT_EQUALS(DatabaseName::kAdmin, request.dbname);
|
||||
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
|
||||
ASSERT(seenHosts.insert(request.target).second)
|
||||
<< "Already saw " << request.target.toString();
|
||||
if (respondFailure.contains(request.target)) {
|
||||
getNet()->scheduleResponse(noi,
|
||||
startDate + Milliseconds(10),
|
||||
RemoteCommandResponse::make_forTest(
|
||||
Status(ErrorCodes::HostUnreachable, "No response")));
|
||||
} else {
|
||||
getNet()->scheduleResponse(noi,
|
||||
startDate + Milliseconds(10),
|
||||
makeHeartbeatResponse(rsConfig, Milliseconds(8)));
|
||||
}
|
||||
}
|
||||
getNet()->runUntil(startDate + Milliseconds(10));
|
||||
getNet()->exitNetwork();
|
||||
Status status = waitForQuorumCheck();
|
||||
ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
|
||||
ASSERT_REASON_CONTAINS(status, "not enough voting nodes responded; required 3 but only");
|
||||
ASSERT_REASON_CONTAINS(status, "h3:1");
|
||||
ASSERT_REASON_CONTAINS(status, "h1:2 failed with");
|
||||
ASSERT_REASON_CONTAINS(status, "h2:2 failed with");
|
||||
ASSERT_REASON_CONTAINS(status, "h4:1 failed with");
|
||||
ASSERT_REASON_CONTAINS(status, "h5:1 failed with");
|
||||
}
|
||||
|
||||
TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToNoElectableNodeResponding) {
|
||||
// In this test, "we" are host "h4". Only "h1", "h2" and "h3" are electable,
|
||||
// and none of them respond.
|
||||
|
|
@ -713,6 +953,61 @@ TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToNoElectableNodeResponding) {
|
|||
ASSERT_REASON_CONTAINS(status, "no electable nodes responded");
|
||||
}
|
||||
|
||||
TEST_F(CheckQuorumForReconfig, QuorumCheckFailsDueToNoElectableNodeRespondingViaMaintenancePort) {
|
||||
// In this test, "we" are host "h4". Only "h1", "h2" and "h3" are electable,
|
||||
// and none of them respond via their maintenance ports
|
||||
|
||||
const ReplSetConfig rsConfig = assertMakeRSConfig(
|
||||
BSON("_id" << "rs0"
|
||||
<< "version" << 2 << "protocolVersion" << 1 << "members"
|
||||
<< BSON_ARRAY(BSON("_id" << 1 << "host"
|
||||
<< "h1:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 2 << "host"
|
||||
<< "h2:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 3 << "host"
|
||||
<< "h3:1" << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 4 << "host"
|
||||
<< "h4:1"
|
||||
<< "priority" << 0 << "maintenancePort" << 2)
|
||||
<< BSON("_id" << 5 << "host"
|
||||
<< "h5:1"
|
||||
<< "priority" << 0 << "maintenancePort" << 2))));
|
||||
const int myConfigIndex = 3;
|
||||
const BSONObj hbRequest = makeHeartbeatRequest(rsConfig, myConfigIndex);
|
||||
std::set<HostAndPort> respondFailure = {
|
||||
HostAndPort("h1", 2), HostAndPort("h2", 2), HostAndPort("h3", 2)};
|
||||
|
||||
|
||||
startQuorumCheck(rsConfig, myConfigIndex);
|
||||
const Date_t startDate = getNet()->now();
|
||||
const int numCommandsExpected = (2 * rsConfig.getNumMembers()) - 2;
|
||||
stdx::unordered_set<HostAndPort> seenHosts;
|
||||
getNet()->enterNetwork();
|
||||
for (int i = 0; i < numCommandsExpected; ++i) {
|
||||
const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
|
||||
const RemoteCommandRequest& request = noi->getRequest();
|
||||
ASSERT_EQUALS(DatabaseName::kAdmin, request.dbname);
|
||||
ASSERT_BSONOBJ_EQ(hbRequest, request.cmdObj);
|
||||
ASSERT(seenHosts.insert(request.target).second)
|
||||
<< "Already saw " << request.target.toString();
|
||||
if (respondFailure.contains(request.target)) {
|
||||
getNet()->scheduleResponse(noi,
|
||||
startDate + Milliseconds(10),
|
||||
RemoteCommandResponse::make_forTest(
|
||||
Status(ErrorCodes::HostUnreachable, "No response")));
|
||||
} else {
|
||||
getNet()->scheduleResponse(noi,
|
||||
startDate + Milliseconds(10),
|
||||
makeHeartbeatResponse(rsConfig, Milliseconds(8)));
|
||||
}
|
||||
}
|
||||
getNet()->runUntil(startDate + Milliseconds(10));
|
||||
getNet()->exitNetwork();
|
||||
Status status = waitForQuorumCheck();
|
||||
ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
|
||||
ASSERT_REASON_CONTAINS(status, "no electable nodes responded");
|
||||
}
|
||||
|
||||
TEST_F(CheckQuorumForReconfig, QuorumCheckSucceedsIfMinoritySetTimesOut) {
|
||||
// In this test, "we" are host "h4". Only "h1", "h2" and "h3" can vote.
|
||||
// The quorum check should succeed even if we do not respond to a minority number of heartbeats,
|
||||
|
|
|
|||
|
|
@ -106,6 +106,19 @@ public:
|
|||
return _splitHorizon.getHostAndPort(horizon);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the host and maintenance port if the maintenance port is specified and the host and main
|
||||
* port if not. Always returns the value for the default horizon since this is only intended for
|
||||
* use by internal replication systems.
|
||||
*/
|
||||
HostAndPort getHostAndPortMaintenance() const {
|
||||
if (getMaintenancePort()) {
|
||||
return HostAndPort(getHostAndPort().host(), *getMaintenancePort());
|
||||
} else {
|
||||
return getHostAndPort();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the mapping of horizon names to `HostAndPort` for this replica set member.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -203,11 +203,14 @@ Status ReplSetConfig::_initialize(bool forInitiate,
|
|||
_addInternalWriteConcernModes();
|
||||
_initializeConnectionString();
|
||||
|
||||
// Count how many members can vote
|
||||
// Count how many members can vote and how many members have maintenance ports available.
|
||||
for (const MemberConfig& m : getMembers()) {
|
||||
if (m.getNumVotes() > 0) {
|
||||
++_votingMemberCount;
|
||||
}
|
||||
if (m.getMaintenancePort()) {
|
||||
++_maintenancePortCount;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
|
@ -550,7 +553,7 @@ int ReplSetConfig::findMemberIndexByHostAndPort(const HostAndPort& hap) const {
|
|||
for (std::vector<MemberConfig>::const_iterator it = getMembers().begin();
|
||||
it != getMembers().end();
|
||||
++it) {
|
||||
if (it->getHostAndPort() == hap) {
|
||||
if (it->getHostAndPort() == hap || it->getHostAndPortMaintenance() == hap) {
|
||||
return x;
|
||||
}
|
||||
++x;
|
||||
|
|
|
|||
|
|
@ -342,6 +342,13 @@ public:
|
|||
return _votingMemberCount;
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns a count of members with a maintenance port specified in this ReplSetConfig.
|
||||
*/
|
||||
int getCountOfMembersWithMaintenancePort() const {
|
||||
return _maintenancePortCount;
|
||||
};
|
||||
|
||||
/**
|
||||
* Access a MemberConfig element by index.
|
||||
*/
|
||||
|
|
@ -617,6 +624,7 @@ private:
|
|||
int _writeMajority = 0;
|
||||
int _totalVotingMembers = 0;
|
||||
int _votingMemberCount = 0;
|
||||
int _maintenancePortCount = 0;
|
||||
ReplSetTagConfig _tagConfig;
|
||||
StringMap<ReplSetTagPattern> _customWriteConcernModes;
|
||||
ConnectionString _connectionString;
|
||||
|
|
|
|||
|
|
@ -60,11 +60,11 @@ inline bool operator==(const MemberConfig& a, const MemberConfig& b) {
|
|||
}
|
||||
}
|
||||
return a.getId() == b.getId() && a.getHostAndPort() == b.getHostAndPort() &&
|
||||
a.getPriority() == b.getPriority() && a.getSecondaryDelay() == b.getSecondaryDelay() &&
|
||||
a.isVoter() == b.isVoter() && a.isArbiter() == b.isArbiter() &&
|
||||
a.isNewlyAdded() == b.isNewlyAdded() && a.isHidden() == b.isHidden() &&
|
||||
a.shouldBuildIndexes() == b.shouldBuildIndexes() && a.getNumTags() == b.getNumTags() &&
|
||||
a.getHorizonMappings() == b.getHorizonMappings() &&
|
||||
a.getMaintenancePort() == b.getMaintenancePort() && a.getPriority() == b.getPriority() &&
|
||||
a.getSecondaryDelay() == b.getSecondaryDelay() && a.isVoter() == b.isVoter() &&
|
||||
a.isArbiter() == b.isArbiter() && a.isNewlyAdded() == b.isNewlyAdded() &&
|
||||
a.isHidden() == b.isHidden() && a.shouldBuildIndexes() == b.shouldBuildIndexes() &&
|
||||
a.getNumTags() == b.getNumTags() && a.getHorizonMappings() == b.getHorizonMappings() &&
|
||||
a.getHorizonReverseHostMappings() == b.getHorizonReverseHostMappings();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1032,7 +1032,9 @@ std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinator::prepareHear
|
|||
if (_selfIndex >= 0) {
|
||||
const MemberConfig& me = _selfConfig();
|
||||
hbArgs.setSenderId(me.getId().getData());
|
||||
hbArgs.setSenderHost(me.getHostAndPort());
|
||||
// Use the maintenance port because the recipient may send a heartbeat back to get a
|
||||
// newer configuration and we want them to use the maintenance port if it is available.
|
||||
hbArgs.setSenderHost(me.getHostAndPortMaintenance());
|
||||
}
|
||||
hbArgs.setTerm(_term);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -56,6 +56,8 @@
|
|||
namespace mongo::transport {
|
||||
namespace {
|
||||
|
||||
MONGO_FAIL_POINT_DEFINE(rejectNewNonPriorityConnections);
|
||||
|
||||
thread_local decltype(ServerGlobalParams::maxIncomingConnsOverride)::Snapshot
|
||||
maxIncomingConnsOverride;
|
||||
|
||||
|
|
@ -275,8 +277,9 @@ void SessionManagerCommon::startSession(std::shared_ptr<Session> session) {
|
|||
IngressHandshakeMetrics::get(*session).onSessionStarted(_svcCtx->getTickSource());
|
||||
|
||||
serverGlobalParams.maxIncomingConnsOverride.refreshSnapshot(maxIncomingConnsOverride);
|
||||
const bool isPrivilegedSession =
|
||||
maxIncomingConnsOverride && session->isExemptedByCIDRList(*maxIncomingConnsOverride);
|
||||
// TODO (SERVER-113219) Check and modify this if needed.
|
||||
const bool isPrivilegedSession = session->isConnectedToMaintenancePort() ||
|
||||
(maxIncomingConnsOverride && session->isExemptedByCIDRList(*maxIncomingConnsOverride));
|
||||
const bool verbose = !quiet();
|
||||
|
||||
auto service = _svcCtx->getService();
|
||||
|
|
@ -287,7 +290,9 @@ void SessionManagerCommon::startSession(std::shared_ptr<Session> session) {
|
|||
std::shared_ptr<transport::SessionWorkflow> workflow;
|
||||
{
|
||||
auto sync = _sessions->sync();
|
||||
if (sync.size() >= _maxOpenSessions && !isPrivilegedSession) {
|
||||
if ((sync.size() >= _maxOpenSessions ||
|
||||
MONGO_unlikely(rejectNewNonPriorityConnections.shouldFail())) &&
|
||||
!isPrivilegedSession) {
|
||||
_sessions->incrementRejected();
|
||||
if (verbose) {
|
||||
ClientSummary cs(client);
|
||||
|
|
|
|||
Loading…
Reference in New Issue