C ++ 20のCoroutines。パート2

前書き



この記事では、の続きです。この記事



無限のデータストリーム co_yield



以下のコードは、無限のデータストリームを実装します。コルーチンは、要求に応じてステップで始まり、新しい値を発行するデータのストリームを作成するためにgetNext使用さco_yieldれますstartstep



無限のデータストリーム
//infiniteDataStream.cpp
#include <coroutine>
#include <memory>
#include <iostream>

template <typename T>
struct Generator {
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;
    Generator(handle_type h) : coro(h) {}                       // (3)
    handle_type coro;
    std::shared_ptr<T> value;
    ~Generator() {
        if (coro) {
            coro.destroy();
        }
    }
    Generator(const Generator &) = delete;
    Generator& operator=(const Generator &) = delete;
    Generator(Generator &&other) : coro(other.coro) {
        other.coro = nullptr;
    }
    Generator& operator=(Generator &&other) {
        coro = other.coro;
        other.coro = nullptr;
        return *this;
    }
    T getValue() {
        return coro.promise().current_value;
    }
    bool next() {                                               // (5)
        coro.resume();
        return not coro.done();
    }
    struct promise_type {
        promise_type() = default;                               // (1)
        ~promise_type() = default;
        auto initial_suspend() {                                // (4)
            return std::suspend_always{};
        }
        auto final_suspend() {
            return std::suspend_always{};
        }
        auto get_return_object() {                              // (2)
            return Generator{handle_type::from_promise(*this)};
        }
        auto return_void() {
            return std::suspend_never{};
        }
        auto yield_value(T value) {                             // (6)
            current_value = value;
            return std::suspend_always{};
        }
        void unhandled_exception() {
            std::exit(1);
        }
        T current_value;
    };
};
Generator <int> getNext(int start = 0, int step = 1) {
    auto value = start;
    for (int i = 0; ; ++i) {
        co_yield value;
        value += step;
    }
}
int main() {
    std::cout << "getNext():";
    auto gen = getNext();
    for (int i = 0; i <= 10; ++i) {
        gen.next();
        std::cout << " " << gen.getValue();                     // (7)
    }
    std::cout << "\ngetNext(100, -10):";
    auto gen2 = getNext(100, -10);
    for (int i = 0; i <= 20; ++i) {
        gen2.next();
        std::cout << " " << gen2.getValue();
    }
    std::cout << std::endl;
}


翻訳者のメモ:アセンブリはコマンドによって実行されg++ -fcoroutines infiniteDataStream.cppました。

関数ではmain、2つの列が作成されます。最初のgen、、は0から10までの値を返します。2番目の、、gen2は10の増分で100から-100までです。プログラム出力:



$ ./infDS
getNext(): 0 1 2 3 4 5 6 7 8 9 10
getNext(100, -10): 100 90 80 70 60 50 40 30 20 10 0 -10 -20 -30 -40 -50 -60 -70 -80 -90 -100


プログラムのコメントに番号が付いているラベルinfiniteDataStream.cppは、次の順序で最初の反復を説明します。



  1. promiseオブジェクトを作成します
  2. promise.get_return_object()結果を呼び出してローカル変数に保存する
  3. ジェネレータの作成
  4. 電話してくださいpromise.initial_suspend()ジェネレータは「レイジー」であるため、suspend_always
  5. 次の値を要求し、ジェネレーターが使い果たされた場合はフラグを返します
  6. に対するアクションco_yield。その後、次の値が使用可能になります
  7. 次の値を取得する


以降の反復では、ステップ5と6のみが実行されます。



を使用してストリームを同期する co_await



co_await. , — . (condition variables), promises futures, -. , (spurious wakeups) (lost wakeups).



// senderReceiver.cpp
#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>

class Event {
public:
    Event() = default;
    Event(const Event &) = delete;
    Event(Event &&) = delete;
    Event& operator=(const Event &) = delete;
    Event& operator=(Event &&) = delete;
    class Awaiter;
    Awaiter operator co_await() const;
    void notify();
private:
    friend class Awaiter;
    mutable std::atomic<void *> suspendedWaiter{nullptr};
    mutable std::atomic<bool> notified{false};
};

class Event::Awaiter {
public:
    Awaiter(const Event &e) : event(e) {}
    bool await_ready() const;
    bool await_suspend(std::coroutine_handle<> ch);
    void await_resume() {}
private:
    friend class Event;
    const Event &event;
    std::coroutine_handle<> coroutineHandle;
};

bool Event::Awaiter::await_ready() const {
    if (event.suspendedWaiter.load() != nullptr) {
        throw std::runtime_error("More than one waiter is not valid");
    }
    return event.notified; // true -     , false -  
}

bool Event::Awaiter::await_suspend(std::coroutine_handle<> ch) {
    coroutineHandle = ch;
    if (event.notified) {
        return false;
    }
    //  waiter   
    event.suspendedWaiter.store(this);
    return true;
}

void Event::notify() {
    notified = true;
    //   waiter
    auto *waiter = static_cast<Awaiter *>(suspendedWaiter.load());
    //    waiter
    if (waiter != nullptr) {
        //   
        waiter->coroutineHandle.resume();
    }
}

Event::Awaiter Event::operator co_await() const {
    return Awaiter{*this};
}

struct Task {
    struct promise_type {
        Task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};

Task receiver(Event &event) {
    auto start = std::chrono::high_resolution_clock::now();
    co_await event;
    std::cout << "Got the notification!" << std::endl;
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;
    std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
}

int main() {
    std::cout << "Notification before waiting" << std::endl;
    Event event1{};
    auto senderThread1 = std::thread([&event1] { event1.notify(); });
    auto receiverThread1 = std::thread(receiver, std::ref(event1));
    receiverThread1.join();
    senderThread1.join();

    std::cout << "\nNotification after 2 seconds waiting" << std::endl;
    Event event2{};
    auto receiverThread2 = std::thread(receiver, std::ref(event2));
    auto senderThread2 = std::thread([&event2] {
                                         using namespace std::chrono_literals;
                                         std::this_thread::sleep_for(2s);
                                         event2.notify();
                                     });
    receiverThread2.join();
    senderThread2.join();
}


: g++ -pthread -fcoroutines senderReceiver.cpp



, . , senderReceiver.cpp senderThread1 senderThread2 event (eventN.notify()). receiver , receiverThread1 receiverThread2. , . .



senderReceiver



$ ./senderReceiver
Notification before waiting
Got the notification!
Waited 3.7006e-05 seconds.

Notification after 2 seconds waiting
Got the notification!
Waited 2.00056 seconds.


Generator Event , . , Generator awaitable awaiter; Event operator co_await awaiter. awaitable awaiter .



, , 2 . , event1 , , event2 , 2 .

senderReceiver.cpp . Event : suspendedWaiter notified. waiter , .



, event1 receiverThread1 . even1.notify() notified waiter. waiter nullptr .. , , waiter->coroutineHandle.resume() . await_ready waiter , , std::runtime_error. . notified true notify, , , .



event2 co_await event , . await_ready. , event.notified false . , await_suspend handle ch corotineHandle. , , . , waiter suspendedWaiter. event2.notify notify. , waiter nullptr. waitercoroutineHandleコロティンを使用して作業を再開します。




All Articles