среда, 14 августа 2013 г.

Запуск процессов в tulip

В последнее время я работаю над запуском процессов в tulip: PEP 3156 и реализация на гуглокоде

Состояние дел на сегодня

Базовые конструкции выглядят так:

Транспорт:

class BaseTransport:
    """Base ABC for transports."""

    def get_extra_info(self, name, default=None):
        """Get optional transport information."""

    def close(self):
        """Closes the transport.

        Buffered data will be flushed asynchronously.  No more data
        will be received.  After all buffered data is flushed, the
        protocol's connection_lost() method will (eventually) called
        with None as its argument.
        """

class SubprocessTransport(BaseTransport):

    def get_pid(self):
        """Get subprocess id."""

    def get_returncode(self):
        """Get subprocess returncode.

        See also
        http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
        """

    def get_pipe_transport(self, fd):
        """Get transport for pipe with number fd."""

    def send_signal(self, signal):
        """Send signal to subprocess.

        See also:
        http://docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
        """

    def terminate(self):
        """Stop the subprocess.

        Alias for close() method.

        On Posix OSs the method sends SIGTERM to the subprocess.
        On Windows the Win32 API function TerminateProcess()
         is called to stop the subprocess.

        See also:
        http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
        """

    def kill(self):
        """Kill the subprocess.

        On Posix OSs the function sends SIGKILL to the subprocess.
        On Windows kill() is an alias for terminate().

        See also:
        http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
        """

Протокол:

class BaseProtocol:
    """ABC for base protocol class.

    Usually user implements protocols that derived from BaseProtocol
    like Protocol or ProcessProtocol.

    The only case when BaseProtocol should be implemented directly is
    write-only transport like write pipe
    """

    def connection_made(self, transport):
        """Called when a connection is made.

        The argument is the transport representing the pipe connection.
        To receive data, wait for data_received() calls.
        When the connection is closed, connection_lost() is called.
        """

    def connection_lost(self, exc):
        """Called when the connection is lost or closed.

        The argument is an exception object or None (the latter
        meaning a regular EOF is received or the connection was
        aborted or closed).
        """

class SubprocessProtocol(BaseProtocol):
    """ABC representing a protocol for subprocess calls."""

    def pipe_data_received(self, fd, data):
        """Called when subprocess write a data into stdout/stderr pipes.

        fd is int file dascriptor.
        data is bytes object.
        """

    def pipe_connection_lost(self, fd, exc):
        """Called when a file descriptor associated with the child process is
        closed.

        fd is the int file descriptor that was closed.
        """

    def process_exited(self):
        """Called when subprocess has exited.
        """

Нужные методы в event loop:

class AbstractEventLoop:
    """Abstract event loop."""

    def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                         **kwargs):
    """Run cmd in shell"""

    def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                        **kwargs):
    """Subprocess *args"""

Т.е. интерфейс запуска процесса почти повторяет subprocess.Popen за исключением того, что subprocess.PIPE теперь вариант по умолчaнию. Заодно еще избавляемся от кошмара с правильным использованием shell=True (см. пост на эту тему). Поддерживаются только байтовые потоки, как и везде в tulip.

Оно уже в целом работает на Unix, код для Windows тоже готовится.

Делает всё что можно и нужно за исключением TTY. C TTY ван Россум предложил пока не связываться, да и subprocess его не поддерживает.

Проблема

Рабочие транспорты и протоколы — это, конечно, классно. Вполне подходящий низкоуровневый строительный блок.

Но простому программисту хочется иметь что-то более удобное и привычное.

Для tulip это должен быть код на основе yield from.

Проблема в том, что для процессов мы имеем не один поток ввода-вывода, а три однонаправленных: stdin, stdout и stderr. А еще процесс может сам решить закрыться, и это тоже нужно удобно обрабатывать.

Просьба

Я пытался придумать что-то такое, но результат пока мне не нравится.

Может, кто сумеет посоветовать дельную конструкцию? Или указать на готовую библиотеку, у которой можно поучиться?

7 комментариев:

  1. смотрел, как в twisted реализовано ? глядеть надо http://twistedmatrix.com/trac/browser/trunk/twisted/internet/process.py и потом реализацию IReactorProcess в http://twistedmatrix.com/trac/browser/trunk/twisted/internet/posixbase.py (кстати интерфейс протокола у тебя очень похож на твистедовский ProcessProtocol, ставлю, что читал)

    ОтветитьУдалить
  2. На twisted смотрел, разумеется. Транспорт-протокол оттуда слизал творчески переработав. Но это нижний уровень.

    Теперь нужен удобный интерфейс. В twisted есть только twisted,internet.utils но это слишком мало и скучно.

    sh, envoy, sarge не могут заработать в асинхре, они не для того. Как и gevent.subprocess.

    Нужна красивая асинхронная либа, чтобы оттуда позаимствовать дизайн.

    ОтветитьУдалить
  3. Привет,
    А что передавать-то нужно через эти потоки ввода и потоки вывода?
    Чем плох обычный print >>stream или .read / .write ? Синхронностью?
    И причём тут yield from ? Он только инструмент асинхронности?
    Не скажу, что он очень красив, и вряд ли он очень быстр, как и генераторы в целом.
    Channels в Go (кстати, рекомендую посмотреть в ту сторону) мне давали 1.5 млн переключений в секунду, вроде бы даже очень много, но какое-то сложное взаимодействие между маленькими процедурками и отдельными байтиками на нём уже не построить -- медленно.

    ОтветитьУдалить
  4. И ещё: почему этот подход с Futures/Coroutines не кажется тебе сложным?
    Может всё же подумать, как можно всё это сделать действительно _ПРОСТЫМ_ для пользователя?
    (И если будет поддержка только с Python 3.3, то почему не изобрести новый синтаксис)
    Порылся в постах, ссылках на эту тему... выглядит всё ужасно сложным. Нет, я понимаю, как всё это работает, но я точно не захочу это объяснять новичкам. Выглядит как каша, крякает как каша...

    ОтветитьУдалить
  5. Мне нравится как чтение/запись потока реализовано в tornado.iostream. proc.stdin.read_until('\n\n') который возвращает Future мне был бы по душе.

    @Yuri легкость c Future приходит со временем ;) Куча толковых ребят поддерживают coroutines.

    ОтветитьУдалить
  6. А как насчет IOLoop из Tornado?

    ОтветитьУдалить
  7. С tulip еще не разбирался, но немного посмотрел пример https://code.google.com/p/tulip/source/browse/examples/child_process.py
    Во многом могу ошибаться.

    Мне бы хотелось видеть что-то наподобие следующего:
    - оберкта над subprocess.Popen, в которой атрибутами класса можно задать протокол, reader и т.д.
    - чтобы stdin, stdout, stderr были тоже обертками над потоками, методы чтения и записи которых возвращали бы future
    - с неожиданным завершением процесса, например: как в subprocess - p.returncode is not None, а также опциональное исключение о завершении (выбрасываемое, допустим, при чтении или записи), ну и возможность добавить callback.

    Т.е. в коде я вижу это так:

    tulip.task
    def main(loop)
    p = tulip.Popen(...)
    while True:
    if p.returncode is not None:
    # exit
    yield from p.stdin.write(something)
    data = yield from p.stdout.readline() # or .read(...)


    Возможно, должен быть еще спец поток в Popen, который бы объединял stdout и stderr, и при обращении к методам чтения возвращал то, что готово первее + тип потока, т.е. как в примере yield from tulip.wait(...).

    Еще думаю, должна быть возможность чтения/записи из нескольких процессов (через tulip.wait()?).

    ОтветитьУдалить