Redis в Python — полная документация на примерах (Часть 4)

Мы уже много чего рассмотрели по библиотеке Redis. В частности, что это такое, как правильно устанавливать сервер, как давать команды для этого хранилища ключей в среде Python, и так далее. Также мы начали работать над интернет-магазином и тем, как обрабатываются в нем данные о том, сколько товаров осталось на складе. Продолжаем теперь рассматривать эту тему.

Создание интернет-магазина с использованием Redis: продолжение

Давайте для начала рассмотрим большой фрагмент кода, а потом пошагово его проанализируем. Пользователь может представить buyitem() в виде чего-то, что вызывается каждый раз, когда пользователем нажимается кнопка «Купить». Его задача – подтвердить факт того, что объект имеется в инвентаре, а также выполнить действие, основываясь на получившемся результате. 

При этом все должно быть безопасно: так, как было описано в предыдущих частях этого руководства.

Итак, давайте приведем сам фрагмент кода. 

import logging

import redis




logging.basicConfig()




class OutOfStockError(Exception):

    """Используется, когда на PyHats заканчивается самый популярный товар"""




def buyitem(r: redis.Redis, itemid: int) -> None:

    with r.pipeline() as pipe:

        error_count = 0

        while True:

            try:

                # Получение доступного инвентаря, поиск изменений

                # связанных с ID объекта перед транзакцией

                pipe.watch(itemid)

                nleft: bytes = r.hget(itemid, "quantity")

                if nleft > b"0":

                    pipe.multi()

                    pipe.hincrby(itemid, "quantity", -1)

                    pipe.hincrby(itemid, "npurchased", 1)

                    pipe.execute()

                    break

                else:

                    # Остановка поиска ID объекта

                    pipe.unwatch()

                    raise OutOfStockError(

                        f"Sorry, {itemid} is out of stock!"

                    )

            except redis.WatchError:

                # Регистрация общего количества ошибок данного пользователя,

                # с последующей попыткой повторения процесса WATCH/HGET/MULTI/EXEC

                error_count += 1

                logging.warning(

                    "WatchError #%d: %s; retrying",

                    error_count, itemid

                )

    return None

 Давайте для начала обратимся к строке 16. Она, вместе с pipe.watch(itemid) говорит Redis, чтобы он проверил ID объекта itemid на предмет наличия изменений. Так, приложение проверяет инвентарь с помощью вызова r.hget(itemid, «quantity») в семнадцатой строке. 

pipe.watch(itemid)

nleft: bytes = r.hget(itemid, "quantity")

if nleft > b"0":

    # Объект в инвентаре. Переход к транзакции.

Если инвентарь затрагивается во временной промежуток небольшого окна между тем, как пользователь проверит содержимое на складе и когда он попробует приобрести товар, то Redis осуществит вызов исключения. В то же время redis-py осуществит вызов WatchError (строка 13). 

Следовательно, при указании изменений itemid на любой из хэшей после вызова .hget() (перед .hincrby() в строках 20 и 21), необходимо повторно запустить весь этот процесс. Он будет проходить в рамках цикла while (вечного), но в другой итерации.

Это и есть «оптимистичная» часть блокировки. Ведь вместо лишней траты времени из-за отправки и получения (в это время база данных блокируется полностью), все задачи возлагаются на Redis. В свою очередь, клиент будет уведомлен о том, что вызывается повторная попытка проверки инвентаря, если такая есть.

Главным моментом здесь выступает понимание разницы между клиентской и серверной сторонами. 

nleft = r.hget(itemid, "quantity")

Это назначение дает результат клиентской стороны r.hget(). И, соответственно, функции, которые запускаются на конвейере, будут хорошо буферизировать все инструкции в одну. И потом уже отправлять их на сервер в качестве одного запроса. 

pipe.multi()

pipe.hincrby(itemid, "quantity", -1)

pipe.hincrby(itemid, "npurchased", 1)

pipe.execute()

Так что данные посреди транзакции уже точно не будут отправляться на клиентскую часть. Достаточно лишь вызвать .execute() (эта команда дается на 19 строке), чтобы вернуть последовательность результатов одним махом.

Хотя здесь две команды есть, все же этот блок включает лишь одну двухстороннюю операцию. Проще говоря, у клиента не получится сразу же использовать результат ipe.hincrby(itemid, «quantity», -1), который приводится на двадцатой строке. Ведь только непосредственно экземпляр начала будет возвращаться методами в конвейере.

Хотя, как правило, когда .hincrby() осуществляет возврат итогового результата, невозможно немедленно на него сослаться с клиентской стороны. По крайней мере до тех пор, пока транзакция не будет полностью завершена.

Здесь мы имеем дело с парадоксом. Невозможно размещение вызова .hget() в блоке транзакции. Ведь когда это сделать, не получится понять, а вообще надо ли увеличивать npurchased. Поэтому в реальном времени невозможно узнать результаты команд, которые внесены в конвейер транзакций.

Итак, если инвентарь доходит до нуля, то применяем UNWATCH на ID объекта и поднимает ошибку OutOfStockError (строка 27), показывается в результате строку «Продано», из-за которой покупателям еще больше захочется приобрести шляпу по еще более неадекватной стоимости.  

else:

    # Остановка просмотра itemid

    pipe.unwatch()

    raise OutOfStockError(

        f"Sorry, {itemid} is out of stock!"

    )

Давайте приведем такой пример. Помните, что первоначальное число шляп с ID товара 56854717 составляло 199 штук, поскольку ранее был вызван .hincrby(). Допустим, было выполнено три покупки. Соответственно, изменятся поля quantity и npurchased

>>> buyitem(r, "hat:56854717")

>>> buyitem(r, "hat:56854717")

>>> buyitem(r, "hat:56854717")

>>> r.hmget("hat:56854717", "quantity", "npurchased")  # Хеш multi-get

[b'196', b'4']

Теперь мы можем пройти через большее количество транзакций, имитируя их поток до того момента, пока на складе не останется 0 единиц товара. Еще раз, представьте, что они исходя от группы разных клиентов, а не от одного экземпляра Redis

Python

>>> # Покупка оставшихся 196 шляп под номером 56854717 и уменьшение запаса до 0

>>> for _ in range(196):

...     buyitem(r, "hat:56854717")

>>> r.hmget("hat:56854717", "quantity", "npurchased")

[b'0', b'200']

Теперь, когда какой-то недобросовестный пользователь опоздает к распродаже, ему будет выдана ошибка OutOfStockError. Соответствующее уведомление будет показано непосредственно в том интерфейсе, с которым работает этот покупатель. 

Python

>>> buyitem(r, "hat:56854717")

Traceback (most recent call last):

  File "<stdin>", line 1, in <module>

  File "<stdin>", line 20, in buyitem

__main__.OutOfStockError: Sorry, hat:56854717 is out of stock!

В такой ситуации необходим новый завоз.

Срок действия ключа в Redis Python

А теперь давайте посмотрим на срок, в течение которого ключ в Redis действует. Это его характерная особенность. По окончанию периода, в течение ключ действует, соответствующее значение автоматически стирается из базы. На это обычно уходит несколько секунд. 

Чтобы добиться этого эффекта в redis-py, возможно использование метода .setex(). С его помощью можно установить базовую пару значения ключа string:string с определенным периодом действия. 

>>> from datetime import timedelta

 

>>> # setex: "SET" со сроком действия

>>> r.setex(

...     "runner",

...     timedelta(minutes=1),

...     value="now you see me, now you don't"

... )

True

Возможно определение второго аргумента в виде количества секунд либо объекта timedelta, аналогично строке №6 приведенного выше фрагмента кода. Второй вариант лучше, поскольку он чуть лучше продуман, а также нет никаких двусмысленностей в нем.

Также существуют функции (и соответствующие команды Redis, естественно), которые позволяют получить сведения о количестве времени, которое осталось у ключа до того момента, как он будет удален. 

>>> r.ttl("runner")  # Срок годности (time to live) в секундах

58

>>> r.pttl("runner")  # Тот же срок годности, но в миллисекундах

54368

Затем вы можете ускорить окно до момента истечения срока действия, а потом наблюдать за этим. После удаления r.get() вернут None, а .exists() – 0. 

 >>> r.get("runner")  # Срок действия еще не истек

b"now you see me, now you don't"

 

>>> r.expire("runner", timedelta(seconds=3))  # Установка нового окна срока действия

True

>>> # Остановка на несколько секунд

>>> r.get("runner")

>>> r.exists("runner")  # Ключ и значение ушли (закончился срок действия)

0

Приведем таблицу, которая суммирует команды, которые связаны со сроком действия ключ-значения, учитывая и те, которые указаны ранее.

Обозначение Назначение
r.setex(name, time, value) Настраивает срок действия указанного ключа которого исчисляется в секундах, где время может быть представлено как int или объект Python timedelta.
r.psetex(name, time_ms, value) Настраивает срок действия ключа которого исчисляется в миллисекундах time_ms, где time_ms может быть представлено как как int, или объект  timedelta.
r.expire(name, time) Настраивает флаг срока действия  ключа, где time может быть отображено как int или объект  timedelta.
r.expireat(name, when) Настраивает флаг срока действия  ключа, где when может быть представлен как int, указывающий время Unix формате, или datetime.
r.persist(name) Удаляет срок действия в для указанного ключа.
r.pexpire(name, time) Настраивает флаг срока действия на  ключ в миллисекунды, а время может быть представлено как как int, или объект timedelta.
r.pexpireat(name, when) Настраивает флаг срока действия для ключа, где when может быть представлено как время Unix в миллисекундах (время Unix * 1000), или объектом datetime.
r.pttl(name) Возвращает количество миллисекунд до тех пор, пока срок действия ключа не закончится.
r.ttl(name) Возвращает число секунд до тех пор, пока срок действия ключа не закончится.

Пример использования Redis на сайте PyHats, часть 2

Итак, давайте теперь знания, которые вы получили ранее, попробуем применить на практике в бэкенде проекта PyHats.

Мы создали клиент Redis, который будет совершать такие же действия, какие обычно совершает покупатель. Он обрабатывает поток поступающих IP-адресов, который в будущем может поступать от большого количества соединений типа HTTPS.

Задача наблюдателя в этом случае будет в том, чтобы анализировать поток адресов, которые подключаются к серверу и приглядывать за ним с одного подозрительного адреса.

Определенное промежуточное программное обеспечение на сервере веб-сайта размещает все входящие IP-адреса в список Redis, используя метод .lpush(). Вот такой вариант можно использовать для того, чтобы сымитировать некоторые из IP-адресов. Он довольно грубый, но вполне пригодный к использованию. 

>>> r = redis.Redis(db=5)

>>> r.lpush("ips", "51.218.112.236")

1

>>> r.lpush("ips", "90.213.45.98")

2

>>> r.lpush("ips", "115.215.230.176")

3

>>> r.lpush("ips", "51.218.112.236")

4

Как здесь можно увидеть, описанный ранее метод возвращает длину списка после того, как операция push() оказалась удачной. Каждый раз, когда этот метод вызывается, IP размещается в начале списка redis, который вводится строкой ips. 

Конечно, эта симуляция упрощенная. Но что мы из нее можем узнать? Например, то, что здесь запросы технически выполняются от одного клиента. Но это не мешает рассматривать их в качестве потенциальных запросов, которые поступают от большого количеств различных клиентов. 

Теперь давайте создадим клиент для работы в вечном цикле и который выполняет блокирующий вызов BLOP. 

# Новая вкладка или окно оболочки




import datetime

import ipaddress




import redis




# Здесь мы размещаем наши вредительские IP адреса

blacklist = set()

MAXVISITS = 15




ipwatcher = redis.Redis(db=5)




while True:

    _, addr = ipwatcher.blpop("ips")

    addr = ipaddress.ip_address(addr.decode("utf-8"))

    now = datetime.datetime.utcnow()

    addrts = f"{addr}:{now.minute}"

    n = ipwatcher.incrby(addrts, 1)

    if n >= MAXVISITS:

        print(f"Hat bot detected!:  {addr}")

        blacklist.add(addr)

    else:

        print(f"{now}:  saw {addr}")

    _ = ipwatcher.expire(addrts, 60)

Давайте теперь проанализируем ключевые моменты. Так, ipwatcher играет роль покупателя. Он сидит и ждет, пока новые адреса не окажутся в списке Redis isp. Он получает их в форме байтов. Например, ”51.218.112.236″. После этого он их делает более подходящими объектами адресов с помощью модуля ipadressess: 

 _, addr = ipwatcher.blpop("ips")

addr = ipaddress.ip_address(addr.decode("utf-8"))

Здесь нами создается ключ строки Redis с использованием адреса и минуты часа, в который ipwatcher обнаружил адрес, увеличивая счетчик на 1 и получая в процессе новый счет. 

now = datetime.datetime.utcnow()

addrts = f"{addr}:{now.minute}"

n = ipwatcher.incrby(addrts, 1)

Если адрес был обнаружен чаще, чем описано в MAXVISITS, то похоже что у нас есть веб-скрапер, который пробует создать пузырь. У нас не остается других вариантов, кроме как вернуть этому пользователю что-то типа статуса 403. 

Нами используется ipwatcher.expire(addrts, 60) для периода действия комбинации в 60 секунд с того момента, как она была обнаружена. Это требуется, чтобы не допустить засорения базы ключей просмотрами, которые совершались один раз и при этом уже устарели. 

Если вы используете новую оболочку для выполнения этого блока кода, то вам сразу будет отображен следующий вывод. 

2019-03-11 15:10:41.489214:  saw 51.218.112.236

2019-03-11 15:10:41.490298:  saw 115.215.230.176

2019-03-11 15:10:41.490839:  saw 90.213.45.98

2019-03-11 15:10:41.491387:  saw 51.218.112.236

Результат будет немедленно, поскольку данные четыре IP-адреса располагались в списке (на подобии очереди) с ключом ips, ожидающем времени, когда будет осуществлено их извлечение с использованием ipwatcher. А с помощью инструкции BLPOP (или .blpop() в соответствующей библиотеке Python) даст возможность создать блок до того момента, пока объект будет находиться в списке, а потом окажется за его пределами. Простыми словами, его действия напоминают Queue.get() в Python до того момента, пока объект доступен. 

Кроме просто разделения IP-адресов, ipwatcher будет считать IP-адрес ботом, если количество GET-запросов будет превышать 15 за каждую минуту. 

Давайте теперь возвратимся к первой оболочке и создадим имитацию скрапера, который будет давать по 20 запросов в несколько миллисекунд на сайт. 

 for _ in range(20):

    r.lpush("ips", "104.174.118.18")

Наконец, переключаемся на вторую оболочку, которая содержит ipwatcher, и нам покажется такой результат в выводе. 

2019-03-11 15:15:43.041363:  saw 104.174.118.18

2019-03-11 15:15:43.042027:  saw 104.174.118.18

2019-03-11 15:15:43.042598:  saw 104.174.118.18

2019-03-11 15:15:43.043143:  saw 104.174.118.18

2019-03-11 15:15:43.043725:  saw 104.174.118.18

2019-03-11 15:15:43.044244:  saw 104.174.118.18

2019-03-11 15:15:43.044760:  saw 104.174.118.18

2019-03-11 15:15:43.045288:  saw 104.174.118.18

2019-03-11 15:15:43.045806:  saw 104.174.118.18

2019-03-11 15:15:43.046318:  saw 104.174.118.18

2019-03-11 15:15:43.046829:  saw 104.174.118.18

2019-03-11 15:15:43.047392:  saw 104.174.118.18

2019-03-11 15:15:43.047966:  saw 104.174.118.18

2019-03-11 15:15:43.048479:  saw 104.174.118.18

Hat bot detected!:  104.174.118.18

Hat bot detected!:  104.174.118.18

Hat bot detected!:  104.174.118.18

Hat bot detected!:  104.174.118.18

Hat bot detected!:  104.174.118.18

Hat bot detected!:  104.174.118.18

Теперь нажимаем на комбинацию клавиш Ctrl + C, чтобы покинуть вечный цикл while и увидим то, что опасный для нас IP-адрес оказался в черном списке.  

>>> blacklist

{IPv4Address('104.174.118.18')}

Возможно ли найти дефект в этой системе? Здесь минута проверяется, как .minute, а не 60 секунд. Как следствие, регулярно проверять количество раз, которое пользователь был в течение последних 60 секунд будет несколько труднее.

Есть интересное решение ClassDojo, которое использует отсортированные наборы Redis. 

Выводы

Мы рассмотрели ключевые аспекты работы с Redis. Но это только капля в море. Более подробно почитать о том, как использовать Redis в действии, можно в книге «Redis in Action», которая была написана Джозаи Карлсоном. 

ОфисГуру
Adblock
detector