вторник, 30 ноября 2010 г.

Мультипоточность в питоне. Часть 2 - синхронизация

В части 1 мы рассмотрели базовые основы работы с потоками. Перейдем к примитивам синхронизации.

В мультипоточной программе доступ к объектам иногда нужно синхронизировать.

С этой точки зрения все объекты (переменные) разделяются на:

  • Неизменяемые. Мои самые любимые. Если объект никто не меняет, то синхронизация доступа ему не нужна. К сожалению, таких не очень много.
  • Локальные. Если объект не виден остальным потокам, то доступ к нему синхронизировать тоже не требуется.
  • Разделяемые и изменяемые. Синхронизация необходима.

Синхронизация доступа к объектам осуществляется с помощью объектов синхронизации (простите за тавтологию).

Объектов синхронизации существует множество - и постоянно придумывают новые. Это не то чтобы плохо - но избыточно (по крайней мере на начальном этапе изучения).

Каждый объект синхронизации имеет свои плюсы и минусы (обычно это баланс между быстродействием и областью применения). Ошибка ведет к тому, что

  • код будет выполнятся не так быстро, как мог бы (меньшее зло)
  • или к тому, что он будет выполнятся неправильно (тушите свет).

Давным-давно Дейкстра работал не только над алгоритмами на графах, но и над проблемами многопоточного кода. Ему удалось в одной из своих работ доказать, что минимальный базис, необходимый для любой синхронизации, сводится к двум объектам:

  • блокировки (они же lock, mutex).
  • условные переменные (condition variable).

Давайте на них и остановимся.

Ремарка о скорости. Питон - не первый спринтер на деревне. Если для программы на С выигрыш от правильного использования специфичного объекта синхронизации может быть существенным, то для Питона в целом они все примерно одинаковы.

Выбирать нужно более безопасный способ, не обращая внимания на быстродействие.

Кстати, это не относится к реализации CPython как такового - в коде интерпретатора присутствуют довольно экстравагантные блокировки, дающие выигрыш в пару процентов. До тех пор, пока их использует небольшая очень квалифицированная команда разработчиков - это оправдано.

Вторая ремарка: рассматриваемые объекты блокировки очень примитивны.

Откровенно говоря, не существует высокоуровневой теории мультипоточных программ. Если сравнивать с развитием языков, то уровень мультипоточного программирования сейчас находится где-то там, где были процедурные языки программирования. Объекты синхронизации схожи с оператором goto - использовать приходится, но всегда следует внимательно смотреть, что вы делаете.

Более высокоуровневые концепты, такие как monitor и active object хороши, но не решают всех проблем.

Языки вроде erlang, прекрасно справляющиеся с распараллеливанием задач на оптимальное количество потоков, останутся в своей узкой нише - примерно как lisp и все семейство языков функционального программирования навсегда остануться инструментом для маргиналов.

Парадигма многопоточных языков общего употребления все еще не создана.

Блокировки

Это - основа всего.

Простейший пример использования:

import threading

class Point(object):
    def __init__(self):
        self._mutex = threading.RLock()
        self._x = 0
        self._y = 0

    def get(self):
        with self._mutex:
            return (self._x, self._y)

    def set(self, x, y):
        with self._mutex:
            self._x = x
            self._x = y

Итак, имеем тривиальный класс. Это точка в двухмерном пространстве. Пусть координаты _x и _y начинаются с подчеркивания - чтобы программист, пожелавший менять их непосредственно немного задумался.

Еще есть _mutex типа RLock.

Публичная часть - два метода get и set.

Работает все это так: - при вызове метода берем блокировку через with self._mutex: - весь код внутри with блока будет выполнятся только в одном потоке. Другими словами, если два разных потока вызовут .get то пока первый поток не выйдет из блока второй будет его ждать - и только потом продолжит выполнение.

Зачем это все нужно? Координаты нужно менять одновременно - ведь точка это цельный объект. Если позволить одному потоку поменять x, а другой в это же время поправит y логика алгоритма может поломаться.

Есть и другой вопрос: зачем методу get блокировка? В приведенном примере она действительно не нужна. Но я всё же настоятельно рекомендую использовать блокировки даже для методов, вроде бы не изменяющих содержимое.

Во первых, это просто хорошая привычка. Иногда блокировки можно опускать для повышения производительности - но только после тщательного изучения побочных эффектов.

Во вторых (и это важнее) - блокировки при чтении позволяют корректно работать с объектами, части которых могут изменятся независимо.

Для примера возьмет цветную точку.

class ColoredPoint(Point):
    def __init__(self):
        super(ColoredPoint, self).__init__()
        self._color = 'green'

    @property
    def color(self):
        with self._mutex:
            return self._color

    @color.setter
    def color(self, val):
        with self._mutex:
            self._color = val

    def do(self, observer):
        with self._mutex:
            if self._color == 'red':
                observer(self.get())

Без блокировки в методе .do возможна ситуация, при которой один поток поменяет цвет, в второй в это же время изменит координаты. И тогда observer будет вызван с неправильными значениями.

В модуле threading существует еще и Lock - никогда его не используйте. Дело в том, что RLock (recursive lock) допускает повторную блокировку.

В примере для цветной точки это хорошо видно - .do берет блокировку, а затем вызывает метод .get, который берет эту блокировку еще раз.

В случае с Lock метод будет намертво повешен. RLock позволяет повторно брать блокировку тому потоку, который уже ее получил.

Говоря о блокировках, нужно упомянуть и две проблемы, которые возникают при их использовании:

  • Race condition: неправильное поведение из за отсутствия блокировки.
  • Dead lock: взаимная блокировка.

Первый случай я уже описал: при отсутствии блокировки один поток меняет часть объекта, временно приводя его в некорректное состояние - а второй поток именно в это время пытается работать с этим некорректным на данный момент объектом.

Пример для взаимных блокировок:

a = threading.RLock()
b = threading.RLock()

def f():
    with a:
        # do something
        with b:
            # do something also

def g():
    with b:
        # do something
        with a:
            # do something also

При одновременно вызове f из одного потока и g из другого оба потока навсегда повиснут: первый будет ждать захвата b а второй, захватив b остановится на ожидании блокировки a.

Обратите внимание: эта проблема может проявляться не сразу и нелегко диагностируется/воспроизводится.

Современные программы бывают большими и сложными с очень запутанными потоками исполнения, отследить которые нелегко.

Еще один пример:

def h():
    try:
        a.acquire()
        b.acquire()
    # do something
    finally:
        a.release()
        b.release()

При одновременном вызове h возможна взаимная блокировка.

Общее правило, немного помогающее при проектировании многопоточных систем: блокировки всегда должны браться в одном и том же порядке, а освобождаться в противоположном. Использование оператора with помогает решить эту проблему.

Условные переменные

С блокировками все более или менее ясно. Нужен еще один объект синхронизации.

Рассмотрим очередь на фиксированное количество элементов. Когда очередь пуста - поток, желающий получить новый элемент должен ждать. Аналогично с переполненной очередью.

Если решать все только на блокировках, придется проверять - а не пуста ли очередь? И если таки пуста - подождать еще немного. Сколько времени ждать - сложный вопрос.

Очевидно, нужен какой-то сигнал от заполняющего очередь потока - ожидающему.

Чтобы последний, выйдя из вечной блокировки мог продолжить работу.

Несмотря на то, что Питон непосредственно поддерживает сигналы (threading.Event) - я настоятельно не рекомендую их использовать.

Проблема в следующем: поток послал сигнал о поступлении элемента в очередь.

Кто его получит, если получение нового элемента ожидают несколько потоков?

Один из них забрал данные и очередь снова пуста. Как нужно дожидаться следующего поступления?

Условные переменные (condition variables) совмещают сигналы с блокировками.

Рассмотрим пример:

class Queue(object):
    def __init__(self, size=5):
        self._size = size
        self._queue = []
        self._mutex = threading.RLock()
        self._empty = threading.Condition(self._mutex)
        self._full = threading.Condition(self._mutex)

    def put(self, val):
        with self._full:
            while len(self._queue) >= self._size:
                self._full.wait()
            self._queue.append(val)
            self._empty.notify()

    def get(self):
        with self._empty:
            while len(self._queue) == 0:
                self._empty.wait()
            ret = self._queue.pop(0)
            self._full.notify()
            return ret

У нас два почти симметричных метода - положить в очередь и взять из нее.

Разберем .get. Сначала берем блокировку.

Обратите внимание - блокиратор один на обе условные переменные. Это важно Дело в том, что _full и _empty взаимозависимы. Хотя threading.Condition позволяет не указывать блокиратор, создавая новый RLock автоматически - не поступайте так. Можно поймать race condition, о котором я говорил раньше. Гораздо надежней и наглядней делать все явно. В нашем случае race condition был бы в явном виде - пусть и скрытый из за GIL.

  • Входим в блокиратор.
  • Проверяем наше условие - обычно это всегда цикл while.
  • Если условие не выполнилось - вызываем .wait(). Этот метод освободит блокировку и будет ждать извещения .notify. После получения сигнала выполнение продолжится при снова взятой блокировке. Еще раз, простыми словами.
    1. Проверка условия всегда выполняется в блокировке. Никто другой в это же время условие поменять не сможет (если все сделано правильно, конечно).
    2. Если условие не выполнилось - ждем опять, отдав управление операционной системе.
    3. При повторном вхождении у нас снова есть блокировка.
  • Условие выполнилось - очередь не пуста. Как минимум один элемент в ней есть. Может быть и больше - нас сейчас это не волнует. Блокировка все еще есть.
  • Получаем этот элемент и извещаем очередь, что она стала не до конца заполненной - т.е. в нее можно еще что-то положить (self._full.notify()).
  • Возвращаем полученное. Выходя из with освобождаем блокировку - другие потоки могут работать с очередью дальше.

Итак, условные переменные - это способ синхронизировать доступ к объектам при помощи блокировки и при этом возможность послать сигнал ожидающим потокам.

Альтернатива оператору with.

Вместо

with lock:
    # do something

можно использовать более традиционную форму:

try:
    lock.acquire()
    # do domething
finally:
    lock.release()

Эти два подхода абсолютно эквивалентны, но первый на три строчки короче. Выбросить try/finally блок нельзя - при возникновении исключения блокировка должна быть всё равно отпущена. К слову, неснятая блокировка при выходе из функции - одна из самых распространенных ошибок. Будьте внимательны - а еще лучше научитесь писать так, чтобы всё получалось просто, ясно и правильно.

Нужно отметить, что .acquire позволяет указывать параметры: blocking и, начиная с Python 3.2 - timeout. Подробности их использования неплохо описаны в документации.

Продолжение - в третьей части.

Мультипоточность в Питоне. Часть 1 - потоки

Продолжение - здесь

Писать многопоточные программы сложно. К сожалению, время от времени это приходится делать.

Ключ к созданию стабильно работающих мультипоточных програм лежит в соблюдении нескольких простых правил. В этой серии статей я попытаюсь их описать.

Для начала рассмотрим минимально необходимый базис:

  • работа с потоками
  • синхронизация разделяемых объектов (тех, которые одновременно используются в нескольких потоках)

Различные интересные вариации (неблокирующий доступ, невытесняющая многозадачность, распараллеливание на процессы) лежат несколько в стороне от "основной линии". Возможно, они будут рассмотрены в конце серии.

Потоки.

Потоки нужно

  • создавать
  • завершать
  • дожидаться завершения

Изначально у каждой программы есть один поток - он же "главный". Этот поток создается операционной системой при запуске процесса.

С точки зрения программиста почти не отличается от созданных вручную. Практически же существуют некоторые особенности.

Например, некоторые библиотеки для работы с графическим интерфейсом ожидают, что цикл обработки сообщений будет запущен в главном потоке, а не в каком-нибудь другом. То же самое относится и к высокоуровневым библиотекам для работы с сетью.

Создание потока.

Работой с потоком занимается threading.Thread (документация).

Меня захотят поправить - есть еще thread. Добрый совет - никогда его не используйте. Потому что!.. Цитата из документации: "The threading module provides an easier to use and higher-level threading API built on top of thread module."

Поток может быть создан двумя способами: передачей исполняемой функции в конструктор и наследованием.

Первый вариант:

def f(a, b, c):
    # do something
    pass

th = threading.Thread(name='th1', target=f, args=(1, 2), kwargs={'c': 3})
  • name - имя потока. Ни на что не влияет, но может быть полезно при отладке.
  • target - точка входа. Любой callable object - функция, связязанный метод класса или что-то еще
  • args - позиционные аргументы
  • kwargs - именованные аргументы.

Поток с именем 'th1' будет создан, но не запущен. После запуска будет вызвана функция f с параметрами a=1, b=2, c=3. Все аргументы могут быть опущены.

Второй вариант:

class MyThread(threading.Thread):
    def __init__(self, a, b, c):
        threading.Thread.__init__(self)
        self.a = a
        self.b = b
        self.c = c

    def run(self):
        # do something, using self.a, self.b, self.c
        pass

th = MyThread(1, 2, 3)

Результат практически тот же самый, но в новом потоке будет запущен метод run.

После того, как объект создан - его нужно запустить. В обоих случаях это делается через вызов

th1.start()

Все очень просто и понятно, верно?

Завершение потока.

Любой поток рано или поздно нужно завершить. Делается это простым выходом из функции потока. Не существует ПРАВИЛЬНОГО способа завершить поток снаружи. Это - принципиальное ограничение. Т.е. если вы хотите завершить поток из другого - просигнализируйте ему о своей просьбе (выставив флаг-переменную, например). В конце статьи я объясню, почему это важно.

Присоединение потока.

Культурные люди должны убирать за собой. Поэтому после сигнала о завершении нужно дождаться, когда поток реально закончит свою работу. Делается это так:

# signal to th1 for exiting
th1.join()

.join приостановит выполнение потока, вызвавшего ее, и будет ждать когда поток th1 завершит свое выполнение. Зачастую поток, стартовавший th1 его же и ждет - но бывают и исключения.

Почему нельзя "прибить" поток снаружи?

Если подумать, то все очень просто.

С точки зрения операционной системы программа (запущенный процесс) и потоки - ортогональные вещи.

Процесс владеет ресурсами - памятью, открытыми файлами, сокетами и подключениями к базе данных, окнами пользовательского интерфейса Типов ресурсов очень много. Грубо говоря, каждую переменную можно рассматривать как ресурс.

Поток - это нить исполнения. Он просто исполняет код параллельно с другими потоками. При этом поток ресурсами не владеет.

Открытый файл нужно закрыть, выделенную память вернуть обратно. Поэтому только правильно написанный код самого потока может корректно завершить самого себя.

Возникает еще один вопрос: а почему нельзя послать потоку исключение, чтобы он корректно свернулся - освободив при этом все что нужно?

Забегая вперед, вспомним об объектах синхронизации. Поток может быть в состоянии блокировки, когда его исполнение приостановлено в ожидании освобождения мьютекса (критической секции), занятого другим потоком.

Объекты блокировки - это в общем случае объекты ядра операционной системы. Т.е. блокированный поток нельзя "разблокировать", не затронув другие потоки.

Т.е. принудительное завершение потока - это потенциально опасная операция. В некоторых случаях все получается, иногда происходит утечка ресурсов.

Несмотря на то, что в низкоуровневом API операционной системы почти всегда есть способ аварийно завершить поток, в Питоне он не реализован.

Давайте просто писать "правильно", к чему нас и подталкивают.

Зачем нужно ждать завершения потока?

Хорошо, поток создан и запущен. В свое время ему послана просьба завершить работу. Зачем ждать подтверждения?

Нет, это не паранойя.

Во первых, так заведено в Питоне (корни растут из posix threads) - процесс завершается только тогда, когда все его потоки завершены.

Т.е. окончание работы главного потока программы еще ни о чем не говорит - Питон будет ждать окончания работы остальных потоков. Это выглядит так, будто в конце программы добавили .join на каждый работающий поток (еще одна причина делать это явно - ведь явное лучше неявного, верно?)

Откровенно говоря, есть лазейка, позволяющая это правило обойти. Достаточно указать

th1.setDaemon(True)

и можно о нем забыть. Отличие потока-демона от "стандартного" только в том, что демоны "не считаются" при завершении процесса.

Вуаля, решение найдено и оно работает? Не совсем.

Дело в том, что программа завершается не мгновенно. После того, как закончился главный поток с точки зрения программиста, Питон начинает чистить свои ресурсы, разрушая интерпретатор.

Закрываются файлы, уничтожаются модули.

И такой безобидный код как

import logging

def f():
    a = logging.getLogger("a.b.c").debug("f is called")

Может "поломаться", потому что модуль logging уже не существует, вернее - он частично разрушен сборщиком мусора.

При аккуратном программировании можно обойти все проблемы, связанные с саморазрушением интерпретатора - но оно вам нужно?

Проще и безопасней писать "по правилам".

Заключение.

С потоками покончено.

  • Потоки можно создавать и запускать.
  • Можно просить их закончить свою работу, но нельзя приказывать.
  • Завершения потока нужно дожидаться.

Но мультипоточное программирование только начинается.

Основная проблема здесь следующая: потоки должны взаимодействовать.

Или, другими словами, изменять состояние разделяемых между ними объектов. Несколько потоков могут пытаться изменить один объект одновременно, с непредсказуемым результатом.

Объекты могут быть сложными, и запись в объект "пользователь" фамилии одним потоком, а имени другим может привести к неожиданному результату. Существуют и более "разрушительные" примеры.

Для решения проблемы придуманы объекты синхронизации. О них - в следующей статье.