目标

在以往,我们想要让程序等待100ms,我们只能调用线程的sleep函数来阻塞当前线程100ms。

这样做的确可以使得程序等待100ms,但坏处是在这100ms期间,被阻塞的当前线程什么也做不了,白白占用了内存。有了协程之后,我们可以让协程在需要sleep的时候挂起,100ms之后再来恢复执行,完全不需要阻塞当前线程。

用例如下:

Task<int, AsyncExecutor> simple_task2() {
  debug("task 2 start ...");
  using namespace std::chrono_literals;
  // 之前的写法,用 sleep_for 让当前线程睡眠 1 秒
  // std::this_thread::sleep_for(1s);
  // 等待 1 秒,注意 1s 是 chrono_literals 的字面值写法
  co_await 1s;
  debug("task 2 returns after 1s.");
  co_return 2;
}

这个例子应该很容易理解,之前使用sleep_for让线程睡眠1秒,这次我们直接使用co_await 1s,这样就可以在当前协程等待1s的时间内去调度其他协程。

为duration实现await_transform

如果读者对C++11不熟悉,可能会疑惑co_await 1s中的1s是个什么东西。实际上这是C++11对字面值的一种支持,本质上就是一个运算符重载,这里的1s的类型是duration<long long>。除了秒以外,时间的单位也可以是毫秒、纳秒、分钟、小时等等,这些C++11的duration都已经提供了完善的支持,因此我们只要对duration做支持就行。

template<typename ResultType, typename Executor>
struct TaskPromise {
  ...
  template<typename _Rep, typename _Period>
  SleepAwaiter await_transform(std::chrono::duration<_Rep, _Period> &&duration) {
    return SleepAwaiter(&executor, std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
  }
  ...
}

这里引入了一个新的类型 SleepAwaiter,它的任务有两个:

  1. 确保当前协程在若干毫秒之后恢复执行。
  2. 确保当前协程恢复执行时要调度到对应的调度器上。

不难想到,std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() 实际上就是把任意单位的 duration 转换成毫秒。

SleepAwaiter 的实现也很简单,我们直接给出:

cpp

复制代码
truct SleepAwaiter {

  explicit SleepAwaiter(AbstractExecutor *executor, long long duration) noexcept
      : _executor(executor), _duration(duration) {}

  bool await_ready() const { return false; }

  void await_suspend(std::coroutine_handle<> handle) const {
    // 自定义的延时执行工具类,全局只需要一个实例
    static Scheduler scheduler;

    scheduler.execute([this, handle]() {
      // _duration 毫秒之后执行下面的代码
      _executor->execute([handle]() {
        handle.resume();
      });
    }, _duration);
  }

  void await_resume() {}

 private:
  AbstractExecutor *_executor;
  long long _duration;
}

当中最关键的就是Scheduler的实现了,这个类实际上就是一个独立的定时任务调度器。

定时任务调度器Scheduler

定时任务调度器,可以理解为一个定时器。任何交给它的任务都需要有优先级,优先级的计算规则就是任务延时的长短,于是我们可以使用优先级队列来存储需要执行的任务。

如果我们给它加上计时执行的能力,Scheduler的功能就差不多完成了。换个角度看,LooperExecutor其实就是Scheduler的一个特化版本,它的所有任务的延时都是0。

定时任务的数据类型

为了方便对定时任务管理,我们可以定义一个DelayedExecutable类型,它包含一个函数和它要执行的绝对时间:

cpp

复制代码
class DelayedExecutable {
 public:
  DelayedExecutable(std::function<void()> &&func, long long delay) : func(std::move(func)) {
    using namespace std;
    using namespace std::chrono;
    auto now = system_clock::now();
    // 当前的时间戳,单位毫秒
    auto current = duration_cast<milliseconds>(now.time_since_epoch()).count();

    // 计算出任务的计划执行时间
    scheduled_time = current + delay;
  }

  // 调用时,返回从当前时间还需要多少毫秒到任务执行时间
  long long delay() const {
    using namespace std;
    using namespace std::chrono;

    auto now = system_clock::now();
    auto current = duration_cast<milliseconds>(now.time_since_epoch()).count();
    return scheduled_time - current;
  }

  long long get_scheduled_time() const {
    return scheduled_time;
  }

  void operator()() {
    func();
  }

 private:
  long long scheduled_time;
  std::function<void()> func;
};

定时任务的描述类DelayedExecutable非常简单,相信大家一看就明白。

为了将DelayedExecutable存入优先级队列当中,我们还需要给它提供一个比较大小的类:

cpp

复制代码
class DelayedExecutableCompare {
 public:
  bool operator()(DelayedExecutable &left, DelayedExecutable &right) {
    return left.get_scheduled_time() > right.get_scheduled_time();
  }
};

这个类将对DelayedExecutable的比较转换成对它们的执行时间的比较。使用这个类对DelayedExecutable进行排序时,会使得时间靠前的对象排到前面。

实现定时任务调度器

接下来我们直接给出Scheduler的实现,由于这个类与前面的LooperExecutor很像,我们只给出不同的部分:

class Scheduler {
 private:
  std::condition_variable queue_condition;
  std::mutex queue_lock;
  // 注意这里改用优先级队列
  std::priority_queue<DelayedExecutable, std::vector<DelayedExecutable>, DelayedExecutableCompare> executable_queue;

  std::atomic<bool> is_active;
  std::thread work_thread;

  void run_loop() {
    while (is_active.load(std::memory_order_relaxed) || !executable_queue.empty()) {
      std::unique_lock lock(queue_lock);
      if (executable_queue.empty()) {
        queue_condition.wait(lock);
        if (executable_queue.empty()) {
          continue;
        }
      }

      // 从这里开始于 LooperExecutor 不同,这里需要判断优先级队头的任务,也就是最先要执行的任务是否需要立即执行
      auto executable = executable_queue.top();
      long long delay = executable.delay();
      if (delay > 0) {
        // 队头的任务还没到执行时间,等待 delay 毫秒
        auto status = queue_condition.wait_for(lock, std::chrono::milliseconds(delay));
        // 如果等待期间没有延时比 delay 更小的任务加入,这里就会返回 timeout
        if (status != std::cv_status::timeout) {
          // 不是 timeout,需要重新计算队头的延时
          continue;
        }
      }
      executable_queue.pop();
      lock.unlock();
      executable();
    }
  }
 public:

  Scheduler() {
    ... // 与 LooperExecutor 完全相同
  }

  ~Scheduler() {
    ... // 与 LooperExecutor 完全相同
  }

  void execute(std::function<void()> &&func, long long delay) {
    delay = delay < 0 ? 0 : delay;
    std::unique_lock lock(queue_lock);
    if (is_active.load(std::memory_order_relaxed)) {
      // 只有队列为空或者比当前队头任务的延时更小时,需要调用 notify_one
      // 其他情况只需要按顺序依次执行即可
      bool need_notify = executable_queue.empty() || executable_queue.top().delay() > delay;
      executable_queue.push(DelayedExecutable(std::move(func), delay));
      lock.unlock();
      if (need_notify) {
        queue_condition.notify_one();
      }
    }
  }

  void shutdown(bool wait_for_complete = true) {
    ... // 与 LooperExecutor 完全相同
  }

  void join() {
    if (work_thread.joinable()) {
      work_thread.join();
    }
  }
};

通过阅读代码和注释,可以发现延时的实现其实时通过阻塞一个专门用于调度延时任务的线程来做到的。

可能你会说,这不还是有阻塞吗?

是这样的,阻塞是避免不了的。通常来说,我们也不会用一个线程去严格对应一个协程,当一个协程挂起时,执行这个协程的线程就会被空闲出来有机会去调度执行其它协程,进而让线程的利用率得到充分提升。如果有10个协程都需要执行延时,相较于阻塞这10个协程当前所在的10个线程而言,阻塞一个线程显然时更加经济的。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。