From bd070b078f8adb6a65f1357cb4adbf22d8352d72 Mon Sep 17 00:00:00 2001 From: "auto-revert-app[bot]" <166078896+auto-revert-app[bot]@users.noreply.github.com> Date: Wed, 10 Dec 2025 22:27:09 +0000 Subject: [PATCH] Revert "SERVER-112674 Open and listen for connections on maintenance port (#43939)" (#45095) Co-authored-by: auto-revert-processor GitOrigin-RevId: 46d893eda4b592dde694e1f4bc024e44ea19befe --- jstests/libs/replsettest.js | 10 - .../noPassthrough/network/maintenance_port.js | 153 +--- .../replication/maintenance_port_reconfig.js | 23 +- .../transport/asio/asio_session_impl.cpp | 11 - src/mongo/transport/asio/asio_session_impl.h | 10 - .../transport/asio/asio_transport_layer.cpp | 686 ++++++------------ .../transport/asio/asio_transport_layer.h | 109 +-- .../asio/asio_transport_layer_test.cpp | 35 +- src/mongo/transport/grpc/grpc_session.h | 7 - src/mongo/transport/mock_session.h | 4 - src/mongo/transport/session.h | 6 - .../transport_layer_manager_grpc_test.cpp | 4 +- .../transport_layer_manager_impl.cpp | 2 +- src/mongo/util/net/BUILD.bazel | 11 - src/mongo/util/net/socket_utils.cpp | 23 - src/mongo/util/net/socket_utils.h | 9 - src/mongo/util/net/socket_utils_test.cpp | 59 -- 17 files changed, 318 insertions(+), 844 deletions(-) delete mode 100644 src/mongo/util/net/socket_utils_test.cpp diff --git a/jstests/libs/replsettest.js b/jstests/libs/replsettest.js index 0366c9adffa..508d055baeb 100644 --- a/jstests/libs/replsettest.js +++ b/jstests/libs/replsettest.js @@ -316,16 +316,6 @@ export class ReplSetTest { return this._maintenancePorts[translatedN]; } - getNewConnectionToMaintenancePort(node) { - const maintenancePort = this.getMaintenancePort(node); - return new Mongo(node.host.split(":")[0] + ":" + maintenancePort); - } - - getNewConnectionToMaintenanceSocket(node) { - const maintenancePort = this.getMaintenancePort(node); - return new Mongo("/tmp/mongodb-" + maintenancePort + ".sock"); - } - getDbPath(node) { // Get a replica set node (check for use of bridge). const n = this.getNodeId(node); diff --git a/jstests/noPassthrough/network/maintenance_port.js b/jstests/noPassthrough/network/maintenance_port.js index cdb988f7383..2437f8d3839 100644 --- a/jstests/noPassthrough/network/maintenance_port.js +++ b/jstests/noPassthrough/network/maintenance_port.js @@ -1,6 +1,8 @@ /** - * Test that we can connect to the maintenance port either through TCP or Unix socket (only on non-Windows - * platforms) and they can handle connections in parallel with the main port. + * Test that the maintenance port option is specified correctly in a ReplSetTest and a ShardingTest. + * + * TODO (SERVER-112674): Extend the integration to expose connections on both the main and + * maintenance ports and add replace the testing coverage via logging with connection based tests. * * @tags: [ * requires_replication, @@ -11,133 +13,47 @@ import {ReplSetTest} from "jstests/libs/replsettest.js"; import {ShardingTest} from "jstests/libs/shardingtest.js"; -import {describe, before, it} from "jstests/libs/mochalite.js"; -import {Thread} from "jstests/libs/parallelTester.js"; +import {describe, it} from "jstests/libs/mochalite.js"; -describe("Tests for maintenance port usage within JS test replica set helper", function () { - before(function () { - // Check that each node has the maintenance port and the corresponding unix socket set and listening - this.checkMaintenancePortSetAndListening = (conn, rs) => { - jsTest.log.info(`Checking maintenance port and corresponding unix socket are set and listening`); - // Checking connection through TCP - const connToMaintenancePort = rs - ? rs.getNewConnectionToMaintenancePort(conn) - : new Mongo(conn.host.split(":")[0] + ":" + conn.maintenancePort); - const dbThroughPort = connToMaintenancePort.getDB("admin"); - assert.commandWorked(dbThroughPort.runCommand({ping: 1})); - - /** TODO(SERVER-115111): Enable unix domain socket connection testing. - if (!_isWindows()) { - // Checking connection through unix socket - const connToMaintenanceSocket = rs - ? rs.getNewConnectionToMaintenanceSocket(conn) - : new Mongo("/tmp/mongodb-" + conn.maintenancePort + ".sock"); - const dbThroughSocket = connToMaintenanceSocket.getDB("admin"); - assert.commandWorked(dbThroughSocket.runCommand({ping: 1})); - } - */ - }; - - // Spawn a thread that 1) creates a connection, 2) issues a 'ping' command, and 3) closes - // the connection. See `runParallelConnections`. - this.spawnThreadForParallelConnections = (host, port) => { - return new Thread( - (host, port) => { - try { - const conn = new Mongo(host + ":" + port); - - /** TODO(SERVER-115111): Enable unix domain socket connection testing. - // Randomly choose to connect via TCP or unix socket - const chooseTCP = Math.round(Math.random()); - const conn = - _isWindows() || chooseTCP - ? new Mongo(host + ":" + port) - : new Mongo("/tmp/mongodb-" + port + ".sock"); - */ - const db = conn.getDB("admin"); - - assert.commandWorked(db.runCommand({ping: 1})); - - conn.close(); - } catch (opError) { - return {success: false, error: opError}; - } - return {success: true}; +describe("Tests for maintenance port usage within JS test helpers", function () { + const checkMaintenancePortSet = (conn, port) => { + assert.eq(port, conn.maintenancePort); + assert( + checkLog.checkContainsOnceJson(conn, 21951, { + options: (opts) => { + return opts.net.maintenancePort == port; }, - host, - port, - ); - }; + }), + ); + }; - // Check that we can run parallel connections on both main and maintenance ports without - // race conditions - this.runParallelConnections = (conn, rs) => { - jsTest.log.info(`Checking parallel connections on main and maintenance ports`); - const host = conn.host.split(":")[0]; - const workers = []; + // TODO (SERVER-112674): Remove this. For now, we set the log verbosity very low to ensure we still see the startup logs + const verbosityOptions = {setParameter: {logComponentVerbosity: {verbosity: 0}}}; - // Spawning threads for both main and maintenance ports (a low number of threads allows to check that - // we are not triggering any thread sanitizer race condition while avoiding port exhaustion) - const numThreadsPerPort = 4; - for (let i = 0; i < numThreadsPerPort; i++) { - workers.push({portName: "main", thread: this.spawnThreadForParallelConnections(host, conn.port)}); - } - for (let i = 0; i < numThreadsPerPort; i++) { - workers.push({ - portName: "maintenance", - thread: this.spawnThreadForParallelConnections( - host, - rs ? rs.getMaintenancePort(conn) : conn.maintenancePort, - ), - }); - } - - // Running parallel connections on both main and maintenance ports - workers.forEach((worker) => worker.thread.start()); - - // Joining threads - for (const {portName, thread} of workers) { - thread.join(); - const result = thread.returnData(); - assert( - result && result.success, - `Error while running multiple connection test on ${portName} port: ${tojson(result)}`, - ); - } - }; - }); - - it("Starting up a replica set with maintenance port enabled on all nodes", () => { - const rs = new ReplSetTest({ - nodes: 3, - useMaintenancePorts: true, - }); + it("Starting up a replica set with maintenance ports", () => { + const rs = new ReplSetTest({nodes: 3, useMaintenancePorts: true, nodeOptions: verbosityOptions}); rs.startSet(); rs.initiate(); - jsTest.log.info("Testing replica set nodes for maintenance port availability"); rs.nodes.forEach((conn) => { - this.checkMaintenancePortSetAndListening(conn, rs); - this.runParallelConnections(conn, rs); + let maintenancePort = rs.getMaintenancePort(conn); + checkMaintenancePortSet(conn, maintenancePort); }); rs.stopSet(); }); it("Starting up a replica set with a specific node having a maintenance port", () => { - // The choice of which node has the maintenance port is arbitrary. Here we choose the second node - // (index 1). The purpose of this test is to verify that only the node with the maintenance port - // set can be listening to the port. - const rs = new ReplSetTest({nodes: [{}, {maintenancePort: allocatePort()}, {}]}); + const rs = new ReplSetTest({nodes: [{}, {maintenancePort: 27021}, {}], nodeOptions: verbosityOptions}); rs.startSet(); rs.initiate(); let countFoundPorts = 0; - jsTest.log.info("Testing replica set nodes for maintenance port availability when it is set"); rs.nodes.forEach((conn) => { try { - this.checkMaintenancePortSetAndListening(conn, rs); + assert.eq(27021, rs.getMaintenancePort(conn)); + checkMaintenancePortSet(conn, 27021); countFoundPorts += 1; } catch (e) { // This should throw if the maintenance port is not set for the node. @@ -147,35 +63,30 @@ describe("Tests for maintenance port usage within JS test replica set helper", f assert.eq(countFoundPorts, 1); - jsTest.log.info("Testing parallel connections to maintenance port on the node that has it set"); - this.runParallelConnections(rs.nodes[1], rs); - rs.stopSet(); }); - it("Starting up a sharded cluster with maintenance port enabled on all nodes", () => { + it("Starting up a sharded cluster with maintenance ports", () => { const st = new ShardingTest({ shards: 1, mongos: 2, useMaintenancePorts: true, + other: {nodeOptions: verbosityOptions, configOptions: verbosityOptions, mongosOptions: verbosityOptions}, }); - jsTest.log.info("Testing config server nodes"); st.configRS.nodes.forEach((conn) => { - this.checkMaintenancePortSetAndListening(conn, st.configRS); - this.runParallelConnections(conn, st.configRS); + let maintenancePort = st.configRS.getMaintenancePort(conn); + checkMaintenancePortSet(conn, maintenancePort); }); - jsTest.log.info("Testing shard server nodes"); st.rs0.nodes.forEach((conn) => { - this.checkMaintenancePortSetAndListening(conn, st.rs0); - this.runParallelConnections(conn, st.rs0); + let maintenancePort = st.rs0.getMaintenancePort(conn); + checkMaintenancePortSet(conn, maintenancePort); }); - jsTest.log.info("Testing mongos nodes"); st._mongos.forEach((conn) => { - this.checkMaintenancePortSetAndListening(conn); - this.runParallelConnections(conn); + let maintenancePort = conn.maintenancePort; + checkMaintenancePortSet(conn, maintenancePort); }); st.stop(); diff --git a/jstests/noPassthrough/replication/maintenance_port_reconfig.js b/jstests/noPassthrough/replication/maintenance_port_reconfig.js index 68686c069df..7ebb5c8cfb2 100644 --- a/jstests/noPassthrough/replication/maintenance_port_reconfig.js +++ b/jstests/noPassthrough/replication/maintenance_port_reconfig.js @@ -133,16 +133,17 @@ describe("Tests for maintenance port usage within JS test helpers", function () rs.stopSet(); }); - it("Initiate with maintenance port plus bindIp works when fast resolution does not", () => { - let ips = "localhost," + getHostName(); - const rs = new ReplSetTest({ - nodes: 1, - useMaintenancePorts: true, - nodeOptions: {bind_ip: ips}, - }); - rs.startSet(); - rs.initiate(); + // TODO (SERVER-112674) Enable this test once isSelfSlowPath works for the maintenancePort + // it("Initiate with maintenance port plus bindIp works when fast resolution does not", () => { + // let ips = "localhost," + getHostName(); + // const rs = new ReplSetTest({ + // nodes: 1, + // useMaintenancePorts: true, + // nodeOptions: {bind_ip: ips}, + // }); + // rs.startSet(); + // rs.initiate(); - rs.stopSet(); - }); + // rs.stopSet(); + // }); }); diff --git a/src/mongo/transport/asio/asio_session_impl.cpp b/src/mongo/transport/asio/asio_session_impl.cpp index 11f00d6ac01..c88ac028076 100644 --- a/src/mongo/transport/asio/asio_session_impl.cpp +++ b/src/mongo/transport/asio/asio_session_impl.cpp @@ -177,13 +177,6 @@ CommonAsioSession::CommonAsioSession( if (tl->loadBalancerPort()) { _isConnectedToLoadBalancerPort = _local.port() == *tl->loadBalancerPort(); } - if (tl->maintenancePort()) { - auto isUnixSocket = - (_localAddr.getType() == AF_LOCAL || _localAddr.getType() == AF_UNIX); - _isConnectedToMaintenancePort = !isUnixSocket - ? _local.port() == *tl->maintenancePort() - : parsePortFromUnixSockPath(_localAddr.toString(true)) == *tl->maintenancePort(); - } } catch (...) { LOGV2_DEBUG(9079002, 1, @@ -222,10 +215,6 @@ bool CommonAsioSession::isConnectedToLoadBalancerPort() const { _isConnectedToLoadBalancerPort; } -bool CommonAsioSession::isConnectedToMaintenancePort() const { - return _isConnectedToMaintenancePort; -} - bool CommonAsioSession::isLoadBalancerPeer() const { return MONGO_unlikely(clientIsLoadBalancedPeer.shouldFail()) || _isLoadBalancerPeer; } diff --git a/src/mongo/transport/asio/asio_session_impl.h b/src/mongo/transport/asio/asio_session_impl.h index b68b7ab4145..5e0dce875db 100644 --- a/src/mongo/transport/asio/asio_session_impl.h +++ b/src/mongo/transport/asio/asio_session_impl.h @@ -137,8 +137,6 @@ public: bool isConnectedToLoadBalancerPort() const override; - bool isConnectedToMaintenancePort() const override; - bool isLoadBalancerPeer() const override; void setisLoadBalancerPeer(bool helloHasLoadBalancedOption) override; @@ -321,14 +319,6 @@ protected: */ bool _isConnectedToLoadBalancerPort = false; bool _isLoadBalancerPeer = false; - - /** - * Indicates whether the connection targets the maintenance port or its corresponding unix - * socket. These connection are intended to allow high-priority operations during connection - * storms. - */ - bool _isConnectedToMaintenancePort = false; - boost::optional _proxiedSrcEndpoint; boost::optional _proxiedDstEndpoint; diff --git a/src/mongo/transport/asio/asio_transport_layer.cpp b/src/mongo/transport/asio/asio_transport_layer.cpp index 0b9c141b4e6..a7a15a747d3 100644 --- a/src/mongo/transport/asio/asio_transport_layer.cpp +++ b/src/mongo/transport/asio/asio_transport_layer.cpp @@ -97,8 +97,6 @@ namespace mongo { namespace transport { class AsioReactor; -class WrappedEndpoint; -class WrappedResolver; } // namespace transport } // namespace mongo namespace asio { @@ -120,12 +118,6 @@ using TcpInfoOption = SocketOption; const Seconds kSessionShutdownTimeout{10}; -std::set getEndpoints( - const std::vector& ports, - const std::vector& listenAddrs, - const WrappedResolver& resolver, - const AsioTransportLayer::Options& listenerOptions); - bool shouldDiscardSocketDueToLostConnectivity(AsioSession::GenericSocket& peerSocket) { #ifdef __linux__ if (gPessimisticConnectivityCheckForAcceptedConnections.load()) { @@ -387,12 +379,7 @@ AsioTransportLayer::AsioTransportLayer(const AsioTransportLayer::Options& opts, std::unique_ptr sessionManager) : _ingressReactor(std::make_shared()), _egressReactor(std::make_shared()), - _listenerInterfaceMainPort( - std::make_unique(_mutex, std::make_shared(), this)), - _listenerInterfaceMaintenancePort( - opts.maintenancePort - ? std::make_unique(_mutex, std::make_shared(), this) - : nullptr), + _acceptorReactor(std::make_shared()), _sessionManager(std::move(sessionManager)), _listenerOptions(opts), _timerService(std::make_unique()) { @@ -580,307 +567,6 @@ private: Resolver _resolver; }; -std::set getEndpoints( - const std::vector& ports, - const std::vector& listenAddrs, - WrappedResolver& resolver, - const AsioTransportLayer::Options& listenerOptions) { - std::set endpoints; - for (const auto& port : ports) { - for (const auto& listenAddr : listenAddrs) { - if (listenAddr.empty()) { - LOGV2_WARNING(23020, "Skipping empty bind address"); - continue; - } - - const auto& swAddrs = - resolver.resolve(HostAndPort(listenAddr, port), listenerOptions.enableIPv6); - if (!swAddrs.isOK()) { - LOGV2_WARNING( - 23021, "Found no addresses for peer", "peer"_attr = swAddrs.getStatus()); - continue; - } - const auto& addrs = swAddrs.getValue(); - endpoints.insert(addrs.begin(), addrs.end()); - } - } - return endpoints; -} - -AsioTransportLayer::ListenerInterface::ListenerInterface(stdx::mutex& sharedMutex, - std::shared_ptr reactor, - AsioTransportLayer* asioTransportLayer) - : _sharedMutex(sharedMutex), - _acceptorReactor(reactor), - _asioTransportLayer(asioTransportLayer) {}; - -AsioTransportLayer::ListenerInterface::~ListenerInterface() = default; - -StatusWith>> -AsioTransportLayer::ListenerInterface::_createAcceptorRecords( - const std::vector& ports, - const std::set& endpoints, - const Options& listenerOptions) { - std::vector> acceptorRecords; - for (const auto& addr : endpoints) { -#ifndef _WIN32 - if (addr.family() == AF_UNIX) { - if (::unlink(addr.toString().c_str()) == -1) { - auto ec = lastPosixError(); - if (ec != posixError(ENOENT)) { - LOGV2_ERROR(23024, - "Failed to unlink socket file", - "path"_attr = addr.toString().c_str(), - "error"_attr = errorMessage(ec)); - fassertFailedNoTrace(40486); - } - } - } -#endif - if (addr.family() == AF_INET6 && !listenerOptions.enableIPv6) { - LOGV2_ERROR(23025, "Specified ipv6 bind address, but ipv6 is disabled"); - fassertFailedNoTrace(40488); - } - - GenericAcceptor acceptor(*getReactor()); - try { - acceptor.open(addr->protocol()); - } catch (std::exception&) { - // Allow the server to start when "ipv6: true" and "bindIpAll: true", but the - // platform does not support ipv6 (e.g., ipv6 kernel module is not loaded in - // Linux). - auto addrIsBindAll = [&] { - for (auto port : ports) { - if (addr.toString() == fmt::format(":::{}", port)) { - return true; - } - } - return false; - }; - - if (errno == EAFNOSUPPORT && listenerOptions.enableIPv6 && addr.family() == AF_INET6 && - addrIsBindAll()) { - LOGV2_WARNING(4206501, - "Failed to bind to {address} as the platform does not support ipv6", - "address"_attr = addr.toString()); - continue; - } - - throw; - } - setSocketOption( - acceptor, ReuseAddrOption(true), "acceptor reuse address", logv2::LogSeverity::Info()); - - std::error_code ec; - - if (auto af = addr.family(); af == AF_INET || af == AF_INET6) { - if (auto ec = tfo::initAcceptorSocket(acceptor)) - return errorCodeToStatus(ec, "setup tcpFastOpenIsConfigured"); - } - if (addr.family() == AF_INET6) { - setSocketOption( - acceptor, IPV6OnlyOption(true), "acceptor v6 only", logv2::LogSeverity::Info()); - } - - (void)acceptor.non_blocking(true, ec); - if (ec) { - return errorCodeToStatus(ec, "setup non_blocking"); - } - - (void)acceptor.bind(*addr, ec); - if (ec) { - return errorCodeToStatus(ec, "setup bind").withContext(addr.toString()); - } - -#ifndef _WIN32 - if (addr.family() == AF_UNIX) { - setUnixDomainSocketPermissions(addr.toString(), - serverGlobalParams.unixSocketPermissions); - } -#endif - auto endpoint = acceptor.local_endpoint(ec); - if (ec) { - return errorCodeToStatus(ec); - } - auto hostAndPort = endpointToHostAndPort(endpoint); - - auto record = std::make_unique(SockAddr(addr->data(), addr->size()), - std::move(acceptor)); - - if (_listenerPort == 0 && (addr.family() == AF_INET || addr.family() == AF_INET6)) { - record->address.setPort(hostAndPort.port()); - _listenerPort = hostAndPort.port(); - } - acceptorRecords.push_back(std::move(record)); - } - return std::move(acceptorRecords); -}; - -StatusWith>> -AsioTransportLayer::ListenerInterface::_retrieveAllAcceptorRecords( - const std::vector& ports, - const std::vector& listenIPAddrs, - const std::vector& listenUnixSocketsAddrs, - const Options& listenerOptions) { - // Self-deduplicating list of unique endpoint addresses. - std::set inetEndpoints; - std::set unixEndpoints; - WrappedResolver resolver(*getReactor()); - std::vector> acceptorRecords; - - inetEndpoints = getEndpoints(ports, listenIPAddrs, resolver, listenerOptions); - auto resultIpAddrs = _createAcceptorRecords(ports, inetEndpoints, listenerOptions); - if (!resultIpAddrs.isOK()) { - return resultIpAddrs.getStatus(); - } - acceptorRecords = std::move(resultIpAddrs.getValue()); - - unixEndpoints = getEndpoints(ports, listenUnixSocketsAddrs, resolver, listenerOptions); - auto resultUnixSockets = _createAcceptorRecords(ports, unixEndpoints, listenerOptions); - if (!resultUnixSockets.isOK()) { - return resultUnixSockets.getStatus(); - } - auto& acceptorRecordsSockets = resultUnixSockets.getValue(); - acceptorRecords.insert(acceptorRecords.end(), - std::make_move_iterator(acceptorRecordsSockets.begin()), - std::make_move_iterator(acceptorRecordsSockets.end())); - - return std::move(acceptorRecords); -} - -void AsioTransportLayer::ListenerInterface::stopListenerWithLock( - stdx::unique_lock& lk) { - if (auto oldState = _listener.state; oldState != Listener::State::kShutdown) { - _listener.state = Listener::State::kShuttingDown; - if (oldState == Listener::State::kActive) { - while (_listener.state != Listener::State::kShutdown) { - lk.unlock(); - _acceptorReactor->stop(); - lk.lock(); - } - } - } -} - -void AsioTransportLayer::ListenerInterface::_runListener(std::string threadName) { - setThreadName(threadName); - - stdx::unique_lock lk(_sharedMutex); - ON_BLOCK_EXIT([&] { - if (!lk.owns_lock()) { - lk.lock(); - } - _listener.state = Listener::State::kShutdown; - _listener.cv.notify_all(); - }); - // Because multiple listener interfaces can be active at the same time (one thread per - // interface), we must hold the lock when accessing the shared state in the following - // operations. Currently, listener threads run for _listenerInterfaceMainPort and - // _listenerInterfaceMaintenancePort. - _startListening(lk); - - _listener.state = Listener::State::kActive; - _listener.cv.notify_all(); - while (!_asioTransportLayer->_isShutdown.load() && - (_listener.state == Listener::State::kActive)) { - lk.unlock(); - _acceptorReactor->run(); - lk.lock(); - } - - _stopAcceptors(lk); -} - -void AsioTransportLayer::ListenerInterface::startListener(std::string threadName) { - _listener.thread = - stdx::thread([this, threadName = std::move(threadName)] { _runListener(threadName); }); -} - -bool AsioTransportLayer::ListenerInterface::isListenerStarted() const { - return _listener.thread.joinable(); -} - -void AsioTransportLayer::ListenerInterface::waitListenerThreadJoin() { - _listener.thread.join(); -} - -void AsioTransportLayer::ListenerInterface::waitUntilListenerStarted( - stdx::unique_lock& lk) { - _listener.cv.wait(lk, [this] { return _listener.state != Listener::State::kNew; }); -} - - -std::shared_ptr AsioTransportLayer::ListenerInterface::getReactor() const { - return _acceptorReactor; -} - -AsioTransportLayer::Listener::State AsioTransportLayer::ListenerInterface::getListenerState() - const { - return _listener.state; -} - -void AsioTransportLayer::ListenerInterface::_startListening(WithLock lk) { - if (_asioTransportLayer->_isShutdown.load() || - _listener.state == Listener::State::kShuttingDown) { - LOGV2_DEBUG(9484000, 3, "Unable to start listening: transport layer in shutdown"); - return; - } - - const int listenBacklog = serverGlobalParams.listenBacklog - ? *serverGlobalParams.listenBacklog - : ProcessInfo::getDefaultListenBacklog(); - for (auto& acceptorRecord : _acceptorRecords) { - asio::error_code ec; - (void)acceptorRecord->acceptor.listen(listenBacklog, ec); - if (ec) { - LOGV2_FATAL(31339, - "Error listening for new connections on listen address", - "listenAddrs"_attr = acceptorRecord->address, - "error"_attr = ec.message()); - } - _asioTransportLayer->_acceptConnection(acceptorRecord->acceptor); - LOGV2(23015, "Listening on", "address"_attr = acceptorRecord->address); - } - - const char* ssl = "off"; -#ifdef MONGO_CONFIG_SSL - if (_asioTransportLayer->sslMode() != SSLParams::SSLMode_disabled) { - ssl = "on"; - } -#endif - LOGV2(23016, "Waiting for connections", "port"_attr = _listenerPort, "ssl"_attr = ssl); -} - -void AsioTransportLayer::ListenerInterface::_stopAcceptors(WithLock lk) { - // Loop through the acceptors and cancel their calls to async_accept. This will prevent new - // connections from being opened. - for (auto& acceptorRecord : _acceptorRecords) { - acceptorRecord->acceptor.cancel(); - auto& addr = acceptorRecord->address; - if (addr.getType() == AF_UNIX && !addr.isAnonymousUNIXSocket()) { - auto path = addr.getAddr(); - LOGV2(23017, "removing socket file", "path"_attr = path); - if (::unlink(path.c_str()) != 0) { - auto ec = lastPosixError(); - LOGV2_WARNING(23022, - "Unable to remove UNIX socket", - "path"_attr = path, - "error"_attr = errorMessage(ec)); - } - } - } -} - -void AsioTransportLayer::ListenerInterface::setAcceptorRecords( - std::vector>&& records) { - _acceptorRecords = std::move(records); -} - -const std::vector>& -AsioTransportLayer::ListenerInterface::getAcceptorRecords() { - return _acceptorRecords; -} - Status makeConnectError(Status status, const HostAndPort& peer, const WrappedEndpoint& endpoint) { std::string errmsg; if (peer.toString() != endpoint.toString() && !endpoint.toString().empty()) { @@ -1302,94 +988,164 @@ Future> AsioTransportLayer::asyncConnect( return mergedFuture; } -Status AsioTransportLayer::ListenerInterface::setup( - const std::vector& ports, - const std::vector& listenIPAddrs, - const std::vector& listenUnixSocketsAddrs, - Options& listenerOptions) { +Status AsioTransportLayer::setup() { + std::vector listenAddrs; + if (_listenerOptions.ipList.empty() && _listenerOptions.isIngress()) { + listenAddrs = {"127.0.0.1"}; + if (_listenerOptions.enableIPv6) { + listenAddrs.emplace_back("::1"); + } + } else if (!_listenerOptions.ipList.empty()) { + listenAddrs = _listenerOptions.ipList; + } + +#ifndef _WIN32 + if (_listenerOptions.useUnixSockets && _listenerOptions.isIngress()) { + listenAddrs.push_back(makeUnixSockPath(_listenerOptions.port)); + + if (_listenerOptions.loadBalancerPort) { + listenAddrs.push_back(makeUnixSockPath(*_listenerOptions.loadBalancerPort)); + } + } +#endif if (auto foStatus = tfo::ensureInitialized(); !foStatus.isOK()) { return foStatus; } - if (!listenerOptions.isIngress() && - (!listenIPAddrs.empty() || !listenUnixSocketsAddrs.empty())) { + if (!(_listenerOptions.isIngress()) && !listenAddrs.empty()) { return {ErrorCodes::BadValue, "Cannot bind to listening sockets with ingress networking is disabled"}; } - auto result = - _retrieveAllAcceptorRecords(ports, listenIPAddrs, listenUnixSocketsAddrs, listenerOptions); - if (!result.isOK()) { - return result.getStatus(); - } - setAcceptorRecords(std::move(result.getValue())); + _listenerPort = _listenerOptions.port; + WrappedResolver resolver(*_acceptorReactor); - if ((getAcceptorRecords()).empty() && listenerOptions.isIngress()) { - return Status(ErrorCodes::SocketException, "No available addresses/ports to bind to"); - } - - return Status::OK(); -} - -Status AsioTransportLayer::setup() { - // Since the load balancer port value is set to zero to disable the port entirely and the - // maintenance port cannot be zero (range of possible value starting from 1), the main port is - // the only port which could be specified as ephemeral. - if (_listenerOptions.port == 0 && _listenerOptions.ipList.size() > 1) { - return Status(ErrorCodes::BadValue, - "Port 0 (ephemeral port) is not allowed when" - " listening on multiple IP interfaces"); - } - - auto getlistenIPAddrs = [&]() { - std::vector listenAddrs; - if (_listenerOptions.ipList.empty() && _listenerOptions.isIngress()) { - listenAddrs = {"127.0.0.1"}; - if (_listenerOptions.enableIPv6) { - listenAddrs.emplace_back("::1"); - } - } else if (!_listenerOptions.ipList.empty()) { - listenAddrs = _listenerOptions.ipList; - } - return listenAddrs; - }; - - auto getUnixDomainSocketAddrs = [&](std::vector ports) -> std::vector { -#ifndef _WIN32 - std::vector listenAddrs; - if (_listenerOptions.useUnixSockets && _listenerOptions.isIngress()) { - for (const auto port : ports) { - listenAddrs.push_back(makeUnixSockPath(port)); - } - } - return listenAddrs; -#else - return {}; -#endif - }; - - const auto listenIPAddrs = getlistenIPAddrs(); - - std::vector ports = {_listenerOptions.port}; + std::vector ports = {_listenerPort}; if (_listenerOptions.loadBalancerPort) { ports.push_back(*_listenerOptions.loadBalancerPort); } - const auto listenUnixDomainSocketAddrs = getUnixDomainSocketAddrs(ports); - auto status = _listenerInterfaceMainPort->setup( - ports, listenIPAddrs, listenUnixDomainSocketAddrs, _listenerOptions); - if (!status.isOK()) { - return status; + + // Self-deduplicating list of unique endpoint addresses. + std::set endpoints; + for (const auto& port : ports) { + for (const auto& listenAddr : listenAddrs) { + if (listenAddr.empty()) { + LOGV2_WARNING(23020, "Skipping empty bind address"); + continue; + } + + const auto& swAddrs = + resolver.resolve(HostAndPort(listenAddr, port), _listenerOptions.enableIPv6); + if (!swAddrs.isOK()) { + LOGV2_WARNING( + 23021, "Found no addresses for peer", "peer"_attr = swAddrs.getStatus()); + continue; + } + const auto& addrs = swAddrs.getValue(); + endpoints.insert(addrs.begin(), addrs.end()); + } } - if (_listenerInterfaceMaintenancePort) { - std::vector ports = {*_listenerOptions.maintenancePort}; - const auto listenUnixDomainSocketAddrs = getUnixDomainSocketAddrs(ports); - auto status = _listenerInterfaceMaintenancePort->setup( - ports, listenIPAddrs, listenUnixDomainSocketAddrs, _listenerOptions); + for (const auto& addr : endpoints) { +#ifndef _WIN32 + if (addr.family() == AF_UNIX) { + if (::unlink(addr.toString().c_str()) == -1) { + auto ec = lastPosixError(); + if (ec != posixError(ENOENT)) { + LOGV2_ERROR(23024, + "Failed to unlink socket file", + "path"_attr = addr.toString().c_str(), + "error"_attr = errorMessage(ec)); + fassertFailedNoTrace(40486); + } + } + } +#endif + if (addr.family() == AF_INET6 && !_listenerOptions.enableIPv6) { + LOGV2_ERROR(23025, "Specified ipv6 bind address, but ipv6 is disabled"); + fassertFailedNoTrace(40488); + } + + GenericAcceptor acceptor(*_acceptorReactor); + try { + acceptor.open(addr->protocol()); + } catch (std::exception&) { + // Allow the server to start when "ipv6: true" and "bindIpAll: true", but the platform + // does not support ipv6 (e.g., ipv6 kernel module is not loaded in Linux). + auto addrIsBindAll = [&] { + for (auto port : ports) { + if (addr.toString() == fmt::format(":::{}", port)) { + return true; + } + } + return false; + }; + + if (errno == EAFNOSUPPORT && _listenerOptions.enableIPv6 && addr.family() == AF_INET6 && + addrIsBindAll()) { + LOGV2_WARNING(4206501, + "Failed to bind to {address} as the platform does not support ipv6", + "address"_attr = addr.toString()); + continue; + } + + throw; + } + setSocketOption( + acceptor, ReuseAddrOption(true), "acceptor reuse address", logv2::LogSeverity::Info()); + + std::error_code ec; + + if (auto af = addr.family(); af == AF_INET || af == AF_INET6) { + if (auto ec = tfo::initAcceptorSocket(acceptor)) + return errorCodeToStatus(ec, "setup tcpFastOpenIsConfigured"); + } + if (addr.family() == AF_INET6) { + setSocketOption( + acceptor, IPV6OnlyOption(true), "acceptor v6 only", logv2::LogSeverity::Info()); + } + + (void)acceptor.non_blocking(true, ec); + if (ec) { + return errorCodeToStatus(ec, "setup non_blocking"); + } + + (void)acceptor.bind(*addr, ec); + if (ec) { + return errorCodeToStatus(ec, "setup bind").withContext(addr.toString()); + } + +#ifndef _WIN32 + if (addr.family() == AF_UNIX) { + setUnixDomainSocketPermissions(addr.toString(), + serverGlobalParams.unixSocketPermissions); + } +#endif + auto endpoint = acceptor.local_endpoint(ec); + if (ec) { + return errorCodeToStatus(ec); + } + auto hostAndPort = endpointToHostAndPort(endpoint); + + auto record = std::make_unique(SockAddr(addr->data(), addr->size()), + std::move(acceptor)); + + if (_listenerOptions.port == 0 && (addr.family() == AF_INET || addr.family() == AF_INET6)) { + if (_listenerPort != _listenerOptions.port) { + return Status(ErrorCodes::BadValue, + "Port 0 (ephemeral port) is not allowed when" + " listening on multiple IP interfaces"); + } + _listenerPort = hostAndPort.port(); + record->address.setPort(_listenerPort); + } + + _acceptorRecords.push_back(std::move(record)); } - if (!status.isOK()) { - return status; + + if (_acceptorRecords.empty() && _listenerOptions.isIngress()) { + return Status(ErrorCodes::SocketException, "No available addresses/ports to bind to"); } #ifdef MONGO_CONFIG_SSL @@ -1406,14 +1162,9 @@ Status AsioTransportLayer::setup() { std::vector> AsioTransportLayer::getListenerSocketBacklogQueueDepths() const { std::vector> queueDepths; - for (auto&& record : _listenerInterfaceMainPort->getAcceptorRecords()) { + for (auto&& record : _acceptorRecords) { queueDepths.push_back({SockAddr(record->address), record->backlogQueueDepth.load()}); } - if (_listenerInterfaceMaintenancePort) { - for (auto&& record : _listenerInterfaceMaintenancePort->getAcceptorRecords()) { - queueDepths.push_back({SockAddr(record->address), record->backlogQueueDepth.load()}); - } - } return queueDepths; } @@ -1421,16 +1172,10 @@ void AsioTransportLayer::appendStatsForServerStatus(BSONObjBuilder* bob) const { bob->append("listenerProcessingTime", _listenerProcessingTime.load().toBSON()); BSONArrayBuilder queueDepthsArrayBuilder( bob->subarrayStart("listenerSocketBacklogQueueDepths")); - for (const auto& record : _listenerInterfaceMainPort->getAcceptorRecords()) { + for (const auto& record : _acceptorRecords) { BSONObjBuilder{queueDepthsArrayBuilder.subobjStart()}.append( record->address.toString(), record->backlogQueueDepth.load()); } - if (_listenerInterfaceMaintenancePort) { - for (const auto& record : _listenerInterfaceMaintenancePort->getAcceptorRecords()) { - BSONObjBuilder{queueDepthsArrayBuilder.subobjStart()}.append( - record->address.toString(), record->backlogQueueDepth.load()); - } - } queueDepthsArrayBuilder.done(); bob->append("connsDiscardedDueToClientDisconnect", _discardedDueToClientDisconnect.get()); bob->append("connsRejectedDueToMaxPendingProxyProtocolHeader", @@ -1448,6 +1193,75 @@ void AsioTransportLayer::appendStatsForServerStatus(BSONObjBuilder* bob) const { void AsioTransportLayer::appendStatsForFTDC(BSONObjBuilder&) const {} +void AsioTransportLayer::_runListener() { + setThreadName("listener"); + + stdx::unique_lock lk(_mutex); + ON_BLOCK_EXIT([&] { + if (!lk.owns_lock()) { + lk.lock(); + } + _listener.state = Listener::State::kShutdown; + _listener.cv.notify_all(); + }); + + if (_isShutdown.load() || _listener.state == Listener::State::kShuttingDown) { + LOGV2_DEBUG(9484000, 3, "Unable to start listening: transport layer in shutdown"); + return; + } + + const int listenBacklog = serverGlobalParams.listenBacklog + ? *serverGlobalParams.listenBacklog + : ProcessInfo::getDefaultListenBacklog(); + for (auto& acceptorRecord : _acceptorRecords) { + asio::error_code ec; + (void)acceptorRecord->acceptor.listen(listenBacklog, ec); + if (ec) { + LOGV2_FATAL(31339, + "Error listening for new connections on listen address", + "listenAddrs"_attr = acceptorRecord->address, + "error"_attr = ec.message()); + } + + _acceptConnection(acceptorRecord->acceptor); + LOGV2(23015, "Listening on", "address"_attr = acceptorRecord->address); + } + + const char* ssl = "off"; +#ifdef MONGO_CONFIG_SSL + if (sslMode() != SSLParams::SSLMode_disabled) { + ssl = "on"; + } +#endif + LOGV2(23016, "Waiting for connections", "port"_attr = _listenerPort, "ssl"_attr = ssl); + + _listener.state = Listener::State::kActive; + _listener.cv.notify_all(); + while (!_isShutdown.load() && (_listener.state == Listener::State::kActive)) { + lk.unlock(); + _acceptorReactor->run(); + lk.lock(); + } + + // Loop through the acceptors and cancel their calls to async_accept. This will prevent new + // connections from being opened. + for (auto& acceptorRecord : _acceptorRecords) { + acceptorRecord->acceptor.cancel(); + auto& addr = acceptorRecord->address; + if (addr.getType() == AF_UNIX && !addr.isAnonymousUNIXSocket()) { + auto path = addr.getAddr(); + LOGV2(23017, "removing socket file", "path"_attr = path); + if (::unlink(path.c_str()) != 0) { + auto ec = lastPosixError(); + LOGV2_WARNING(23022, + "Unable to remove UNIX socket", + "path"_attr = path, + "error"_attr = errorMessage(ec)); + } + } + } +} + Status AsioTransportLayer::start() { stdx::unique_lock lk(_mutex); if (_isShutdown.load()) { @@ -1456,25 +1270,14 @@ Status AsioTransportLayer::start() { } if (_listenerOptions.isIngress()) { - // Only start the listener threads if the TL wasn't shut down before start() was invoked. - if (_listenerInterfaceMainPort->getListenerState() == Listener::State::kNew && - (!_listenerInterfaceMaintenancePort || - _listenerInterfaceMaintenancePort->getListenerState() == Listener::State::kNew)) { + // Only start the listener thread if the TL wasn't shut down before start() was invoked. + if (_listener.state == Listener::State::kNew) { invariant(_sessionManager); - _listenerInterfaceMainPort->startListener("listener"); - if (_listenerInterfaceMaintenancePort) { - _listenerInterfaceMaintenancePort->startListener("listenerForMaintenance"); - } - _listenerInterfaceMainPort->waitUntilListenerStarted(lk); - if (_listenerInterfaceMaintenancePort) { - _listenerInterfaceMaintenancePort->waitUntilListenerStarted(lk); - } + _listener.thread = stdx::thread([this] { _runListener(); }); + _listener.cv.wait(lk, [&] { return _listener.state != Listener::State::kNew; }); } } else { - invariant(_listenerInterfaceMainPort->getAcceptorRecords().empty()); - if (_listenerInterfaceMaintenancePort) { - invariant(_listenerInterfaceMaintenancePort->getAcceptorRecords().empty()); - } + invariant(_acceptorRecords.empty()); } return Status::OK(); @@ -1506,25 +1309,26 @@ void AsioTransportLayer::stopAcceptingSessionsWithLock(stdx::unique_lockstopListenerWithLock(lk); - if (_listenerInterfaceMaintenancePort) { - _listenerInterfaceMaintenancePort->stopListenerWithLock(lk); + if (auto oldState = _listener.state; oldState != Listener::State::kShutdown) { + _listener.state = Listener::State::kShuttingDown; + if (oldState == Listener::State::kActive) { + while (_listener.state != Listener::State::kShutdown) { + lk.unlock(); + _acceptorReactor->stop(); + lk.lock(); + } + } } - if (!_listenerInterfaceMainPort->isListenerStarted() && - (!_listenerInterfaceMaintenancePort || - !_listenerInterfaceMaintenancePort->isListenerStarted())) { - // If the listeners never started, then we can return now + auto thread = std::exchange(_listener.thread, {}); + if (!thread.joinable()) { + // If the listener never started, then we can return now return; } - // Release the lock and wait for the threads to die + // Release the lock and wait for the thread to die lk.unlock(); - _listenerInterfaceMainPort->waitListenerThreadJoin(); - if (_listenerInterfaceMaintenancePort && - _listenerInterfaceMaintenancePort->isListenerStarted()) { - _listenerInterfaceMaintenancePort->waitListenerThreadJoin(); - } + thread.join(); } void AsioTransportLayer::stopAcceptingSessions() { @@ -1637,7 +1441,7 @@ void AsioTransportLayer::_acceptConnection(GenericAcceptor& acceptor) { _acceptConnection(acceptor); return; } - session->parseProxyProtocolHeader(_listenerInterfaceMainPort->getReactor()) + session->parseProxyProtocolHeader(_acceptorReactor) .getAsync([this, session = std::move(session), t = std::move(token)](Status s) { if (s.isOK()) { invariant(!!_sessionManager); @@ -1658,7 +1462,8 @@ void AsioTransportLayer::_acceptConnection(GenericAcceptor& acceptor) { LOGV2_WARNING(23023, "Error accepting new connection", "error"_attr = e); } - // TODO(SERVER-114929): Fix possible race condition on _listenerProcessingTime + // _acceptConnection() is accessed by only one thread (i.e. the listener thread), so an + // atomic increment is not required here _listenerProcessingTime.store(_listenerProcessingTime.load() + timer.elapsed()); _acceptConnection(acceptor); }; @@ -1675,24 +1480,11 @@ void AsioTransportLayer::_trySetListenerSocketBacklogQueueDepth(GenericAcceptor& try { if (!isTcp(acceptor.local_endpoint().protocol())) return; - auto matchingRecord = std::ranges::find_if( - _listenerInterfaceMainPort->getAcceptorRecords(), [&](const auto& record) { + auto matchingRecord = + std::find_if(begin(_acceptorRecords), end(_acceptorRecords), [&](const auto& record) { return acceptor.local_endpoint() == record->acceptor.local_endpoint(); }); - if (_listenerInterfaceMaintenancePort && - matchingRecord == std::end(_listenerInterfaceMainPort->getAcceptorRecords())) { - // No match with the main ports acceptors, search over the maintenance port acceptors - matchingRecord = std::ranges::find_if( - _listenerInterfaceMaintenancePort->getAcceptorRecords(), [&](const auto& record) { - return acceptor.local_endpoint() == record->acceptor.local_endpoint(); - }); - invariant(matchingRecord != - std::ranges::end(_listenerInterfaceMaintenancePort->getAcceptorRecords())); - } else { - invariant(matchingRecord != - std::ranges::end(_listenerInterfaceMainPort->getAcceptorRecords())); - } - + invariant(matchingRecord != std::end(_acceptorRecords)); TcpInfoOption tcpi; acceptor.get_option(tcpi); (*matchingRecord)->backlogQueueDepth.store(tcpi->tcpi_unacked); diff --git a/src/mongo/transport/asio/asio_transport_layer.h b/src/mongo/transport/asio/asio_transport_layer.h index a0099f527c4..3af95a9c277 100644 --- a/src/mongo/transport/asio/asio_transport_layer.h +++ b/src/mongo/transport/asio/asio_transport_layer.h @@ -72,7 +72,6 @@ extern FailPoint asioTransportLayerHangDuringAcceptCallback; class AsioNetworkingBaton; class AsioReactor; class AsioSession; -class WrappedEndpoint; /** * A TransportLayer implementation based on ASIO networking primitives. @@ -105,7 +104,6 @@ public: int port = ServerGlobalParams::DefaultDBPort; // port to bind to boost::optional loadBalancerPort; // accepts load balancer connections - boost::optional maintenancePort; // accepts maintenance connections std::vector ipList; // addresses to bind to #ifndef _WIN32 bool useUnixSockets = true; // whether to allow UNIX sockets in ipList @@ -214,18 +212,14 @@ public: return TransportProtocol::MongoRPC; } - int listenerMainPort() const { - return _listenerInterfaceMainPort->getListenerPort(); + int listenerPort() const { + return _listenerPort; } boost::optional loadBalancerPort() const { return _listenerOptions.loadBalancerPort; } - boost::optional maintenancePort() const { - return _listenerOptions.maintenancePort; - } - SessionManager* getSessionManager() const override { return _sessionManager.get(); } @@ -290,14 +284,16 @@ private: // server has already accepted too many of such connections, returns `nullptr`. std::unique_ptr _makeParseProxyTokenIfPossible(); + void _runListener(); + void _trySetListenerSocketBacklogQueueDepth(GenericAcceptor& acceptor); stdx::mutex _mutex; void stopAcceptingSessionsWithLock(stdx::unique_lock lk); // There are three reactors that are used by AsioTransportLayer. The _ingressReactor contains - // all the accepted sockets and all ingress networking activity. The _egressReactor contains - // egress connections. + // all the accepted sockets and all ingress networking activity. The _acceptorReactor contains + // all the sockets in _acceptors. The _egressReactor contains egress connections. // // AsioTransportLayer should never call run() on the _ingressReactor. // In synchronous mode, this will cause a massive performance degradation due to @@ -306,6 +302,10 @@ private: // with the acceptors epoll set, thus avoiding those wakeups. Calling run will // undo that benefit. // + // AsioTransportLayer should run its own thread that calls run() on the _acceptorReactor + // to process calls to async_accept - this is the equivalent of the "listener" thread in + // other TransportLayers. + // // The underlying problem that caused this is here: // https://github.com/chriskohlhoff/asio/issues/240 // @@ -315,12 +315,14 @@ private: // it. std::shared_ptr _ingressReactor; std::shared_ptr _egressReactor; + std::shared_ptr _acceptorReactor; #ifdef MONGO_CONFIG_SSL synchronized_value> _sslContext; #endif struct AcceptorRecord; + std::vector> _acceptorRecords; // Only used if _listenerOptions.async is false. struct Listener { @@ -341,94 +343,13 @@ private: stdx::condition_variable cv; State state{State::kNew}; }; - - class ListenerInterface { - public: - ListenerInterface(stdx::mutex& sharedMutex, - std::shared_ptr reactor, - AsioTransportLayer* layer); - - ~ListenerInterface(); - - Status setup(const std::vector& ports, - const std::vector& listenIPAddrs, - const std::vector& listenUnixSocketsAddr, - Options& listenerOptions); - - void stopListenerWithLock(stdx::unique_lock& lk); - - AsioTransportLayer::Listener::State getListenerState() const; - - std::shared_ptr getReactor() const; - - void startListener(std::string threadName); - - bool isListenerStarted() const; - - int getListenerPort() const { - return _listenerPort; - } - - void waitUntilListenerStarted(stdx::unique_lock& lk); - - void waitListenerThreadJoin(); - - void setAcceptorRecords( - std::vector>&& records); - - const std::vector>& - getAcceptorRecords(); - - private: - void _runListener(std::string threadName); - - void _startListening(WithLock lk); - - void _waitForConnections(stdx::unique_lock& lk); - - void _stopAcceptors(WithLock lk); - - StatusWith>> - _createAcceptorRecords(const std::vector& ports, - const std::set& endpoints, - const Options& listenerOptions); - - // The lock is not necessary for this method since there are no concurrent operations - // accessing the shared state. - StatusWith>> - _retrieveAllAcceptorRecords(const std::vector& ports, - const std::vector& listenIPAddrs, - const std::vector& listenUnixSocketsAddrs, - const Options& listenerOptions); - - // The real incoming port in case of the corresponding listener option port is 0 - // (ephemeral). - int _listenerPort = 0; - - stdx::mutex& _sharedMutex; - - // The _acceptorReactor contains all the sockets in _acceptors. - std::shared_ptr _acceptorReactor; - - std::vector> _acceptorRecords; - - Listener _listener; - - AsioTransportLayer* _asioTransportLayer; - }; - - // AsioTransportLayer should run its own thread that calls run() on _listenerInterfaceMainPort's - // _acceptorReactor to process calls to async_accept on the main port - this is the equivalent - // of the "listener" thread in other TransportLayers. If the maintenance port is specified, - // AsioTransportLayer should run a second thread that calls run() on - // _listenerInterfaceMaintenancePort's _acceptorReactor to process calls to async_accept on the - // maintenance port. - const std::unique_ptr _listenerInterfaceMainPort; - const std::unique_ptr _listenerInterfaceMaintenancePort; + Listener _listener; std::shared_ptr _sessionManager; Options _listenerOptions; + // The real incoming port in case of _listenerOptions.port==0 (ephemeral). + int _listenerPort = 0; Atomic _isShutdown{false}; diff --git a/src/mongo/transport/asio/asio_transport_layer_test.cpp b/src/mongo/transport/asio/asio_transport_layer_test.cpp index 9b064c32ec4..001e36f20bc 100644 --- a/src/mongo/transport/asio/asio_transport_layer_test.cpp +++ b/src/mongo/transport/asio/asio_transport_layer_test.cpp @@ -298,7 +298,7 @@ public: LOGV2(6109515, "creating test client connection"); auto& fp = asioTransportLayerHangDuringAcceptCallback; auto timesEntered = fp.setMode(FailPoint::alwaysOn); - ConnectionThread connectThread(tla().listenerMainPort(), onConnectFunc); + ConnectionThread connectThread(tla().listenerPort(), onConnectFunc); fp.waitForTimesEntered(timesEntered + 1); connectThread.wait(); @@ -309,7 +309,7 @@ public: // Using a second connection as a means to wait for the server to process the closed // connection and move on to accept the next connection. - ConnectionThread dummyConnection(tla().listenerMainPort(), nullptr); + ConnectionThread dummyConnection(tla().listenerPort(), nullptr); dummyConnection.wait(); sessionsCreated.wait(0); @@ -333,7 +333,7 @@ TEST(AsioTransportLayer, ListenerPortZeroTreatedAsEphemeral) { TestFixture tf; tf.sessionManager().setOnStartSession([&](auto&&) { connected.set(true); }); - int port = tf.tla().listenerMainPort(); + int port = tf.tla().listenerPort(); ASSERT_GT(port, 0); LOGV2(6109514, "AsioTransportLayer listening", "port"_attr = port); @@ -439,9 +439,9 @@ TEST(AsioTransportLayer, TCPCheckQueueDepth) { tf.stopHangDuringAcceptingConnection(); }); - ConnectionThread connectThread1(tf.tla().listenerMainPort()); - ConnectionThread connectThread2(tf.tla().listenerMainPort()); - ConnectionThread connectThread3(tf.tla().listenerMainPort()); + ConnectionThread connectThread1(tf.tla().listenerPort()); + ConnectionThread connectThread2(tf.tla().listenerPort()); + ConnectionThread connectThread3(tf.tla().listenerPort()); tf.waitForHangDuringAcceptingFirstConnection(); connectThread1.wait(); @@ -456,7 +456,7 @@ TEST(AsioTransportLayer, TCPCheckQueueDepth) { ASSERT_EQ(depths.size(), 1); auto depth = depths[0]; - ASSERT_EQ(depth.first.getPort(), tf.tla().listenerMainPort()); + ASSERT_EQ(depth.first.getPort(), tf.tla().listenerPort()); ASSERT_EQ(depth.second, 2); @@ -469,8 +469,7 @@ TEST(AsioTransportLayer, TCPCheckQueueDepth) { ASSERT_EQ(queueDepthsArray.size(), 1); const auto& queueDepthObj = queueDepthsArray[0].Obj(); - ASSERT_EQ(HostAndPort(queueDepthObj.firstElementFieldName()).port(), - tf.tla().listenerMainPort()); + ASSERT_EQ(HostAndPort(queueDepthObj.firstElementFieldName()).port(), tf.tla().listenerPort()); ASSERT_EQ(queueDepthObj.firstElement().Int(), 2); } #endif @@ -481,7 +480,7 @@ TEST(AsioTransportLayer, ThrowOnNetworkErrorInEnsureSync) { tf.sessionManager().setOnStartSession( [&](test::SessionThread& st) { mockSessionCreated.set(&st); }); - ConnectionThread connectThread(tf.tla().listenerMainPort(), &setNoLinger); + ConnectionThread connectThread(tf.tla().listenerPort(), &setNoLinger); // We set the timeout to ensure that the setsockopt calls are actually made in ensureSync() auto& st = *mockSessionCreated.get(); @@ -509,7 +508,7 @@ TEST(AsioTransportLayer, SourceSyncTimeoutTimesOut) { st.session()->setTimeout(Milliseconds{500}); st.schedule([&](auto& session) { received.set(session.sourceMessage()); }); }); - SyncClient conn(tf.tla().listenerMainPort()); + SyncClient conn(tf.tla().listenerPort()); ASSERT_EQ(received.get().getStatus(), ErrorCodes::NetworkTimeout); } @@ -554,7 +553,7 @@ TEST(AsioTransportLayer, SourceSyncTimeoutSucceeds) { st.session()->setTimeout(Milliseconds{500}); st.schedule([&](auto& session) { received.set(session.sourceMessage()); }); }); - SyncClient conn(tf.tla().listenerMainPort()); + SyncClient conn(tf.tla().listenerPort()); ping(conn); // This time we send a message ASSERT_OK(received.get().getStatus()); } @@ -586,7 +585,7 @@ TEST(AsioTransportLayer, IngressPhysicalNetworkMetricsTest) { responsed.promise.setFrom(session.sinkMessage(resp)); }); }); - SyncClient conn(tf.tla().listenerMainPort()); + SyncClient conn(tf.tla().listenerPort()); auto stats = test::NetworkConnectionStats::get(NetworkCounter::ConnectionType::kIngress); auto ec = conn.write(req.buf(), req.size()); ASSERT_FALSE(ec) << errorMessage(ec); @@ -608,7 +607,7 @@ TEST(AsioTransportLayer, SwitchTimeoutModes) { tf.sessionManager().setOnStartSession( [&](test::SessionThread& st) { mockSessionCreated.set(&st); }); - SyncClient conn(tf.tla().listenerMainPort()); + SyncClient conn(tf.tla().listenerPort()); auto& st = *mockSessionCreated.get(); { @@ -746,7 +745,7 @@ void runTfoScenario(bool serverOn, bool clientOn, bool expectTfo) { test::BlockingQueue> received; auto connectOnce = [&] { - TfoClient conn(tf.tla().listenerMainPort()); + TfoClient conn(tf.tla().listenerPort()); ping(conn); ASSERT_OK(received.pop().getStatus()); }; @@ -1054,7 +1053,7 @@ TEST_F(AsioTransportLayerWithServiceContextTest, ShutdownDuringSSLHandshake) { * The goal is to simulate a server crash, and verify the behavior of the client, during the * handshake process. */ - int port = tla().listenerMainPort(); + int port = tla().listenerPort(); DBClientConnection conn; conn.setSoTimeout(1); // 1 second timeout @@ -1163,7 +1162,7 @@ public: configureSessionManager(*sessionManager); auto tl = makeTLA(std::move(sessionManager)); - const auto listenerPort = tl->listenerMainPort(); + const auto listenerPort = tl->listenerPort(); auto* svcCtx = getServiceContext(); svcCtx->getService()->setServiceEntryPoint( @@ -1719,7 +1718,7 @@ private: auto tla = checked_cast( _sc->getTransportLayerManager()->getDefaultEgressLayer()); - HostAndPort localTarget(testHostName(), tla->listenerMainPort()); + HostAndPort localTarget(testHostName(), tla->listenerPort()); return _sc->getTransportLayerManager() ->getDefaultEgressLayer() diff --git a/src/mongo/transport/grpc/grpc_session.h b/src/mongo/transport/grpc/grpc_session.h index 1f9f7bba691..9f1c8ca2f65 100644 --- a/src/mongo/transport/grpc/grpc_session.h +++ b/src/mongo/transport/grpc/grpc_session.h @@ -167,13 +167,6 @@ public: void setisLoadBalancerPeer(bool helloHasLoadBalancedOption) final; - /** - * The maintenance port is unavailable with grpc enabled. - */ - bool isConnectedToMaintenancePort() const final { - return false; - } - /** * All gRPC sessions are considered bound to the operation state. */ diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index a36547c2592..165fa233c94 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -89,10 +89,6 @@ public: return false; }; - bool isConnectedToMaintenancePort() const override { - return false; - } - void setisLoadBalancerPeer(bool helloHasLoadBalancedOption) override {} bool bindsToOperationState() const override { diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 32ae2abeba4..9fad28655d3 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -183,12 +183,6 @@ public: */ virtual void setisLoadBalancerPeer(bool helloHasLoadBalancedOption) = 0; - /** - * Returns true if the connection is on the maintenance port or corresponding unix socket. - */ - virtual bool isConnectedToMaintenancePort() const = 0; - - /** * Returns true if this session binds to the operation state, which implies open cursors and * in-progress transactions should be killed upon client disconnection. diff --git a/src/mongo/transport/transport_layer_manager_grpc_test.cpp b/src/mongo/transport/transport_layer_manager_grpc_test.cpp index 002e0b3d427..78bbed612f1 100644 --- a/src/mongo/transport/transport_layer_manager_grpc_test.cpp +++ b/src/mongo/transport/transport_layer_manager_grpc_test.cpp @@ -93,7 +93,7 @@ public: uassertStatusOK(getServiceContext()->getTransportLayerManager()->setup()); uassertStatusOK(getServiceContext()->getTransportLayerManager()->start()); - _asioListenAddress = HostAndPort("127.0.0.1", _asioTL->listenerMainPort()); + _asioListenAddress = HostAndPort("127.0.0.1", _asioTL->listenerPort()); _grpcEgressReactor = std::dynamic_pointer_cast( _grpcTL->getReactor(TransportLayer::WhichReactor::kEgress)); } @@ -423,7 +423,7 @@ private: #define ASSERT_ASIO_ECHO_SUCCEEDS(tl) \ { \ - auto swSession = tl.connect(HostAndPort("localhost", tl.listenerMainPort()), \ + auto swSession = tl.connect(HostAndPort("localhost", tl.listenerPort()), \ ConnectSSLMode::kEnableSSL, \ grpc::CommandServiceTestFixtures::kDefaultConnectTimeout, \ boost::none); \ diff --git a/src/mongo/transport/transport_layer_manager_impl.cpp b/src/mongo/transport/transport_layer_manager_impl.cpp index 64dcd88c0fb..9f1faac7be0 100644 --- a/src/mongo/transport/transport_layer_manager_impl.cpp +++ b/src/mongo/transport/transport_layer_manager_impl.cpp @@ -261,6 +261,7 @@ TransportLayerManagerImpl::makeDefaultEgressTransportLayer() { std::make_unique(opts, nullptr)); } +// TODO: SERVER-112674 Open and listen for connections on maintenancePort std::unique_ptr TransportLayerManagerImpl::createWithConfig( const ServerGlobalParams* config, ServiceContext* svcCtx, @@ -278,7 +279,6 @@ std::unique_ptr TransportLayerManagerImpl::createWithConf { AsioTransportLayer::Options opts(config); opts.loadBalancerPort = std::move(loadBalancerPort); - opts.maintenancePort = std::move(maintenancePort); auto sm = std::make_unique(svcCtx, observers); auto tl = std::make_unique(opts, std::move(sm)); diff --git a/src/mongo/util/net/BUILD.bazel b/src/mongo/util/net/BUILD.bazel index 97b0d3dd2bf..d3b4c22ff4b 100644 --- a/src/mongo/util/net/BUILD.bazel +++ b/src/mongo/util/net/BUILD.bazel @@ -283,17 +283,6 @@ mongo_cc_library( ], ) -mongo_cc_unit_test( - name = "socket_utils_test", - srcs = [ - "socket_utils_test.cpp", - ], - tags = ["mongo_unittest_eighth_group"], - deps = [ - "//src/mongo/unittest", - ], -) - mongo_cc_library( name = "ssl_util", srcs = [ diff --git a/src/mongo/util/net/socket_utils.cpp b/src/mongo/util/net/socket_utils.cpp index 423085a3ef4..ef3148fd5ab 100644 --- a/src/mongo/util/net/socket_utils.cpp +++ b/src/mongo/util/net/socket_utils.cpp @@ -247,29 +247,6 @@ std::string makeUnixSockPath(int port, StringData label) { return stream << port << ".sock"; } -int parsePortFromUnixSockPath(StringData path) { - constexpr StringData extension = ".sock"; - if (!path.ends_with(extension)) { - return -1; - } - path.remove_suffix(extension.size()); - - const auto lastHyphenIndex = path.rfind('-'); - if (lastHyphenIndex == StringData::npos) { - return -1; - } - path.remove_prefix(lastHyphenIndex + 1); - - int port; - const char* const begin = path.data(); - const char* const end = begin + path.size(); - const auto result = std::from_chars(begin, end, port); - if (result.ec != std::errc{} || result.ptr != end) { - return -1; - } - return port; -} - #ifndef _WIN32 void setUnixDomainSocketPermissions(const std::string& path, int permissions) { if (::chmod(path.c_str(), permissions) == -1) { diff --git a/src/mongo/util/net/socket_utils.h b/src/mongo/util/net/socket_utils.h index a5eaf7b0398..c8b03b57c5b 100644 --- a/src/mongo/util/net/socket_utils.h +++ b/src/mongo/util/net/socket_utils.h @@ -49,15 +49,6 @@ void setSocketKeepAliveParams(int sock, std::string makeUnixSockPath(int port, StringData label = ""); -/** - * Extracts the port number from the specified unix domain socket path name, under the assumption - * that the path was produced by a call to makeUnixSockPath, which takes a port number as an - * argument. - * Returns -1 if an error occurs. - * Note that this function assumes that the port passed to makeUnixSockPath was not negative. - */ -int parsePortFromUnixSockPath(StringData path); - inline bool isUnixDomainSocket(StringData hostname) { return hostname.find('/') != std::string::npos; } diff --git a/src/mongo/util/net/socket_utils_test.cpp b/src/mongo/util/net/socket_utils_test.cpp deleted file mode 100644 index 3ad25d676dd..00000000000 --- a/src/mongo/util/net/socket_utils_test.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright (C) 2025-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - - -#include "mongo/util/net/socket_utils.h" - -#include "mongo/unittest/unittest.h" - -namespace mongo { - -TEST(UnixSockPath, mustContainHyphen) { - const auto path = "/tmp/mongodb27018.sock"; - ASSERT_EQ(parsePortFromUnixSockPath(path), -1); -} - -TEST(UnixSockPath, mustContainPort) { - const auto path = "/tmp/mongodb-.sock"; - ASSERT_EQ(parsePortFromUnixSockPath(path), -1); -} - -TEST(UnixSockPath, mustHaveCorrectExtension) { - const auto path = "/tmp/mongodb-27018"; - ASSERT_EQ(parsePortFromUnixSockPath(path), -1); -} - -TEST(UnixSockPath, inverseOfMakeUnixSockPath) { - const int port = 27018; - const auto path = makeUnixSockPath(port); - ASSERT_EQ(parsePortFromUnixSockPath(path), port); -} - -} // namespace mongo -