Jump to content

Проблема производителя и потребителя

(Перенаправлено с Производитель-потребитель )

В вычислительной технике проблема производителя -потребителя (также известная как проблема ограниченного буфера ) представляет собой семейство проблем, описанных Эдсгером В. Дейкстрой с 1965 года.

Дейкстра нашел решение проблемы производитель-потребитель, работая консультантом по компьютерам Electrologica X1 и X8: «Первое использование производителя-потребителя было частично программным, частично аппаратным: компонент, обеспечивающий передачу информации между магазином и магазином. периферийное устройство называлось «каналом»... Синхронизация контролировалась двумя счетными семафорами в том, что мы теперь знаем как механизм производитель/потребитель: один семафор, указывающий длину очереди, увеличивался (в V) процессором и уменьшается (в P) каналом, другой, подсчитывая количество неподтвержденных завершений, увеличивается каналом и уменьшается процессором [второй семафор, являющийся положительным, поднимет соответствующий флаг прерывания.]" [ 1 ]

Дейкстра писал о случае неограниченного буфера: «Мы рассматриваем два процесса, которые называются «производителем» и «потребителем» соответственно. Производитель — это циклический процесс, и каждый раз, когда он проходит свой цикл, он производит определенную порцию информации. который должен быть обработан потребителем. Потребитель также представляет собой циклический процесс, и каждый раз, когда он проходит свой цикл, он может обрабатывать следующую порцию информации, произведенную производителем... Мы предполагаем, что эти два процесса быть подключен для этой цели через буфер с неограниченной емкостью». [ 2 ]

Он писал о случае ограниченного буфера: «Мы изучили производителя и потребителя, связанных через буфер с неограниченной емкостью... Отношение становится симметричным, если они связаны через буфер конечного размера, скажем, N порций» [ 3 ]

А о случае с несколькими производителями-потребителями: «Мы рассматриваем несколько пар производитель/потребитель, где пара i связана через информационный поток, содержащий n i частей. Мы предполагаем... конечный буфер, который должен содержать все части всех потоков. иметь вместимость «всего» порций». [ 4 ]

Пер Бринч Хансен и Никлаус Вирт вскоре увидели проблему семафоров: «Я пришел к тому же выводу в отношении семафоров, а именно, что они не подходят для языков более высокого уровня. Вместо этого естественными событиями синхронизации являются обмены сообщениями ». [ 5 ]

Ограниченное буферное решение Дейкстры

[ редактировать ]

Исходное решение с ограниченным семафором буфером было написано в ALGOL стиле . Буфер может хранить N частей или элементов. «количество частей в очереди» Семафор подсчитывает заполненные ячейки в буфере, семафор «количество пустых позиций» подсчитывает пустые ячейки в буфере, а семафор «манипулирование буфером» работает как мьютекс для операций размещения и получения буфера. Если буфер заполнен, то есть количество пустых позиций равно нулю, поток-производитель будет ожидать операции P (количество пустых позиций). Если буфер пуст, то есть количество частей в очереди равно нулю, потребительский поток будет ожидать операции P (количество частей в очереди). Операции V() освобождают семафоры. В качестве побочного эффекта поток может перейти из очереди ожидания в очередь готовности. Операция P() уменьшает значение семафора до нуля. Операция V() увеличивает значение семафора. [ 6 ]

begin integer number of queueing portions, number of empty positions,
      buffer manipulation;
      number of queueing portions:= 0;
      number of empty positions:= N;
      buffer manipulation:= 1;
      parbegin
      producer: begin
              again 1: produce next portion;
                       P(number of empty positions);
                       P(buffer manipulation);
                       add portion to buffer;
                       V(buffer manipulation);
                       V(number of queueing portions); goto again 1 end;
      consumer: begin
              again 2: P(number of queueing portions);
                       P(buffer manipulation);
                       take portion from buffer;
                       V(buffer manipulation) ;
                       V(number of empty positions);
                       process portion taken; goto again 2 end
      parend
end

Начиная с C++ 20, семафоры являются частью языка. Решение Дейкстры можно легко написать на современном C++. Переменная buffer_manipulation является мьютексом. Функция семафора для получения данных в одном потоке и освобождения в другом потоке не требуется. Оператор lock_guard() вместо пары lock() и unlock() — это C++ RAII . Деструктор lock_guard обеспечивает снятие блокировки в случае исключения. Это решение может обрабатывать несколько потоков потребителей и/или несколько потоков производителей.

#include <thread>
#include <mutex>
#include <semaphore>

std::counting_semaphore<N> number_of_queueing_portions{0};
std::counting_semaphore<N> number_of_empty_positions{N};
std::mutex buffer_manipulation;

void producer() {
  for (;;) {
    Portion portion = produce_next_portion();
    number_of_empty_positions.acquire();
    {
      std::lock_guard<std::mutex> g(buffer_manipulation);
      add_portion_to_buffer(portion);
    }
    number_of_queueing_portions.release();
  }
}

void consumer() {
  for (;;) {
    number_of_queueing_portions.acquire();
    Portion portion;
    {
      std::lock_guard<std::mutex> g(buffer_manipulation);
      portion = take_portion_from_buffer();
    }
    number_of_empty_positions.release();
    process_portion_taken(portion);
  }
}

int main() {
  std::thread t1(producer);
  std::thread t2(consumer);
  t1.join();
  t2.join();
}

[ неправильный синтез? ]

Использование мониторов

[ редактировать ]

Пер Бринч Хансен определил монитор: Я буду использовать термин «монитор» для обозначения общей переменной и набора значимых операций над ней. Целью монитора является управление планированием ресурсов между отдельными процессами в соответствии с определенной политикой. [ 7 ] Тони Хоар заложил теоретическую основу монитора. [ 8 ]

bounded buffer: monitor
  begin buffer:array 0..N-1 of portion;
    head, tail: 0..N-1;
    count: 0..N;
    nonempty, nonfull: condition;
  procedure append(x: portion);
    begin if count = N then nonfull.wait;
      note 0 <= count < N;
      buffer[tail] := x;
      tail := tail (+) 1;
      count := count + 1;
      nonempty.signal
    end append;
  procedure remove(result x: portion) ;
    begin if count = 0 then nonempty.wait;
      note 0 < count <= N;
      x := buffer[head];
      head := head (+) 1;
      count := count - 1;
      nonfull.signal
    end remove;
  head := 0; tail := 0; count := 0;
end bounded buffer;

Монитор — это объект, содержащий переменные buffer, head, tail и count чтобы реализовать циклический буфер , переменные условия nonempty и nonfull для синхронизации и методов append и remove для доступа к ограниченному буферу. Ожидание операции монитора соответствует операции семафора P или захвату, сигнал соответствует V или освобождению. Операция, обведенная кружком (+), берется по модулю N. Представленный псевдокод в стиле Паскаля показывает монитор Хоара. Монитор Mesa использует while count вместо if count. Версия языка программирования C++:

class Bounded_buffer {
  Portion buffer[N];    // 0..N-1
  unsigned head, tail;  // 0..N-1
  unsigned count;       // 0..N
  std::condition_variable nonempty, nonfull;
  std::mutex mtx;
public:
  void append(Portion x) {
    std::unique_lock<std::mutex> lck(mtx);
    nonfull.wait(lck, [&]{ return !(N == count); });
    assert(0 <= count && count < N);
    buffer[tail++] = x;
    tail %= N;
    ++count;
    nonempty.notify_one();
  }
  Portion remove() {
    std::unique_lock<std::mutex> lck(mtx);
    nonempty.wait(lck, [&]{ return !(0 == count); });
    assert(0 < count && count <= N);
    Portion x = buffer[head++];
    head %= N; 
    --count;
    nonfull.notify_one();
    return x;
  }
  Bounded_buffer() {
    head = 0; tail = 0; count = 0;
  }
};

Версия C++ требует дополнительного мьютекса по техническим причинам. Он использует утверждение для обеспечения выполнения предварительных условий для операций добавления и удаления буфера.

Использование каналов

[ редактировать ]

Самое первое решение производитель-потребитель в компьютерах Electrologica использовало «каналы». определенные Хоаром Каналы, : альтернатива явному указанию источника и пункта назначения. было бы назвать порт, через который будет осуществляться связь. Имена портов будут локальными для процессов, а способ соединения пар портов по каналам может быть объявлен в заголовке параллельной команды. [ 9 ] Бринч Хансен реализовал каналы на языках программирования Joyce и Super Pascal . Язык программирования операционной системы Plan 9 Alef и язык программирования операционной системы Inferno Limbo имеют каналы. Следующий исходный код C компилируется в Plan 9 из пользовательского пространства :

#include "u.h"
#include "libc.h"
#include "thread.h"

enum { STACK = 8192 };

void producer(void *v) {
  Channel *ch = v;
  for (uint i = 1; ; ++i) {
    sleep(400);
    print("p %d\n", i);
    sendul(ch, i);
  }
}
void consumer(void *v) {
  Channel *ch = v;
  for (;;) {
    uint p = recvul(ch);
    print("\t\tc %d\n", p);
    sleep(200 + nrand(600));
  }
}
void threadmain(int argc, char **argv) {
  int (*mk)(void (*fn)(void*), void *arg, uint stack);
  mk = threadcreate;
  Channel *ch = chancreate(sizeof(ulong), 1);
  mk(producer, ch, STACK);
  mk(consumer, ch, STACK);
  recvp(chancreate(sizeof(void*), 0));
  threadexitsall(0);
}

Точка входа в программу находится в функции threadmain. Вызов функции ch = chancreate(sizeof(ulong), 1) создает канал, вызов функции sendul(ch, i) отправляет значение в канал и вызов функции p = recvul(ch) получает значение из канала. В языке программирования Go тоже есть каналы. Пример Го:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

var sendMsg = 0

func produceMessage() int {
	time.Sleep(400 * time.Millisecond)
	sendMsg++
	fmt.Printf("sendMsg = %v\n", sendMsg)
	return sendMsg
}
func consumeMessage(recvMsg int) {
	fmt.Printf("\t\trecvMsg = %v\n", recvMsg)
	time.Sleep(time.Duration(200+rand.Intn(600)) * time.Millisecond)
}
func main() {
	ch := make(chan int, 3)
	go func() {
		for {
			ch <- produceMessage()
		}
	}()
	for recvMsg := range ch {
		consumeMessage(recvMsg)
	}
}

Решение Go производитель-потребитель использует основную процедуру Go для потребителя и создает новую безымянную процедуру Go для производителя. Две процедуры Go связаны с каналом ch. Этот канал может поставить в очередь до трех значений int. Заявление ch := make(chan int, 3) создает канал, утверждение ch <- produceMessage() отправляет значение в канал и оператор recvMsg := range ch получает значение из канала. [ 10 ] Распределение ресурсов памяти, выделение ресурсов обработки и синхронизация ресурсов выполняются языком программирования автоматически.

Без семафоров и мониторов

[ редактировать ]

Лесли Лэмпорт задокументировал решение «производитель-потребитель» с ограниченным буфером для одного производителя и одного потребителя: Мы предполагаем, что буфер может содержать не более b сообщений, b >= 1. В нашем решении мы позволяем k быть константой, большей, чем b, и пусть s и r — целочисленные переменные, принимающие значения от 0 до k-1. Мы предполагаем, что изначально s=r и буфер пуст. Выбрав k кратным b, буфер можно реализовать как массив B [0: b - 1]. Производитель просто помещает каждое новое сообщение в B[s mod b], а потребитель берет каждое сообщение из B[r mod b]. [ 11 ] Алгоритм показан ниже, обобщенный для бесконечного k.

Producer:
  L:  if (s - r) mod k = b then goto L fi;
      put message in buffer;
      s := (s + 1) mod k;
      goto L;
Consumer:
  L:  if (s - r) mod k = 0 then goto L fi;
      take message from buffer;
      r := (r + 1) mod k;
      goto L;

Решение Лампорта использует ожидание занятости в потоке вместо ожидания в планировщике. Это решение не учитывает влияние переключения потоков планировщика в неподходящее время. Если первый поток прочитал значение переменной из памяти, планировщик переключается на второй поток, который меняет значение переменной, а планировщик переключается обратно на первый поток, тогда первый поток использует старое значение переменной, а не текущее значение. . Атомное чтение-изменение-запись решает эту проблему. Современные предложения C++ atomic переменные и операции для многопоточного программирования. Следующее решение C++11 с занятым ожиданием для одного производителя и одного потребителя использует атомарные операции чтения-изменения-записи. fetch_add и fetch_sub об атомарной переменной count.

enum {N = 4 };
Message buffer[N];
std::atomic<unsigned> count {0};
void producer() {
  unsigned tail {0};
  for (;;) {
    Message message = produceMessage();
    while (N == count) 
      ; // busy waiting
    buffer[tail++] = message;
    tail %= N;
    count.fetch_add(1, std::memory_order_relaxed);
  }
}
void consumer() {
  unsigned head {0};
  for (;;) {
    while (0 == count) 
      ; // busy waiting
    Message message = buffer[head++];
    head %= N;
    count.fetch_sub(1, std::memory_order_relaxed);
    consumeMessage(message);
  }
}
int main() {
  std::thread t1(producer);
  std::thread t2(consumer);
  t1.join();
  t2.join();
}

Переменные индекса кольцевого буфера head и tail являются локальными для потока и поэтому не имеют отношения к согласованности памяти. Переменная count контролирует занятое ожидание потока производителя и потребителя.

См. также

[ редактировать ]
  1. ^ Дейкстра; 2000 г.; EWD1303 Мои воспоминания о проектировании операционной системы
  2. ^ Дейкстра; 1965 год; EWD123 Сотрудничающие последовательные процессы, раздел 4.1. Типичное использование общего семафора.
  3. ^ Дейкстра; 1965 год; EWD123 Сотрудничающие последовательные процессы, раздел 4.3. Ограниченный буфер.
  4. ^ Дейкстра; 1972 год; Информационные потоки EWD329, совместно использующие конечный буфер
  5. ^ Вирт; 1969 год; Письмо Никлауса Вирта от 14 июля 1969 г. в Бринч-Хансене; 2004 г.; История программиста, глава 4 Спешащий молодой человек
  6. ^ Дейкстра; 1965 год; EWD123 Сотрудничающие последовательные процессы, раздел 4.3. Ограниченный буфер.
  7. ^ Пер Бринч Хансен; 1973 год; Принципы работы операционной системы, 3.4.7. Очереди событий
  8. ^ АВТОМОБИЛЬ Хоара; 1974 год; Мониторы: концепция структурирования операционной системы, 4. Пример: ограниченный буфер
  9. ^ Хоар; 1978 год; Взаимодействие последовательных процессов, 7.3 Имена портов
  10. ^ Экскурсия по Го, Каналам
  11. ^ Лэмпорт, Лесли; 1977 год; Доказательство корректности многопроцессных программ, пример производителя/потребителя

Дальнейшее чтение

[ редактировать ]
Arc.Ask3.Ru: конец переведенного документа.
Arc.Ask3.Ru
Номер скриншота №: 1d4e4edce533a14af39dfc88c979c791__1716981000
URL1:https://arc.ask3.ru/arc/aa/1d/91/1d4e4edce533a14af39dfc88c979c791.html
Заголовок, (Title) документа по адресу, URL1:
Producer–consumer problem - Wikipedia
Данный printscreen веб страницы (снимок веб страницы, скриншот веб страницы), визуально-программная копия документа расположенного по адресу URL1 и сохраненная в файл, имеет: квалифицированную, усовершенствованную (подтверждены: метки времени, валидность сертификата), открепленную ЭЦП (приложена к данному файлу), что может быть использовано для подтверждения содержания и факта существования документа в этот момент времени. Права на данный скриншот принадлежат администрации Ask3.ru, использование в качестве доказательства только с письменного разрешения правообладателя скриншота. Администрация Ask3.ru не несет ответственности за информацию размещенную на данном скриншоте. Права на прочие зарегистрированные элементы любого права, изображенные на снимках принадлежат их владельцам. Качество перевода предоставляется как есть. Любые претензии, иски не могут быть предъявлены. Если вы не согласны с любым пунктом перечисленным выше, вы не можете использовать данный сайт и информация размещенную на нем (сайте/странице), немедленно покиньте данный сайт. В случае нарушения любого пункта перечисленного выше, штраф 55! (Пятьдесят пять факториал, Денежную единицу (имеющую самостоятельную стоимость) можете выбрать самостоятельно, выплаичвается товарами в течение 7 дней с момента нарушения.)