КаталогИндекс раздела
НазадОглавлениеВперед

6. Дополнительные примеры

В предложениях новых свойств для языка высокого уровня очень трудно привести убедительное доказательство того, что свойство будет и легким в эффективном использовании, и легким в эффективной реализации. Качество реализации может быть доказано единственным хорошим примером, но легкость и эффективность использования требует большого числа реальных примеров; иначе может показаться, что новое свойство разработано специально для этих примеров, а не наоборот. Эта секция содержит ряд дополнительных примеров решения известных задач. Другие примеры могут быть найдены в [14].

6.1 Выделение буферов

Ограниченный буфер, описанный в секции 4, приспособлен только для последовательностей с малыми порциями, например, для очередей сообщений. Если буферы содержат информацию большого объема (например, файлы оффлайнового ввода и вывода), ограниченный буфер по-прежнему может использоваться для запоминания адресов буферов, которые используются для хранения информации. Таким образом, производитель может заполнять один буфер, в то время как потребитель разгружает другой буфер той же самой последовательности. Но это требует программы-распределителя для динамического получения и освобождения буферных адресов.

Они могут быть объявлены как тип

type bufferaddress = 1..B;

где B - число буферов, доступных для распределения. Распределитель буферов имеет два входа:

procedure acquire (result b:bufferaddress);

который выдает адрес b - свободного буфера; и

procedure release(b: bufferaddress);

который возвращает в пул bufferaddress; когда он больше не требуется. Чтобы вести учет адресов свободных буферов, монитору потребуется

freepool:powerset bufferaddress;

который использует средство powerset языка ПАСКАЛЬ, чтобы определить переменную, значения которой охватывают все множество буферных адресов от пустого множества до множества, содержащего все буферные адреса. Оно может быть реализовано как битовая карта из B последовательных битов, где i-й бит равен 1, тогда и только тогда, когда i входит во множество. Нужна только одна переменная условия:

nonempty:condition 

которая означает, что freepool < > пусто. Код программы-распределителя:

buffer allocator: monitor 
begin freepool:powerset bufferaddress; 
     nonempty:condition; 
  procedure acquire (result b:bufferaddress); 
    begin if freepool = empty then nonempty.wait; 
      b := first(freepool); 
      comment Делается что угодно; 
      freepool := freepool - { b }; 
      comment Вычитание из множества; 
    end acquire; 
  procedure release(b:bufferaddress); 
    begin freepool := freepool + { b };
      nonempty.signal
    end release;
  freepool := all buffer addresses 
end buffer allocator

Действия производителя и потребителя генератора и потребителя могут быть сведены к:

producer: begin b:bufferaddress; ...
            while not finished do
              begin bufferallocator.acquire(b);
               ... заполнение буфера b ...;
               bounded buffer.append(b)
              end; ...
          end producer;

consumer: begin b:bufferaddress; ...
            while not finished do
              begin bounded buffer.remove(b);
                ... опустошение буфера b ...;
                buffer allocator.release(b)
              end; ...
          end consumer;

Этот распределитель буферов представляется пригодным для применения в совместном использовании буфера несколькими потоками, каждый с собственным производителем и собственным потребителем и с собственным экземпляром монитора ограниченного буфера. К сожалению, когда потоки выполняются при широко диапазоне изменения скоростей и когда freepool пуст, алгоритм планирования, может постоянно демонстрировать нежелательное поведение. Если два производителя соревнуются за каждый буфер при его освобождении, дисциплина распределения "первым прибыл - первым обслуживается", будет гарантировать (довольно справедливо), что каждый из них получит очередной буфер; и они, следовательно, станут производить на равных скоростях. Но если один потребитель - принтер со скоростью 1000 строк/мин, а другой - телетайп со скоростью 10 строк/мин, то скорость более быстрого потребителя будет в конечном счете понижена до скорости более медленного, так как он никогда не может выполняться быстрее, чем производитель. В этой стадии почти все буфера будут принадлежать более медленному потоку, так что ситуация может потребовать длительного времени для прояснения.

Путь преодоления этого состоит в использовании планируемого ожидания, чтобы гарантировать, что в условиях большой нагрузки доступные буферы будут разделяться достаточно справедливо между потоками, которые соревнуются за них. Конечно, не должны рассматриваться неактивные потоки и потоки в которых потребитель работает в настоящее время быстрее, чем производитель, никогда не будут запрашивать больше, чем два буфера. Чтобы достичь справедливости в распределении, достаточно выделять только что освобожденный буфер тому из конкурирующих производителей, чей поток в настоящее время имеет наименьшее количество буферов. Таким образом система будет искать точку, настолько далеко отстоящую от нежелательного экстремального значения, насколько возможно.

По этой причине входы в распределитель должны указывать, для какого потока буфер должен быть (или был) использован, и распределитель должен хранить счетчик текущего распределения для каждого потока в массиве:

count:array stream of integer;

Новая версия распределителя:

bufferallocator: monitor
  begin freepool:powerset bufferaddress;
        nonempty:condition
        count:array stream of integer;
    procedure acquire(result b:bufferaddress; s:stream);
      begin if freepool = empty then nonempty.wait(count[s]);
        count[s] := count[s] + 1;
        b := first(freepool);
        freepool := freepool - {b}
      end acquire;
    procedure release(b:bufferaddress; s:stream)
      begin count[s] := count[s] - 1;
        freepool := freepool - {b};
        nonempty.signal
      end;
    freepool := all buffer addresses;
    for s:stream do count[s] := 0
  end bufferallocator

Конечно, возможно, что потребитель остановится совсем, например, вследствие механической неисправности. Производитель должен также быть приостановлен прежде чем он получит слишком много буферов, даже если никто больше в настоящее время не претендует на них. Это можно легко осуществить соответствующей фиксацией размера ограниченного буфера для потока и/или, гарантируя, чтобы по крайней мере два буфера были зарезервированы для каждого потока, даже когда он неактивен. Вот интересное примечание к динамическому распределению ресурсов: система должна быть разработана так, чтобы при тяжелой загрузке ресурсов, откатываться к более статичному режиму.

Я благодарен E. W. Dijkstra за постановку этой проблемы и решение в [10].

6.2 Планировщик головок диска

На вращающемся диске время перемещения головок увеличивается монотонно с увеличением расстоянием. Если несколько программ желают переместить головки, среднее время ожидания может быть снижено, если обслуживать первой ту программу, которая желает переместить их на кратчайшее расстояние. Но, к сожалению, эта стратегия неустойчива, так как программа, желающая обратиться к цилиндру на одном краю диска, может неопределенно долго откладываться программами, работающими на другом краю или в середине.

Решение состоит в минимизации частоты изменения направления движения при перемещении головок. В любом случае головки сохраняют движение в данном направлении, и они обслуживают программные запросы к самому близкому цилиндру в этом направлении. Если же не больше нет таких запросов, изменяется направление, и головки делают другой проход по поверхности диска. Это может быть названо алгоритмом "лифта", так как моделирует поведение лифта в многоэтажном здании. Имеются два входа в планировщик головок диска:

(1)  request(dest:cylinder); 

где

    type cylinder = 0..cylmax.

который вводится программой перед самой выдачей команды на перемещение головок к цилиндру dest.

(2)  release; 

который вводится программой, когда она выполнила все передачи информации, требуемые на текущем цилиндре.

Локальные данные монитора должны включать в себя headpos - запись о текущей позиции головок, текущее направление прохода, и признак, является ли диск занятым:

     headpos:cylinder;
     direction:(up,down);
     busy:Boolean

Мы нуждаемся в двух условиях: одно для запросов, ожидающих прохода вверх (upsweep), и другое - для запросов, ожидающих прохода вниз (downsweep):

dischead:monitor
begin headpos:cylinder;
     direction:(up,down);
     busy:Boolean;
     upsweep,downsweep:condition;
   procedure request(dest:cylinder);
      begin if busy then
         {if headpos < dest or headpos = dest & direction = up
                  then upsweep.wait(dest)
            else downsweep.wait(dest)};
          busy := true; headpos := dest
      end request;
   procedure release;
      begin busy := false;
         if direction = up then
            {if upsweep.queue then upsweep.signal
                                else {direction := down;
                   downsweep.signal}}
         else if downsweep.queue then downsweep.signal
                                               else {direction := up;
                                                    upsweep.signal}
     end release;
     headpos := 0; direction := up; busy := false
end dischead;

6.3 Читатели и писатели

Как более значительный пример, мы возьмем проблему, которая возникает в онлайновых приложениях систем реального времени, таких как управление воздушным пространством. Предположим, что каждый самолет представлен записью, и что актуальное состояние этой записи поддерживается рядом процессов "писателей", и к записи обращается ряд процессов "читателей". Любое число "читателей" может одновременно обращаться одной и той же записи, но очевидно, что любой процесс, который модифицирует (записывает) отдельные компоненты записи, должен иметь монопольный доступ к ней, иначе мы получим хаос. Итак, нам нужен класс мониторов; экземпляр этого класса, локальный для каждой записи о самолете, приводит в исполнение заданную дисциплину для этой записи. Если есть много самолета, имеется веское основание для сокращения локальных данных монитора; и если каждая операция чтения или записи короткая, то мы должны также минимизировать фактическое время, требуемое для выполнения каждого обращения к монитору.

Когда много читателей заинтересованы в единственной записи о самолете, есть опасность, что писатель будет на неопределенное время откладываться и не сможет поддерживать актуальность записи. Мы поэтому решаем, что новому читателю нельзя разрешить начаться, если имеется ожидающий писатель. Точно так же, чтобы избежать опасности неопределенного откладывания читателей, все читатели, ожидающие завершения записи должны иметь приоритет перед следующими писателями. Заметьте, что эти правила планирования очень отличны от обсуждаемых в [4], и представляется, что они не требуют такой тонкости в реализации. Тем не менее, они могут лучше подходить для таких приложений, где лучше читать просроченную информацию, чем ждать неопределенно долго!

Монитор очевидно требует четырех локальных процедур:

startread 
    вызывается читателем, который желает читать.
endread 
    вызывается читателем, который закончил чтение.
startwrite 
    вызывается писателем, который желает писать.
endwrite 
    вызывается писателем, который закончил писать.

Мы должны хранить счетчик числа читающих пользователей, чтобы последний заканчивающийся читатель знал об этом:

    readercount: integer

Мы также нуждаемся в булевом индикаторе того, что кто-то сейчас пишет:

    busy: Boolean; 

Мы вводим отдельные условия для ожидания читателей и писателей:

    OKtoread, OKtowrite:condition; 

Следует отметить, что:

    OKtoread = not busy
    OKtowrite = not busy & readercount = 0
    invariant: busy => readercount = 0

class readers and writers: monitors
   begin readercount:integer;
        busy:Boolean;
        OKtoread, OKtowrite:condition;
     procedure startread;
        begin if busy or OKtowrite.queue then OKtoread.wait;
          readercount := readercount + 1;
          OKtoread.signal;
          comment Если может статровать один читатель, то могут и все;
        end startread;
     procedure endread;
        begin readercount := readercount - 1;
          if readercount = 0 then OKtowrite.signal
        end endread;
     procedure startwrite;
        begin
          if readercount > 0 or busy then OKtowrite.wait
          busy := true
        end startwrite;
     procedure endwrite;
        begin busy := false;
          if OKtoread.queue then OKtoread.signal
                             else OKtowrite.signal
        end endwrite;
     readercount := 0;
     busy := false;
   end readers and writers;

Я благодарен Dave Gorman за помощь в нахождении этого решения.


НазадОглавлениеВперед
КаталогИндекс раздела