chitay-knigi.com » Разная литература » C++17 STL Стандартная библиотека шаблонов - Яцек Галовиц

Шрифт:

-
+

Интервал:

-
+

Закладка:

Сделать
1 ... 103 104 105 106 107 108 109 110 111 ... 121
Перейти на страницу:
блокируем мьютекс, поскольку нужно задать значение для бита finished. Затем опять вызываем метод cv.notify_all():

  {

    lock_guard<mutex> lk {mut};

    finished = true;

  }

  cv.notify_all();

}

5. Теперь можем реализовать функцию потребителя. Она не принимает никаких аргументов, поскольку будет слепо перебирать все элементы до тех пор, пока очередь не опустеет. В цикле, который станет выполняться до тех пор, пока не установится значение переменной finished, функция будет блокировать мьютекс, который защищает очередь и флаг finished. В момент получения блокировки последняя вызовет функцию cv.wait, передав ей блокировку и лямбда-выражение в качестве аргументов. Лямбда-выражение — это предикат, указывающий, жив ли еще поток-производитель и есть ли значения в очереди.

static void consumer() {

  while (!finished) {

    unique_lock<mutex> l {mut};

    cv.wait(l, [] { return !q.empty() || finished; });

6. Вызов cv.wait разблокирует блокировку и ждет до тех пор, пока условие, описанное предикатом, не перестанет выполняться. Затем функция снова блокирует мьютекс и перебирает все из очереди до тех пор, пока та не опустеет. Если производитель все еще жив, то она снова проитерирует по циклу. В противном случае функция завершит работу, поскольку установлен флаг finished — таким способом производитель указывает, что новых элементов больше не будет.

    while (!q.empty()) {

    cout << "Got " << q.front()

         << " from queue.n"; q.pop();

    }

  }

}

7. В функции main запускаем поток-производитель, который создает десять элементов, а также поток-потребитель. Затем ждем их выполнения и завершаем программу.

int main()

{

  thread t1 {producer, 10};

  thread t2 {consumer};

  t1.join();

  t2.join();

  cout << "finished!n";

}

8. Компиляция и запуск программы дадут следующий результат. При выполнении программы можно увидеть, что между появлением каждой строки проходит какое-то время (100 миллисекунд), поскольку создание элементов занимает время:

$ ./producer_consumer

Got 0 from queue.

Got 1 from queue.

Got 2 from queue.

Got 3 from queue.

Got 4 from queue.

Got 5 from queue.

Got 6 from queue.

Got 7 from queue.

Got 8 from queue.

Got 9 from queue.

finished!

Как это работает

В данном примере мы просто запустили два потока. Первый создает элементы и помещает их в очередь. Второй извлекает их из очереди. При взаимодействии с очередью один из этих потоков блокирует общий мьютекс mut, доступный им обоим. Таким образом, можно быть уверенными, что оба потока не будут взаимодействовать с состоянием очереди в одно и то же время.

Помимо очереди и мьютекса мы объявили четыре переменные, которые были включены во взаимодействие производителя/потребителя:

queue<size_t>      q;

mutex              mut;

condition_variable cv;

bool               finished {false};

Переменную finished объяснить очень просто. Ее значение было установлено в true, когда производитель создал ограниченное количество элементов. Когда потребитель видит, что значение этой переменной равно true, он использует последние элементы и завершает работу. Но для чего нужна переменная condition_variable cv? Мы использовали cv в двух разных контекстах. Один из контекстов ждал наступления конкретного условия, а второй — указывал на выполнение этого условия.

Сторона-потребитель, ждущая выполнения конкретного условия, выглядит так. Поток-потребитель проходит в цикле по блоку, который изначально блокирует мьютекс mut с помощью unique_lock. Затем вызывает cv.wait:

while (!finished) {

  unique_lock<mutex> l {mut};

  cv.wait(l, [] { return !q.empty() || finished; });

  while (!q.empty()) {

    // consume

  }

}

Данный код чем-то похож на следующий эквивалентный код. Вскоре мы рассмотрим, почему эти фрагменты не похожи друг на друга:

while (!finished) {

  unique_lock<mutex> l {mut};

  while (q.empty() && !finished) {

    l.unlock();

    l.lock();

  }

  while (!q.empty()) {

    // consume

  }

}

Это значит, что сначала мы получили блокировку, а затем проверили, с каким сценарием сейчас работаем.

1. Есть ли элементы, которые можно использовать? В таком случае сохраняем блокировку, потребляем эти элементы, возвращаем блокировку и начинаем сначала.

2. В противном случае, если элементов для потребления нет, но производитель все еще жив, то возвращаем мьютекс с целью дать ему шанс добавить новые элементы в очередь. Далее снова пытаемся получить блокировку в надежде, что ситуация изменится и мы перейдем к ситуации 1.

Реальная причина, по которой строка cv.wait не эквивалентна конструкции while (q.empty() && ...), заключается в том, что мы не можем просто пройти по циклу l.unlock(); l.lock();. Отсутствие активности потока-производителя в течение какого-то промежутка времени приводит к постоянным блокировкам/ разблокировкам мьютекса, что не имеет смысла, поскольку мы впустую тратим циклы процессора.

Выражение наподобие cv.wait(lock, predicate) будет ждать до тех пор, пока вызов predicate() не вернет значение true. Но это не делается путем постоянной блокировки/разблокировки. Чтобы возобновить поток, который блокируется вызовом wait объекта condition_variable, другой поток должен вызывать методы notify_one() или notify_all() для одного объекта. Только тогда ожидающий поток будет возобновлен и проверит условие predicate(). (Последнее действительно и для нескольких потоков.)

Положительный момент таков: после вызова wait, проверившего предикат так, словно поступил внезапный сигнал к пробуждению, поток будет снова мгновенно приостановлен. Это значит, что мы не навредим рабочему потоку программы (но, возможно, пострадает производительность), если добавим в код слишком много вызовов notify.

На стороне производителя мы просто вызвали cv.notify_all() после того, как производитель добавил новый элемент в очередь, а также вслед за тем, как создал последний элемент и установил значение флага finished, равное true. Этого было достаточно для того, чтобы направить потребителя. 

Реализуем идиому «несколько производителей/потребителей» с помощью std::condition_variable

Возьмем задачу из прошлого примера и усложним ее: создадим несколько производителей и несколько потребителей. Кроме того, укажем, что размер очереди не должен превышать определенное значение.

Таким образом, нужно приостанавливать не только потребителей, при отсутствии в очереди элементов, но и производителей, если элементов в ней слишком много.

1 ... 103 104 105 106 107 108 109 110 111 ... 121
Перейти на страницу:

Комментарии
Минимальная длина комментария - 25 символов.
Комментариев еще нет. Будьте первым.