Revert "SERVER-112674 Open and listen for connections on maintenance port (#43939)" (#45095)

Co-authored-by: auto-revert-processor <devprod-si-team@mongodb.com>
GitOrigin-RevId: 46d893eda4b592dde694e1f4bc024e44ea19befe
This commit is contained in:
auto-revert-app[bot] 2025-12-10 22:27:09 +00:00 committed by MongoDB Bot
parent bdc1063064
commit bd070b078f
17 changed files with 318 additions and 844 deletions

View File

@ -316,16 +316,6 @@ export class ReplSetTest {
return this._maintenancePorts[translatedN];
}
getNewConnectionToMaintenancePort(node) {
const maintenancePort = this.getMaintenancePort(node);
return new Mongo(node.host.split(":")[0] + ":" + maintenancePort);
}
getNewConnectionToMaintenanceSocket(node) {
const maintenancePort = this.getMaintenancePort(node);
return new Mongo("/tmp/mongodb-" + maintenancePort + ".sock");
}
getDbPath(node) {
// Get a replica set node (check for use of bridge).
const n = this.getNodeId(node);

View File

@ -1,6 +1,8 @@
/**
* Test that we can connect to the maintenance port either through TCP or Unix socket (only on non-Windows
* platforms) and they can handle connections in parallel with the main port.
* Test that the maintenance port option is specified correctly in a ReplSetTest and a ShardingTest.
*
* TODO (SERVER-112674): Extend the integration to expose connections on both the main and
* maintenance ports and add replace the testing coverage via logging with connection based tests.
*
* @tags: [
* requires_replication,
@ -11,133 +13,47 @@
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {describe, before, it} from "jstests/libs/mochalite.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {describe, it} from "jstests/libs/mochalite.js";
describe("Tests for maintenance port usage within JS test replica set helper", function () {
before(function () {
// Check that each node has the maintenance port and the corresponding unix socket set and listening
this.checkMaintenancePortSetAndListening = (conn, rs) => {
jsTest.log.info(`Checking maintenance port and corresponding unix socket are set and listening`);
// Checking connection through TCP
const connToMaintenancePort = rs
? rs.getNewConnectionToMaintenancePort(conn)
: new Mongo(conn.host.split(":")[0] + ":" + conn.maintenancePort);
const dbThroughPort = connToMaintenancePort.getDB("admin");
assert.commandWorked(dbThroughPort.runCommand({ping: 1}));
/** TODO(SERVER-115111): Enable unix domain socket connection testing.
if (!_isWindows()) {
// Checking connection through unix socket
const connToMaintenanceSocket = rs
? rs.getNewConnectionToMaintenanceSocket(conn)
: new Mongo("/tmp/mongodb-" + conn.maintenancePort + ".sock");
const dbThroughSocket = connToMaintenanceSocket.getDB("admin");
assert.commandWorked(dbThroughSocket.runCommand({ping: 1}));
}
*/
};
// Spawn a thread that 1) creates a connection, 2) issues a 'ping' command, and 3) closes
// the connection. See `runParallelConnections`.
this.spawnThreadForParallelConnections = (host, port) => {
return new Thread(
(host, port) => {
try {
const conn = new Mongo(host + ":" + port);
/** TODO(SERVER-115111): Enable unix domain socket connection testing.
// Randomly choose to connect via TCP or unix socket
const chooseTCP = Math.round(Math.random());
const conn =
_isWindows() || chooseTCP
? new Mongo(host + ":" + port)
: new Mongo("/tmp/mongodb-" + port + ".sock");
*/
const db = conn.getDB("admin");
assert.commandWorked(db.runCommand({ping: 1}));
conn.close();
} catch (opError) {
return {success: false, error: opError};
}
return {success: true};
},
host,
port,
);
};
// Check that we can run parallel connections on both main and maintenance ports without
// race conditions
this.runParallelConnections = (conn, rs) => {
jsTest.log.info(`Checking parallel connections on main and maintenance ports`);
const host = conn.host.split(":")[0];
const workers = [];
// Spawning threads for both main and maintenance ports (a low number of threads allows to check that
// we are not triggering any thread sanitizer race condition while avoiding port exhaustion)
const numThreadsPerPort = 4;
for (let i = 0; i < numThreadsPerPort; i++) {
workers.push({portName: "main", thread: this.spawnThreadForParallelConnections(host, conn.port)});
}
for (let i = 0; i < numThreadsPerPort; i++) {
workers.push({
portName: "maintenance",
thread: this.spawnThreadForParallelConnections(
host,
rs ? rs.getMaintenancePort(conn) : conn.maintenancePort,
),
});
}
// Running parallel connections on both main and maintenance ports
workers.forEach((worker) => worker.thread.start());
// Joining threads
for (const {portName, thread} of workers) {
thread.join();
const result = thread.returnData();
describe("Tests for maintenance port usage within JS test helpers", function () {
const checkMaintenancePortSet = (conn, port) => {
assert.eq(port, conn.maintenancePort);
assert(
result && result.success,
`Error while running multiple connection test on ${portName} port: ${tojson(result)}`,
checkLog.checkContainsOnceJson(conn, 21951, {
options: (opts) => {
return opts.net.maintenancePort == port;
},
}),
);
}
};
});
it("Starting up a replica set with maintenance port enabled on all nodes", () => {
const rs = new ReplSetTest({
nodes: 3,
useMaintenancePorts: true,
});
// TODO (SERVER-112674): Remove this. For now, we set the log verbosity very low to ensure we still see the startup logs
const verbosityOptions = {setParameter: {logComponentVerbosity: {verbosity: 0}}};
it("Starting up a replica set with maintenance ports", () => {
const rs = new ReplSetTest({nodes: 3, useMaintenancePorts: true, nodeOptions: verbosityOptions});
rs.startSet();
rs.initiate();
jsTest.log.info("Testing replica set nodes for maintenance port availability");
rs.nodes.forEach((conn) => {
this.checkMaintenancePortSetAndListening(conn, rs);
this.runParallelConnections(conn, rs);
let maintenancePort = rs.getMaintenancePort(conn);
checkMaintenancePortSet(conn, maintenancePort);
});
rs.stopSet();
});
it("Starting up a replica set with a specific node having a maintenance port", () => {
// The choice of which node has the maintenance port is arbitrary. Here we choose the second node
// (index 1). The purpose of this test is to verify that only the node with the maintenance port
// set can be listening to the port.
const rs = new ReplSetTest({nodes: [{}, {maintenancePort: allocatePort()}, {}]});
const rs = new ReplSetTest({nodes: [{}, {maintenancePort: 27021}, {}], nodeOptions: verbosityOptions});
rs.startSet();
rs.initiate();
let countFoundPorts = 0;
jsTest.log.info("Testing replica set nodes for maintenance port availability when it is set");
rs.nodes.forEach((conn) => {
try {
this.checkMaintenancePortSetAndListening(conn, rs);
assert.eq(27021, rs.getMaintenancePort(conn));
checkMaintenancePortSet(conn, 27021);
countFoundPorts += 1;
} catch (e) {
// This should throw if the maintenance port is not set for the node.
@ -147,35 +63,30 @@ describe("Tests for maintenance port usage within JS test replica set helper", f
assert.eq(countFoundPorts, 1);
jsTest.log.info("Testing parallel connections to maintenance port on the node that has it set");
this.runParallelConnections(rs.nodes[1], rs);
rs.stopSet();
});
it("Starting up a sharded cluster with maintenance port enabled on all nodes", () => {
it("Starting up a sharded cluster with maintenance ports", () => {
const st = new ShardingTest({
shards: 1,
mongos: 2,
useMaintenancePorts: true,
other: {nodeOptions: verbosityOptions, configOptions: verbosityOptions, mongosOptions: verbosityOptions},
});
jsTest.log.info("Testing config server nodes");
st.configRS.nodes.forEach((conn) => {
this.checkMaintenancePortSetAndListening(conn, st.configRS);
this.runParallelConnections(conn, st.configRS);
let maintenancePort = st.configRS.getMaintenancePort(conn);
checkMaintenancePortSet(conn, maintenancePort);
});
jsTest.log.info("Testing shard server nodes");
st.rs0.nodes.forEach((conn) => {
this.checkMaintenancePortSetAndListening(conn, st.rs0);
this.runParallelConnections(conn, st.rs0);
let maintenancePort = st.rs0.getMaintenancePort(conn);
checkMaintenancePortSet(conn, maintenancePort);
});
jsTest.log.info("Testing mongos nodes");
st._mongos.forEach((conn) => {
this.checkMaintenancePortSetAndListening(conn);
this.runParallelConnections(conn);
let maintenancePort = conn.maintenancePort;
checkMaintenancePortSet(conn, maintenancePort);
});
st.stop();

View File

@ -133,16 +133,17 @@ describe("Tests for maintenance port usage within JS test helpers", function ()
rs.stopSet();
});
it("Initiate with maintenance port plus bindIp works when fast resolution does not", () => {
let ips = "localhost," + getHostName();
const rs = new ReplSetTest({
nodes: 1,
useMaintenancePorts: true,
nodeOptions: {bind_ip: ips},
});
rs.startSet();
rs.initiate();
// TODO (SERVER-112674) Enable this test once isSelfSlowPath works for the maintenancePort
// it("Initiate with maintenance port plus bindIp works when fast resolution does not", () => {
// let ips = "localhost," + getHostName();
// const rs = new ReplSetTest({
// nodes: 1,
// useMaintenancePorts: true,
// nodeOptions: {bind_ip: ips},
// });
// rs.startSet();
// rs.initiate();
rs.stopSet();
});
// rs.stopSet();
// });
});

View File

@ -177,13 +177,6 @@ CommonAsioSession::CommonAsioSession(
if (tl->loadBalancerPort()) {
_isConnectedToLoadBalancerPort = _local.port() == *tl->loadBalancerPort();
}
if (tl->maintenancePort()) {
auto isUnixSocket =
(_localAddr.getType() == AF_LOCAL || _localAddr.getType() == AF_UNIX);
_isConnectedToMaintenancePort = !isUnixSocket
? _local.port() == *tl->maintenancePort()
: parsePortFromUnixSockPath(_localAddr.toString(true)) == *tl->maintenancePort();
}
} catch (...) {
LOGV2_DEBUG(9079002,
1,
@ -222,10 +215,6 @@ bool CommonAsioSession::isConnectedToLoadBalancerPort() const {
_isConnectedToLoadBalancerPort;
}
bool CommonAsioSession::isConnectedToMaintenancePort() const {
return _isConnectedToMaintenancePort;
}
bool CommonAsioSession::isLoadBalancerPeer() const {
return MONGO_unlikely(clientIsLoadBalancedPeer.shouldFail()) || _isLoadBalancerPeer;
}

View File

@ -137,8 +137,6 @@ public:
bool isConnectedToLoadBalancerPort() const override;
bool isConnectedToMaintenancePort() const override;
bool isLoadBalancerPeer() const override;
void setisLoadBalancerPeer(bool helloHasLoadBalancedOption) override;
@ -321,14 +319,6 @@ protected:
*/
bool _isConnectedToLoadBalancerPort = false;
bool _isLoadBalancerPeer = false;
/**
* Indicates whether the connection targets the maintenance port or its corresponding unix
* socket. These connection are intended to allow high-priority operations during connection
* storms.
*/
bool _isConnectedToMaintenancePort = false;
boost::optional<HostAndPort> _proxiedSrcEndpoint;
boost::optional<HostAndPort> _proxiedDstEndpoint;

View File

@ -97,8 +97,6 @@
namespace mongo {
namespace transport {
class AsioReactor;
class WrappedEndpoint;
class WrappedResolver;
} // namespace transport
} // namespace mongo
namespace asio {
@ -120,12 +118,6 @@ using TcpInfoOption = SocketOption<IPPROTO_TCP, TCP_INFO, tcp_info>;
const Seconds kSessionShutdownTimeout{10};
std::set<mongo::transport::WrappedEndpoint> getEndpoints(
const std::vector<int>& ports,
const std::vector<std::string>& listenAddrs,
const WrappedResolver& resolver,
const AsioTransportLayer::Options& listenerOptions);
bool shouldDiscardSocketDueToLostConnectivity(AsioSession::GenericSocket& peerSocket) {
#ifdef __linux__
if (gPessimisticConnectivityCheckForAcceptedConnections.load()) {
@ -387,12 +379,7 @@ AsioTransportLayer::AsioTransportLayer(const AsioTransportLayer::Options& opts,
std::unique_ptr<SessionManager> sessionManager)
: _ingressReactor(std::make_shared<AsioReactor>()),
_egressReactor(std::make_shared<AsioReactor>()),
_listenerInterfaceMainPort(
std::make_unique<ListenerInterface>(_mutex, std::make_shared<AsioReactor>(), this)),
_listenerInterfaceMaintenancePort(
opts.maintenancePort
? std::make_unique<ListenerInterface>(_mutex, std::make_shared<AsioReactor>(), this)
: nullptr),
_acceptorReactor(std::make_shared<AsioReactor>()),
_sessionManager(std::move(sessionManager)),
_listenerOptions(opts),
_timerService(std::make_unique<TimerService>()) {
@ -580,307 +567,6 @@ private:
Resolver _resolver;
};
std::set<mongo::transport::WrappedEndpoint> getEndpoints(
const std::vector<int>& ports,
const std::vector<std::string>& listenAddrs,
WrappedResolver& resolver,
const AsioTransportLayer::Options& listenerOptions) {
std::set<mongo::transport::WrappedEndpoint> endpoints;
for (const auto& port : ports) {
for (const auto& listenAddr : listenAddrs) {
if (listenAddr.empty()) {
LOGV2_WARNING(23020, "Skipping empty bind address");
continue;
}
const auto& swAddrs =
resolver.resolve(HostAndPort(listenAddr, port), listenerOptions.enableIPv6);
if (!swAddrs.isOK()) {
LOGV2_WARNING(
23021, "Found no addresses for peer", "peer"_attr = swAddrs.getStatus());
continue;
}
const auto& addrs = swAddrs.getValue();
endpoints.insert(addrs.begin(), addrs.end());
}
}
return endpoints;
}
AsioTransportLayer::ListenerInterface::ListenerInterface(stdx::mutex& sharedMutex,
std::shared_ptr<AsioReactor> reactor,
AsioTransportLayer* asioTransportLayer)
: _sharedMutex(sharedMutex),
_acceptorReactor(reactor),
_asioTransportLayer(asioTransportLayer) {};
AsioTransportLayer::ListenerInterface::~ListenerInterface() = default;
StatusWith<std::vector<std::unique_ptr<AsioTransportLayer::AcceptorRecord>>>
AsioTransportLayer::ListenerInterface::_createAcceptorRecords(
const std::vector<int>& ports,
const std::set<WrappedEndpoint>& endpoints,
const Options& listenerOptions) {
std::vector<std::unique_ptr<AcceptorRecord>> acceptorRecords;
for (const auto& addr : endpoints) {
#ifndef _WIN32
if (addr.family() == AF_UNIX) {
if (::unlink(addr.toString().c_str()) == -1) {
auto ec = lastPosixError();
if (ec != posixError(ENOENT)) {
LOGV2_ERROR(23024,
"Failed to unlink socket file",
"path"_attr = addr.toString().c_str(),
"error"_attr = errorMessage(ec));
fassertFailedNoTrace(40486);
}
}
}
#endif
if (addr.family() == AF_INET6 && !listenerOptions.enableIPv6) {
LOGV2_ERROR(23025, "Specified ipv6 bind address, but ipv6 is disabled");
fassertFailedNoTrace(40488);
}
GenericAcceptor acceptor(*getReactor());
try {
acceptor.open(addr->protocol());
} catch (std::exception&) {
// Allow the server to start when "ipv6: true" and "bindIpAll: true", but the
// platform does not support ipv6 (e.g., ipv6 kernel module is not loaded in
// Linux).
auto addrIsBindAll = [&] {
for (auto port : ports) {
if (addr.toString() == fmt::format(":::{}", port)) {
return true;
}
}
return false;
};
if (errno == EAFNOSUPPORT && listenerOptions.enableIPv6 && addr.family() == AF_INET6 &&
addrIsBindAll()) {
LOGV2_WARNING(4206501,
"Failed to bind to {address} as the platform does not support ipv6",
"address"_attr = addr.toString());
continue;
}
throw;
}
setSocketOption(
acceptor, ReuseAddrOption(true), "acceptor reuse address", logv2::LogSeverity::Info());
std::error_code ec;
if (auto af = addr.family(); af == AF_INET || af == AF_INET6) {
if (auto ec = tfo::initAcceptorSocket(acceptor))
return errorCodeToStatus(ec, "setup tcpFastOpenIsConfigured");
}
if (addr.family() == AF_INET6) {
setSocketOption(
acceptor, IPV6OnlyOption(true), "acceptor v6 only", logv2::LogSeverity::Info());
}
(void)acceptor.non_blocking(true, ec);
if (ec) {
return errorCodeToStatus(ec, "setup non_blocking");
}
(void)acceptor.bind(*addr, ec);
if (ec) {
return errorCodeToStatus(ec, "setup bind").withContext(addr.toString());
}
#ifndef _WIN32
if (addr.family() == AF_UNIX) {
setUnixDomainSocketPermissions(addr.toString(),
serverGlobalParams.unixSocketPermissions);
}
#endif
auto endpoint = acceptor.local_endpoint(ec);
if (ec) {
return errorCodeToStatus(ec);
}
auto hostAndPort = endpointToHostAndPort(endpoint);
auto record = std::make_unique<AcceptorRecord>(SockAddr(addr->data(), addr->size()),
std::move(acceptor));
if (_listenerPort == 0 && (addr.family() == AF_INET || addr.family() == AF_INET6)) {
record->address.setPort(hostAndPort.port());
_listenerPort = hostAndPort.port();
}
acceptorRecords.push_back(std::move(record));
}
return std::move(acceptorRecords);
};
StatusWith<std::vector<std::unique_ptr<AsioTransportLayer::AcceptorRecord>>>
AsioTransportLayer::ListenerInterface::_retrieveAllAcceptorRecords(
const std::vector<int>& ports,
const std::vector<std::string>& listenIPAddrs,
const std::vector<std::string>& listenUnixSocketsAddrs,
const Options& listenerOptions) {
// Self-deduplicating list of unique endpoint addresses.
std::set<WrappedEndpoint> inetEndpoints;
std::set<WrappedEndpoint> unixEndpoints;
WrappedResolver resolver(*getReactor());
std::vector<std::unique_ptr<AcceptorRecord>> acceptorRecords;
inetEndpoints = getEndpoints(ports, listenIPAddrs, resolver, listenerOptions);
auto resultIpAddrs = _createAcceptorRecords(ports, inetEndpoints, listenerOptions);
if (!resultIpAddrs.isOK()) {
return resultIpAddrs.getStatus();
}
acceptorRecords = std::move(resultIpAddrs.getValue());
unixEndpoints = getEndpoints(ports, listenUnixSocketsAddrs, resolver, listenerOptions);
auto resultUnixSockets = _createAcceptorRecords(ports, unixEndpoints, listenerOptions);
if (!resultUnixSockets.isOK()) {
return resultUnixSockets.getStatus();
}
auto& acceptorRecordsSockets = resultUnixSockets.getValue();
acceptorRecords.insert(acceptorRecords.end(),
std::make_move_iterator(acceptorRecordsSockets.begin()),
std::make_move_iterator(acceptorRecordsSockets.end()));
return std::move(acceptorRecords);
}
void AsioTransportLayer::ListenerInterface::stopListenerWithLock(
stdx::unique_lock<stdx::mutex>& lk) {
if (auto oldState = _listener.state; oldState != Listener::State::kShutdown) {
_listener.state = Listener::State::kShuttingDown;
if (oldState == Listener::State::kActive) {
while (_listener.state != Listener::State::kShutdown) {
lk.unlock();
_acceptorReactor->stop();
lk.lock();
}
}
}
}
void AsioTransportLayer::ListenerInterface::_runListener(std::string threadName) {
setThreadName(threadName);
stdx::unique_lock lk(_sharedMutex);
ON_BLOCK_EXIT([&] {
if (!lk.owns_lock()) {
lk.lock();
}
_listener.state = Listener::State::kShutdown;
_listener.cv.notify_all();
});
// Because multiple listener interfaces can be active at the same time (one thread per
// interface), we must hold the lock when accessing the shared state in the following
// operations. Currently, listener threads run for _listenerInterfaceMainPort and
// _listenerInterfaceMaintenancePort.
_startListening(lk);
_listener.state = Listener::State::kActive;
_listener.cv.notify_all();
while (!_asioTransportLayer->_isShutdown.load() &&
(_listener.state == Listener::State::kActive)) {
lk.unlock();
_acceptorReactor->run();
lk.lock();
}
_stopAcceptors(lk);
}
void AsioTransportLayer::ListenerInterface::startListener(std::string threadName) {
_listener.thread =
stdx::thread([this, threadName = std::move(threadName)] { _runListener(threadName); });
}
bool AsioTransportLayer::ListenerInterface::isListenerStarted() const {
return _listener.thread.joinable();
}
void AsioTransportLayer::ListenerInterface::waitListenerThreadJoin() {
_listener.thread.join();
}
void AsioTransportLayer::ListenerInterface::waitUntilListenerStarted(
stdx::unique_lock<stdx::mutex>& lk) {
_listener.cv.wait(lk, [this] { return _listener.state != Listener::State::kNew; });
}
std::shared_ptr<AsioReactor> AsioTransportLayer::ListenerInterface::getReactor() const {
return _acceptorReactor;
}
AsioTransportLayer::Listener::State AsioTransportLayer::ListenerInterface::getListenerState()
const {
return _listener.state;
}
void AsioTransportLayer::ListenerInterface::_startListening(WithLock lk) {
if (_asioTransportLayer->_isShutdown.load() ||
_listener.state == Listener::State::kShuttingDown) {
LOGV2_DEBUG(9484000, 3, "Unable to start listening: transport layer in shutdown");
return;
}
const int listenBacklog = serverGlobalParams.listenBacklog
? *serverGlobalParams.listenBacklog
: ProcessInfo::getDefaultListenBacklog();
for (auto& acceptorRecord : _acceptorRecords) {
asio::error_code ec;
(void)acceptorRecord->acceptor.listen(listenBacklog, ec);
if (ec) {
LOGV2_FATAL(31339,
"Error listening for new connections on listen address",
"listenAddrs"_attr = acceptorRecord->address,
"error"_attr = ec.message());
}
_asioTransportLayer->_acceptConnection(acceptorRecord->acceptor);
LOGV2(23015, "Listening on", "address"_attr = acceptorRecord->address);
}
const char* ssl = "off";
#ifdef MONGO_CONFIG_SSL
if (_asioTransportLayer->sslMode() != SSLParams::SSLMode_disabled) {
ssl = "on";
}
#endif
LOGV2(23016, "Waiting for connections", "port"_attr = _listenerPort, "ssl"_attr = ssl);
}
void AsioTransportLayer::ListenerInterface::_stopAcceptors(WithLock lk) {
// Loop through the acceptors and cancel their calls to async_accept. This will prevent new
// connections from being opened.
for (auto& acceptorRecord : _acceptorRecords) {
acceptorRecord->acceptor.cancel();
auto& addr = acceptorRecord->address;
if (addr.getType() == AF_UNIX && !addr.isAnonymousUNIXSocket()) {
auto path = addr.getAddr();
LOGV2(23017, "removing socket file", "path"_attr = path);
if (::unlink(path.c_str()) != 0) {
auto ec = lastPosixError();
LOGV2_WARNING(23022,
"Unable to remove UNIX socket",
"path"_attr = path,
"error"_attr = errorMessage(ec));
}
}
}
}
void AsioTransportLayer::ListenerInterface::setAcceptorRecords(
std::vector<std::unique_ptr<AcceptorRecord>>&& records) {
_acceptorRecords = std::move(records);
}
const std::vector<std::unique_ptr<AsioTransportLayer::AcceptorRecord>>&
AsioTransportLayer::ListenerInterface::getAcceptorRecords() {
return _acceptorRecords;
}
Status makeConnectError(Status status, const HostAndPort& peer, const WrappedEndpoint& endpoint) {
std::string errmsg;
if (peer.toString() != endpoint.toString() && !endpoint.toString().empty()) {
@ -1302,47 +988,7 @@ Future<std::shared_ptr<Session>> AsioTransportLayer::asyncConnect(
return mergedFuture;
}
Status AsioTransportLayer::ListenerInterface::setup(
const std::vector<int>& ports,
const std::vector<std::string>& listenIPAddrs,
const std::vector<std::string>& listenUnixSocketsAddrs,
Options& listenerOptions) {
if (auto foStatus = tfo::ensureInitialized(); !foStatus.isOK()) {
return foStatus;
}
if (!listenerOptions.isIngress() &&
(!listenIPAddrs.empty() || !listenUnixSocketsAddrs.empty())) {
return {ErrorCodes::BadValue,
"Cannot bind to listening sockets with ingress networking is disabled"};
}
auto result =
_retrieveAllAcceptorRecords(ports, listenIPAddrs, listenUnixSocketsAddrs, listenerOptions);
if (!result.isOK()) {
return result.getStatus();
}
setAcceptorRecords(std::move(result.getValue()));
if ((getAcceptorRecords()).empty() && listenerOptions.isIngress()) {
return Status(ErrorCodes::SocketException, "No available addresses/ports to bind to");
}
return Status::OK();
}
Status AsioTransportLayer::setup() {
// Since the load balancer port value is set to zero to disable the port entirely and the
// maintenance port cannot be zero (range of possible value starting from 1), the main port is
// the only port which could be specified as ephemeral.
if (_listenerOptions.port == 0 && _listenerOptions.ipList.size() > 1) {
return Status(ErrorCodes::BadValue,
"Port 0 (ephemeral port) is not allowed when"
" listening on multiple IP interfaces");
}
auto getlistenIPAddrs = [&]() {
std::vector<std::string> listenAddrs;
if (_listenerOptions.ipList.empty() && _listenerOptions.isIngress()) {
listenAddrs = {"127.0.0.1"};
@ -1352,44 +998,154 @@ Status AsioTransportLayer::setup() {
} else if (!_listenerOptions.ipList.empty()) {
listenAddrs = _listenerOptions.ipList;
}
return listenAddrs;
};
auto getUnixDomainSocketAddrs = [&](std::vector<int> ports) -> std::vector<std::string> {
#ifndef _WIN32
std::vector<std::string> listenAddrs;
if (_listenerOptions.useUnixSockets && _listenerOptions.isIngress()) {
for (const auto port : ports) {
listenAddrs.push_back(makeUnixSockPath(port));
listenAddrs.push_back(makeUnixSockPath(_listenerOptions.port));
if (_listenerOptions.loadBalancerPort) {
listenAddrs.push_back(makeUnixSockPath(*_listenerOptions.loadBalancerPort));
}
}
return listenAddrs;
#else
return {};
#endif
};
const auto listenIPAddrs = getlistenIPAddrs();
if (auto foStatus = tfo::ensureInitialized(); !foStatus.isOK()) {
return foStatus;
}
std::vector<int> ports = {_listenerOptions.port};
if (!(_listenerOptions.isIngress()) && !listenAddrs.empty()) {
return {ErrorCodes::BadValue,
"Cannot bind to listening sockets with ingress networking is disabled"};
}
_listenerPort = _listenerOptions.port;
WrappedResolver resolver(*_acceptorReactor);
std::vector<int> ports = {_listenerPort};
if (_listenerOptions.loadBalancerPort) {
ports.push_back(*_listenerOptions.loadBalancerPort);
}
const auto listenUnixDomainSocketAddrs = getUnixDomainSocketAddrs(ports);
auto status = _listenerInterfaceMainPort->setup(
ports, listenIPAddrs, listenUnixDomainSocketAddrs, _listenerOptions);
if (!status.isOK()) {
return status;
// Self-deduplicating list of unique endpoint addresses.
std::set<WrappedEndpoint> endpoints;
for (const auto& port : ports) {
for (const auto& listenAddr : listenAddrs) {
if (listenAddr.empty()) {
LOGV2_WARNING(23020, "Skipping empty bind address");
continue;
}
if (_listenerInterfaceMaintenancePort) {
std::vector<int> ports = {*_listenerOptions.maintenancePort};
const auto listenUnixDomainSocketAddrs = getUnixDomainSocketAddrs(ports);
auto status = _listenerInterfaceMaintenancePort->setup(
ports, listenIPAddrs, listenUnixDomainSocketAddrs, _listenerOptions);
const auto& swAddrs =
resolver.resolve(HostAndPort(listenAddr, port), _listenerOptions.enableIPv6);
if (!swAddrs.isOK()) {
LOGV2_WARNING(
23021, "Found no addresses for peer", "peer"_attr = swAddrs.getStatus());
continue;
}
if (!status.isOK()) {
return status;
const auto& addrs = swAddrs.getValue();
endpoints.insert(addrs.begin(), addrs.end());
}
}
for (const auto& addr : endpoints) {
#ifndef _WIN32
if (addr.family() == AF_UNIX) {
if (::unlink(addr.toString().c_str()) == -1) {
auto ec = lastPosixError();
if (ec != posixError(ENOENT)) {
LOGV2_ERROR(23024,
"Failed to unlink socket file",
"path"_attr = addr.toString().c_str(),
"error"_attr = errorMessage(ec));
fassertFailedNoTrace(40486);
}
}
}
#endif
if (addr.family() == AF_INET6 && !_listenerOptions.enableIPv6) {
LOGV2_ERROR(23025, "Specified ipv6 bind address, but ipv6 is disabled");
fassertFailedNoTrace(40488);
}
GenericAcceptor acceptor(*_acceptorReactor);
try {
acceptor.open(addr->protocol());
} catch (std::exception&) {
// Allow the server to start when "ipv6: true" and "bindIpAll: true", but the platform
// does not support ipv6 (e.g., ipv6 kernel module is not loaded in Linux).
auto addrIsBindAll = [&] {
for (auto port : ports) {
if (addr.toString() == fmt::format(":::{}", port)) {
return true;
}
}
return false;
};
if (errno == EAFNOSUPPORT && _listenerOptions.enableIPv6 && addr.family() == AF_INET6 &&
addrIsBindAll()) {
LOGV2_WARNING(4206501,
"Failed to bind to {address} as the platform does not support ipv6",
"address"_attr = addr.toString());
continue;
}
throw;
}
setSocketOption(
acceptor, ReuseAddrOption(true), "acceptor reuse address", logv2::LogSeverity::Info());
std::error_code ec;
if (auto af = addr.family(); af == AF_INET || af == AF_INET6) {
if (auto ec = tfo::initAcceptorSocket(acceptor))
return errorCodeToStatus(ec, "setup tcpFastOpenIsConfigured");
}
if (addr.family() == AF_INET6) {
setSocketOption(
acceptor, IPV6OnlyOption(true), "acceptor v6 only", logv2::LogSeverity::Info());
}
(void)acceptor.non_blocking(true, ec);
if (ec) {
return errorCodeToStatus(ec, "setup non_blocking");
}
(void)acceptor.bind(*addr, ec);
if (ec) {
return errorCodeToStatus(ec, "setup bind").withContext(addr.toString());
}
#ifndef _WIN32
if (addr.family() == AF_UNIX) {
setUnixDomainSocketPermissions(addr.toString(),
serverGlobalParams.unixSocketPermissions);
}
#endif
auto endpoint = acceptor.local_endpoint(ec);
if (ec) {
return errorCodeToStatus(ec);
}
auto hostAndPort = endpointToHostAndPort(endpoint);
auto record = std::make_unique<AcceptorRecord>(SockAddr(addr->data(), addr->size()),
std::move(acceptor));
if (_listenerOptions.port == 0 && (addr.family() == AF_INET || addr.family() == AF_INET6)) {
if (_listenerPort != _listenerOptions.port) {
return Status(ErrorCodes::BadValue,
"Port 0 (ephemeral port) is not allowed when"
" listening on multiple IP interfaces");
}
_listenerPort = hostAndPort.port();
record->address.setPort(_listenerPort);
}
_acceptorRecords.push_back(std::move(record));
}
if (_acceptorRecords.empty() && _listenerOptions.isIngress()) {
return Status(ErrorCodes::SocketException, "No available addresses/ports to bind to");
}
#ifdef MONGO_CONFIG_SSL
@ -1406,14 +1162,9 @@ Status AsioTransportLayer::setup() {
std::vector<std::pair<SockAddr, int>> AsioTransportLayer::getListenerSocketBacklogQueueDepths()
const {
std::vector<std::pair<SockAddr, int>> queueDepths;
for (auto&& record : _listenerInterfaceMainPort->getAcceptorRecords()) {
for (auto&& record : _acceptorRecords) {
queueDepths.push_back({SockAddr(record->address), record->backlogQueueDepth.load()});
}
if (_listenerInterfaceMaintenancePort) {
for (auto&& record : _listenerInterfaceMaintenancePort->getAcceptorRecords()) {
queueDepths.push_back({SockAddr(record->address), record->backlogQueueDepth.load()});
}
}
return queueDepths;
}
@ -1421,16 +1172,10 @@ void AsioTransportLayer::appendStatsForServerStatus(BSONObjBuilder* bob) const {
bob->append("listenerProcessingTime", _listenerProcessingTime.load().toBSON());
BSONArrayBuilder queueDepthsArrayBuilder(
bob->subarrayStart("listenerSocketBacklogQueueDepths"));
for (const auto& record : _listenerInterfaceMainPort->getAcceptorRecords()) {
for (const auto& record : _acceptorRecords) {
BSONObjBuilder{queueDepthsArrayBuilder.subobjStart()}.append(
record->address.toString(), record->backlogQueueDepth.load());
}
if (_listenerInterfaceMaintenancePort) {
for (const auto& record : _listenerInterfaceMaintenancePort->getAcceptorRecords()) {
BSONObjBuilder{queueDepthsArrayBuilder.subobjStart()}.append(
record->address.toString(), record->backlogQueueDepth.load());
}
}
queueDepthsArrayBuilder.done();
bob->append("connsDiscardedDueToClientDisconnect", _discardedDueToClientDisconnect.get());
bob->append("connsRejectedDueToMaxPendingProxyProtocolHeader",
@ -1448,6 +1193,75 @@ void AsioTransportLayer::appendStatsForServerStatus(BSONObjBuilder* bob) const {
void AsioTransportLayer::appendStatsForFTDC(BSONObjBuilder&) const {}
void AsioTransportLayer::_runListener() {
setThreadName("listener");
stdx::unique_lock lk(_mutex);
ON_BLOCK_EXIT([&] {
if (!lk.owns_lock()) {
lk.lock();
}
_listener.state = Listener::State::kShutdown;
_listener.cv.notify_all();
});
if (_isShutdown.load() || _listener.state == Listener::State::kShuttingDown) {
LOGV2_DEBUG(9484000, 3, "Unable to start listening: transport layer in shutdown");
return;
}
const int listenBacklog = serverGlobalParams.listenBacklog
? *serverGlobalParams.listenBacklog
: ProcessInfo::getDefaultListenBacklog();
for (auto& acceptorRecord : _acceptorRecords) {
asio::error_code ec;
(void)acceptorRecord->acceptor.listen(listenBacklog, ec);
if (ec) {
LOGV2_FATAL(31339,
"Error listening for new connections on listen address",
"listenAddrs"_attr = acceptorRecord->address,
"error"_attr = ec.message());
}
_acceptConnection(acceptorRecord->acceptor);
LOGV2(23015, "Listening on", "address"_attr = acceptorRecord->address);
}
const char* ssl = "off";
#ifdef MONGO_CONFIG_SSL
if (sslMode() != SSLParams::SSLMode_disabled) {
ssl = "on";
}
#endif
LOGV2(23016, "Waiting for connections", "port"_attr = _listenerPort, "ssl"_attr = ssl);
_listener.state = Listener::State::kActive;
_listener.cv.notify_all();
while (!_isShutdown.load() && (_listener.state == Listener::State::kActive)) {
lk.unlock();
_acceptorReactor->run();
lk.lock();
}
// Loop through the acceptors and cancel their calls to async_accept. This will prevent new
// connections from being opened.
for (auto& acceptorRecord : _acceptorRecords) {
acceptorRecord->acceptor.cancel();
auto& addr = acceptorRecord->address;
if (addr.getType() == AF_UNIX && !addr.isAnonymousUNIXSocket()) {
auto path = addr.getAddr();
LOGV2(23017, "removing socket file", "path"_attr = path);
if (::unlink(path.c_str()) != 0) {
auto ec = lastPosixError();
LOGV2_WARNING(23022,
"Unable to remove UNIX socket",
"path"_attr = path,
"error"_attr = errorMessage(ec));
}
}
}
}
Status AsioTransportLayer::start() {
stdx::unique_lock lk(_mutex);
if (_isShutdown.load()) {
@ -1456,25 +1270,14 @@ Status AsioTransportLayer::start() {
}
if (_listenerOptions.isIngress()) {
// Only start the listener threads if the TL wasn't shut down before start() was invoked.
if (_listenerInterfaceMainPort->getListenerState() == Listener::State::kNew &&
(!_listenerInterfaceMaintenancePort ||
_listenerInterfaceMaintenancePort->getListenerState() == Listener::State::kNew)) {
// Only start the listener thread if the TL wasn't shut down before start() was invoked.
if (_listener.state == Listener::State::kNew) {
invariant(_sessionManager);
_listenerInterfaceMainPort->startListener("listener");
if (_listenerInterfaceMaintenancePort) {
_listenerInterfaceMaintenancePort->startListener("listenerForMaintenance");
}
_listenerInterfaceMainPort->waitUntilListenerStarted(lk);
if (_listenerInterfaceMaintenancePort) {
_listenerInterfaceMaintenancePort->waitUntilListenerStarted(lk);
}
_listener.thread = stdx::thread([this] { _runListener(); });
_listener.cv.wait(lk, [&] { return _listener.state != Listener::State::kNew; });
}
} else {
invariant(_listenerInterfaceMainPort->getAcceptorRecords().empty());
if (_listenerInterfaceMaintenancePort) {
invariant(_listenerInterfaceMaintenancePort->getAcceptorRecords().empty());
}
invariant(_acceptorRecords.empty());
}
return Status::OK();
@ -1506,25 +1309,26 @@ void AsioTransportLayer::stopAcceptingSessionsWithLock(stdx::unique_lock<stdx::m
return;
}
_listenerInterfaceMainPort->stopListenerWithLock(lk);
if (_listenerInterfaceMaintenancePort) {
_listenerInterfaceMaintenancePort->stopListenerWithLock(lk);
if (auto oldState = _listener.state; oldState != Listener::State::kShutdown) {
_listener.state = Listener::State::kShuttingDown;
if (oldState == Listener::State::kActive) {
while (_listener.state != Listener::State::kShutdown) {
lk.unlock();
_acceptorReactor->stop();
lk.lock();
}
}
}
if (!_listenerInterfaceMainPort->isListenerStarted() &&
(!_listenerInterfaceMaintenancePort ||
!_listenerInterfaceMaintenancePort->isListenerStarted())) {
// If the listeners never started, then we can return now
auto thread = std::exchange(_listener.thread, {});
if (!thread.joinable()) {
// If the listener never started, then we can return now
return;
}
// Release the lock and wait for the threads to die
// Release the lock and wait for the thread to die
lk.unlock();
_listenerInterfaceMainPort->waitListenerThreadJoin();
if (_listenerInterfaceMaintenancePort &&
_listenerInterfaceMaintenancePort->isListenerStarted()) {
_listenerInterfaceMaintenancePort->waitListenerThreadJoin();
}
thread.join();
}
void AsioTransportLayer::stopAcceptingSessions() {
@ -1637,7 +1441,7 @@ void AsioTransportLayer::_acceptConnection(GenericAcceptor& acceptor) {
_acceptConnection(acceptor);
return;
}
session->parseProxyProtocolHeader(_listenerInterfaceMainPort->getReactor())
session->parseProxyProtocolHeader(_acceptorReactor)
.getAsync([this, session = std::move(session), t = std::move(token)](Status s) {
if (s.isOK()) {
invariant(!!_sessionManager);
@ -1658,7 +1462,8 @@ void AsioTransportLayer::_acceptConnection(GenericAcceptor& acceptor) {
LOGV2_WARNING(23023, "Error accepting new connection", "error"_attr = e);
}
// TODO(SERVER-114929): Fix possible race condition on _listenerProcessingTime
// _acceptConnection() is accessed by only one thread (i.e. the listener thread), so an
// atomic increment is not required here
_listenerProcessingTime.store(_listenerProcessingTime.load() + timer.elapsed());
_acceptConnection(acceptor);
};
@ -1675,24 +1480,11 @@ void AsioTransportLayer::_trySetListenerSocketBacklogQueueDepth(GenericAcceptor&
try {
if (!isTcp(acceptor.local_endpoint().protocol()))
return;
auto matchingRecord = std::ranges::find_if(
_listenerInterfaceMainPort->getAcceptorRecords(), [&](const auto& record) {
auto matchingRecord =
std::find_if(begin(_acceptorRecords), end(_acceptorRecords), [&](const auto& record) {
return acceptor.local_endpoint() == record->acceptor.local_endpoint();
});
if (_listenerInterfaceMaintenancePort &&
matchingRecord == std::end(_listenerInterfaceMainPort->getAcceptorRecords())) {
// No match with the main ports acceptors, search over the maintenance port acceptors
matchingRecord = std::ranges::find_if(
_listenerInterfaceMaintenancePort->getAcceptorRecords(), [&](const auto& record) {
return acceptor.local_endpoint() == record->acceptor.local_endpoint();
});
invariant(matchingRecord !=
std::ranges::end(_listenerInterfaceMaintenancePort->getAcceptorRecords()));
} else {
invariant(matchingRecord !=
std::ranges::end(_listenerInterfaceMainPort->getAcceptorRecords()));
}
invariant(matchingRecord != std::end(_acceptorRecords));
TcpInfoOption tcpi;
acceptor.get_option(tcpi);
(*matchingRecord)->backlogQueueDepth.store(tcpi->tcpi_unacked);

View File

@ -72,7 +72,6 @@ extern FailPoint asioTransportLayerHangDuringAcceptCallback;
class AsioNetworkingBaton;
class AsioReactor;
class AsioSession;
class WrappedEndpoint;
/**
* A TransportLayer implementation based on ASIO networking primitives.
@ -105,7 +104,6 @@ public:
int port = ServerGlobalParams::DefaultDBPort; // port to bind to
boost::optional<int> loadBalancerPort; // accepts load balancer connections
boost::optional<int> maintenancePort; // accepts maintenance connections
std::vector<std::string> ipList; // addresses to bind to
#ifndef _WIN32
bool useUnixSockets = true; // whether to allow UNIX sockets in ipList
@ -214,18 +212,14 @@ public:
return TransportProtocol::MongoRPC;
}
int listenerMainPort() const {
return _listenerInterfaceMainPort->getListenerPort();
int listenerPort() const {
return _listenerPort;
}
boost::optional<int> loadBalancerPort() const {
return _listenerOptions.loadBalancerPort;
}
boost::optional<int> maintenancePort() const {
return _listenerOptions.maintenancePort;
}
SessionManager* getSessionManager() const override {
return _sessionManager.get();
}
@ -290,14 +284,16 @@ private:
// server has already accepted too many of such connections, returns `nullptr`.
std::unique_ptr<TokenType> _makeParseProxyTokenIfPossible();
void _runListener();
void _trySetListenerSocketBacklogQueueDepth(GenericAcceptor& acceptor);
stdx::mutex _mutex;
void stopAcceptingSessionsWithLock(stdx::unique_lock<stdx::mutex> lk);
// There are three reactors that are used by AsioTransportLayer. The _ingressReactor contains
// all the accepted sockets and all ingress networking activity. The _egressReactor contains
// egress connections.
// all the accepted sockets and all ingress networking activity. The _acceptorReactor contains
// all the sockets in _acceptors. The _egressReactor contains egress connections.
//
// AsioTransportLayer should never call run() on the _ingressReactor.
// In synchronous mode, this will cause a massive performance degradation due to
@ -306,6 +302,10 @@ private:
// with the acceptors epoll set, thus avoiding those wakeups. Calling run will
// undo that benefit.
//
// AsioTransportLayer should run its own thread that calls run() on the _acceptorReactor
// to process calls to async_accept - this is the equivalent of the "listener" thread in
// other TransportLayers.
//
// The underlying problem that caused this is here:
// https://github.com/chriskohlhoff/asio/issues/240
//
@ -315,12 +315,14 @@ private:
// it.
std::shared_ptr<AsioReactor> _ingressReactor;
std::shared_ptr<AsioReactor> _egressReactor;
std::shared_ptr<AsioReactor> _acceptorReactor;
#ifdef MONGO_CONFIG_SSL
synchronized_value<std::shared_ptr<const SSLConnectionContext>> _sslContext;
#endif
struct AcceptorRecord;
std::vector<std::unique_ptr<AcceptorRecord>> _acceptorRecords;
// Only used if _listenerOptions.async is false.
struct Listener {
@ -341,94 +343,13 @@ private:
stdx::condition_variable cv;
State state{State::kNew};
};
class ListenerInterface {
public:
ListenerInterface(stdx::mutex& sharedMutex,
std::shared_ptr<AsioReactor> reactor,
AsioTransportLayer* layer);
~ListenerInterface();
Status setup(const std::vector<int>& ports,
const std::vector<std::string>& listenIPAddrs,
const std::vector<std::string>& listenUnixSocketsAddr,
Options& listenerOptions);
void stopListenerWithLock(stdx::unique_lock<stdx::mutex>& lk);
AsioTransportLayer::Listener::State getListenerState() const;
std::shared_ptr<AsioReactor> getReactor() const;
void startListener(std::string threadName);
bool isListenerStarted() const;
int getListenerPort() const {
return _listenerPort;
}
void waitUntilListenerStarted(stdx::unique_lock<stdx::mutex>& lk);
void waitListenerThreadJoin();
void setAcceptorRecords(
std::vector<std::unique_ptr<AsioTransportLayer::AcceptorRecord>>&& records);
const std::vector<std::unique_ptr<AsioTransportLayer::AcceptorRecord>>&
getAcceptorRecords();
private:
void _runListener(std::string threadName);
void _startListening(WithLock lk);
void _waitForConnections(stdx::unique_lock<stdx::mutex>& lk);
void _stopAcceptors(WithLock lk);
StatusWith<std::vector<std::unique_ptr<AsioTransportLayer::AcceptorRecord>>>
_createAcceptorRecords(const std::vector<int>& ports,
const std::set<WrappedEndpoint>& endpoints,
const Options& listenerOptions);
// The lock is not necessary for this method since there are no concurrent operations
// accessing the shared state.
StatusWith<std::vector<std::unique_ptr<AsioTransportLayer::AcceptorRecord>>>
_retrieveAllAcceptorRecords(const std::vector<int>& ports,
const std::vector<std::string>& listenIPAddrs,
const std::vector<std::string>& listenUnixSocketsAddrs,
const Options& listenerOptions);
// The real incoming port in case of the corresponding listener option port is 0
// (ephemeral).
int _listenerPort = 0;
stdx::mutex& _sharedMutex;
// The _acceptorReactor contains all the sockets in _acceptors.
std::shared_ptr<AsioReactor> _acceptorReactor;
std::vector<std::unique_ptr<AsioTransportLayer::AcceptorRecord>> _acceptorRecords;
Listener _listener;
AsioTransportLayer* _asioTransportLayer;
};
// AsioTransportLayer should run its own thread that calls run() on _listenerInterfaceMainPort's
// _acceptorReactor to process calls to async_accept on the main port - this is the equivalent
// of the "listener" thread in other TransportLayers. If the maintenance port is specified,
// AsioTransportLayer should run a second thread that calls run() on
// _listenerInterfaceMaintenancePort's _acceptorReactor to process calls to async_accept on the
// maintenance port.
const std::unique_ptr<ListenerInterface> _listenerInterfaceMainPort;
const std::unique_ptr<ListenerInterface> _listenerInterfaceMaintenancePort;
std::shared_ptr<SessionManager> _sessionManager;
Options _listenerOptions;
// The real incoming port in case of _listenerOptions.port==0 (ephemeral).
int _listenerPort = 0;
Atomic<bool> _isShutdown{false};

View File

@ -298,7 +298,7 @@ public:
LOGV2(6109515, "creating test client connection");
auto& fp = asioTransportLayerHangDuringAcceptCallback;
auto timesEntered = fp.setMode(FailPoint::alwaysOn);
ConnectionThread connectThread(tla().listenerMainPort(), onConnectFunc);
ConnectionThread connectThread(tla().listenerPort(), onConnectFunc);
fp.waitForTimesEntered(timesEntered + 1);
connectThread.wait();
@ -309,7 +309,7 @@ public:
// Using a second connection as a means to wait for the server to process the closed
// connection and move on to accept the next connection.
ConnectionThread dummyConnection(tla().listenerMainPort(), nullptr);
ConnectionThread dummyConnection(tla().listenerPort(), nullptr);
dummyConnection.wait();
sessionsCreated.wait(0);
@ -333,7 +333,7 @@ TEST(AsioTransportLayer, ListenerPortZeroTreatedAsEphemeral) {
TestFixture tf;
tf.sessionManager().setOnStartSession([&](auto&&) { connected.set(true); });
int port = tf.tla().listenerMainPort();
int port = tf.tla().listenerPort();
ASSERT_GT(port, 0);
LOGV2(6109514, "AsioTransportLayer listening", "port"_attr = port);
@ -439,9 +439,9 @@ TEST(AsioTransportLayer, TCPCheckQueueDepth) {
tf.stopHangDuringAcceptingConnection();
});
ConnectionThread connectThread1(tf.tla().listenerMainPort());
ConnectionThread connectThread2(tf.tla().listenerMainPort());
ConnectionThread connectThread3(tf.tla().listenerMainPort());
ConnectionThread connectThread1(tf.tla().listenerPort());
ConnectionThread connectThread2(tf.tla().listenerPort());
ConnectionThread connectThread3(tf.tla().listenerPort());
tf.waitForHangDuringAcceptingFirstConnection();
connectThread1.wait();
@ -456,7 +456,7 @@ TEST(AsioTransportLayer, TCPCheckQueueDepth) {
ASSERT_EQ(depths.size(), 1);
auto depth = depths[0];
ASSERT_EQ(depth.first.getPort(), tf.tla().listenerMainPort());
ASSERT_EQ(depth.first.getPort(), tf.tla().listenerPort());
ASSERT_EQ(depth.second, 2);
@ -469,8 +469,7 @@ TEST(AsioTransportLayer, TCPCheckQueueDepth) {
ASSERT_EQ(queueDepthsArray.size(), 1);
const auto& queueDepthObj = queueDepthsArray[0].Obj();
ASSERT_EQ(HostAndPort(queueDepthObj.firstElementFieldName()).port(),
tf.tla().listenerMainPort());
ASSERT_EQ(HostAndPort(queueDepthObj.firstElementFieldName()).port(), tf.tla().listenerPort());
ASSERT_EQ(queueDepthObj.firstElement().Int(), 2);
}
#endif
@ -481,7 +480,7 @@ TEST(AsioTransportLayer, ThrowOnNetworkErrorInEnsureSync) {
tf.sessionManager().setOnStartSession(
[&](test::SessionThread& st) { mockSessionCreated.set(&st); });
ConnectionThread connectThread(tf.tla().listenerMainPort(), &setNoLinger);
ConnectionThread connectThread(tf.tla().listenerPort(), &setNoLinger);
// We set the timeout to ensure that the setsockopt calls are actually made in ensureSync()
auto& st = *mockSessionCreated.get();
@ -509,7 +508,7 @@ TEST(AsioTransportLayer, SourceSyncTimeoutTimesOut) {
st.session()->setTimeout(Milliseconds{500});
st.schedule([&](auto& session) { received.set(session.sourceMessage()); });
});
SyncClient conn(tf.tla().listenerMainPort());
SyncClient conn(tf.tla().listenerPort());
ASSERT_EQ(received.get().getStatus(), ErrorCodes::NetworkTimeout);
}
@ -554,7 +553,7 @@ TEST(AsioTransportLayer, SourceSyncTimeoutSucceeds) {
st.session()->setTimeout(Milliseconds{500});
st.schedule([&](auto& session) { received.set(session.sourceMessage()); });
});
SyncClient conn(tf.tla().listenerMainPort());
SyncClient conn(tf.tla().listenerPort());
ping(conn); // This time we send a message
ASSERT_OK(received.get().getStatus());
}
@ -586,7 +585,7 @@ TEST(AsioTransportLayer, IngressPhysicalNetworkMetricsTest) {
responsed.promise.setFrom(session.sinkMessage(resp));
});
});
SyncClient conn(tf.tla().listenerMainPort());
SyncClient conn(tf.tla().listenerPort());
auto stats = test::NetworkConnectionStats::get(NetworkCounter::ConnectionType::kIngress);
auto ec = conn.write(req.buf(), req.size());
ASSERT_FALSE(ec) << errorMessage(ec);
@ -608,7 +607,7 @@ TEST(AsioTransportLayer, SwitchTimeoutModes) {
tf.sessionManager().setOnStartSession(
[&](test::SessionThread& st) { mockSessionCreated.set(&st); });
SyncClient conn(tf.tla().listenerMainPort());
SyncClient conn(tf.tla().listenerPort());
auto& st = *mockSessionCreated.get();
{
@ -746,7 +745,7 @@ void runTfoScenario(bool serverOn, bool clientOn, bool expectTfo) {
test::BlockingQueue<StatusWith<Message>> received;
auto connectOnce = [&] {
TfoClient conn(tf.tla().listenerMainPort());
TfoClient conn(tf.tla().listenerPort());
ping(conn);
ASSERT_OK(received.pop().getStatus());
};
@ -1054,7 +1053,7 @@ TEST_F(AsioTransportLayerWithServiceContextTest, ShutdownDuringSSLHandshake) {
* The goal is to simulate a server crash, and verify the behavior of the client, during the
* handshake process.
*/
int port = tla().listenerMainPort();
int port = tla().listenerPort();
DBClientConnection conn;
conn.setSoTimeout(1); // 1 second timeout
@ -1163,7 +1162,7 @@ public:
configureSessionManager(*sessionManager);
auto tl = makeTLA(std::move(sessionManager));
const auto listenerPort = tl->listenerMainPort();
const auto listenerPort = tl->listenerPort();
auto* svcCtx = getServiceContext();
svcCtx->getService()->setServiceEntryPoint(
@ -1719,7 +1718,7 @@ private:
auto tla = checked_cast<AsioTransportLayer*>(
_sc->getTransportLayerManager()->getDefaultEgressLayer());
HostAndPort localTarget(testHostName(), tla->listenerMainPort());
HostAndPort localTarget(testHostName(), tla->listenerPort());
return _sc->getTransportLayerManager()
->getDefaultEgressLayer()

View File

@ -167,13 +167,6 @@ public:
void setisLoadBalancerPeer(bool helloHasLoadBalancedOption) final;
/**
* The maintenance port is unavailable with grpc enabled.
*/
bool isConnectedToMaintenancePort() const final {
return false;
}
/**
* All gRPC sessions are considered bound to the operation state.
*/

View File

@ -89,10 +89,6 @@ public:
return false;
};
bool isConnectedToMaintenancePort() const override {
return false;
}
void setisLoadBalancerPeer(bool helloHasLoadBalancedOption) override {}
bool bindsToOperationState() const override {

View File

@ -183,12 +183,6 @@ public:
*/
virtual void setisLoadBalancerPeer(bool helloHasLoadBalancedOption) = 0;
/**
* Returns true if the connection is on the maintenance port or corresponding unix socket.
*/
virtual bool isConnectedToMaintenancePort() const = 0;
/**
* Returns true if this session binds to the operation state, which implies open cursors and
* in-progress transactions should be killed upon client disconnection.

View File

@ -93,7 +93,7 @@ public:
uassertStatusOK(getServiceContext()->getTransportLayerManager()->setup());
uassertStatusOK(getServiceContext()->getTransportLayerManager()->start());
_asioListenAddress = HostAndPort("127.0.0.1", _asioTL->listenerMainPort());
_asioListenAddress = HostAndPort("127.0.0.1", _asioTL->listenerPort());
_grpcEgressReactor = std::dynamic_pointer_cast<grpc::GRPCReactor>(
_grpcTL->getReactor(TransportLayer::WhichReactor::kEgress));
}
@ -423,7 +423,7 @@ private:
#define ASSERT_ASIO_ECHO_SUCCEEDS(tl) \
{ \
auto swSession = tl.connect(HostAndPort("localhost", tl.listenerMainPort()), \
auto swSession = tl.connect(HostAndPort("localhost", tl.listenerPort()), \
ConnectSSLMode::kEnableSSL, \
grpc::CommandServiceTestFixtures::kDefaultConnectTimeout, \
boost::none); \

View File

@ -261,6 +261,7 @@ TransportLayerManagerImpl::makeDefaultEgressTransportLayer() {
std::make_unique<transport::AsioTransportLayer>(opts, nullptr));
}
// TODO: SERVER-112674 Open and listen for connections on maintenancePort
std::unique_ptr<TransportLayerManager> TransportLayerManagerImpl::createWithConfig(
const ServerGlobalParams* config,
ServiceContext* svcCtx,
@ -278,7 +279,6 @@ std::unique_ptr<TransportLayerManager> TransportLayerManagerImpl::createWithConf
{
AsioTransportLayer::Options opts(config);
opts.loadBalancerPort = std::move(loadBalancerPort);
opts.maintenancePort = std::move(maintenancePort);
auto sm = std::make_unique<AsioSessionManager>(svcCtx, observers);
auto tl = std::make_unique<AsioTransportLayer>(opts, std::move(sm));

View File

@ -283,17 +283,6 @@ mongo_cc_library(
],
)
mongo_cc_unit_test(
name = "socket_utils_test",
srcs = [
"socket_utils_test.cpp",
],
tags = ["mongo_unittest_eighth_group"],
deps = [
"//src/mongo/unittest",
],
)
mongo_cc_library(
name = "ssl_util",
srcs = [

View File

@ -247,29 +247,6 @@ std::string makeUnixSockPath(int port, StringData label) {
return stream << port << ".sock";
}
int parsePortFromUnixSockPath(StringData path) {
constexpr StringData extension = ".sock";
if (!path.ends_with(extension)) {
return -1;
}
path.remove_suffix(extension.size());
const auto lastHyphenIndex = path.rfind('-');
if (lastHyphenIndex == StringData::npos) {
return -1;
}
path.remove_prefix(lastHyphenIndex + 1);
int port;
const char* const begin = path.data();
const char* const end = begin + path.size();
const auto result = std::from_chars(begin, end, port);
if (result.ec != std::errc{} || result.ptr != end) {
return -1;
}
return port;
}
#ifndef _WIN32
void setUnixDomainSocketPermissions(const std::string& path, int permissions) {
if (::chmod(path.c_str(), permissions) == -1) {

View File

@ -49,15 +49,6 @@ void setSocketKeepAliveParams(int sock,
std::string makeUnixSockPath(int port, StringData label = "");
/**
* Extracts the port number from the specified unix domain socket path name, under the assumption
* that the path was produced by a call to makeUnixSockPath, which takes a port number as an
* argument.
* Returns -1 if an error occurs.
* Note that this function assumes that the port passed to makeUnixSockPath was not negative.
*/
int parsePortFromUnixSockPath(StringData path);
inline bool isUnixDomainSocket(StringData hostname) {
return hostname.find('/') != std::string::npos;
}

View File

@ -1,59 +0,0 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/util/net/socket_utils.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
TEST(UnixSockPath, mustContainHyphen) {
const auto path = "/tmp/mongodb27018.sock";
ASSERT_EQ(parsePortFromUnixSockPath(path), -1);
}
TEST(UnixSockPath, mustContainPort) {
const auto path = "/tmp/mongodb-.sock";
ASSERT_EQ(parsePortFromUnixSockPath(path), -1);
}
TEST(UnixSockPath, mustHaveCorrectExtension) {
const auto path = "/tmp/mongodb-27018";
ASSERT_EQ(parsePortFromUnixSockPath(path), -1);
}
TEST(UnixSockPath, inverseOfMakeUnixSockPath) {
const int port = 27018;
const auto path = makeUnixSockPath(port);
ASSERT_EQ(parsePortFromUnixSockPath(path), port);
}
} // namespace mongo