mirror of https://github.com/mongodb/mongo
1289 lines
50 KiB
C++
1289 lines
50 KiB
C++
/**
|
|
* Copyright (C) 2018-present MongoDB, Inc.
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the Server Side Public License, version 1,
|
|
* as published by MongoDB, Inc.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* Server Side Public License for more details.
|
|
*
|
|
* You should have received a copy of the Server Side Public License
|
|
* along with this program. If not, see
|
|
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
|
*
|
|
* As a special exception, the copyright holders give permission to link the
|
|
* code of portions of this program with the OpenSSL library under certain
|
|
* conditions as described in each individual source file and distribute
|
|
* linked combinations including the program with the OpenSSL library. You
|
|
* must comply with the Server Side Public License in all respects for
|
|
* all of the code used other than as permitted herein. If you modify file(s)
|
|
* with this exception, you may extend this exception to your version of the
|
|
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
|
* delete this exception statement from your version. If you delete this
|
|
* exception statement from all source files in the program, then also delete
|
|
* it in the license file.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
// Keeping this first to ensure it compiles by itself
|
|
#include "mongo/util/future_impl.h"
|
|
|
|
#include <boost/intrusive_ptr.hpp>
|
|
#include <type_traits>
|
|
|
|
#include "mongo/base/status.h"
|
|
#include "mongo/base/status_with.h"
|
|
#include "mongo/stdx/type_traits.h"
|
|
#include "mongo/util/assert_util.h"
|
|
#include "mongo/util/debug_util.h"
|
|
#include "mongo/util/interruptible.h"
|
|
#include "mongo/util/intrusive_counter.h"
|
|
#include "mongo/util/out_of_line_executor.h"
|
|
|
|
namespace mongo {
|
|
|
|
/**
|
|
* SemiFuture<T> is logically a possibly-deferred StatusWith<T> (or Status when T is void).
|
|
*
|
|
* Unlike Future<T> it only supports blocking operations, not directly chained continuations. You
|
|
* are only allowed to chain continuations by passing an executor to thenRunOn(). This is intended
|
|
* to protect the promise-completer's execution context from needing to perform arbitrary operations
|
|
* requested by other subsystem's continuations.
|
|
*
|
|
* SemiFutures can't convert to or be assigned to Futures since that would allow adding
|
|
* continuations which would defeat the purpose of SemiFuture.
|
|
*
|
|
* A future may be passed between threads, but only one thread may use it at a time.
|
|
*
|
|
* TODO decide if destroying a Future before extracting the result should cancel work or should
|
|
* cancellation be explicit. For now avoid unnecessarily throwing away active Futures since the
|
|
* behavior may change. End all Future chains with either a blocking call to get()/getNoThrow() or a
|
|
* non-blocking call to getAsync().
|
|
*
|
|
* SemiFuture<void> is the same as the generic SemiFuture<T> with the following exceptions:
|
|
* - Anything mentioning StatusWith<T> will use Status instead.
|
|
* - Anything returning references to T will just return void since there are no void references.
|
|
* - Anything taking a T argument will receive no arguments.
|
|
*/
|
|
template <typename T>
|
|
class MONGO_WARN_UNUSED_RESULT_CLASS SemiFuture {
|
|
using Impl = future_details::FutureImpl<T>;
|
|
using T_unless_void = std::conditional_t<std::is_void_v<T>, future_details::FakeVoid, T>;
|
|
|
|
public:
|
|
static_assert(!std::is_same<T, Status>::value,
|
|
"Future<Status> is banned. Use Future<void> instead.");
|
|
static_assert(!isStatusWith<T>, "Future<StatusWith<T>> is banned. Just use Future<T> instead.");
|
|
static_assert(!future_details::isFutureLike<T>,
|
|
"Future of Future types is banned. Just use Future<T> instead.");
|
|
static_assert(!std::is_reference<T>::value, "Future<T&> is banned.");
|
|
static_assert(!std::is_const<T>::value, "Future<const T> is banned.");
|
|
static_assert(!std::is_array<T>::value, "Future<T[]> is banned.");
|
|
|
|
using value_type = T;
|
|
|
|
/**
|
|
* For non-void T: Constructs a SemiFuture in a moved-from state that can only be assigned to
|
|
* or destroyed.
|
|
*
|
|
* For void T: Constructs a ready Semifuture for parity with SemiFuture<T>(T)
|
|
*/
|
|
SemiFuture() = default;
|
|
|
|
SemiFuture& operator=(SemiFuture&&) = default;
|
|
SemiFuture(SemiFuture&&) = default;
|
|
|
|
SemiFuture(const SemiFuture&) = delete;
|
|
SemiFuture& operator=(const SemiFuture&) = delete;
|
|
|
|
/**
|
|
* For non-void T: This must be passed a not-OK Status.
|
|
*
|
|
* For void T: This behaves like the StatusWith constructor and accepts any Status.
|
|
*/
|
|
/* implicit */ SemiFuture(Status status) : SemiFuture(Impl::makeReady(std::move(status))) {}
|
|
|
|
// These should not be used with T=void.
|
|
/* implicit */ SemiFuture(T_unless_void val) : SemiFuture(Impl::makeReady(std::move(val))) {
|
|
static_assert(!std::is_void_v<T>);
|
|
}
|
|
/* implicit */ SemiFuture(StatusWith<T_unless_void> sw)
|
|
: SemiFuture(Impl::makeReady(std::move(sw))) {
|
|
static_assert(!std::is_void_v<T>);
|
|
}
|
|
|
|
/**
|
|
* Make a ready SemiFuture<T> from a value for cases where you don't need to wait
|
|
* asynchronously.
|
|
*
|
|
* Calling this is faster than getting a SemiFuture out of a Promise, and is effectively free.
|
|
* It is
|
|
* fast enough that you never need to avoid returning a SemiFuture from an API, even if the
|
|
* result
|
|
* is ready 99.99% of the time.
|
|
*
|
|
* As an example, if you are handing out results from a batch, you can use this when for each
|
|
* result while you have a batch, then use a Promise to return a not-ready SemiFuture when you
|
|
* need
|
|
* to get another batch.
|
|
*/
|
|
static SemiFuture<T> makeReady(T_unless_void val) {
|
|
return SemiFuture(Impl::makeReady(std::move(val)));
|
|
}
|
|
|
|
static SemiFuture<T> makeReady(Status status) {
|
|
return SemiFuture(Impl::makeReady(std::move(status)));
|
|
}
|
|
|
|
static SemiFuture<T> makeReady(StatusWith<T_unless_void> val) {
|
|
return SemiFuture(Impl::makeReady(std::move(val)));
|
|
}
|
|
|
|
REQUIRES_FOR_NON_TEMPLATE(std::is_void_v<T>)
|
|
static SemiFuture<void> makeReady() {
|
|
return SemiFuture(Impl::makeReady());
|
|
}
|
|
|
|
/**
|
|
* A no-op so that you can always do `return makesFutureOrSemiFuture().semi()` when you want to
|
|
* protect your execution context.
|
|
*/
|
|
SemiFuture<T> semi() && noexcept {
|
|
return std::move(*this);
|
|
}
|
|
|
|
/**
|
|
* Convert this SemiFuture to a SharedSemiFuture.
|
|
*/
|
|
SharedSemiFuture<T> share() && noexcept {
|
|
return std::move(_impl).share();
|
|
}
|
|
|
|
/**
|
|
* If this returns true, get() is guaranteed not to block and callbacks will be immediately
|
|
* invoked. You can't assume anything if this returns false since it may be completed
|
|
* immediately after checking (unless you have independent knowledge that this SemiFuture can't
|
|
* complete in the background).
|
|
*
|
|
* Callers must still call get() or similar, even on SemiFuture<void>, to ensure that they are
|
|
* correctly sequenced with the completing task, and to be informed about whether the Promise
|
|
* completed successfully.
|
|
*
|
|
* This is generally only useful as an optimization to avoid prep work, such as setting up
|
|
* timeouts, that is unnecessary if the SemiFuture is ready already.
|
|
*/
|
|
bool isReady() const {
|
|
return _impl.isReady();
|
|
}
|
|
|
|
/**
|
|
* Returns when the Semifuture isReady().
|
|
*
|
|
* Throws if the interruptible passed is interrupted (explicitly or via deadline).
|
|
*/
|
|
void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const {
|
|
return _impl.wait(interruptible);
|
|
}
|
|
|
|
/**
|
|
* Returns Status::OK() when the Semifuture isReady().
|
|
*
|
|
* Returns a non-okay status if the interruptible is interrupted.
|
|
*/
|
|
Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const
|
|
noexcept {
|
|
return _impl.waitNoThrow(interruptible);
|
|
}
|
|
|
|
/**
|
|
* Gets the value out of this SemiFuture, blocking until it is ready.
|
|
*
|
|
* get() methods throw on error, while getNoThrow() returns a !OK status.
|
|
*
|
|
* These methods can be called multiple times, except for the rvalue overloads.
|
|
*
|
|
* Note: It is impossible to differentiate interruptible interruption from an error propagating
|
|
* down the Semifuture chain with these methods. If you need to distinguish the two cases, call
|
|
* wait() first.
|
|
*/
|
|
T get(Interruptible* interruptible = Interruptible::notInterruptible()) && {
|
|
return std::move(_impl).get(interruptible);
|
|
}
|
|
|
|
future_details::AddRefUnlessVoid<T> get(
|
|
Interruptible* interruptible = Interruptible::notInterruptible()) & {
|
|
return _impl.get(interruptible);
|
|
}
|
|
future_details::AddRefUnlessVoid<const T> get(
|
|
Interruptible* interruptible = Interruptible::notInterruptible()) const& {
|
|
return _impl.get(interruptible);
|
|
}
|
|
StatusOrStatusWith<T> getNoThrow(
|
|
Interruptible* interruptible = Interruptible::notInterruptible()) &&
|
|
noexcept {
|
|
return std::move(_impl).getNoThrow(interruptible);
|
|
}
|
|
StatusOrStatusWith<T> getNoThrow(
|
|
Interruptible* interruptible = Interruptible::notInterruptible()) const& noexcept {
|
|
return _impl.getNoThrow(interruptible);
|
|
}
|
|
|
|
/**
|
|
* Ignores the return value of a future, transforming it down into a SemiFuture<void>.
|
|
*
|
|
* This only ignores values, not errors. Those remain propagated to an onError handler.
|
|
*/
|
|
SemiFuture<void> ignoreValue() && noexcept {
|
|
return SemiFuture<void>(std::move(this->_impl).ignoreValue());
|
|
}
|
|
|
|
/**
|
|
* Returns a future that allows you to add continuations that are guaranteed to run on the
|
|
* provided executor.
|
|
*
|
|
* Be sure to read the ExecutorFuture class comment.
|
|
*/
|
|
ExecutorFuture<T> thenRunOn(ExecutorPtr exec) && noexcept;
|
|
|
|
/**
|
|
* Returns an inline Future type from this SemiFuture.
|
|
*
|
|
* WARNING: Do not use this unless you're extremely sure of what you're doing, as callbacks
|
|
* chained to the resulting Future may run in unexpected places.
|
|
*/
|
|
Future<T> unsafeToInlineFuture() && noexcept;
|
|
|
|
private:
|
|
friend class Promise<T>;
|
|
friend class SharedPromise<T>;
|
|
template <typename>
|
|
friend class Future;
|
|
template <typename>
|
|
friend class ExecutorFuture;
|
|
template <typename>
|
|
friend class future_details::FutureImpl;
|
|
template <typename>
|
|
friend class SharedSemiFuture;
|
|
template <typename>
|
|
friend class SemiFuture;
|
|
|
|
|
|
explicit SemiFuture(future_details::SharedStateHolder<T_unless_void>&& impl)
|
|
: _impl(std::move(impl)) {}
|
|
|
|
explicit SemiFuture(Impl&& impl) : _impl(std::move(impl)) {}
|
|
operator Impl &&() && {
|
|
return std::move(_impl);
|
|
}
|
|
|
|
template <typename U>
|
|
void propagateResultTo(U&& arg) && {
|
|
std::move(_impl).propagateResultTo(std::forward<U>(arg));
|
|
}
|
|
|
|
Impl _impl;
|
|
};
|
|
|
|
// Deduction Guides
|
|
TEMPLATE(typename T)
|
|
REQUIRES(!isStatusOrStatusWith<T> && !future_details::isFutureLike<T>)
|
|
SemiFuture(T)->SemiFuture<T>;
|
|
template <typename T>
|
|
SemiFuture(StatusWith<T>)->SemiFuture<T>;
|
|
|
|
/**
|
|
* Future<T> is a SemiFuture<T> (which is logically a possibly deferred StatusOrStatusWith<T>),
|
|
* extended with the ability to chain additional continuations that will be invoked when the result
|
|
* is ready.
|
|
*
|
|
* All comments on SemiFuture<T> apply to Future<T> as well.
|
|
*/
|
|
template <typename T>
|
|
class MONGO_WARN_UNUSED_RESULT_CLASS Future : private SemiFuture<T> {
|
|
using Impl = typename SemiFuture<T>::Impl;
|
|
using T_unless_void = typename SemiFuture<T>::T_unless_void;
|
|
|
|
public:
|
|
/**
|
|
* Re-export the API of SemiFuture. The API of Future is a superset, except you can't convert
|
|
* from a SemiFuture to a Future.
|
|
*/
|
|
using value_type = T;
|
|
using SemiFuture<T>::SemiFuture; // Constructors.
|
|
using SemiFuture<T>::share;
|
|
using SemiFuture<T>::isReady;
|
|
using SemiFuture<T>::wait;
|
|
using SemiFuture<T>::waitNoThrow;
|
|
using SemiFuture<T>::get;
|
|
using SemiFuture<T>::getNoThrow;
|
|
using SemiFuture<T>::semi;
|
|
using SemiFuture<T>::thenRunOn;
|
|
|
|
/**
|
|
* Re-export makeReady, but return a Future<T>
|
|
*/
|
|
static Future<T> makeReady(T_unless_void val) {
|
|
return Future(Impl::makeReady(std::move(val)));
|
|
}
|
|
static Future<T> makeReady(Status status) {
|
|
return Future(Impl::makeReady(std::move(status)));
|
|
}
|
|
static Future<T> makeReady(StatusWith<T_unless_void> val) {
|
|
return Future(Impl::makeReady(std::move(val)));
|
|
}
|
|
REQUIRES_FOR_NON_TEMPLATE(std::is_void_v<T>)
|
|
static Future<void> makeReady() {
|
|
return Future(Impl::makeReady());
|
|
}
|
|
|
|
Future<void> ignoreValue() && noexcept {
|
|
return Future<void>(std::move(this->_impl).ignoreValue());
|
|
}
|
|
|
|
/**
|
|
* Attaches a `func` callback that will consume the result of this `Future` when it completes. A
|
|
* `getAsync` call can be the tail end of a continuation chain, and that is only position at
|
|
* which it can appear.
|
|
*
|
|
* The result argument passed to `func` is `Status` if `T` is `void`, otherwise `StatusWith<T>`.
|
|
* The `func(result)` return type must be `void`, and not a discarded return value.
|
|
*
|
|
* `func` must not throw an exception. It is invoked as if by a `noexcept` function, and it will
|
|
* `std::terminate` the process. This is because there is no appropriate context in which to
|
|
* handle such an asynchronous exception.
|
|
*
|
|
* TODO decide how to handle func throwing.
|
|
*/
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallableExactR<void, Func, StatusOrStatusWith<T>>)
|
|
void getAsync(Func&& func) && noexcept {
|
|
std::move(this->_impl).getAsync(std::forward<Func>(func));
|
|
}
|
|
|
|
//
|
|
// The remaining methods are all continuation-based and take a callback and return a Future-like
|
|
// type based on the return type of the callback, except for the "tap" methods that always
|
|
// return Future<T>. When passed a callback that returns a FutureLike<U> type, the return type
|
|
// of the method will be either Future<U> if FutureLike is Future, otherwise SemiFuture<U>. The
|
|
// result of the callback will be automatically unwrapped and connected to the returned
|
|
// FutureLike<U> rather than producing a Future<FutureLike<U>>. When the callback returns a
|
|
// non-FutureLike type U, the return type of the method will be Future<U>, with the adjustment
|
|
// for Status/StatusWith described below.
|
|
//
|
|
// Each method has a comment indicating the supported signatures for that callback, and a
|
|
// description of when the callback is invoked and how the impacts the returned Future. It may
|
|
// be helpful to think of Future continuation chains as a pipeline of stages that take input
|
|
// from earlier stages and produce output for later stages.
|
|
//
|
|
// Be aware that the callback may be invoked inline at the call-site or at the producer when
|
|
// setting the value. Therefore, you should avoid doing blocking work inside of a callback.
|
|
// Additionally, avoid acquiring any locks or mutexes that the caller already holds, otherwise
|
|
// you risk a deadlock. If either of these concerns apply to your callback, it should schedule
|
|
// itself on an executor, rather than doing work in the callback.
|
|
// TODO make this easier to do by having executor APIs return Futures.
|
|
//
|
|
// Error handling in callbacks: all exceptions thrown propagate to the returned Future
|
|
// automatically. Callbacks that return Status or StatusWith<T> behave as-if they were wrapped
|
|
// in something that called uassertStatusOK() on the return value. There is no way to
|
|
// distinguish between a function throwing or returning a !OK status.
|
|
//
|
|
|
|
/**
|
|
* Callbacks passed to then() are only called if the input Future completes successfully.
|
|
* Otherwise the error propagates automatically, bypassing the callback.
|
|
*
|
|
* The callback takes a T and can return anything (see above for how Statusy and Futurey returns
|
|
* are handled.)
|
|
*/
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallable<Func, T>)
|
|
/*see above*/ auto then(Func&& func) && noexcept {
|
|
return wrap<Func, T>(std::move(this->_impl).then(std::forward<Func>(func)));
|
|
}
|
|
|
|
/**
|
|
* Callbacks passed to onCompletion() are called if the input Future completes with or without
|
|
* an error.
|
|
*
|
|
* The callback takes a StatusOrStatusWith<T> and can return anything (see above for how Statusy
|
|
* and Futurey returns are handled.)
|
|
*/
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallable<Func, StatusOrStatusWith<T>>)
|
|
/*see above*/ auto onCompletion(Func&& func) && noexcept {
|
|
return wrap<Func, Status>(std::move(this->_impl).onCompletion(std::forward<Func>(func)));
|
|
}
|
|
|
|
/**
|
|
* Callbacks passed to onError() are only called if the input Future completes with an error.
|
|
* Otherwise, the successful result propagates automatically, bypassing the callback.
|
|
*
|
|
* The callback can either produce a replacement value (which must be a T), return a replacement
|
|
* Future<T> (such as by retrying), or return/throw a replacement error.
|
|
*
|
|
* Note that this will only catch errors produced by earlier stages; it is not registering a
|
|
* general error handler for the entire chain.
|
|
*
|
|
* The callback takes a non-OK Status and returns a possibly-wrapped T (see above for how
|
|
* Statusy and Futurey returns are handled.)
|
|
*/
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallableR<T, Func, Status>)
|
|
/*see above*/ auto onError(Func&& func) && noexcept {
|
|
return wrap<Func, Status>(std::move(this->_impl).onError(std::forward<Func>(func)));
|
|
}
|
|
|
|
/**
|
|
* Same as the other two onErrors but only calls the callback if the code matches the template
|
|
* parameter. Otherwise lets the error propagate unchanged.
|
|
*
|
|
* The callback takes a non-OK Status and returns a possibly-wrapped T (see above for how
|
|
* Statusy and Futurey returns are handled.)
|
|
*/
|
|
TEMPLATE(ErrorCodes::Error code, typename Func)
|
|
REQUIRES(future_details::isCallableR<T, Func, Status>)
|
|
/*see above*/ auto onError(Func&& func) && noexcept {
|
|
return wrap<Func, Status>(
|
|
std::move(this->_impl).template onError<code>(std::forward<Func>(func)));
|
|
}
|
|
|
|
/**
|
|
* Similar to the first two onErrors, but only calls the callback if the category matches
|
|
* the template parameter. Otherwise lets the error propagate unchanged.
|
|
*
|
|
* The callback takes a non-OK Status and returns a possibly-wrapped T (see above for how
|
|
* Statusy and Futurey returns are handled.)
|
|
*/
|
|
TEMPLATE(ErrorCategory category, typename Func)
|
|
REQUIRES(future_details::isCallableR<T, Func, Status>)
|
|
/*see above*/ auto onErrorCategory(Func&& func) && noexcept {
|
|
return wrap<Func, Status>(
|
|
std::move(this->_impl).template onErrorCategory<category>(std::forward<Func>(func)));
|
|
}
|
|
|
|
//
|
|
// The tap/tapError/tapAll family of functions take callbacks to observe the flow through a
|
|
// future chain without affecting the propagating result, except possibly if they throw. If the
|
|
// naming seems odd, you can think of it like a "wire tap" in that it allows you to observe a
|
|
// conversation between two parties (the promise-producer and future-consumer) without adding
|
|
// messages of your own. This is why all callbacks are required to return void.
|
|
//
|
|
// TODO decide what to do if callback throws:
|
|
// - transition the future chain to failure
|
|
// - ignore
|
|
// - fatal (current impl)
|
|
//
|
|
|
|
/**
|
|
* Callback is called if the input completes successfully.
|
|
*
|
|
* This can be used to inform some outside system of the result.
|
|
*
|
|
* The callback takes a const T& and must return void.
|
|
*/
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallableExactR<void, Func, const T>)
|
|
Future<T> tap(Func&& func) && noexcept {
|
|
return Future<T>(std::move(this->_impl).tap(std::forward<Func>(func)));
|
|
}
|
|
|
|
/**
|
|
* Callback is called if the input completes with an error.
|
|
*
|
|
* This can be used to log.
|
|
*
|
|
* The callback takes a non-OK Status and must return void.
|
|
*/
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallableExactR<void, Func, const Status>)
|
|
Future<T> tapError(Func&& func) && noexcept {
|
|
return Future<T>(std::move(this->_impl).tapError(std::forward<Func>(func)));
|
|
}
|
|
|
|
/**
|
|
* Callback is called when the input completes, regardless of success or failure.
|
|
*
|
|
* This can be used for cleanup. Some other libraries name the equivalent method finally to
|
|
* match the common semantic from other languages.
|
|
*
|
|
* The callback takes a StatusOrStatusWith<T> and must return void.
|
|
*/
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallableExactR<void, Func, const StatusOrStatusWith<T>>)
|
|
Future<T> tapAll(Func&& func) && noexcept {
|
|
return Future<T>(std::move(this->_impl).tapAll(std::forward<Func>(func)));
|
|
}
|
|
|
|
// Calling this on a Future is never strictly necessary, since this is already an inline
|
|
// Future, but making this public helps for writing generic utilities which need to use
|
|
// unsafeToInlineFuture for some future types but not others.
|
|
using SemiFuture<T>::unsafeToInlineFuture;
|
|
|
|
private:
|
|
template <typename>
|
|
friend class ExecutorFuture;
|
|
template <typename>
|
|
friend class Future;
|
|
template <typename>
|
|
friend class future_details::FutureImpl;
|
|
friend class Promise<T>;
|
|
friend class SharedPromise<T>;
|
|
|
|
|
|
template <typename Func, typename Arg, typename U>
|
|
static auto wrap(future_details::FutureImpl<U>&& impl) {
|
|
using namespace future_details;
|
|
return FutureContinuationKind<NormalizedCallResult<Func, Arg>>(std::move(impl));
|
|
}
|
|
};
|
|
|
|
// Deduction Guides
|
|
TEMPLATE(typename T)
|
|
REQUIRES(!isStatusOrStatusWith<T> && !future_details::isFutureLike<T>)
|
|
Future(T)->Future<T>;
|
|
template <typename T>
|
|
Future(StatusWith<T>)->Future<T>;
|
|
|
|
/**
|
|
* An ExecutorFuture is like a Future that ensures that all callbacks are run on a supplied
|
|
* executor.
|
|
*
|
|
* IMPORTANT: Executors are allowed to refuse work by invoking their task callbacks with a non-OK
|
|
* Status. In that event, callbacks passed to continuation functions WILL NOT RUN. Instead, the
|
|
* error status will propagate down the future chain until it would run a callback on an executor
|
|
* that doesn't refuse the work, or it is extracted by calling a blocking get() method. Destructors
|
|
* for these callbacks can run in any context, so be suspicious of callbacks that capture Promises
|
|
* because they will propagate out BrokenPromise if the executor refuses work.
|
|
*/
|
|
template <typename T>
|
|
class MONGO_WARN_UNUSED_RESULT_CLASS ExecutorFuture : private SemiFuture<T> {
|
|
using Impl = typename SemiFuture<T>::Impl;
|
|
using T_unless_void = typename SemiFuture<T>::T_unless_void;
|
|
|
|
public:
|
|
/**
|
|
* Default construction is disallowed to ensure that every ExecutorFuture has an associated
|
|
* Executor (unless it has been moved-from).
|
|
*/
|
|
ExecutorFuture() = delete;
|
|
|
|
ExecutorFuture(ExecutorPtr exec, Status status)
|
|
: SemiFuture<T>(std::move(status)), _exec(std::move(exec)) {}
|
|
|
|
// These should not be used with T=void.
|
|
ExecutorFuture(ExecutorPtr exec, T_unless_void val)
|
|
: SemiFuture<T>(std::move(val)), _exec(std::move(exec)) {
|
|
static_assert(!std::is_void_v<T>);
|
|
}
|
|
ExecutorFuture(ExecutorPtr exec, StatusWith<T_unless_void> sw)
|
|
: SemiFuture<T>(std::move(sw)), _exec(std::move(exec)) {
|
|
static_assert(!std::is_void_v<T>);
|
|
}
|
|
|
|
REQUIRES_FOR_NON_TEMPLATE(std::is_void_v<T>)
|
|
explicit ExecutorFuture(ExecutorPtr exec) : SemiFuture<void>(), _exec(std::move(exec)) {}
|
|
|
|
/**
|
|
* Re-export the accessor API of SemiFuture. The access API of ExecutorFuture is a superset, but
|
|
* you can't create an ExecutorFuture without supplying an executor.
|
|
*/
|
|
using value_type = T;
|
|
using SemiFuture<T>::share;
|
|
using SemiFuture<T>::isReady;
|
|
using SemiFuture<T>::wait;
|
|
using SemiFuture<T>::waitNoThrow;
|
|
using SemiFuture<T>::get;
|
|
using SemiFuture<T>::getNoThrow;
|
|
using SemiFuture<T>::semi;
|
|
using SemiFuture<T>::thenRunOn;
|
|
|
|
ExecutorFuture<void> ignoreValue() && noexcept {
|
|
return ExecutorFuture<void>(std::move(_exec), std::move(this->_impl).ignoreValue());
|
|
}
|
|
|
|
//
|
|
// Provide the callback-taking API from Future (except for the taps). All callbacks will be run
|
|
// on the executor associated with this ExecutorFuture. See class comment for how we handle
|
|
// executors that refuse work.
|
|
//
|
|
// All methods that return non-void will return an ExecutorFuture bound to the same executor as
|
|
// this.
|
|
//
|
|
// There is no tap support because we can't easily be both non-intrusive in the value flow and
|
|
// schedule on an executor that is allowed to fail. In particular, the inability to copy
|
|
// move-only values means that we would need to refer directly into the internal SharedState
|
|
// objects and keep them alive longer that we otherwise would. If there is a real need for this,
|
|
// it should be doable, but will be fairly complicated.
|
|
//
|
|
|
|
/**
|
|
* Attach a completion callback to asynchronously consume this `ExecutorFuture`'s result.
|
|
* \see `Future<T>::getAsync()`.
|
|
*/
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallableExactR<void, Func, StatusOrStatusWith<T>>)
|
|
void getAsync(Func&& func) && noexcept {
|
|
static_assert(std::is_void_v<decltype(func(std::declval<StatusOrStatusWith<T>>()))>,
|
|
"func passed to getAsync must return void");
|
|
|
|
// Can't use wrapCB since we don't want to return a future, just schedule a non-chainable
|
|
// callback.
|
|
std::move(this->_impl).getAsync([
|
|
exec = std::move(_exec), // Unlike wrapCB this can move because we won't need it later.
|
|
func = std::forward<Func>(func)
|
|
](StatusOrStatusWith<T> arg) mutable noexcept {
|
|
exec->schedule([ func = std::move(func),
|
|
arg = std::move(arg) ](Status execStatus) mutable noexcept {
|
|
if (execStatus.isOK())
|
|
func(std::move(arg));
|
|
});
|
|
});
|
|
}
|
|
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallable<Func, T>)
|
|
auto then(Func&& func) && noexcept {
|
|
return mongo::ExecutorFuture(
|
|
std::move(_exec), std::move(this->_impl).then(wrapCB<T>(std::forward<Func>(func))));
|
|
}
|
|
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallable<Func, StatusOrStatusWith<T>>)
|
|
auto onCompletion(Func&& func) && noexcept {
|
|
return mongo::ExecutorFuture(
|
|
std::move(_exec),
|
|
std::move(this->_impl)
|
|
.onCompletion(wrapCB<StatusOrStatusWith<T>>(std::forward<Func>(func))));
|
|
}
|
|
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallableR<T, Func, Status>)
|
|
ExecutorFuture<T> onError(Func&& func) && noexcept {
|
|
return mongo::ExecutorFuture(
|
|
std::move(_exec),
|
|
std::move(this->_impl).onError(wrapCB<Status>(std::forward<Func>(func))));
|
|
}
|
|
|
|
TEMPLATE(ErrorCodes::Error code, typename Func)
|
|
REQUIRES(future_details::isCallableR<T, Func, Status>)
|
|
ExecutorFuture<T> onError(Func&& func) && noexcept {
|
|
return mongo::ExecutorFuture(
|
|
std::move(_exec),
|
|
std::move(this->_impl)
|
|
.template onError<code>(wrapCB<Status>(std::forward<Func>(func))));
|
|
}
|
|
|
|
TEMPLATE(ErrorCategory category, typename Func)
|
|
REQUIRES(future_details::isCallableR<T, Func, Status>)
|
|
ExecutorFuture<T> onErrorCategory(Func&& func) && noexcept {
|
|
return mongo::ExecutorFuture(
|
|
std::move(_exec),
|
|
std::move(this->_impl)
|
|
.template onErrorCategory<category>(wrapCB<Status>(std::forward<Func>(func))));
|
|
}
|
|
|
|
/**
|
|
* Returns an inline Future type from this ExecutorFuture.
|
|
*
|
|
* WARNING: Do not use this unless you're extremely sure of what you're doing, as callbacks
|
|
* chained to the resulting Future may run in unexpected places.
|
|
*/
|
|
using SemiFuture<T>::unsafeToInlineFuture;
|
|
|
|
private:
|
|
// This *must* take exec by ref to ensure it isn't moved from while evaluating wrapCB above.
|
|
ExecutorFuture(ExecutorPtr&& exec, Impl&& impl) : SemiFuture<T>(std::move(impl)), _exec(exec) {
|
|
dassert(_exec);
|
|
}
|
|
|
|
/**
|
|
* Wraps func in a callback that takes the argument it would and returns an appropriately typed
|
|
* Future<U>, then schedules a task on _exec to complete the associated promise with the result
|
|
* of calling func with that argument.
|
|
*/
|
|
template <typename RawArg, typename Func>
|
|
auto wrapCB(Func&& func) {
|
|
// Have to take care to never put void in argument position, since that is a hard error.
|
|
using Result = typename std::conditional_t<std::is_void_v<RawArg>,
|
|
std::invoke_result<Func>,
|
|
std::invoke_result<Func, RawArg>>::type;
|
|
using DummyArg = std::conditional_t<std::is_void_v<RawArg>, //
|
|
future_details::FakeVoid,
|
|
RawArg>;
|
|
using Sig = std::conditional_t<std::is_void_v<RawArg>, //
|
|
Result(),
|
|
Result(DummyArg)>;
|
|
return _wrapCBHelper(_exec, unique_function<Sig>(std::forward<Func>(func)));
|
|
}
|
|
|
|
template <typename UniqueFunc>
|
|
MONGO_COMPILER_NOINLINE static auto _wrapCBHelper(ExecutorPtr exec, UniqueFunc&& func);
|
|
|
|
template <typename>
|
|
friend class ExecutorFuture;
|
|
template <typename>
|
|
friend class SemiFuture;
|
|
template <typename>
|
|
friend class SharedSemiFuture;
|
|
template <typename>
|
|
friend class future_details::FutureImpl;
|
|
|
|
ExecutorPtr _exec;
|
|
};
|
|
|
|
// Deduction Guides
|
|
TEMPLATE(typename T)
|
|
REQUIRES(!isStatusOrStatusWith<T> && !future_details::isFutureLike<T>)
|
|
ExecutorFuture(ExecutorPtr, T)->ExecutorFuture<T>;
|
|
template <typename T>
|
|
ExecutorFuture(ExecutorPtr, future_details::FutureImpl<T>)->ExecutorFuture<T>;
|
|
template <typename T>
|
|
ExecutorFuture(ExecutorPtr, StatusWith<T>)->ExecutorFuture<T>;
|
|
ExecutorFuture(ExecutorPtr)->ExecutorFuture<void>;
|
|
|
|
/** Constructor tag (see the corresponding `Promise` constructor). */
|
|
struct NonNullPromiseTag {};
|
|
|
|
/**
|
|
* This class represents the producer side of a Future.
|
|
*
|
|
* This is a single-shot class: you may either set a value or error at most once. If no value or
|
|
* error has been set at the time this Promise is destroyed, a error will be set with
|
|
* ErrorCode::BrokenPromise. This should generally be considered a programmer error, and should not
|
|
* be relied upon. We may make it debug-fatal in the future.
|
|
*
|
|
* Only one thread can use a given Promise at a time, but another thread may be using the associated
|
|
* Future object.
|
|
*
|
|
* If the result is ready when producing the Future, it is more efficient to use
|
|
* makeReadyFutureWith() or Future<T>::makeReady() than to use a Promise<T>.
|
|
*
|
|
* A default constructed `Promise` is in a null state. Null `Promises` can only be assigned over
|
|
* and destroyed. It is a programmer error to call any methods on a null `Promise`. Any methods
|
|
* that complete a `Promise` leave it in the null state.
|
|
*/
|
|
template <typename T>
|
|
class Promise {
|
|
using SharedStateT = future_details::SharedState<T>;
|
|
using T_unless_void = typename SemiFuture<T>::T_unless_void;
|
|
|
|
public:
|
|
using value_type = T;
|
|
|
|
/**
|
|
* Creates a null `Promise`.
|
|
*/
|
|
Promise() = default;
|
|
|
|
/** Makes a `Promise` that has a `Future`. */
|
|
explicit Promise(NonNullPromiseTag) : Promise{make_intrusive<SharedStateT>()} {}
|
|
|
|
~Promise() {
|
|
breakPromiseIfNeeded();
|
|
}
|
|
|
|
Promise(const Promise&) = delete;
|
|
Promise& operator=(const Promise&) = delete;
|
|
|
|
|
|
/**
|
|
* Breaks this `Promise`, if not fulfilled and not in a null state.
|
|
*/
|
|
Promise& operator=(Promise&& p) noexcept {
|
|
breakPromiseIfNeeded();
|
|
_sharedState = std::move(p._sharedState);
|
|
return *this;
|
|
}
|
|
|
|
Promise(Promise&&) = default;
|
|
|
|
/**
|
|
* Sets a value or error into this Promise by calling func, which must take no arguments and
|
|
* return one of T, StatusWith<T> (or Status when T is void), or Future<T>. All errors, whether
|
|
* returned or thrown, will be correctly propagated.
|
|
*
|
|
* If the function returns a Future<T>, this Promise's Future will complete when the returned
|
|
* Future<T> completes, as-if it was passed to Promise::setFrom().
|
|
*
|
|
* If any work is needed to produce the result, prefer doing something like:
|
|
* promise.setWith([&]{ return makeResult(); });
|
|
* over code like:
|
|
* promise.emplaceValue(makeResult());
|
|
* because this method will correctly propagate errors thrown from makeResult(), rather than
|
|
* ErrorCodes::BrokenPromise.
|
|
*/
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallableR<T, Func, void>)
|
|
void setWith(Func&& func) noexcept {
|
|
setFrom(Future<void>::makeReady().then(std::forward<Func>(func)));
|
|
}
|
|
|
|
/**
|
|
* Sets the value into this Promise when the passed-in Future completes, which may have already
|
|
* happened. If it hasn't, it is still safe to destroy this Promise since it is no longer
|
|
* involved.
|
|
*/
|
|
void setFrom(Future<T>&& future) noexcept {
|
|
setImpl([&](boost::intrusive_ptr<SharedStateT>&& sharedState) {
|
|
std::move(future).propagateResultTo(sharedState.get());
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Sets the value into this Promise immediately.
|
|
*
|
|
* This accepts a Status for Promises<void> or a StatusWith<T> for Promise<T>.
|
|
*/
|
|
void setFrom(StatusOrStatusWith<T> sosw) noexcept {
|
|
setImpl([&](boost::intrusive_ptr<SharedStateT>&& sharedState) {
|
|
sharedState->setFrom(std::move(sosw));
|
|
});
|
|
}
|
|
|
|
// Use emplaceValue(Args&&...) instead.
|
|
REQUIRES_FOR_NON_TEMPLATE(!std::is_void_v<T>)
|
|
void setFrom(T_unless_void val) noexcept = delete;
|
|
|
|
// Use setError(Status) instead.
|
|
REQUIRES_FOR_NON_TEMPLATE(!std::is_void_v<T>)
|
|
void setFrom(Status) noexcept = delete;
|
|
|
|
TEMPLATE(typename... Args)
|
|
REQUIRES(std::is_constructible_v<T, Args...> || (std::is_void_v<T> && sizeof...(Args) == 0))
|
|
void emplaceValue(Args&&... args) noexcept {
|
|
setImpl([&](boost::intrusive_ptr<SharedStateT>&& sharedState) {
|
|
sharedState->emplaceValue(std::forward<Args>(args)...);
|
|
});
|
|
}
|
|
|
|
void setError(Status status) noexcept {
|
|
invariant(!status.isOK());
|
|
setImpl([&](boost::intrusive_ptr<SharedStateT>&& sharedState) {
|
|
sharedState->setError(std::move(status));
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create and return the Future corresponding to this Promise. Cannot be
|
|
* called more than once (but see `SharedPromise` which supports multiple
|
|
* `getFuture` calls).
|
|
*
|
|
* Caution: calling `getFuture` is only valid before the promise is
|
|
* completed, because a Promise is nullified upon completion. This Promise
|
|
* behavior is different from the semantics of other future libraries. The
|
|
* PromiseAndFuture helper avoids a potential data race by eagerly calling
|
|
* `getFuture`, returning the Promise together with its Future.
|
|
*/
|
|
Future<T> getFuture() noexcept {
|
|
// Copy `_sharedState` into a SharedStateHolder to make a Future,
|
|
// exploiting the knowledge that this Promise is its sole owner.
|
|
_sharedState->threadUnsafeIncRefCountTo(2);
|
|
return Future<T>(future_details::SharedStateHolder<future_details::VoidToFakeVoid<T>>(
|
|
boost::intrusive_ptr<SharedStateT>(_sharedState.get(), false)));
|
|
}
|
|
|
|
private:
|
|
explicit Promise(boost::intrusive_ptr<SharedStateT>&& sharedState)
|
|
: _sharedState(std::move(sharedState)) {}
|
|
|
|
friend class Future<void>;
|
|
|
|
template <typename Func>
|
|
void setImpl(Func&& doSet) noexcept {
|
|
invariant(_sharedState);
|
|
// We keep `sharedState` as a stack local, to preserve ownership of the resource,
|
|
// in case the code in `doSet` unblocks a thread which winds up causing
|
|
// `~Promise` to be invoked.
|
|
auto sharedState = std::move(_sharedState);
|
|
doSet(std::move(sharedState));
|
|
// Note: `this` is potentially dead, at this point.
|
|
}
|
|
|
|
// The current promise will be broken, if not already fulfilled.
|
|
void breakPromiseIfNeeded() {
|
|
if (MONGO_unlikely(_sharedState)) {
|
|
_sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"});
|
|
}
|
|
}
|
|
|
|
boost::intrusive_ptr<SharedStateT> _sharedState;
|
|
};
|
|
|
|
/**
|
|
* SharedSemiFuture<T> is logically a possibly-deferred StatusWith<T> (or Status when T is void).
|
|
*
|
|
* All methods that are present do the same as on a Future<T> so see it for documentation.
|
|
*
|
|
* Unlike Future<T> it only supports blocking operation, not chained continuations. This is intended
|
|
* to protect the promise-completer's execution context from needing to perform arbitrary operations
|
|
* requested by other subsystem's continuations.
|
|
* TODO Support continuation chaining when supplied with an executor to run them on.
|
|
*
|
|
* A SharedSemiFuture may be passed between threads, but only one thread may use it at a time.
|
|
*/
|
|
template <typename T>
|
|
class MONGO_WARN_UNUSED_RESULT_CLASS SharedSemiFuture {
|
|
using Impl = future_details::SharedStateHolder<T>;
|
|
using T_unless_void = std::conditional_t<std::is_void_v<T>, future_details::FakeVoid, T>;
|
|
|
|
public:
|
|
static_assert(!std::is_same<T, Status>::value,
|
|
"SharedSemiFuture<Status> is banned. Use SharedSemiFuture<void> instead.");
|
|
static_assert(
|
|
!isStatusWith<T>,
|
|
"SharedSemiFuture<StatusWith<T>> is banned. Just use SharedSemiFuture<T> instead.");
|
|
static_assert(
|
|
!future_details::isFutureLike<T>,
|
|
"SharedSemiFuture of Future types is banned. Just use SharedSemiFuture<T> instead.");
|
|
static_assert(!std::is_reference<T>::value, "SharedSemiFuture<T&> is banned.");
|
|
static_assert(!std::is_const<T>::value, "SharedSemiFuture<const T> is banned.");
|
|
static_assert(!std::is_array<T>::value, "SharedSemiFuture<T[]> is banned.");
|
|
|
|
static_assert(std::is_void_v<T> || std::is_copy_constructible_v<T>,
|
|
"SharedSemiFuture currently requires copyable types. Let us know if this is a "
|
|
"problem. Supporting this for blocking use cases is easy, but it will require "
|
|
"more work for async usage.");
|
|
|
|
using value_type = T;
|
|
|
|
SharedSemiFuture() = default;
|
|
|
|
/*implicit*/ SharedSemiFuture(const Future<T>& fut) = delete;
|
|
/*implicit*/ SharedSemiFuture(Future<T>&& fut) : SharedSemiFuture(std::move(fut).share()) {}
|
|
/*implicit*/ SharedSemiFuture(Status error) : _shared(Impl::makeReady(std::move(error))) {}
|
|
|
|
|
|
// These should not be used with T=void.
|
|
/*implicit*/ SharedSemiFuture(T_unless_void val) : _shared(Impl::makeReady(std::move(val))) {
|
|
static_assert(!std::is_void_v<T>);
|
|
}
|
|
/*implicit*/ SharedSemiFuture(StatusWith<T_unless_void> sw)
|
|
: _shared(Impl::makeReady(std::move(sw))) {
|
|
static_assert(!std::is_void_v<T>);
|
|
}
|
|
|
|
bool isReady() const {
|
|
return _shared.isReady();
|
|
}
|
|
|
|
void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const {
|
|
_shared.wait(interruptible);
|
|
}
|
|
|
|
Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const
|
|
noexcept {
|
|
return _shared.waitNoThrow(interruptible);
|
|
}
|
|
|
|
future_details::AddRefUnlessVoid<const T> get(
|
|
Interruptible* interruptible = Interruptible::notInterruptible()) const& {
|
|
return _shared.get(interruptible);
|
|
}
|
|
|
|
StatusOrStatusWith<T> getNoThrow(
|
|
Interruptible* interruptible = Interruptible::notInterruptible()) const& noexcept {
|
|
return _shared.getNoThrow(interruptible);
|
|
}
|
|
|
|
ExecutorFuture<T> thenRunOn(ExecutorPtr exec) const noexcept {
|
|
return ExecutorFuture<T>(std::move(exec), toFutureImpl());
|
|
}
|
|
|
|
/**
|
|
* Makes a copy of this SharedSemiFuture that resolves at the same time as the original.
|
|
*/
|
|
SharedSemiFuture split() const noexcept {
|
|
return toFutureImpl().share();
|
|
}
|
|
|
|
SemiFuture<T> semi() && noexcept {
|
|
return SemiFuture<T>(toFutureImpl());
|
|
}
|
|
|
|
/**
|
|
* Returns an inline Future type from this SharedSemiFuture.
|
|
*
|
|
* WARNING: Do not use this unless you're extremely sure of what you're doing, as callbacks
|
|
* chained to the resulting Future may run in unexpected places.
|
|
*/
|
|
Future<T> unsafeToInlineFuture() const noexcept {
|
|
return Future<T>(toFutureImpl());
|
|
}
|
|
|
|
private:
|
|
template <typename>
|
|
friend class SharedPromise;
|
|
template <typename>
|
|
friend class future_details::FutureImpl;
|
|
friend class SharedSemiFuture<void>;
|
|
template <typename>
|
|
friend class ExecutorFuture;
|
|
|
|
future_details::FutureImpl<T> toFutureImpl() const noexcept {
|
|
static_assert(std::is_void_v<T> || std::is_copy_constructible_v<T>);
|
|
return future_details::FutureImpl<T>(_shared.addChild());
|
|
}
|
|
|
|
// These are needed to support chaining where a SharedSemiFuture is returned from a
|
|
// continuation.
|
|
explicit operator future_details::FutureImpl<T>() const noexcept {
|
|
return toFutureImpl();
|
|
}
|
|
template <typename U>
|
|
void propagateResultTo(U&& arg) const noexcept {
|
|
toFutureImpl().propagateResultTo(std::forward<U>(arg));
|
|
}
|
|
explicit SharedSemiFuture(boost::intrusive_ptr<future_details::SharedState<T>> ptr)
|
|
: _shared(std::move(ptr)) {}
|
|
explicit SharedSemiFuture(future_details::SharedStateHolder<T>&& holder)
|
|
: _shared(std::move(holder)) {}
|
|
|
|
future_details::SharedStateHolder<T> _shared;
|
|
};
|
|
|
|
// Deduction Guides
|
|
TEMPLATE(typename T)
|
|
REQUIRES(!isStatusOrStatusWith<T> && !future_details::isFutureLike<T>)
|
|
SharedSemiFuture(T)->SharedSemiFuture<T>;
|
|
template <typename T>
|
|
SharedSemiFuture(StatusWith<T>)->SharedSemiFuture<T>;
|
|
|
|
/**
|
|
* This class represents the producer of SharedSemiFutures.
|
|
*
|
|
* This is a single-shot class: you may either set a value or error at most once. However you may
|
|
* extract as many futures as you want and they will all be completed at the same time. Any number
|
|
* of threads can extract a future at the same time. It is also safe to extract a future
|
|
* concurrently with completing the promise. If you extract a future after the promise has been
|
|
* completed, a ready future will be returned. You must still ensure that all calls to getFuture()
|
|
* complete prior to destroying the Promise.
|
|
*
|
|
* If no value or error has been set at the time this Promise is destroyed, an error will be set
|
|
* with ErrorCode::BrokenPromise. This should generally be considered a programmer error, and should
|
|
* not be relied upon. We may make it debug-fatal in the future.
|
|
*
|
|
* Unless otherwise specified, all methods behave the same as on Promise<T>.
|
|
*/
|
|
template <typename T>
|
|
class SharedPromise {
|
|
using SharedStateT = future_details::SharedState<T>;
|
|
using T_unless_void = std::conditional_t<std::is_void_v<T>, future_details::FakeVoid, T>;
|
|
|
|
public:
|
|
using value_type = T;
|
|
|
|
/**
|
|
* Creates a `SharedPromise` ready for use.
|
|
*/
|
|
SharedPromise() = default;
|
|
|
|
~SharedPromise() {
|
|
if (MONGO_unlikely(!_haveCompleted)) {
|
|
_sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"});
|
|
}
|
|
}
|
|
|
|
SharedPromise(const SharedPromise&) = delete;
|
|
SharedPromise(SharedPromise&&) = delete;
|
|
SharedPromise& operator=(const SharedPromise&) = delete;
|
|
SharedPromise& operator=(SharedPromise&& p) noexcept = delete;
|
|
|
|
/**
|
|
* Returns a future associated with this promise. All returned futures will be completed when
|
|
* the promise is completed.
|
|
*/
|
|
SharedSemiFuture<T> getFuture() const {
|
|
return SharedSemiFuture<T>(_sharedState);
|
|
}
|
|
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallableR<T, Func, void>)
|
|
void setWith(Func&& func) noexcept {
|
|
setFrom(Future<void>::makeReady().then(std::forward<Func>(func)));
|
|
}
|
|
|
|
/**
|
|
* Sets the value into this SharedPromise when the passed-in Future completes, which may have
|
|
* already happened.
|
|
*/
|
|
void setFrom(Future<T>&& future) noexcept {
|
|
invariant(!std::exchange(_haveCompleted, true));
|
|
std::move(future).propagateResultTo(_sharedState.get());
|
|
}
|
|
|
|
/**
|
|
* Sets the value into this SharedPromise immediately.
|
|
*
|
|
* This accepts a Status for SharedPromises<void> or a StatusWith<T> for SharedPromise<T>.
|
|
*/
|
|
void setFrom(StatusOrStatusWith<T> sosw) noexcept {
|
|
invariant(!std::exchange(_haveCompleted, true));
|
|
_sharedState->setFrom(std::move(sosw));
|
|
}
|
|
|
|
// Use emplaceValue(Args&&...) instead.
|
|
REQUIRES_FOR_NON_TEMPLATE(!std::is_void_v<T>)
|
|
void setFrom(T_unless_void val) noexcept = delete;
|
|
|
|
// Use setError(Status) instead.
|
|
REQUIRES_FOR_NON_TEMPLATE(!std::is_void_v<T>)
|
|
void setFrom(Status) noexcept = delete;
|
|
|
|
TEMPLATE(typename... Args)
|
|
REQUIRES(std::is_constructible_v<T, Args...> || (std::is_void_v<T> && sizeof...(Args) == 0))
|
|
void emplaceValue(Args&&... args) noexcept {
|
|
invariant(!std::exchange(_haveCompleted, true));
|
|
_sharedState->emplaceValue(std::forward<Args>(args)...);
|
|
}
|
|
|
|
void setError(Status status) noexcept {
|
|
invariant(!status.isOK());
|
|
invariant(!std::exchange(_haveCompleted, true));
|
|
_sharedState->setError(std::move(status));
|
|
}
|
|
|
|
private:
|
|
friend class Future<void>;
|
|
|
|
// This is slightly different from whether the SharedState is in kFinished, because this
|
|
// SharedPromise may have been completed with a Future that isn't ready yet.
|
|
bool _haveCompleted = false;
|
|
const boost::intrusive_ptr<SharedStateT> _sharedState = make_intrusive<SharedStateT>();
|
|
};
|
|
|
|
/**
|
|
* Holds a Promise and its corresponding Future.
|
|
* Upon construction, contains a new Promise and its Future.
|
|
*/
|
|
template <typename T>
|
|
struct PromiseAndFuture {
|
|
Promise<T> promise{NonNullPromiseTag{}};
|
|
Future<T> future{promise.getFuture()};
|
|
};
|
|
|
|
/** Another way to write `PromiseAndFuture<T>{}`. */
|
|
template <typename T>
|
|
PromiseAndFuture<T> makePromiseFuture() {
|
|
return {};
|
|
}
|
|
|
|
/**
|
|
* This metafunction allows APIs that take callbacks and return Future to avoid doing their own type
|
|
* calculus. This results in the base value_type that would result from passing Func to a
|
|
* Future<T>::then(), with the same normalizing of T/StatusWith<T>/Future<T> returns. This is
|
|
* primarily useful for implementations of executors rather than their users.
|
|
*
|
|
* This returns the unwrapped T rather than Future<T> so it will be easy to create a Promise<T>.
|
|
*
|
|
* Examples:
|
|
*
|
|
* FutureContinuationResult<std::function<void()>> == void
|
|
* FutureContinuationResult<std::function<Status()>> == void
|
|
* FutureContinuationResult<std::function<Future<void>()>> == void
|
|
*
|
|
* FutureContinuationResult<std::function<int()>> == int
|
|
* FutureContinuationResult<std::function<StatusWith<int>()>> == int
|
|
* FutureContinuationResult<std::function<Future<int>()>> == int
|
|
*
|
|
* FutureContinuationResult<std::function<int(bool)>, bool> == int
|
|
*
|
|
* FutureContinuationResult<std::function<int(bool)>, NotBool> SFINAE-safe substitution failure.
|
|
*/
|
|
template <typename Func, typename... Args>
|
|
using FutureContinuationResult =
|
|
future_details::UnwrappedType<std::invoke_result_t<Func, Args&&...>>;
|
|
|
|
/**
|
|
* This type transform is useful for coercing (T,StatusWith<T>)->Future<T> and Status->Future<void>.
|
|
*/
|
|
template <typename T>
|
|
using FutureFor = Future<future_details::UnwrappedType<T>>;
|
|
|
|
template <typename T>
|
|
auto coerceToFuture(T&& value) {
|
|
return FutureFor<T>(std::forward<T>(value));
|
|
}
|
|
|
|
/**
|
|
* Makes a Future with the return value of a nullary function. This has the same semantics as
|
|
* Promise::setWith, and has the same reasons to prefer it over Future<T>::makeReady(). Also, it
|
|
* deduces the T, so it is easier to use.
|
|
*
|
|
* Note that if func returns an unready Future, this function will not block until it is ready.
|
|
*/
|
|
TEMPLATE(typename Func)
|
|
REQUIRES(future_details::isCallable<Func, void>)
|
|
auto makeReadyFutureWith(Func&& func) -> Future<FutureContinuationResult<Func&&>> try {
|
|
if constexpr (std::is_void_v<std::invoke_result_t<Func>>) {
|
|
std::forward<Func>(func)();
|
|
return Future<void>::makeReady();
|
|
} else {
|
|
return std::forward<Func>(func)();
|
|
}
|
|
} catch (const DBException& ex) {
|
|
return ex.toStatus();
|
|
}
|
|
|
|
//
|
|
// Implementations of methods that couldn't be defined in the class due to ordering requirements.
|
|
//
|
|
|
|
template <typename T>
|
|
template <typename UniqueFunc>
|
|
auto ExecutorFuture<T>::_wrapCBHelper(ExecutorPtr exec, UniqueFunc&& func) {
|
|
return [ exec = std::move(exec), func = std::move(func) ](auto&&... args) mutable noexcept {
|
|
using FuncR = typename UniqueFunc::result_type;
|
|
using BoundArgs = std::tuple<std::decay_t<decltype(args)>...>;
|
|
Promise<future_details::UnwrappedType<FuncR>> promise{NonNullPromiseTag{}};
|
|
auto future = promise.getFuture();
|
|
|
|
exec->schedule([
|
|
promise = std::move(promise),
|
|
func = std::move(func),
|
|
boundArgs = BoundArgs{std::forward<decltype(args)>(args)...}
|
|
](Status execStatus) mutable noexcept {
|
|
if (!execStatus.isOK()) {
|
|
promise.setError(std::move(execStatus));
|
|
return;
|
|
}
|
|
promise.setWith([&] {
|
|
auto closure = [&] { return std::apply(func, std::move(boundArgs)); };
|
|
if constexpr (future_details::isFutureLike<FuncR>) {
|
|
// Cheat and convert to an inline Future since we know we will schedule
|
|
// further user callbacks onto an executor.
|
|
return closure().unsafeToInlineFuture();
|
|
} else {
|
|
return closure();
|
|
}
|
|
});
|
|
});
|
|
|
|
return future;
|
|
};
|
|
}
|
|
|
|
template <typename T>
|
|
inline ExecutorFuture<T> SemiFuture<T>::thenRunOn(ExecutorPtr exec) && noexcept {
|
|
return ExecutorFuture<T>(std::move(exec), std::move(_impl));
|
|
}
|
|
|
|
template <typename T>
|
|
Future<T> SemiFuture<T>::unsafeToInlineFuture() && noexcept {
|
|
return Future<T>(std::move(_impl));
|
|
}
|
|
|
|
template <typename T>
|
|
inline SharedSemiFuture<future_details::FakeVoidToVoid<T>>
|
|
future_details::FutureImpl<T>::share() && noexcept {
|
|
using Out = SharedSemiFuture<FakeVoidToVoid<T>>;
|
|
if (_immediate)
|
|
return Out(SharedStateHolder<FakeVoidToVoid<T>>::makeReady(std::move(*_immediate)));
|
|
return Out(SharedStateHolder<FakeVoidToVoid<T>>(std::move(_shared)));
|
|
}
|
|
|
|
} // namespace mongo
|