Проблема производителя и потребителя
В этой статье есть несколько проблем. Пожалуйста, помогите улучшить его или обсудите эти проблемы на странице обсуждения . ( Узнайте, как и когда удалять эти шаблонные сообщения )
|
В вычислительной технике проблема производителя -потребителя (также известная как проблема ограниченного буфера ) представляет собой семейство проблем, описанных Эдсгером В. Дейкстрой с 1965 года.
Дейкстра нашел решение проблемы производитель-потребитель, работая консультантом по компьютерам Electrologica X1 и X8: «Первое использование производителя-потребителя было частично программным, частично аппаратным: компонент, обеспечивающий передачу информации между магазином и магазином. периферийное устройство называлось «каналом»... Синхронизация контролировалась двумя счетными семафорами в том, что мы теперь знаем как механизм производитель/потребитель: один семафор, указывающий длину очереди, увеличивался (в V) процессором и уменьшается (в P) каналом, другой, подсчитывая количество неподтвержденных завершений, увеличивается каналом и уменьшается процессором [второй семафор, являющийся положительным, поднимет соответствующий флаг прерывания.]" [ 1 ]
Дейкстра писал о случае неограниченного буфера: «Мы рассматриваем два процесса, которые называются «производителем» и «потребителем» соответственно. Производитель — это циклический процесс, и каждый раз, когда он проходит свой цикл, он производит определенную порцию информации. который должен быть обработан потребителем. Потребитель также представляет собой циклический процесс, и каждый раз, когда он проходит свой цикл, он может обрабатывать следующую порцию информации, произведенную производителем... Мы предполагаем, что эти два процесса быть подключен для этой цели через буфер с неограниченной емкостью». [ 2 ]
Он писал о случае ограниченного буфера: «Мы изучили производителя и потребителя, связанных через буфер с неограниченной емкостью... Отношение становится симметричным, если они связаны через буфер конечного размера, скажем, N порций» [ 3 ]
А о случае с несколькими производителями-потребителями: «Мы рассматриваем несколько пар производитель/потребитель, где пара i связана через информационный поток, содержащий n i частей. Мы предполагаем... конечный буфер, который должен содержать все части всех потоков. иметь вместимость «всего» порций». [ 4 ]
Пер Бринч Хансен и Никлаус Вирт вскоре увидели проблему семафоров: «Я пришел к тому же выводу в отношении семафоров, а именно, что они не подходят для языков более высокого уровня. Вместо этого естественными событиями синхронизации являются обмены сообщениями ». [ 5 ]
Ограниченное буферное решение Дейкстры
[ редактировать ]Исходное решение с ограниченным семафором буфером было написано в ALGOL стиле . Буфер может хранить N частей или элементов. «количество частей в очереди» Семафор подсчитывает заполненные ячейки в буфере, семафор «количество пустых позиций» подсчитывает пустые ячейки в буфере, а семафор «манипулирование буфером» работает как мьютекс для операций размещения и получения буфера. Если буфер заполнен, то есть количество пустых позиций равно нулю, поток-производитель будет ожидать операции P (количество пустых позиций). Если буфер пуст, то есть количество частей в очереди равно нулю, потребительский поток будет ожидать операции P (количество частей в очереди). Операции V() освобождают семафоры. В качестве побочного эффекта поток может перейти из очереди ожидания в очередь готовности. Операция P() уменьшает значение семафора до нуля. Операция V() увеличивает значение семафора. [ 6 ]
begin integer number of queueing portions, number of empty positions,
buffer manipulation;
number of queueing portions:= 0;
number of empty positions:= N;
buffer manipulation:= 1;
parbegin
producer: begin
again 1: produce next portion;
P(number of empty positions);
P(buffer manipulation);
add portion to buffer;
V(buffer manipulation);
V(number of queueing portions); goto again 1 end;
consumer: begin
again 2: P(number of queueing portions);
P(buffer manipulation);
take portion from buffer;
V(buffer manipulation) ;
V(number of empty positions);
process portion taken; goto again 2 end
parend
end
Начиная с C++ 20, семафоры являются частью языка. Решение Дейкстры можно легко написать на современном C++. Переменная buffer_manipulation является мьютексом. Функция семафора для получения данных в одном потоке и освобождения в другом потоке не требуется. Оператор lock_guard() вместо пары lock() и unlock() — это C++ RAII . Деструктор lock_guard обеспечивает снятие блокировки в случае исключения. Это решение может обрабатывать несколько потоков потребителей и/или несколько потоков производителей.
#include <thread>
#include <mutex>
#include <semaphore>
std::counting_semaphore<N> number_of_queueing_portions{0};
std::counting_semaphore<N> number_of_empty_positions{N};
std::mutex buffer_manipulation;
void producer() {
for (;;) {
Portion portion = produce_next_portion();
number_of_empty_positions.acquire();
{
std::lock_guard<std::mutex> g(buffer_manipulation);
add_portion_to_buffer(portion);
}
number_of_queueing_portions.release();
}
}
void consumer() {
for (;;) {
number_of_queueing_portions.acquire();
Portion portion;
{
std::lock_guard<std::mutex> g(buffer_manipulation);
portion = take_portion_from_buffer();
}
number_of_empty_positions.release();
process_portion_taken(portion);
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
}
Использование мониторов
[ редактировать ]Пер Бринч Хансен определил монитор: Я буду использовать термин «монитор» для обозначения общей переменной и набора значимых операций над ней. Целью монитора является управление планированием ресурсов между отдельными процессами в соответствии с определенной политикой. [ 7 ] Тони Хоар заложил теоретическую основу монитора. [ 8 ]
bounded buffer: monitor
begin buffer:array 0..N-1 of portion;
head, tail: 0..N-1;
count: 0..N;
nonempty, nonfull: condition;
procedure append(x: portion);
begin if count = N then nonfull.wait;
note 0 <= count < N;
buffer[tail] := x;
tail := tail (+) 1;
count := count + 1;
nonempty.signal
end append;
procedure remove(result x: portion) ;
begin if count = 0 then nonempty.wait;
note 0 < count <= N;
x := buffer[head];
head := head (+) 1;
count := count - 1;
nonfull.signal
end remove;
head := 0; tail := 0; count := 0;
end bounded buffer;
Монитор — это объект, содержащий переменные buffer
, head
, tail
и count
чтобы реализовать циклический буфер , переменные условия nonempty
и nonfull
для синхронизации и методов append
и remove
для доступа к ограниченному буферу. Ожидание операции монитора соответствует операции семафора P или захвату, сигнал соответствует V или освобождению. Операция, обведенная кружком (+), берется по модулю N. Представленный псевдокод в стиле Паскаля показывает монитор Хоара. Монитор Mesa использует while count
вместо if count
. Версия языка программирования C++:
class Bounded_buffer {
Portion buffer[N]; // 0..N-1
unsigned head, tail; // 0..N-1
unsigned count; // 0..N
std::condition_variable nonempty, nonfull;
std::mutex mtx;
public:
void append(Portion x) {
std::unique_lock<std::mutex> lck(mtx);
nonfull.wait(lck, [&]{ return !(N == count); });
assert(0 <= count && count < N);
buffer[tail++] = x;
tail %= N;
++count;
nonempty.notify_one();
}
Portion remove() {
std::unique_lock<std::mutex> lck(mtx);
nonempty.wait(lck, [&]{ return !(0 == count); });
assert(0 < count && count <= N);
Portion x = buffer[head++];
head %= N;
--count;
nonfull.notify_one();
return x;
}
Bounded_buffer() {
head = 0; tail = 0; count = 0;
}
};
Версия C++ требует дополнительного мьютекса по техническим причинам. Он использует утверждение для обеспечения выполнения предварительных условий для операций добавления и удаления буфера.
Использование каналов
[ редактировать ]Самое первое решение производитель-потребитель в компьютерах Electrologica использовало «каналы». определенные Хоаром Каналы, : альтернатива явному указанию источника и пункта назначения. было бы назвать порт, через который будет осуществляться связь. Имена портов будут локальными для процессов, а способ соединения пар портов по каналам может быть объявлен в заголовке параллельной команды. [ 9 ] Бринч Хансен реализовал каналы на языках программирования Joyce и Super Pascal . Язык программирования операционной системы Plan 9 Alef и язык программирования операционной системы Inferno Limbo имеют каналы. Следующий исходный код C компилируется в Plan 9 из пользовательского пространства :
#include "u.h"
#include "libc.h"
#include "thread.h"
enum { STACK = 8192 };
void producer(void *v) {
Channel *ch = v;
for (uint i = 1; ; ++i) {
sleep(400);
print("p %d\n", i);
sendul(ch, i);
}
}
void consumer(void *v) {
Channel *ch = v;
for (;;) {
uint p = recvul(ch);
print("\t\tc %d\n", p);
sleep(200 + nrand(600));
}
}
void threadmain(int argc, char **argv) {
int (*mk)(void (*fn)(void*), void *arg, uint stack);
mk = threadcreate;
Channel *ch = chancreate(sizeof(ulong), 1);
mk(producer, ch, STACK);
mk(consumer, ch, STACK);
recvp(chancreate(sizeof(void*), 0));
threadexitsall(0);
}
Точка входа в программу находится в функции threadmain
. Вызов функции ch = chancreate(sizeof(ulong), 1)
создает канал, вызов функции sendul(ch, i)
отправляет значение в канал и вызов функции p = recvul(ch)
получает значение из канала. В языке программирования Go тоже есть каналы. Пример Го:
package main
import (
"fmt"
"math/rand"
"time"
)
var sendMsg = 0
func produceMessage() int {
time.Sleep(400 * time.Millisecond)
sendMsg++
fmt.Printf("sendMsg = %v\n", sendMsg)
return sendMsg
}
func consumeMessage(recvMsg int) {
fmt.Printf("\t\trecvMsg = %v\n", recvMsg)
time.Sleep(time.Duration(200+rand.Intn(600)) * time.Millisecond)
}
func main() {
ch := make(chan int, 3)
go func() {
for {
ch <- produceMessage()
}
}()
for recvMsg := range ch {
consumeMessage(recvMsg)
}
}
Решение Go производитель-потребитель использует основную процедуру Go для потребителя и создает новую безымянную процедуру Go для производителя. Две процедуры Go связаны с каналом ch. Этот канал может поставить в очередь до трех значений int. Заявление ch := make(chan int, 3)
создает канал, утверждение ch <- produceMessage()
отправляет значение в канал и оператор recvMsg := range ch
получает значение из канала. [ 10 ] Распределение ресурсов памяти, выделение ресурсов обработки и синхронизация ресурсов выполняются языком программирования автоматически.
Без семафоров и мониторов
[ редактировать ]Лесли Лэмпорт задокументировал решение «производитель-потребитель» с ограниченным буфером для одного производителя и одного потребителя: Мы предполагаем, что буфер может содержать не более b сообщений, b >= 1. В нашем решении мы позволяем k быть константой, большей, чем b, и пусть s и r — целочисленные переменные, принимающие значения от 0 до k-1. Мы предполагаем, что изначально s=r и буфер пуст. Выбрав k кратным b, буфер можно реализовать как массив B [0: b - 1]. Производитель просто помещает каждое новое сообщение в B[s mod b], а потребитель берет каждое сообщение из B[r mod b]. [ 11 ] Алгоритм показан ниже, обобщенный для бесконечного k.
Producer:
L: if (s - r) mod k = b then goto L fi;
put message in buffer;
s := (s + 1) mod k;
goto L;
Consumer:
L: if (s - r) mod k = 0 then goto L fi;
take message from buffer;
r := (r + 1) mod k;
goto L;
Решение Лампорта использует ожидание занятости в потоке вместо ожидания в планировщике. Это решение не учитывает влияние переключения потоков планировщика в неподходящее время. Если первый поток прочитал значение переменной из памяти, планировщик переключается на второй поток, который меняет значение переменной, а планировщик переключается обратно на первый поток, тогда первый поток использует старое значение переменной, а не текущее значение. . Атомное чтение-изменение-запись решает эту проблему. Современные предложения C++ atomic
переменные и операции для многопоточного программирования. Следующее решение C++11 с занятым ожиданием для одного производителя и одного потребителя использует атомарные операции чтения-изменения-записи. fetch_add
и fetch_sub
об атомарной переменной count
.
enum {N = 4 };
Message buffer[N];
std::atomic<unsigned> count {0};
void producer() {
unsigned tail {0};
for (;;) {
Message message = produceMessage();
while (N == count)
; // busy waiting
buffer[tail++] = message;
tail %= N;
count.fetch_add(1, std::memory_order_relaxed);
}
}
void consumer() {
unsigned head {0};
for (;;) {
while (0 == count)
; // busy waiting
Message message = buffer[head++];
head %= N;
count.fetch_sub(1, std::memory_order_relaxed);
consumeMessage(message);
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
}
Переменные индекса кольцевого буфера head
и tail
являются локальными для потока и поэтому не имеют отношения к согласованности памяти. Переменная count
контролирует занятое ожидание потока производителя и потребителя.
См. также
[ редактировать ]- Атомная операция
- Шаблон проектирования
- ФИФО
- Трубопровод
- Канал
- Реализация на Java: Служба сообщений Java
Ссылки
[ редактировать ]- ^ Дейкстра; 2000 г.; EWD1303 Мои воспоминания о проектировании операционной системы
- ^ Дейкстра; 1965 год; EWD123 Сотрудничающие последовательные процессы, раздел 4.1. Типичное использование общего семафора.
- ^ Дейкстра; 1965 год; EWD123 Сотрудничающие последовательные процессы, раздел 4.3. Ограниченный буфер.
- ^ Дейкстра; 1972 год; Информационные потоки EWD329, совместно использующие конечный буфер
- ^ Вирт; 1969 год; Письмо Никлауса Вирта от 14 июля 1969 г. в Бринч-Хансене; 2004 г.; История программиста, глава 4 Спешащий молодой человек
- ^ Дейкстра; 1965 год; EWD123 Сотрудничающие последовательные процессы, раздел 4.3. Ограниченный буфер.
- ^ Пер Бринч Хансен; 1973 год; Принципы работы операционной системы, 3.4.7. Очереди событий
- ^ АВТОМОБИЛЬ Хоара; 1974 год; Мониторы: концепция структурирования операционной системы, 4. Пример: ограниченный буфер
- ^ Хоар; 1978 год; Взаимодействие последовательных процессов, 7.3 Имена портов
- ^ Экскурсия по Го, Каналам
- ^ Лэмпорт, Лесли; 1977 год; Доказательство корректности многопроцессных программ, пример производителя/потребителя
Дальнейшее чтение
[ редактировать ]- Отметьте грандиозные шаблоны в Java, том 1, Каталог повторно используемых шаблонов проектирования, иллюстрированный с помощью UML
- Журнал пользователей C/C++ (Dr.Dobb's), январь 2004 г., «Библиотека шаблонов параллелизма производитель-потребитель C++» , автор Тед Юань, представляет собой готовую к использованию библиотеку шаблонов C++. Исходный код небольшой библиотеки шаблонов и примеры можно найти здесь.
- Иоан Тинка, Эволюция проблемы производителя-потребителя в Java