среда, 9 февраля 2011 г.

Мультипоточность в Питоне. Часть 3 - высокоуровневые объекты

В частях 1 и 2 был описан низкоуровневый базис мультипоточных программ.

Эта основа крайне необходима для понимания. Но, тем не менее, работать с низким уровнем слишком утомительно и небезопасно.

Поэтому поговорим об объектах мультипоточности.

С точки зрения объектно ориентированного программирования в применении к многопоточности изменяемые объекты делятся на:

  • Неизменяемые. Блокировать ничего не нужно, объект не может изменить свое состояние во время работы.

  • Не имеющие представления по потоках. Доступ к ним должен обеспечиваться явными блокировками, выполнение которых - ответственность программиста. Иначе при одновременном обращении из разных потоков возможно повреждение объекта и прочие неприятности.

  • Мониторы (monitors). Содержат внутри блокировку, которая берется при обращении к публичным атрибутам. Полностью потокобезопасные - одновременно с монитором работает только один поток.

  • Активные объекты (active objects). Содержат внутри как минимум один поток. Вызовы публичных методов активного объекта возвращают отложенное значение (future), которое будет заполнено после того, как вложенный поток произведет нужные операции.

Мониторы

Назовем объектом-монитором такой объект, доступ к публичной части которого защищен блокировкой.

import threading

class A(object):
    def __init__(self, val):
        self._lock = threaing.RLock()
        self._val = val

    @property
    def val(self):
        with self._lock:
            return self._val

    @val.setter
    def val(self, val):
        with self._lock:
            self._val = val

    def do(self):
        with self._lock:
            pass # do stuff

Выглядит очень просто, верно? Ничем не отличается от примеров из предыдущей статьи.

Дело вот в чём: монитор - это концепт. Если часть публичного интерфейса защищена блокировками, а часть нет - концепт оказывается порванным.

Прелесть монитора в том, что он потокобезопасный по построению. Т.е. блокировка берется всегда - не смотря на то, нужна она или нет в конкретном случае. Например, мы уверены что метод .do из примера вызывается тогда, когда доступ к экземпляру класса имеет только один поток.

Возникает соблазн убрать эти "лишние" блокировки. С ним нужно бороться. Многопоточные программы - штука сложная. Отдельные потоки выполняются параллельно. Время их работы и график взаимодействия могут довольно сильно варьироваться от запуска к запуску. При развитии программы легко оказывается, что неблокирующая часть интерфейса начинает требовать синхронизации. Отсутствие блокировок в нужных местах ведет к непредсказуемому поведению, порче данных, обескураженным заказчикам и прочим неприятностям.

Вместо того, чтобы следить за всем, держа картинку в уме и добавляя синхронизацию по мере необходимости - проще и надежней сразу всё делать правильно.

Если у вас очень большой класс, лишь часть интерфейса которого требует блокировок - возможно, это повод для вынесения этой части в отдельный класс с последующей агрегацией.

Явное выделение "чистых" мониторов способствует ясному, чистому и надежному дизайну.

Пулы потоков (thread pools)

Прежде чем перейти к обсуждению активных объектов давайте поговорим о пулах потоков.

Дело в следующем: создание потока (объекта ядра вашей операционной системы, между прочим) - довольно дорогая операция. Если код потока выполняется быстро, время создания потока может быть сопоставимо со временем исполнения этого кода.

Есть и еще одна проблема - операционные системы не очень любят когда потоков слишком много. У процесса заканчивается виртуальная память и количество доступных объектов ядра. Детали зависят от конкретной операционной системы, да на самом деле они и не нужны.

Важно запомнить: тысяча потоков - это практически всегда плохо, сотня - тоже скорее всего излишне.

Теория считает, что оптимальное количество активных (т.е. не спящих в ожидании того или иного события) потоков равно числу ядер процессора.

Для Питона это не совсем верно (всё тот же GIL), но примерный порядок чисел ясен.

Обе проблемы эффективно решаются созданием поточного пула.

Это такой объект, в котором уже запущены несколько рабочих потоков. Когда приходит новый запрос на обработку, ссылка на обрабатывающую функцию и нужные параметры помещаются во внутреннюю очередь.

Рабочий поток берет из очереди запрос и выполняет его в своем контексте. Получившийся результат возвращается обратно.

Пул потоков обычно создается при старте программы и живет до ее завершения. Чаще всего программе нужно очень немного пулов - несколько штук максимум, при том что чисто технически почти всегда можно обойтись лишь одним.

Я видел пару десятков различных реализаций этого механизма. К счастью, анархии пришел конец и в Питоне 3.2 появляется concurent.futures .

Короткий пример использования:

from concurent.futures import ThreadPoolExecutor, as_completed

def f(a):
    # do something
    return a

with ThreadPoolExecutor(max_workers=5) as pool:
    results = [pool.submit(f, i) for i in range(100)]

    for future in as_completed(results):
        print(future.result())

Основные моменты перечислю здесь, а подробности можно посмотреть в документации.

  • Создается объект pool - пул потоков. Сколько этих потоков запущено - не важно. Потоки могут автоматически создаваться по мере необходимости. Создание и завершение потоков - часть внутренней реализации, которая может изменяться.

    max_workers - ограничение на максимально возможное для пула количество потоков.

    Пул работает в with блоке, чтобы вызвать .shutdown при выходе.

    Метод shutdown дождется завершения всех запущенных потоков.

  • Самый важный метод - submit. Он ставит в очередь запрос на обработку вместе со всеми требуемыми параметрами и возвращает future - отложенное значение.

  • У отложенного значения можно узнать, завершено ли асинхронное вычисление или еще нет, получить результат вычисления (или поймать исключение) и т.д.

  • as_completed - очень удобная утилита. Принимает список отложенных вычислений и возвращает генератор, выдающий завершенные вычисления по мере готовности.

Конечно же, поддерживается принудительная отмена отложенного, но еще не начавшегося выполнятся вычисления, добавление обратного вызова на завершение future и таймауты.

Активные объекты

Активным объектом будем называть объект, выполняющим свои методы в отдельных потоках.

Имея пул потоков создать активный объект очень легко.

Давайте сделаем простенький пример, проверяющий - жив ли еще интернет?

#!/home/andrew/projects/py3k/python

import threading
from concurrent import futures
from collections import defaultdict, namedtuple
from urllib.request import urlopen, URLError

State = namedtuple('State', 'addr ok fail')

class Pinger:
    def __init__(self, pool):
        self._pool = pool
        self._lock = threading.RLock()
        self._results = defaultdict(lambda: {'ok': 0, 'fail': 0})
        self._pendings = set()

    def result(self, addr=None):
        def _make_state(addr, res):
            return State(addr=addr, ok=res['ok'], fail=res['fail'])
        with self._lock:
            if addr is not None:
                return _make_state(addr, self._results[addr])
            else:
                return {_make_state(addr, val)
                        for addr, val in self._results.items()}

    @property
    def pendings(self):
        with self._lock:
            return set(self._pendings)

    def ping(self, addr):
        with self._lock:
            future = self._pool.submit(self._ping, addr)
            self._pendings.add(future)
            future.add_done_callback(self._discard_pending)
            return future

    def _discard_pending(self, future):
        with self._lock:
            self._pendings.discard(future)

    def _ping(self, addr):
        try:
            ret = urlopen(addr)
            ret.read()
        except URLError:
            result = False
        else:
            result = True

        with self._lock:
            if result:
                self._results[addr]['ok'] += 1
            else:
                self._results[addr]['fail'] += 1

        return result

if __name__ == '__main__':
    from pprint import pprint

    with futures.ThreadPoolExecutor(max_workers=3) as pool:
        pinger = Pinger(pool)

        pinger.ping('http://google.com')
        pinger.ping('http://ya.ru')

        print("State for 'ya.ru'", pinger.result('http://ya.ru')) # 1

        future = pinger.ping('http://python.su/forum/index.php')
        print("Result for 'python.su'", future.result()) # 2

        pinger.ping('http://asvetlov.blogspot.com')

        futures.wait(pinger.pendings) # 3

        print("Total table")
        pprint(pinger.result()) # 4

Вышло не так уж и коротко - но оно того стоит.

Давайте разберем код по порядку, обращая внимание на существенные места.

  • Имеем класс Pinger, который является активным объектом. Конструктор класса принимает параметром ThreadingPoolExecutor, а не создает его внутри.

    Зачем? Потому что пингователей может быть много. Каждый будет пытаться создавать новые потоки, а операционная система это не любит.

    Лучше передавать один правильно сконфигурированный пул всем желающим, а они уж пусть сражаются за потоки в нем.

    Или создать несколько пулов для разных целей - но оставить себе контроль за их применением.

  • Внутри класса есть блокировка self._lock (куда без нее), таблица результатов и множество ожидающих вызовов - тех future, которые были запланированы на выполнение но еще не завершились.

  • Два публичных атрибута для опроса текущего состояния.

    • метод result возвращает всю таблицу или только ее строку для указанного адреса.

    • свойство pendings вернет список еще не выполненных запросов.

    Обратите внимание: эти действия берут блокировку _lock и возвращают копию изменяемых данных. Оба пункта очень важны.

    Без блокировки мы можем попасть в ситуацию, когда выполняющийся в пуле код изменит результат, и возвращаемые данные будут частично повреждены.

    Если возвращать ссылку на внутреннюю структуру вместо копии - повреждение данных произойдет позже, при обработке результата вызова во внешнем коде.

    Вероятно, вы заметите проблемы далеко не сразу - и тем сложнее их будет выявить и поправить.

  • Метод ping ставит обработчик (непубличный метод _ping, делающий всю работу) в очередь пула потоков.

    Блокировка нужна и здесь, так как изменяется _pendings.

    К полученному отложенному вызову добавляем callback, чтобы удалить наше вычисление из _pendings после того, как оно будет выполнено.

    Удаление (_discard_pending) тоже происходит с блокировкой.

  • Рабочий код _ping пытается получить html страницу через urlopen.

    Блокировка не берется - доступ к интернету медленный, а мы как раз хотели получить возможность пинговать несколько сайтов одновременно. Столько, сколько окажется свободных потоков у _pool.

    Тем не менее для обновления внутренних структур класса блокировка нужна.

    Наконец, возвращаем результат, который попадет в future и заодно вызовет callback _discard_pending.

  • _discard_pending будет вызван в контексте рабочего потока из пула, запомните этот факт. Не в потоке, вызвавшем 'ping' - а в рабочем, внутри пула.

Теперь проверка. У меня довольно быстрый интернет, и тем не менее результат запуска такой:

andrew@ocean ~/p/a/threading> ./pinger.py 
State for 'ya.ru' State(addr='ya.ru', ok=0, fail=0)
Result for 'python.su' True
Total table
{State(addr='http://asvetlov.blogspot.com', ok=1, fail=0),
 State(addr='http://google.com', ok=1, fail=0),
 State(addr='http://python.su/forum/index.php', ok=1, fail=0),
 State(addr='http://ya.ru', ok=1, fail=0),
  • На момент постановки 'ya.ru' в очередь пинг только выполняется, результата нет.

  • Явный вызов future.result() для 'python.su' дожидается завершения, True демонстрирует вечноживучесть форума.

  • futures.wait(pinger.pendings) ждет завершения всех отложенных задач. Если future на тот момент уже готова - ждать не потребуется.

Другие высокоуровневые мультипоточные объекты

Откровенно говоря, их нет.

Всё, что вы придумаете будет является комбинацией мониторов и активных объектов.

Иногда эти поделки удобны.

Временами, - в случае нарушения правил работы с потоками, - больно бьют по рукам.

Общее правило остается в силе - объект потокобезопасен только в двух случаях:

  1. он константный

  2. все публичные операции с объектом защищены тем или иным методом синхронизации и передаваемые/возвращаемые параметры тоже потокобезопасны.

Естественно, в реальной жизни всё куда сложнее. Сплошь и рядом приходится нарушать "правила безопасности жизнедеятельности", идя на компромисс в целях улучшения производительности или по каким-нибудь другим причинам.

В каждом таком случае делайте выбор осознанно. Не обижайтесь, если на голову упадет кирпич.