SERVER-103828 Handle empty socket in peekASIOStream (#35382)

GitOrigin-RevId: f882ef816d531ecfbb593843e4c554fda90ca416
This commit is contained in:
Erin McNulty 2025-04-25 11:54:08 -04:00 committed by MongoDB Bot
parent 0f234c3106
commit 752f375499
7 changed files with 267 additions and 14 deletions

View File

@ -0,0 +1,111 @@
/**
* Helpers for testing the proxy protocol.
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {ProxyProtocolServer} from "jstests/sharding/libs/proxy_protocol.js";
export const connectAndHello = (port, isRouter) => {
jsTestLog(`Attempting to connect to port ${port}`);
const connStart = Date.now();
const conn = new Mongo(`mongodb://127.0.0.1:${port}${isRouter ? '/?loadBalanced=true' : ''}`);
assert.neq(null, conn, `Client was unable to connect to port ${port}`);
assert.lt(Date.now() - connStart, 10 * 1000, 'Client was unable to connect within 10 seconds');
assert.commandWorked(conn.getDB('admin').runCommand({hello: 1}));
};
export const timeoutEmptyConnection = (ingressPort, egressPort, isRouter) => {
// Use the connection to set a lower proxy header timeout and validate that empty connections
// timeout.
const conn =
new Mongo(`mongodb://127.0.0.1:${ingressPort}${isRouter ? '/?loadBalanced=true' : ''}`);
const proxyTimeoutFailPoint = configureFailPoint(conn, "asioTransportLayer1sProxyTimeout");
// runProgram blocks until the program is complete. nc should be finished when the server times
// out the connection that doesn't send data after 1 second, otherwise the test will hang.
assert.eq(0, runProgram("bash", "-c", `cat </dev/tcp/127.0.0.1/${egressPort}`));
proxyTimeoutFailPoint.off();
};
export const emptyMessageTest = (ingressPort, egressPort, node, isRouter) => {
jsTestLog("Connect to proxy port without sending data");
const pid = _startMongoProgram("bash", "-c", `exec cat < /dev/tcp/127.0.0.1/${egressPort}`);
// Connecting to the proxy port still succeeds within a reasonable time limit.
connectAndHello(ingressPort, isRouter);
// Connecting to the default port still succeeds within a reasonable time limit.
connectAndHello(node.port, isRouter);
// A connection with no data will timeout.
timeoutEmptyConnection(ingressPort, egressPort, isRouter);
assert(checkProgram(pid).alive);
stopMongoProgramByPid(pid);
};
export const fuzzingTest = (ingressPort, egressPort, node, isRouter) => {
const numConnections = 200;
for (let i = 0; i < numConnections; i++) {
jsTestLog("Sending random data to proxy port");
const pid = _startMongoProgram(
'bash',
'-c',
`head -c ${Math.floor(Math.random() * 5000)} /dev/urandom >/dev/tcp/127.0.0.1/${
egressPort}`);
// Connecting to the to the proxy port still succeeds within a reasonable time
// limit.
connectAndHello(ingressPort, isRouter);
// Connecting to the default port still succeeds within a reasonable time limit.
connectAndHello(node.port, isRouter);
assert.soon(() => !checkProgram(pid).alive,
"Server should have closed connection with invalid proxy protocol header");
}
};
export const loadTest = (ingressPort, egressPort, node, isRouter) => {
const numConnections = 200;
let threads = [];
for (let i = 0; i < numConnections; i++) {
threads.push(new Thread((regularPort, ingressPort, egressPort, connectFn, isRouter) => {
// Throw in some connections without data to make sure we handle those correctly.
const pid =
_startMongoProgram("bash", "-c", `exec cat < /dev/tcp/127.0.0.1/${egressPort}`);
// Connecting to the proxy port still succeeds within a reasonable time
// limit.
connectFn(ingressPort, isRouter);
// Connecting to the default port still succeeds within a reasonable time limit.
connectFn(regularPort, isRouter);
assert(checkProgram(pid).alive);
stopMongoProgramByPid(pid);
}, node.port, ingressPort, egressPort, connectAndHello, isRouter));
threads[i].start();
}
for (let i = 0; i < numConnections; i++) {
threads[i].join();
}
};
export const testProxyProtocolShardedCluster = (ingressPort, egressPort, version, testFn) => {
const proxy_server = new ProxyProtocolServer(ingressPort, egressPort, version);
proxy_server.start();
const st = new ShardingTest(
{shards: 1, mongos: 1, mongosOptions: {setParameter: {"loadBalancerPort": egressPort}}});
testFn(ingressPort, egressPort, st.s, true);
proxy_server.stop();
st.stop();
};

View File

@ -0,0 +1,31 @@
/**
* Verify mongos supports proxy protocol connections.
* @tags: [
* # TODO (SERVER-97257): Re-enable this test or add an explanation why it is incompatible.
* embedded_router_incompatible,
* grpc_incompatible,
* ]
*/
import {
emptyMessageTest,
fuzzingTest,
loadTest,
testProxyProtocolShardedCluster
} from "jstests/noPassthrough/libs/proxy_protocol_helpers.js";
if (_isWindows()) {
quit();
}
const ingressPort = allocatePort();
const egressPort = allocatePort();
testProxyProtocolShardedCluster(ingressPort, egressPort, 1, emptyMessageTest);
testProxyProtocolShardedCluster(ingressPort, egressPort, 2, emptyMessageTest);
testProxyProtocolShardedCluster(ingressPort, egressPort, 1, fuzzingTest);
testProxyProtocolShardedCluster(ingressPort, egressPort, 2, fuzzingTest);
testProxyProtocolShardedCluster(ingressPort, egressPort, 1, loadTest);
testProxyProtocolShardedCluster(ingressPort, egressPort, 2, loadTest);

View File

@ -53,6 +53,7 @@ MONGO_FAIL_POINT_DEFINE(asioTransportLayerSessionPauseBeforeSetSocketOption);
MONGO_FAIL_POINT_DEFINE(asioTransportLayerBlockBeforeOpportunisticRead);
MONGO_FAIL_POINT_DEFINE(asioTransportLayerBlockBeforeAddSession);
MONGO_FAIL_POINT_DEFINE(clientIsFromLoadBalancer);
MONGO_FAIL_POINT_DEFINE(asioTransportLayer1sProxyTimeout);
namespace {
@ -456,15 +457,27 @@ auto CommonAsioSession::getSocket() -> GenericSocket& {
ExecutorFuture<void> CommonAsioSession::parseProxyProtocolHeader(const ReactorHandle& reactor) {
invariant(_isIngressSession);
invariant(reactor);
const Backoff kExponentialBackoff(Milliseconds(2), Milliseconds::max());
const Seconds proxyHeaderTimeout =
MONGO_unlikely(asioTransportLayer1sProxyTimeout.shouldFail()) ? Seconds(1) : Seconds(120);
const Date_t deadline = reactor->now() + proxyHeaderTimeout;
auto buffer = std::make_shared<std::array<char, kProxyProtocolHeaderSizeUpperBound>>();
return AsyncTry([this, buffer] {
const auto bytesRead = peekASIOStream(
_socket, asio::buffer(buffer->data(), kProxyProtocolHeaderSizeUpperBound));
return transport::parseProxyProtocolHeader(StringData(buffer->data(), bytesRead));
})
.until([](StatusWith<boost::optional<ParserResults>> sw) {
.until([deadline, proxyHeaderTimeout, reactor](
StatusWith<boost::optional<ParserResults>> sw) {
uassert(10382800,
fmt::format("Did not receive proxy protocol header within the time limit: {}",
proxyHeaderTimeout.toString()),
reactor->now() < deadline);
return !sw.isOK() || sw.getValue();
})
.withBackoffBetweenIterations(kExponentialBackoff)
.on(reactor, CancellationToken::uncancelable())
.then([this, buffer](const boost::optional<ParserResults>& results) mutable {
invariant(results);

View File

@ -173,6 +173,15 @@ StatusWith<unsigned> pollASIOSocket(asio::generic::stream_protocol::socket& sock
*/
template <typename Stream, typename MutableBufferSequence>
size_t peekASIOStream(Stream& stream, const MutableBufferSequence& buffers) {
// Check that the socket has bytes available to read so that receive does not block.
asio::socket_base::bytes_readable command;
stream.io_control(command);
std::size_t bytes_readable = command.get();
if (bytes_readable == 0) {
return 0;
}
std::error_code ec;
size_t bytesRead;
do {

View File

@ -56,6 +56,15 @@ void writeToSocketAndPollForResponse(Stream& writeSocket, Stream& readSocket, St
}
}
template <typename Stream>
void peekEmptySocket(Stream& readSocket) {
const auto bufferSize = 10;
auto inBuffer = std::make_unique<char[]>(bufferSize);
const auto bytesRead =
peekASIOStream(readSocket, asio::mutable_buffer(inBuffer.get(), bufferSize));
ASSERT_EQ(bytesRead, 0);
}
template <typename Stream>
void peekAllSubstrings(Stream& writeSocket, Stream& readSocket, StringData data) {
writeToSocketAndPollForResponse(writeSocket, readSocket, data);
@ -85,31 +94,59 @@ void peekPastBuffer(Stream& writeSocket, Stream& readSocket, StringData data) {
}
#ifdef ASIO_HAS_LOCAL_SOCKETS
auto prepareUnixSocketPair(asio::io_context& io_context) {
auto prepareUnixSocketPair(asio::io_context& io_context, bool blocking) {
asio::local::stream_protocol::socket writeSocket(io_context);
asio::local::stream_protocol::socket readSocket(io_context);
asio::local::connect_pair(writeSocket, readSocket);
readSocket.non_blocking(true);
readSocket.non_blocking(blocking);
return std::pair(std::move(writeSocket), std::move(readSocket));
}
TEST(ASIOUtils, PeekAvailableBytes) {
TEST(ASIOUtils, PeekEmptySocketBlocking) {
asio::io_context io_context;
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context);
auto [_, readSocket] = prepareUnixSocketPair(io_context, false);
peekEmptySocket(readSocket);
}
TEST(ASIOUtils, PeekEmptySocketNonBlocking) {
asio::io_context io_context;
auto [_, readSocket] = prepareUnixSocketPair(io_context, true);
peekEmptySocket(readSocket);
}
TEST(ASIOUtils, PeekAvailableBytesBlocking) {
asio::io_context io_context;
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context, false);
peekAllSubstrings(writeSocket, readSocket, "example"_sd);
}
TEST(ASIOUtils, PeekPastAvailableBytes) {
TEST(ASIOUtils, PeekAvailableBytesNonBlocking) {
asio::io_context io_context;
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context);
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context, true);
peekAllSubstrings(writeSocket, readSocket, "example"_sd);
}
TEST(ASIOUtils, PeekPastAvailableBytesBlocking) {
asio::io_context io_context;
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context, false);
peekPastBuffer(writeSocket, readSocket, "example"_sd);
}
TEST(ASIOUtils, PeekPastAvailableBytesNonBlocking) {
asio::io_context io_context;
auto [writeSocket, readSocket] = prepareUnixSocketPair(io_context, true);
peekPastBuffer(writeSocket, readSocket, "example"_sd);
}
#endif // ASIO_HAS_LOCAL_SOCKETS
auto prepareTCPSocketPair(asio::io_context& io_context) {
auto prepareTCPSocketPair(asio::io_context& io_context, bool blocking) {
// Make a local loopback connection on an arbitrary ephemeral port.
asio::ip::tcp::endpoint ep(asio::ip::make_address("127.0.0.1"), 0);
asio::ip::tcp::acceptor acceptor(io_context, ep.protocol());
@ -127,21 +164,49 @@ auto prepareTCPSocketPair(asio::io_context& io_context) {
writeSocket.non_blocking(false);
// Set no_delay so that our output doesn't get buffered in a kernel buffer.
writeSocket.set_option(asio::ip::tcp::no_delay(true));
readSocket.non_blocking(true);
readSocket.non_blocking(blocking);
return std::pair(std::move(writeSocket), std::move(readSocket));
}
TEST(ASIOUtils, PeekAvailableBytesTCP) {
TEST(ASIOUtils, PeekEmptySocketTCPBlocking) {
asio::io_context io_context;
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context);
auto [_, readSocket] = prepareTCPSocketPair(io_context, false);
peekEmptySocket(readSocket);
}
TEST(ASIOUtils, PeekEmptySocketTCPNonBlocking) {
asio::io_context io_context;
auto [_, readSocket] = prepareTCPSocketPair(io_context, true);
peekEmptySocket(readSocket);
}
TEST(ASIOUtils, PeekAvailableBytesTCPBlocking) {
asio::io_context io_context;
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context, false);
peekAllSubstrings(writeSocket, readSocket, "example"_sd);
}
TEST(ASIOUtils, PeekPastAvailableBytesTCP) {
TEST(ASIOUtils, PeekAvailableBytesTCPNonBlocking) {
asio::io_context io_context;
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context);
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context, true);
peekAllSubstrings(writeSocket, readSocket, "example"_sd);
}
TEST(ASIOUtils, PeekPastAvailableBytesTCPBlocking) {
asio::io_context io_context;
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context, false);
peekPastBuffer(writeSocket, readSocket, "example"_sd);
}
TEST(ASIOUtils, PeekPastAvailableBytesTCPNonBlocking) {
asio::io_context io_context;
auto [writeSocket, readSocket] = prepareTCPSocketPair(io_context, true);
peekPastBuffer(writeSocket, readSocket, "example"_sd);
}

View File

@ -33,6 +33,7 @@
#include "mongo/base/status.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/future_util.h"
namespace mongo {
namespace transport {
@ -57,5 +58,23 @@ const Status TransportLayer::TicketSessionClosedStatus = Status(
ReactorTimer::ReactorTimer() : _id(reactorTimerIdCounter.addAndFetch(1)) {}
ExecutorFuture<void> Reactor::sleepFor(Milliseconds duration, const CancellationToken& token) {
auto when = now() + duration;
if (token.isCanceled()) {
return ExecutorFuture<void>(
shared_from_this(), Status(ErrorCodes::CallbackCanceled, "Cancelled reactor sleep"));
}
if (when <= now()) {
return ExecutorFuture<void>(shared_from_this());
}
std::unique_ptr<ReactorTimer> timer = makeTimer();
return future_util::withCancellation(timer->waitUntil(when), token)
.thenRunOn(shared_from_this())
.onCompletion([t = std::move(timer)](const Status& s) { return s; });
}
} // namespace transport
} // namespace mongo

View File

@ -216,7 +216,7 @@ private:
const size_t _id;
};
class Reactor : public OutOfLineExecutor {
class Reactor : public OutOfLineExecutor, public std::enable_shared_from_this<Reactor> {
public:
Reactor(const Reactor&) = delete;
Reactor& operator=(const Reactor&) = delete;
@ -241,6 +241,11 @@ public:
* executed in a thread calling run() or runFor().
*/
virtual std::unique_ptr<ReactorTimer> makeTimer() = 0;
// sleepFor is implemented so that the reactor is compatible with the AsyncTry exponential
// backoff API.
ExecutorFuture<void> sleepFor(Milliseconds duration, const CancellationToken& token);
virtual Date_t now() = 0;
virtual void appendStats(BSONObjBuilder& bob) const = 0;