mirror of https://github.com/mongodb/mongo
GitOrigin-RevId: 854814761ef474dbcc388cdc519a3374f489c814
This commit is contained in:
parent
2aacd501c8
commit
368b72db72
|
|
@ -4,15 +4,15 @@ Egress networking entails outbound communication (i.e. requests) from a client p
|
||||||
|
|
||||||
## Remote Commands
|
## Remote Commands
|
||||||
|
|
||||||
Remote commands represent the "packages" in which data is transmitted via egress networking. There are two types of remote commands: requests and responses. The [request object][remote_command_request_h] is in essence a wrapper for a command in BSON format, that is to be delivered to and executed by a remote MongoDB node against a database specified by a member in the object. The [response object][remote_command_response_h], in turn, contains data that describes the response to a previously sent request, also in BSON format. Besides the actual response data, the response object also stores useful information such as the duration of running the command specified in the corresponding request, as well as a `Status` member that indicates whether the operation was a success, and the cause of error if not.
|
A remote command represents an exchange of data between a client and a server. A remote command consists of two steps: a request, which the clients sends to the server, and a response, which the client receives from the server. These elements are represented by the [request][remote_command_request_h] and [response][remote_command_response_h] objects; each wraps the BSON that represents the on-wire transacted data and metadata that describes the context of the command, such as the host that the command targets. Each object also contains metadata that corresponds to its half of the command lifecycle. For example, the request object notes the timeout of the command and the operation's unique identifier, among other fields, and the response object notes the final disposition of the command's data exchange as a `Status` object (which takes no position on the success of the command's semantics at the remote) and the time that the command actually took to execute, among other fields. In the case of an exhaust command, there may be multiple responses for a single request.
|
||||||
|
|
||||||
There are two variants of both the request and response classes that are used in egress networking. The distinction between the `RemoteCommandRequest` and `RemoteCommandRequestOnAny` classes is that the former specifies a particular host/server to connect to, whereas the latter houses a vector of hosts, for when a command may be run on multiple nodes in a replica set. The distinction between `RemoteCommandResponse` and `RemoteCommandOnAnyResponse` is that the latter includes additional information as to what host the originating request was ultimately run on. It should be noted that the distinctions between the request and response classes are characteristically different; that is to say, whereas the _OnAny_ variant of the request object is a augmented version of the other, the response classes should be understood as being different return types altogether.
|
|
||||||
|
|
||||||
## Connection Pooling
|
## Connection Pooling
|
||||||
|
|
||||||
[Connection pooling][connection_pool] is largely taken care of by the [executor::connection_pool][connection_pool_h] class. This class houses a collection of `ConnectionPool::SpecificPool` objects, each of which shares a one-to-one mapping with a unique host. This lends itself to a parent-child relationship between a "parent" ConnectionPool and its constituent "children" SpecificPool members. The `ConnectionPool::ControllerInterface` subclass is used to direct the behavior of the SpecificPools that belong to it. The main operations associated with the ControllerInterface are the addition, removal, and updating of hosts (and thereby corresponding SpecificPools) to/from/in the parent pool. SpecificPools are created when a connection to a new host is requested, and expire when `hostTimeout` has passed without there having been any new requests or checked-out connections (i.e. connections in use). A pool can have its expiration status lifted whenever a connection is requested, but once a pool is shutdown, the pool becomes unusable. The `hostTimeout` field is one of many parameters belonging to the `ConnectionPool::Options` struct that determines how pools operate.
|
The [executor::ConnectionPool][connection_pool_h] class is responsible for pooling connections to any number of hosts. It contains zero or more `ConnectionPool::SpecificPool` objects, each of which pools connections for a unique host, and exactly one `ConnectionPool::ControllerInterface` object, which is responsible for the addition, removal, and updating of `SpecificPool`s to, from, and in its owning `ConnectionPool`. When a caller requests a connection to a host from the `ConnectionPool`, the `ConnectionPool` creates a new `SpecificPool` to pool connections for that host if one does not exist already, and then the `ConnectionPool` forwards the request to the `SpecificPool`. A `SpecificPool` expires when its `hostTimeout` has passed without any connection requests, after which time it becomes unusable; further requests for connections to that host will trigger the creation of a fresh `SpecificPool`.
|
||||||
|
|
||||||
The `ConnectionPool::ConnectionInterface` is responsible for handling the connections _within_ a pool. The ConnectionInterface's operations include, but are not limited to, connection setup (establishing a connection, authenticating, etc.), refreshing connections, and managing a timer. This interface also maintains the notion of a pool/connection **generation**, which is used to identify whether some particular connection's generation is older than that of the pool it belongs to (i.e. the connection is out-of-date), in which case it is dropped. The ConnectionPool uses a global mutex for access to SpecificPools as well as generation counters. Another component of the ConnectionPool is its `EgressConnectionCloserManager`. The manager consists of multiple `EgressConnectionClosers`, which are used to determine whether hosts should be dropped. In the context of the ConnectionPool, the manager's purpose is to drop _connections_ to hosts based on whether they have been marked as keep open or not.
|
The final result of a successful connection request made through `ConnectionPool::getConnection` is a `ConnectionPool::ConnectionInterface`, which represents a connection ready for use. Externally, the `ConnectionInterface` is primarily used by the caller to exchange data with its remote host. Callers return `ConnectionInterface`s to the pool by allowing them to destruct and callers must signal to the pool the final disposition of the connection beforehand through the `indicate*` family of methods. `ConnectionInterface`s also support setting timers to schedule future activities. Internally, the `ConnectionInterface` is used to prepare the connection for data exchange before transferring ownership to the caller and refreshing the health of a connection when the caller returns the connection to the pool. `ConnectionInterface` also maintains a notion of generation, which is implemented as a monotonically-incrementing counter. When a caller returns a `ConnectionInterface` to a `ConnectionPool` from a generation prior to the current generation of the corresponding `SpecificPool`, the connection is dropped. The current generation of a `SpecificPool` is incremented when the pool experiences certain failures (e.g., when to establish a new connection). `ConnectionPool` also drops a connection if the caller called `indicateFailure` on the connection before returning it. `ConnectionPool` uses a global mutex for access to `SpecificPool`s as well as generation counters.
|
||||||
|
|
||||||
|
`ConnectionPool` uses its single instance of `EgressConnectionCloserManager` to determine when hosts should be dropped. The manager consists of multiple `EgressConnectionClosers`, which are used to determine whether hosts should be dropped. In the context of the ConnectionPool, the manager's purpose is to drop _connections_ to hosts based on whether they have been marked as keep open or not.
|
||||||
|
|
||||||
## Internal Network Clients
|
## Internal Network Clients
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -94,8 +94,6 @@ using std::stringstream;
|
||||||
using std::unique_ptr;
|
using std::unique_ptr;
|
||||||
using std::vector;
|
using std::vector;
|
||||||
|
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
|
|
||||||
AtomicWord<long long> DBClientBase::ConnectionIdSequence;
|
AtomicWord<long long> DBClientBase::ConnectionIdSequence;
|
||||||
|
|
||||||
void (*DBClientBase::withConnection_do_not_use)(std::string host,
|
void (*DBClientBase::withConnection_do_not_use)(std::string host,
|
||||||
|
|
|
||||||
|
|
@ -58,8 +58,6 @@ namespace mongo {
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
|
|
||||||
using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
|
using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
|
||||||
const char* kCursorFieldName = "cursor";
|
const char* kCursorFieldName = "cursor";
|
||||||
const char* kCursorIdFieldName = "id";
|
const char* kCursorIdFieldName = "id";
|
||||||
|
|
|
||||||
|
|
@ -78,9 +78,6 @@
|
||||||
|
|
||||||
|
|
||||||
namespace mongo {
|
namespace mongo {
|
||||||
|
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
constexpr auto saslClientLogFieldName = "clientLogLevel"_sd;
|
constexpr auto saslClientLogFieldName = "clientLogLevel"_sd;
|
||||||
|
|
|
||||||
|
|
@ -94,8 +94,6 @@
|
||||||
namespace mongo {
|
namespace mongo {
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
|
|
||||||
using ResponseStatus = executor::TaskExecutor::ResponseStatus;
|
using ResponseStatus = executor::TaskExecutor::ResponseStatus;
|
||||||
|
|
||||||
const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
|
const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
|
||||||
|
|
|
||||||
|
|
@ -55,7 +55,6 @@
|
||||||
namespace mongo {
|
namespace mongo {
|
||||||
MONGO_FAIL_POINT_DEFINE(failClassicSearch);
|
MONGO_FAIL_POINT_DEFINE(failClassicSearch);
|
||||||
|
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
using executor::TaskExecutorCursor;
|
using executor::TaskExecutorCursor;
|
||||||
|
|
||||||
DocumentSourceInternalSearchMongotRemote::DocumentSourceInternalSearchMongotRemote(
|
DocumentSourceInternalSearchMongotRemote::DocumentSourceInternalSearchMongotRemote(
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,6 @@
|
||||||
namespace mongo {
|
namespace mongo {
|
||||||
|
|
||||||
using boost::intrusive_ptr;
|
using boost::intrusive_ptr;
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
|
|
||||||
REGISTER_DOCUMENT_SOURCE_WITH_FEATURE_FLAG(vectorSearch,
|
REGISTER_DOCUMENT_SOURCE_WITH_FEATURE_FLAG(vectorSearch,
|
||||||
LiteParsedSearchStage::parse,
|
LiteParsedSearchStage::parse,
|
||||||
|
|
|
||||||
|
|
@ -52,8 +52,6 @@
|
||||||
namespace mongo {
|
namespace mongo {
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
|
|
||||||
const DatabaseName kDbName = DatabaseName::createDatabaseName_forTest(boost::none, "TestDb");
|
const DatabaseName kDbName = DatabaseName::createDatabaseName_forTest(boost::none, "TestDb");
|
||||||
const auto kNamespace = NamespaceString::createNamespaceString_forTest(kDbName, "TestColl");
|
const auto kNamespace = NamespaceString::createNamespaceString_forTest(kDbName, "TestColl");
|
||||||
const int kSizeOnDisk = 1;
|
const int kSizeOnDisk = 1;
|
||||||
|
|
|
||||||
|
|
@ -87,7 +87,6 @@ namespace mongo {
|
||||||
|
|
||||||
using executor::NetworkInterfaceMock;
|
using executor::NetworkInterfaceMock;
|
||||||
using executor::NetworkTestEnv;
|
using executor::NetworkTestEnv;
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
|
|
||||||
using std::string;
|
using std::string;
|
||||||
using std::vector;
|
using std::vector;
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,6 @@
|
||||||
namespace mongo {
|
namespace mongo {
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
using std::string;
|
using std::string;
|
||||||
using std::vector;
|
using std::vector;
|
||||||
using unittest::assertGet;
|
using unittest::assertGet;
|
||||||
|
|
|
||||||
|
|
@ -76,7 +76,6 @@
|
||||||
namespace mongo {
|
namespace mongo {
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
using unittest::assertGet;
|
using unittest::assertGet;
|
||||||
|
|
||||||
class ShardCollectionTestBase : public ConfigServerTestFixture {
|
class ShardCollectionTestBase : public ConfigServerTestFixture {
|
||||||
|
|
|
||||||
|
|
@ -80,8 +80,6 @@
|
||||||
namespace mongo {
|
namespace mongo {
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
|
|
||||||
const NamespaceString kNs = NamespaceString::createNamespaceString_forTest("a.b");
|
const NamespaceString kNs = NamespaceString::createNamespaceString_forTest("a.b");
|
||||||
const NamespaceString kOtherNs = NamespaceString::createNamespaceString_forTest("a.b.c");
|
const NamespaceString kOtherNs = NamespaceString::createNamespaceString_forTest("a.b.c");
|
||||||
const KeyPattern kShardKey(BSON("x" << 1));
|
const KeyPattern kShardKey(BSON("x" << 1));
|
||||||
|
|
|
||||||
|
|
@ -69,7 +69,6 @@
|
||||||
namespace mongo {
|
namespace mongo {
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
using std::vector;
|
using std::vector;
|
||||||
using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks;
|
using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -89,7 +89,6 @@ namespace mongo {
|
||||||
|
|
||||||
using executor::NetworkInterfaceMock;
|
using executor::NetworkInterfaceMock;
|
||||||
using executor::NetworkTestEnv;
|
using executor::NetworkTestEnv;
|
||||||
using executor::RemoteCommandRequest;
|
|
||||||
using repl::ReplicationCoordinatorMock;
|
using repl::ReplicationCoordinatorMock;
|
||||||
using repl::ReplSettings;
|
using repl::ReplSettings;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -62,29 +62,35 @@ AtomicWord<unsigned long long> requestIdCounter(0);
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
constexpr Milliseconds RemoteCommandRequestBase::kNoTimeout;
|
constexpr Milliseconds RemoteCommandRequest::kNoTimeout;
|
||||||
|
|
||||||
RemoteCommandRequestBase::RemoteCommandRequestBase(RequestId requestId,
|
RemoteCommandRequest::RemoteCommandRequest()
|
||||||
const DatabaseName& theDbName,
|
: id(requestIdCounter.addAndFetch(1)), operationKey(UUID::gen()) {}
|
||||||
const BSONObj& theCmdObj,
|
|
||||||
const BSONObj& metadataObj,
|
RemoteCommandRequest::RemoteCommandRequest(RequestId requestId_,
|
||||||
OperationContext* opCtx,
|
const HostAndPort& target_,
|
||||||
Milliseconds timeoutMillis,
|
const DatabaseName& dbName_,
|
||||||
bool fireAndForget,
|
const BSONObj& cmdObj_,
|
||||||
boost::optional<UUID> opKey)
|
const BSONObj& metadataObj_,
|
||||||
: id(requestId),
|
OperationContext* opCtx_,
|
||||||
dbname(theDbName),
|
Milliseconds timeoutMillis_,
|
||||||
metadata(metadataObj),
|
bool fireAndForget_,
|
||||||
opCtx(opCtx),
|
boost::optional<UUID> opKey_)
|
||||||
fireAndForget(fireAndForget),
|
: id(requestId_),
|
||||||
operationKey(opKey),
|
target(target_),
|
||||||
timeout(timeoutMillis) {
|
dbname(dbName_),
|
||||||
|
cmdObj(cmdObj_),
|
||||||
|
metadata(metadataObj_),
|
||||||
|
opCtx(opCtx_),
|
||||||
|
timeout(timeoutMillis_),
|
||||||
|
fireAndForget(fireAndForget_),
|
||||||
|
operationKey(opKey_) {
|
||||||
|
|
||||||
// If there is a comment associated with the current operation, append it to the command that we
|
// If there is a comment associated with the current operation, append it to the command that we
|
||||||
// are about to dispatch to the shards.
|
// are about to dispatch to the shards.
|
||||||
cmdObj = opCtx && opCtx->getComment() && !theCmdObj["comment"]
|
if (opCtx && opCtx->getComment() && !cmdObj["comment"]) {
|
||||||
? theCmdObj.addField(*opCtx->getComment())
|
cmdObj = cmdObj.addField(*opCtx->getComment());
|
||||||
: cmdObj = theCmdObj;
|
}
|
||||||
|
|
||||||
if (cmdObj.hasField("maxTimeMSOpOnly")) {
|
if (cmdObj.hasField("maxTimeMSOpOnly")) {
|
||||||
int maxTimeField = cmdObj["maxTimeMSOpOnly"].Number();
|
int maxTimeField = cmdObj["maxTimeMSOpOnly"].Number();
|
||||||
|
|
@ -103,10 +109,25 @@ RemoteCommandRequestBase::RemoteCommandRequestBase(RequestId requestId,
|
||||||
_updateTimeoutFromOpCtxDeadline(opCtx);
|
_updateTimeoutFromOpCtxDeadline(opCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteCommandRequestBase::RemoteCommandRequestBase()
|
RemoteCommandRequest::RemoteCommandRequest(const HostAndPort& target_,
|
||||||
: id(requestIdCounter.addAndFetch(1)), operationKey(UUID::gen()) {}
|
const DatabaseName& dbName_,
|
||||||
|
const BSONObj& cmdObj_,
|
||||||
|
const BSONObj& metadataObj_,
|
||||||
|
OperationContext* opCtx_,
|
||||||
|
Milliseconds timeoutMillis_,
|
||||||
|
bool fireAndForget_,
|
||||||
|
boost::optional<UUID> operationKey_)
|
||||||
|
: RemoteCommandRequest(requestIdCounter.addAndFetch(1),
|
||||||
|
target_,
|
||||||
|
dbName_,
|
||||||
|
cmdObj_,
|
||||||
|
metadataObj_,
|
||||||
|
opCtx_,
|
||||||
|
timeoutMillis_,
|
||||||
|
fireAndForget_,
|
||||||
|
operationKey_) {}
|
||||||
|
|
||||||
RemoteCommandRequestBase::operator OpMsgRequest() const {
|
RemoteCommandRequest::operator OpMsgRequest() const {
|
||||||
const auto& tenantId = this->dbname.tenantId();
|
const auto& tenantId = this->dbname.tenantId();
|
||||||
const auto vts = tenantId
|
const auto vts = tenantId
|
||||||
? auth::ValidatedTenancyScopeFactory::create(
|
? auth::ValidatedTenancyScopeFactory::create(
|
||||||
|
|
@ -116,7 +137,7 @@ RemoteCommandRequestBase::operator OpMsgRequest() const {
|
||||||
return OpMsgRequestBuilder::create(vts, this->dbname, std::move(this->cmdObj), this->metadata);
|
return OpMsgRequestBuilder::create(vts, this->dbname, std::move(this->cmdObj), this->metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RemoteCommandRequestBase::_updateTimeoutFromOpCtxDeadline(const OperationContext* opCtx) {
|
void RemoteCommandRequest::_updateTimeoutFromOpCtxDeadline(const OperationContext* opCtx) {
|
||||||
if (!opCtx || !opCtx->hasDeadline()) {
|
if (!opCtx || !opCtx->hasDeadline()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -136,61 +157,10 @@ void RemoteCommandRequestBase::_updateTimeoutFromOpCtxDeadline(const OperationCo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T>
|
std::string RemoteCommandRequest::toString() const {
|
||||||
RemoteCommandRequestImpl<T>::RemoteCommandRequestImpl() = default;
|
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
RemoteCommandRequestImpl<T>::RemoteCommandRequestImpl(RequestId requestId,
|
|
||||||
const T& theTarget,
|
|
||||||
const DatabaseName& theDbName,
|
|
||||||
const BSONObj& theCmdObj,
|
|
||||||
const BSONObj& metadataObj,
|
|
||||||
OperationContext* opCtx,
|
|
||||||
Milliseconds timeoutMillis,
|
|
||||||
bool fireAndForget,
|
|
||||||
boost::optional<UUID> operationKey)
|
|
||||||
: RemoteCommandRequestBase(requestId,
|
|
||||||
theDbName,
|
|
||||||
theCmdObj,
|
|
||||||
metadataObj,
|
|
||||||
opCtx,
|
|
||||||
timeoutMillis,
|
|
||||||
fireAndForget,
|
|
||||||
operationKey),
|
|
||||||
target(theTarget) {
|
|
||||||
if constexpr (std::is_same_v<T, std::vector<HostAndPort>>) {
|
|
||||||
invariant(!theTarget.empty());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
RemoteCommandRequestImpl<T>::RemoteCommandRequestImpl(const T& theTarget,
|
|
||||||
const DatabaseName& theDbName,
|
|
||||||
const BSONObj& theCmdObj,
|
|
||||||
const BSONObj& metadataObj,
|
|
||||||
OperationContext* opCtx,
|
|
||||||
Milliseconds timeoutMillis,
|
|
||||||
bool fireAndForget,
|
|
||||||
boost::optional<UUID> operationKey)
|
|
||||||
: RemoteCommandRequestImpl(requestIdCounter.addAndFetch(1),
|
|
||||||
theTarget,
|
|
||||||
theDbName,
|
|
||||||
theCmdObj,
|
|
||||||
metadataObj,
|
|
||||||
opCtx,
|
|
||||||
timeoutMillis,
|
|
||||||
fireAndForget,
|
|
||||||
operationKey) {}
|
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
std::string RemoteCommandRequestImpl<T>::toString() const {
|
|
||||||
str::stream out;
|
str::stream out;
|
||||||
out << "RemoteCommand " << id << " -- target:";
|
out << "RemoteCommand " << id << " -- target:";
|
||||||
if constexpr (std::is_same_v<HostAndPort, T>) {
|
|
||||||
out << target.toString();
|
out << target.toString();
|
||||||
} else {
|
|
||||||
out << "[{}]"_format(fmt::join(target, ", "));
|
|
||||||
}
|
|
||||||
out << " db:" << toStringForLogging(dbname);
|
out << " db:" << toStringForLogging(dbname);
|
||||||
out << " fireAndForget:" << fireAndForget;
|
out << " fireAndForget:" << fireAndForget;
|
||||||
|
|
||||||
|
|
@ -206,8 +176,7 @@ std::string RemoteCommandRequestImpl<T>::toString() const {
|
||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T>
|
bool RemoteCommandRequest::operator==(const RemoteCommandRequest& rhs) const {
|
||||||
bool RemoteCommandRequestImpl<T>::operator==(const RemoteCommandRequestImpl& rhs) const {
|
|
||||||
if (this == &rhs) {
|
if (this == &rhs) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
@ -217,12 +186,8 @@ bool RemoteCommandRequestImpl<T>::operator==(const RemoteCommandRequestImpl& rhs
|
||||||
timeout == rhs.timeout;
|
timeout == rhs.timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T>
|
bool RemoteCommandRequest::operator!=(const RemoteCommandRequest& rhs) const {
|
||||||
bool RemoteCommandRequestImpl<T>::operator!=(const RemoteCommandRequestImpl& rhs) const {
|
|
||||||
return !(*this == rhs);
|
return !(*this == rhs);
|
||||||
}
|
}
|
||||||
|
|
||||||
template struct RemoteCommandRequestImpl<HostAndPort>;
|
|
||||||
template struct RemoteCommandRequestImpl<std::vector<HostAndPort>>;
|
|
||||||
} // namespace executor
|
} // namespace executor
|
||||||
} // namespace mongo
|
} // namespace mongo
|
||||||
|
|
|
||||||
|
|
@ -53,35 +53,86 @@
|
||||||
namespace mongo {
|
namespace mongo {
|
||||||
namespace executor {
|
namespace executor {
|
||||||
|
|
||||||
struct RemoteCommandRequestBase {
|
struct RemoteCommandRequest {
|
||||||
|
|
||||||
// Indicates that there is no timeout for the request to complete
|
// Indicates that there is no timeout for the request to complete
|
||||||
static constexpr Milliseconds kNoTimeout{-1};
|
static constexpr Milliseconds kNoTimeout{-1};
|
||||||
|
|
||||||
// Type to represent the internal id of this request
|
// Type to represent the internal id of this request
|
||||||
typedef uint64_t RequestId;
|
typedef uint64_t RequestId;
|
||||||
|
|
||||||
RemoteCommandRequestBase();
|
RemoteCommandRequest();
|
||||||
RemoteCommandRequestBase(RequestId requestId,
|
|
||||||
const DatabaseName& theDbName,
|
RemoteCommandRequest(RequestId requestId,
|
||||||
const BSONObj& theCmdObj,
|
const HostAndPort& hostAndPort,
|
||||||
|
const DatabaseName& dbName,
|
||||||
|
const BSONObj& cmdObj,
|
||||||
const BSONObj& metadataObj,
|
const BSONObj& metadataObj,
|
||||||
OperationContext* opCtx,
|
OperationContext* opCtx,
|
||||||
Milliseconds timeoutMillis,
|
Milliseconds timeoutMillis = kNoTimeout,
|
||||||
bool fireAndForget,
|
bool fireAndForget = false,
|
||||||
boost::optional<UUID> operationKey = boost::none);
|
boost::optional<UUID> operationKey = boost::none);
|
||||||
|
|
||||||
// Internal id of this request. Not interpreted and used for tracing purposes only.
|
RemoteCommandRequest(const HostAndPort& target,
|
||||||
RequestId id;
|
const DatabaseName& dbName,
|
||||||
|
const BSONObj& cmdObj,
|
||||||
|
const BSONObj& metadataObj,
|
||||||
|
OperationContext* opCtx,
|
||||||
|
Milliseconds timeoutMillis = kNoTimeout,
|
||||||
|
bool fireAndForget = false,
|
||||||
|
boost::optional<UUID> operationKey = boost::none);
|
||||||
|
|
||||||
DatabaseName dbname;
|
RemoteCommandRequest(const HostAndPort& target,
|
||||||
BSONObj metadata{rpc::makeEmptyMetadata()};
|
const DatabaseName& dbName,
|
||||||
BSONObj cmdObj;
|
const BSONObj& cmdObj,
|
||||||
|
const BSONObj& metadataObj,
|
||||||
|
OperationContext* opCtx,
|
||||||
|
bool fireAndForget,
|
||||||
|
boost::optional<UUID> operationKey = boost::none)
|
||||||
|
: RemoteCommandRequest(
|
||||||
|
target, dbName, cmdObj, metadataObj, opCtx, kNoTimeout, fireAndForget, operationKey) {
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
RemoteCommandRequest(const HostAndPort& target,
|
||||||
|
const DatabaseName& dbName,
|
||||||
|
const BSONObj& cmdObj,
|
||||||
|
OperationContext* opCtx,
|
||||||
|
Milliseconds timeoutMillis = kNoTimeout,
|
||||||
|
bool fireAndForget = false,
|
||||||
|
boost::optional<UUID> operationKey = boost::none)
|
||||||
|
: RemoteCommandRequest(target,
|
||||||
|
dbName,
|
||||||
|
cmdObj,
|
||||||
|
rpc::makeEmptyMetadata(),
|
||||||
|
opCtx,
|
||||||
|
timeoutMillis,
|
||||||
|
fireAndForget,
|
||||||
|
operationKey) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Conversion function that performs the RemoteCommandRequest conversion into OpMsgRequest
|
* Conversion function that performs the RemoteCommandRequest conversion into OpMsgRequest
|
||||||
*/
|
*/
|
||||||
explicit operator OpMsgRequest() const;
|
explicit operator OpMsgRequest() const;
|
||||||
|
|
||||||
|
std::string toString() const;
|
||||||
|
|
||||||
|
bool operator==(const RemoteCommandRequest& rhs) const;
|
||||||
|
bool operator!=(const RemoteCommandRequest& rhs) const;
|
||||||
|
|
||||||
|
friend std::ostream& operator<<(std::ostream& os, const RemoteCommandRequest& response) {
|
||||||
|
return (os << response.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Internal id of this request. Not interpreted and used for tracing purposes only.
|
||||||
|
RequestId id;
|
||||||
|
|
||||||
|
HostAndPort target;
|
||||||
|
|
||||||
|
DatabaseName dbname;
|
||||||
|
BSONObj cmdObj;
|
||||||
|
BSONObj metadata{rpc::makeEmptyMetadata()};
|
||||||
|
|
||||||
// OperationContext is added to each request to allow OP_Command metadata attachment access to
|
// OperationContext is added to each request to allow OP_Command metadata attachment access to
|
||||||
// the Client object. The OperationContext is only accessed on the thread that calls
|
// the Client object. The OperationContext is only accessed on the thread that calls
|
||||||
// NetworkInterface::startCommand. It is not safe to access from a thread that does not own the
|
// NetworkInterface::startCommand. It is not safe to access from a thread that does not own the
|
||||||
|
|
@ -91,6 +142,9 @@ struct RemoteCommandRequestBase {
|
||||||
// metadata attachment (i.e., replication).
|
// metadata attachment (i.e., replication).
|
||||||
OperationContext* opCtx{nullptr};
|
OperationContext* opCtx{nullptr};
|
||||||
|
|
||||||
|
Milliseconds timeout = kNoTimeout;
|
||||||
|
boost::optional<ErrorCodes::Error> timeoutCode;
|
||||||
|
|
||||||
bool fireAndForget = false;
|
bool fireAndForget = false;
|
||||||
|
|
||||||
boost::optional<UUID> operationKey;
|
boost::optional<UUID> operationKey;
|
||||||
|
|
@ -99,17 +153,11 @@ struct RemoteCommandRequestBase {
|
||||||
// but will still pass the timeout on as maxTimeMSOpOnly.
|
// but will still pass the timeout on as maxTimeMSOpOnly.
|
||||||
bool enforceLocalTimeout = true;
|
bool enforceLocalTimeout = true;
|
||||||
|
|
||||||
Milliseconds timeout = kNoTimeout;
|
|
||||||
boost::optional<ErrorCodes::Error> timeoutCode;
|
|
||||||
|
|
||||||
// Time when the request was scheduled.
|
// Time when the request was scheduled.
|
||||||
boost::optional<Date_t> dateScheduled;
|
boost::optional<Date_t> dateScheduled;
|
||||||
|
|
||||||
transport::ConnectSSLMode sslMode = transport::kGlobalSSLMode;
|
transport::ConnectSSLMode sslMode = transport::kGlobalSSLMode;
|
||||||
|
|
||||||
protected:
|
|
||||||
~RemoteCommandRequestBase() = default;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
* Sets 'timeout' to the min of the current 'timeout' value and the remaining time on the OpCtx.
|
* Sets 'timeout' to the min of the current 'timeout' value and the remaining time on the OpCtx.
|
||||||
|
|
@ -121,80 +169,5 @@ private:
|
||||||
void _updateTimeoutFromOpCtxDeadline(const OperationContext* opCtx);
|
void _updateTimeoutFromOpCtxDeadline(const OperationContext* opCtx);
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* Type of object describing a command to execute against a remote MongoDB node.
|
|
||||||
*/
|
|
||||||
template <typename Target>
|
|
||||||
struct RemoteCommandRequestImpl : RemoteCommandRequestBase {
|
|
||||||
RemoteCommandRequestImpl();
|
|
||||||
|
|
||||||
RemoteCommandRequestImpl(RequestId requestId,
|
|
||||||
const Target& theTarget,
|
|
||||||
const DatabaseName& theDbName,
|
|
||||||
const BSONObj& theCmdObj,
|
|
||||||
const BSONObj& metadataObj,
|
|
||||||
OperationContext* opCtx,
|
|
||||||
Milliseconds timeoutMillis = kNoTimeout,
|
|
||||||
bool fireAndForget = false,
|
|
||||||
boost::optional<UUID> operationKey = boost::none);
|
|
||||||
|
|
||||||
RemoteCommandRequestImpl(const Target& theTarget,
|
|
||||||
const DatabaseName& theDbName,
|
|
||||||
const BSONObj& theCmdObj,
|
|
||||||
const BSONObj& metadataObj,
|
|
||||||
OperationContext* opCtx,
|
|
||||||
Milliseconds timeoutMillis = kNoTimeout,
|
|
||||||
bool fireAndForget = false,
|
|
||||||
boost::optional<UUID> operationKey = boost::none);
|
|
||||||
|
|
||||||
RemoteCommandRequestImpl(const Target& theTarget,
|
|
||||||
const DatabaseName& theDbName,
|
|
||||||
const BSONObj& theCmdObj,
|
|
||||||
const BSONObj& metadataObj,
|
|
||||||
OperationContext* opCtx,
|
|
||||||
bool fireAndForget,
|
|
||||||
boost::optional<UUID> operationKey = boost::none)
|
|
||||||
: RemoteCommandRequestImpl(theTarget,
|
|
||||||
theDbName,
|
|
||||||
theCmdObj,
|
|
||||||
metadataObj,
|
|
||||||
opCtx,
|
|
||||||
kNoTimeout,
|
|
||||||
fireAndForget,
|
|
||||||
operationKey) {}
|
|
||||||
|
|
||||||
RemoteCommandRequestImpl(const Target& theTarget,
|
|
||||||
const DatabaseName& theDbName,
|
|
||||||
const BSONObj& theCmdObj,
|
|
||||||
OperationContext* opCtx,
|
|
||||||
Milliseconds timeoutMillis = kNoTimeout,
|
|
||||||
bool fireAndForget = false,
|
|
||||||
boost::optional<UUID> operationKey = boost::none)
|
|
||||||
: RemoteCommandRequestImpl(theTarget,
|
|
||||||
theDbName,
|
|
||||||
theCmdObj,
|
|
||||||
rpc::makeEmptyMetadata(),
|
|
||||||
opCtx,
|
|
||||||
timeoutMillis,
|
|
||||||
fireAndForget,
|
|
||||||
operationKey) {}
|
|
||||||
|
|
||||||
std::string toString() const;
|
|
||||||
|
|
||||||
bool operator==(const RemoteCommandRequestImpl& rhs) const;
|
|
||||||
bool operator!=(const RemoteCommandRequestImpl& rhs) const;
|
|
||||||
|
|
||||||
friend std::ostream& operator<<(std::ostream& os, const RemoteCommandRequestImpl& response) {
|
|
||||||
return (os << response.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
Target target;
|
|
||||||
};
|
|
||||||
|
|
||||||
extern template struct RemoteCommandRequestImpl<HostAndPort>;
|
|
||||||
extern template struct RemoteCommandRequestImpl<std::vector<HostAndPort>>;
|
|
||||||
|
|
||||||
using RemoteCommandRequest = RemoteCommandRequestImpl<HostAndPort>;
|
|
||||||
|
|
||||||
} // namespace executor
|
} // namespace executor
|
||||||
} // namespace mongo
|
} // namespace mongo
|
||||||
|
|
|
||||||
|
|
@ -969,7 +969,7 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock lk, OperationContext* opC
|
||||||
// the cursor was killed due to a maxTimeMs timeout, the remaining time will be 0, and
|
// the cursor was killed due to a maxTimeMs timeout, the remaining time will be 0, and
|
||||||
// the remote request will not be sent. To avoid this, we remove the timeout for the
|
// the remote request will not be sent. To avoid this, we remove the timeout for the
|
||||||
// remote 'killCursor' command.
|
// remote 'killCursor' command.
|
||||||
request.timeout = executor::RemoteCommandRequestBase::kNoTimeout;
|
request.timeout = executor::RemoteCommandRequest::kNoTimeout;
|
||||||
|
|
||||||
// Send kill request; discard callback handle, if any, or failure report, if not.
|
// Send kill request; discard callback handle, if any, or failure report, if not.
|
||||||
_executor
|
_executor
|
||||||
|
|
|
||||||
|
|
@ -1002,7 +1002,7 @@ TEST_F(AsyncResultsMergerTest, KillCursorCmdHasNoTimeout) {
|
||||||
auto* opCtx = operationContext();
|
auto* opCtx = operationContext();
|
||||||
opCtx->setDeadlineAfterNowBy(Microseconds::zero(), ErrorCodes::MaxTimeMSExpired);
|
opCtx->setDeadlineAfterNowBy(Microseconds::zero(), ErrorCodes::MaxTimeMSExpired);
|
||||||
auto killFuture = arm->kill(opCtx);
|
auto killFuture = arm->kill(opCtx);
|
||||||
ASSERT_EQ(executor::RemoteCommandRequestBase::kNoTimeout, getNthPendingRequest(0u).timeout);
|
ASSERT_EQ(executor::RemoteCommandRequest::kNoTimeout, getNthPendingRequest(0u).timeout);
|
||||||
killFuture.wait();
|
killFuture.wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -431,7 +431,7 @@ void CursorEstablisher::killOpOnShards(ServiceContext* srvCtx,
|
||||||
DatabaseName::kAdmin,
|
DatabaseName::kAdmin,
|
||||||
BSON("_killOperations" << 1 << "operationKeys" << opKeyArrayBuilder.arr()),
|
BSON("_killOperations" << 1 << "operationKeys" << opKeyArrayBuilder.arr()),
|
||||||
opCtx.get(),
|
opCtx.get(),
|
||||||
executor::RemoteCommandRequestBase::kNoTimeout,
|
executor::RemoteCommandRequest::kNoTimeout,
|
||||||
true /* fireAndForget */);
|
true /* fireAndForget */);
|
||||||
|
|
||||||
// We do not process the response to the killOperations request (we make a good-faith
|
// We do not process the response to the killOperations request (we make a good-faith
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue