вторник, 30 ноября 2010 г.

Мультипоточность в питоне. Часть 2 - синхронизация

В части 1 мы рассмотрели базовые основы работы с потоками. Перейдем к примитивам синхронизации.

В мультипоточной программе доступ к объектам иногда нужно синхронизировать.

С этой точки зрения все объекты (переменные) разделяются на:

  • Неизменяемые. Мои самые любимые. Если объект никто не меняет, то синхронизация доступа ему не нужна. К сожалению, таких не очень много.
  • Локальные. Если объект не виден остальным потокам, то доступ к нему синхронизировать тоже не требуется.
  • Разделяемые и изменяемые. Синхронизация необходима.

Синхронизация доступа к объектам осуществляется с помощью объектов синхронизации (простите за тавтологию).

Объектов синхронизации существует множество - и постоянно придумывают новые. Это не то чтобы плохо - но избыточно (по крайней мере на начальном этапе изучения).

Каждый объект синхронизации имеет свои плюсы и минусы (обычно это баланс между быстродействием и областью применения). Ошибка ведет к тому, что

  • код будет выполнятся не так быстро, как мог бы (меньшее зло)
  • или к тому, что он будет выполнятся неправильно (тушите свет).

Давным-давно Дейкстра работал не только над алгоритмами на графах, но и над проблемами многопоточного кода. Ему удалось в одной из своих работ доказать, что минимальный базис, необходимый для любой синхронизации, сводится к двум объектам:

  • блокировки (они же lock, mutex).
  • условные переменные (condition variable).

Давайте на них и остановимся.

Ремарка о скорости. Питон - не первый спринтер на деревне. Если для программы на С выигрыш от правильного использования специфичного объекта синхронизации может быть существенным, то для Питона в целом они все примерно одинаковы.

Выбирать нужно более безопасный способ, не обращая внимания на быстродействие.

Кстати, это не относится к реализации CPython как такового - в коде интерпретатора присутствуют довольно экстравагантные блокировки, дающие выигрыш в пару процентов. До тех пор, пока их использует небольшая очень квалифицированная команда разработчиков - это оправдано.

Вторая ремарка: рассматриваемые объекты блокировки очень примитивны.

Откровенно говоря, не существует высокоуровневой теории мультипоточных программ. Если сравнивать с развитием языков, то уровень мультипоточного программирования сейчас находится где-то там, где были процедурные языки программирования. Объекты синхронизации схожи с оператором goto - использовать приходится, но всегда следует внимательно смотреть, что вы делаете.

Более высокоуровневые концепты, такие как monitor и active object хороши, но не решают всех проблем.

Языки вроде erlang, прекрасно справляющиеся с распараллеливанием задач на оптимальное количество потоков, останутся в своей узкой нише - примерно как lisp и все семейство языков функционального программирования навсегда остануться инструментом для маргиналов.

Парадигма многопоточных языков общего употребления все еще не создана.

Блокировки

Это - основа всего.

Простейший пример использования:

import threading

class Point(object):
    def __init__(self):
        self._mutex = threading.RLock()
        self._x = 0
        self._y = 0

    def get(self):
        with self._mutex:
            return (self._x, self._y)

    def set(self, x, y):
        with self._mutex:
            self._x = x
            self._x = y

Итак, имеем тривиальный класс. Это точка в двухмерном пространстве. Пусть координаты _x и _y начинаются с подчеркивания - чтобы программист, пожелавший менять их непосредственно немного задумался.

Еще есть _mutex типа RLock.

Публичная часть - два метода get и set.

Работает все это так: - при вызове метода берем блокировку через with self._mutex: - весь код внутри with блока будет выполнятся только в одном потоке. Другими словами, если два разных потока вызовут .get то пока первый поток не выйдет из блока второй будет его ждать - и только потом продолжит выполнение.

Зачем это все нужно? Координаты нужно менять одновременно - ведь точка это цельный объект. Если позволить одному потоку поменять x, а другой в это же время поправит y логика алгоритма может поломаться.

Есть и другой вопрос: зачем методу get блокировка? В приведенном примере она действительно не нужна. Но я всё же настоятельно рекомендую использовать блокировки даже для методов, вроде бы не изменяющих содержимое.

Во первых, это просто хорошая привычка. Иногда блокировки можно опускать для повышения производительности - но только после тщательного изучения побочных эффектов.

Во вторых (и это важнее) - блокировки при чтении позволяют корректно работать с объектами, части которых могут изменятся независимо.

Для примера возьмет цветную точку.

class ColoredPoint(Point):
    def __init__(self):
        super(ColoredPoint, self).__init__()
        self._color = 'green'

    @property
    def color(self):
        with self._mutex:
            return self._color

    @color.setter
    def color(self, val):
        with self._mutex:
            self._color = val

    def do(self, observer):
        with self._mutex:
            if self._color == 'red':
                observer(self.get())

Без блокировки в методе .do возможна ситуация, при которой один поток поменяет цвет, в второй в это же время изменит координаты. И тогда observer будет вызван с неправильными значениями.

В модуле threading существует еще и Lock - никогда его не используйте. Дело в том, что RLock (recursive lock) допускает повторную блокировку.

В примере для цветной точки это хорошо видно - .do берет блокировку, а затем вызывает метод .get, который берет эту блокировку еще раз.

В случае с Lock метод будет намертво повешен. RLock позволяет повторно брать блокировку тому потоку, который уже ее получил.

Говоря о блокировках, нужно упомянуть и две проблемы, которые возникают при их использовании:

  • Race condition: неправильное поведение из за отсутствия блокировки.
  • Dead lock: взаимная блокировка.

Первый случай я уже описал: при отсутствии блокировки один поток меняет часть объекта, временно приводя его в некорректное состояние - а второй поток именно в это время пытается работать с этим некорректным на данный момент объектом.

Пример для взаимных блокировок:

a = threading.RLock()
b = threading.RLock()

def f():
    with a:
        # do something
        with b:
            # do something also

def g():
    with b:
        # do something
        with a:
            # do something also

При одновременно вызове f из одного потока и g из другого оба потока навсегда повиснут: первый будет ждать захвата b а второй, захватив b остановится на ожидании блокировки a.

Обратите внимание: эта проблема может проявляться не сразу и нелегко диагностируется/воспроизводится.

Современные программы бывают большими и сложными с очень запутанными потоками исполнения, отследить которые нелегко.

Еще один пример:

def h():
    try:
        a.acquire()
        b.acquire()
    # do something
    finally:
        a.release()
        b.release()

При одновременном вызове h возможна взаимная блокировка.

Общее правило, немного помогающее при проектировании многопоточных систем: блокировки всегда должны браться в одном и том же порядке, а освобождаться в противоположном. Использование оператора with помогает решить эту проблему.

Условные переменные

С блокировками все более или менее ясно. Нужен еще один объект синхронизации.

Рассмотрим очередь на фиксированное количество элементов. Когда очередь пуста - поток, желающий получить новый элемент должен ждать. Аналогично с переполненной очередью.

Если решать все только на блокировках, придется проверять - а не пуста ли очередь? И если таки пуста - подождать еще немного. Сколько времени ждать - сложный вопрос.

Очевидно, нужен какой-то сигнал от заполняющего очередь потока - ожидающему.

Чтобы последний, выйдя из вечной блокировки мог продолжить работу.

Несмотря на то, что Питон непосредственно поддерживает сигналы (threading.Event) - я настоятельно не рекомендую их использовать.

Проблема в следующем: поток послал сигнал о поступлении элемента в очередь.

Кто его получит, если получение нового элемента ожидают несколько потоков?

Один из них забрал данные и очередь снова пуста. Как нужно дожидаться следующего поступления?

Условные переменные (condition variables) совмещают сигналы с блокировками.

Рассмотрим пример:

class Queue(object):
    def __init__(self, size=5):
        self._size = size
        self._queue = []
        self._mutex = threading.RLock()
        self._empty = threading.Condition(self._mutex)
        self._full = threading.Condition(self._mutex)

    def put(self, val):
        with self._full:
            while len(self._queue) >= self._size:
                self._full.wait()
            self._queue.append(val)
            self._empty.notify()

    def get(self):
        with self._empty:
            while len(self._queue) == 0:
                self._empty.wait()
            ret = self._queue.pop(0)
            self._full.notify()
            return ret

У нас два почти симметричных метода - положить в очередь и взять из нее.

Разберем .get. Сначала берем блокировку.

Обратите внимание - блокиратор один на обе условные переменные. Это важно Дело в том, что _full и _empty взаимозависимы. Хотя threading.Condition позволяет не указывать блокиратор, создавая новый RLock автоматически - не поступайте так. Можно поймать race condition, о котором я говорил раньше. Гораздо надежней и наглядней делать все явно. В нашем случае race condition был бы в явном виде - пусть и скрытый из за GIL.

  • Входим в блокиратор.
  • Проверяем наше условие - обычно это всегда цикл while.
  • Если условие не выполнилось - вызываем .wait(). Этот метод освободит блокировку и будет ждать извещения .notify. После получения сигнала выполнение продолжится при снова взятой блокировке. Еще раз, простыми словами.
    1. Проверка условия всегда выполняется в блокировке. Никто другой в это же время условие поменять не сможет (если все сделано правильно, конечно).
    2. Если условие не выполнилось - ждем опять, отдав управление операционной системе.
    3. При повторном вхождении у нас снова есть блокировка.
  • Условие выполнилось - очередь не пуста. Как минимум один элемент в ней есть. Может быть и больше - нас сейчас это не волнует. Блокировка все еще есть.
  • Получаем этот элемент и извещаем очередь, что она стала не до конца заполненной - т.е. в нее можно еще что-то положить (self._full.notify()).
  • Возвращаем полученное. Выходя из with освобождаем блокировку - другие потоки могут работать с очередью дальше.

Итак, условные переменные - это способ синхронизировать доступ к объектам при помощи блокировки и при этом возможность послать сигнал ожидающим потокам.

Альтернатива оператору with.

Вместо

with lock:
    # do something

можно использовать более традиционную форму:

try:
    lock.acquire()
    # do domething
finally:
    lock.release()

Эти два подхода абсолютно эквивалентны, но первый на три строчки короче. Выбросить try/finally блок нельзя - при возникновении исключения блокировка должна быть всё равно отпущена. К слову, неснятая блокировка при выходе из функции - одна из самых распространенных ошибок. Будьте внимательны - а еще лучше научитесь писать так, чтобы всё получалось просто, ясно и правильно.

Нужно отметить, что .acquire позволяет указывать параметры: blocking и, начиная с Python 3.2 - timeout. Подробности их использования неплохо описаны в документации.

Продолжение - в третьей части.