Асинхронное программирование для начинающих (Часть 2)

В предыдущей статье мы рассмотрели теоретические основы, что такое синхронные и асинхронные приложения. Также рассмотрели основные проблемы, которые связаны с тем или иным типом программ. Например, два процесса могут блокировать друг друга. 

Сейчас поговорим о том, как реализовать асинхронные особенности Python. Приведем некоторые варианты исполнения этой задачи. Обязательно прочитайте предыдущий материал. Без него содержимое этой статьи не будет понятным.

Все примеры, которые приводятся в этой статье, были предварительно протестированы на Python 3.8. Вот эти модули Вам необходимо установить для того, чтобы эти примеры работали у вас на компьютере. 

aiohttp==3.6.2

async-timeout==3.0.1

attrs==19.3.0

certifi==2019.11.28

chardet==3.0.4

codetiming==1.1.0

idna==2.8

multidict==4.7.4

requests==2.22.0

urllib3==1.25.7

yarl==1.4.2

Эту последовательность строк сохраните как requirements.txt, после чего запустите терминал и выполните следующую строку.

pip3 install -r requirements.txt

Помимо этого, необходимо будет установить виртуальную среду Python, чтобы системный Python работал стабильно.

Пример синхронного программирования

Если говорить о первом примере, приведенном в прошлом материале, то в нем идет речь о создании очереди. Она являет собой структуру FIFO. То есть, эта структура предоставляет специальные инструменты, которые используются для создания очереди элементов, а потом повторного их вывода в том порядке, каком они были до этого.

В данном примере наша задача – получить номер из очереди и определить число циклов, необходимых для достижения данного числа. И получившееся значение потом выводится в консоль. Вернее, это происходит два раза: когда цикл стартует и потом уже при повторно в общем выводе. 

Приведем пример кода приложения, который будет показывать метод, с помощью которого несколько синхронных задач обрабатывают работу в очереди.

Пусть это приложение будет называться example_1.py.  

import queue

 

def task(name, work_queue):

    if work_queue.empty():

        print(f"Task {name} nothing to do")

    else:

        while not work_queue.empty():

            count = work_queue.get()

            total = 0

            print(f"Task {name} running")

            for x in range(count):

                total += 1

            print(f"Task {name} total: {total}")

 

def main():

    """

    Это основная точка входа в программу

    """

    # Создание очереди работы

    work_queue = queue.Queue()

 

    # Помещение работы в очередь

    for work in [15, 10, 5, 2]:

        work_queue.put(work)

 

    # Создание нескольких синхронных задач

    tasks = [(task, "One", work_queue), (task, "Two", work_queue)]

 

    # Запуск задач

    for t, n, q in tasks:

        t(n, q)

 

if __name__ == "__main__":

    main()

В целом, алгоритм приложения следующий:

  1. Сначала осуществляется импорт модуля queue, который используется для создания очереди.
  2. 3-13 строки определяют task(). Обработка очереди осуществляется вплоть до того момента, как заканчиваются задачи для выполнения.
  3. Строка 15. С ее помощью определяется функция main(), чтобы запустить задачу приложения.
  4. Строка 20. Она создает очередь. И этот общий ресурс используется для извлечения работы. 
  5. Строки 23-24. Работа помещается в очередь. В этом примере ею является произвольное число значений для тех задач, обработка которых требуется.
  6. Строка 27. Генерируется список кортежей задач, которые содержат значения параметров, которые передаются задачами. 
  7. Строки 30-31. Отвечают за то, чтобы выполнять итерацию по списку кортежей задач, передавая ранее определенные значения параметров.
  8. Строка 34. Вызывает функцию main(), чтобы приложение было запущено.

Задача в этом приложении являет собой обычную функцию, принимающую в роли параметров данные строкового типа и очередь. Когда она выполняется, то просматривает очередь для того, чтобы обработать соответствующие данные. Если такое значение есть, оно извлекается, выполняется цикл, а потом результирующее значение в конце выдается пользователю. И это будет происходить ровно до того момента, пока не будут перебраны все значения в очереди.

Вывод в процессе запуска этого приложения получится следующим.

Task One running

Task One total: 15

Task One running

Task One total: 10

Task One running

Task One total: 5

Task One running

Task One total: 2

Task Two nothing to do

Простой кооперативный параллелизм в Python

Теперь приведем пример приложения, которое дает возможность двум задачам выполняться параллельно. За счет добавления yield мы передаем циклу контроль при том, что свой контекст сохраняется. Простыми словами, это нужно, чтобы можно было возобновить уступающую задачу.

Фактически мы реализовываем переключение контекста, так как управлением начинает заниматься вызывающая сторона, а не функция генератора. 

Приведем пример кода приложения, которое задействует простой параллелизм. 

import queue

 

 

def task(name, queue):

    while not queue.empty():

        count = queue.get()

        total = 0

        print(f"Task {name} running")

        for x in range(count):

            total += 1

            yield

        print(f"Task {name} total: {total}")

 

 

def main():

    """

    Это основная точка входа в программу

    """

    # Создание очереди работы

    work_queue = queue.Queue()

 

    # Размещение работы в очереди

    for work in [15, 10, 5, 2]:

        work_queue.put(work)

 

    # Создание задач

    tasks = [task("One", work_queue), task("Two", work_queue)]

 

    # Запуск задач

    done = False

    while not done:

        for t in tasks:

            try:

                next(t)

            except StopIteration:

                tasks.remove(t)

            if len(tasks) == 0:

                done = True

 

 

if __name__ == "__main__":

    main()

Программа работает следующим образом:

  1. Строки 3-11. Как и прежде, определяют task(). Помимо этого, в Строке 10 мы превращаем функцию в генератор с помощью yield. Тогда контекст переключается, а цикл while в main() берет управление циклом на себя.
  2. Строка 25. Она нужна, чтобы создать список задач. Правда, это будет происходить немного по-другому, не так, как в прошлом примере. Здесь вызов каждой задачи производится с теми параметрами, которые прописаны в переменной списка задач. Это требуется для первого выполнения функции генератора task().
  3. Строки 31-36. Они являют собой разновидностями цикла while в main(), что дает возможность параллельного запуска task(). Возврат управления осуществляется каждому экземпляру task(), что дает возможность продолжать цикл и начать выполнение другой задачи.
  4. Строка 32. С ее помощью контроль к task() возвращается, и выполнение продолжается после того места, где был осуществлен вызов yield.
  5. Строка 36. Определяет переменную done. Цикл заканчивается тогда, когда все задачи выполнены.

Если попробовать выполнить программу, которая приводится выше, то получаем следующее:

Task One running

Task Two running

Task Two total: 10

Task Two running

Task One total: 15

Task One running

Task Two total: 5

Task One total: 2

Здесь отчетливо показано, что Task One и Task Two работают одновременно. Правда, здесь затрачивается много лишних ресурсов.

Совместный параллелизм с блокирующими вызовами

Эта программа, по сути, являет собой полную копию предыдущей за тем лишь исключением, что в ней будет отсутствовать time.sleep(delay). Задержка будет основываться на значении, которое будет получено из рабочей очереди, связанного с каждой итерацией цикла задачи. Таким образом, имитируется эффект блокирующего вызова. 

Если говорить о примерах блокирующего вызова, то это описанная ранее ситуация, когда родитель из-за того, что был занят подсчетом коммунальных услуг не мог следить за детьми. 

Сначала нам нужно установить требуемые библиотеки.

pip3 install codetiming

А потом записать и выполнить следующий код. 

import time

import queue

from codetiming import Timer





def task(name, queue):

    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")

    while not queue.empty():

        delay = queue.get()

        print(f"Task {name} running")

        timer.start()

        time.sleep(delay)

        timer.stop()

        yield





def main():

    """

    Это основная точка входа в программу

    """

    # Создание очереди работы

    work_queue = queue.Queue()




    # Добавление работы в очередь

    for work in [15, 10, 5, 2]:

        work_queue.put(work)




    tasks = [task("One", work_queue), task("Two", work_queue)]




    # Запуск задач

    done = False

    with Timer(text="\nTotal elapsed time: {:.1f}"):

        while not done:

            for t in tasks:

                try:

                    next(t)

                except StopIteration:

                    tasks.remove(t)

                if len(tasks) == 0:

                    done = True




if __name__ == "__main__":

    main()

Какие изменения были внесены в этом примере?

  1. Строка 1. Чтобы приложение могло получить доступ к time.sleep(), осуществляется импорт модуля time.
  2. Строка 3. Выполняет импорт кода Timer из модуля codetiming.
  3. Строка 6. Создает объект, который относится к классу Timer, который в нашем примере засекает время, которое требуется для выполнения каждой итерации. 
  4. Строка 10. Осуществляет запуск Timer.
  5. Строка 11. Изменяет task() таким образом, чтобы включить time.sleep(day), чтобы имитировать задержку. Это нужно, чтобы заменить цикл for, который отвечал за выполнение расчетов в example_1.py.
  6. Строка 12. Она останавливает таймер и выводит количество времени, которое прошло с того момента, как произошел вызов метода timer.start().
  7. Строка 30. Она создает менеджер контекста Timer, что позволяет показать время, которое истекло с момента начала цикла в целом.

Если мы запустим эту программу, то результат будет приблизительно следующим.

Task One running

Task One elapsed time: 15.0

Task Two running

Task Two elapsed time: 10.0

Task One running

Task One elapsed time: 5.0

Task Two running

Task Two elapsed time: 2.0

 

Total elapsed time: 32.0

Мы рассмотрели некоторые способы реализации асинхронных приложений. Конечно, их намного больше, но эти являются самыми простыми для понимания. Поэтому разумно с них и начать.

ОфисГуру
Adblock
detector