English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

python concurrency 2: استخدام asyncio لمعالجة التزامن

asyncio

في عصر Python 2، كانت البرمجة الشبكية عالية الأداء تعتمد بشكل رئيسي على مكتبات Twisted و Tornado و Gevent هذه الثلاثة، ولكن لم تكن كوداتها المتوازية متوافقة مع بعضها البعض ولا قابلة للنقل. كما ذكرنا في الفصل السابق، كان Gvanrossum يأمل في تحقيق مكتبة جروة قياسية مبنية على مولدات في Python 3، حيث تم تضمين دعم IO المتوازي مباشرة، وهذا هو asyncio، الذي تم إدراجه في مكتبة القياسية في Python 3.4.

يستخدم حزمة asyncio الجراءات التي تدفعها دائرة الحدث لتحقيق التوازي.

تم تسمية حزمة asyncio بـ "Tulip" (الزنبق) قبل إدراجها في مكتبة القياسية، لذا ستشاهد هذا الاسم غالبًا عند البحث عن المعلومات عبر الإنترنت.

ما هو دائرة الحدث?

يقول ويكيبيديا: دائرة الحدث هي "أرختة برمجة تنتظر توزيع أحداث أو رسائل". بشكل أساسي، دائرة الحدث هي: "عندما يحدث A، يتم تنفيذ B". أو يمكن تفسير هذه الفكرة بأبسط شكل وهي دائرة الحدث الخاصة بـ JavaScript الموجودة في كل متصفح. عندما تضغط على شيء ما ("عندما يحدث A")، يتم إرسال هذا الضغط إلى دائرة الحدث الخاصة بـ JavaScript، ويتم التحقق من وجود أي استدعاءات onclick مسجلة للتعامل مع هذا الضغط (تنفيذ B). يتم تنفيذ أي استدعاءات مسجلة مصحوبة بتفاصيل الضغط. يعتبر دائرة الحدث وهمية لأنها تتبع أحداث وتقوم بالاستدعاءات بشكل دوري لمعالجة هذه الأحداث.

بالنسبة لـ Python، تم إضافة asyncio، الذي يقدم دائرة الحدث، إلى مكتبة القياسية. يركز asyncio على حل المشاكل في خدمات الشبكة، حيث تأتي دائرة الحدث من جهاز التوصيل (السوكت) عند استعداد I/O للقراءة والكتابة كـ "عندما يحدث A" (من خلال مكتبة selectors). بالإضافة إلى واجهة المستخدم الرسومية والI/O، تستخدم دائرة الحدث أيضًا غالبًا لتنفيذ الكود في نواة أخرى أو عملية فرعية، وتستخدم كآلية تنظيم (مثل، التعاونية في المهمات المتعددة). إذا كنت تفهم GIL الخاصة بـ Python، فإن دائرة الحدث مفيدة جدًا للأماكن التي تحتاج إلى إطلاق GIL.

الموضوعات والغورورات

دعنا نبدأ بمراجعة قطعتين من الكود، واحدة باستخدام مكتبة threading وآخرى باستخدام مكتبة asyncio.

# sinner_thread.py
import threading
import itertools
import time
import sys
class Signal: # هذه الفئة تعرف عنصر متغير، يتم استخدامه للتحكم في الخيط من الخارج
 go = True
def spin(msg, signal): # هذه الوظيفة ستتم تشغيلها في خيط منفرد، وهو parameter signal هو مثال على Signal class الذي تم تعريفه مسبقًا
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|/-\\'): # itertools.cycle 函数从指定的序列中反复不断地生成元素
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x08' * len(status)) # 使用退格符把光标移回行首
  time.sleep(.1) # كل 0.1 ثانية تحديث
  if not signal.go: # إذا لم يكن خاصية go ليست True، يخرج من الدائرة
   break
 write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除状态消息,把光标移回开头
def slow_function(): # لتقليد العملية الطويلة
 # 假装等待I/O一段时间
 time.sleep(3) # سيقوم blockage بالتوقف عن العمل، ويتم القيام بهذا لتحرير GIL، وإقامة خيط فرعي
 return 42
def supervisor(): # هذه الوظيفة تضبط الخيط الفرعي، تظهر موضوع الخيط، تتحقق من وقت التشغيل، ثم تنهي العملية
 signal = Signal()
 spinner = threading.Thread(target=spin,
        args=('thinking!', signal))
 print('spinner object:', spinner) # عرض موضوع spinner، يتم عرض spinner object: <Thread(Thread-1, initial)>
 spinner.start() # تشغيل عملية الفرعية
 result = slow_function() # تشغيل slow_function، سيفقده الخيط الرئيسي، وسيقوم الخيط丛书 بلف النقاط بشكل متحرك
 signal.go = False
 spinner.join() # ينتظر انتهاء خيط spinner
 return result
def main():
 result = supervisor() 
 print('Answer', result)
إذا كان __name__ == '__main__':
 main()

أ�行رها، النتيجة ستكون تقريباً مثل هذا الشكل:

هذه رسمة متحركة، الخط الذي يسبق "thinking" يتحرك (لتحقيق تسجيل الشاشة، قمت بزيادة وقت النوم)

Python لا يقدم API للإنهاء الموضوع، لذا إذا كنت ترغب في إغلاق الموضوع، يجب على الموضوع استقبال رسالة. هنا نستخدم خاصية signal.go: في الخيط الرئيسي، عند ضبطها على False، سيحصل الخيط spinner على إشارة، ثم يخرج

الآن دعونا نرى النسخة التي تستخدم حزمة asyncio:

# spinner_asyncio.py
# 通过协程以动画的形式显示文本式旋转指针
import asyncio
import itertools
import sys
@asyncio.coroutine # 打算交给asyncio 处理的协程要使用 @asyncio.coroutine 装饰
def spin(msg):
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|/-\\'): # itertools.cycle 函数从指定的序列中反复不断地生成元素
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x08' * len(status)) # 使用退格符把光标移回行首
  try:
   yield from asyncio.sleep(0.1) # 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 这样的休眠不会阻塞事件循环
  except asyncio.CancelledError: # 如果 spin 函数苏醒后抛出 asyncio.CancelledError 异常,其原因是发出了取消请求
   break
 write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除状态消息,把光标移回开头
@asyncio.coroutine
def slow_function(): # 5 现在此函数是协�程,使用休眠假装进行I/O 操作时,使用 yield from 继续执行事件循环
 # 假装等待I/O一段时间
 yield from asyncio.sleep(3) # 此表达式把控制权交给主循环,在休眠结束后回复这个协程
 return 42
@asyncio.coroutine
def supervisor(): #这个函数也是协程,因此可以使用 yield from 驱动 slow_function
 spinner = asyncio.async(spin('thinking!')) # asyncio.async() 函数排定协程的运行时间,使用一个 Task 对象包装spin 协程,并立即返回
 print('spinner object:', spinner) # تعريف التعاون، يتم طباعة شيء مثل spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
 # تدوير func_slow()، للحصول على قيمته عند اكتمال عمله. في نفس الوقت، تستمر حلقة الحدث في العمل
 # لأن func_slow يستخدم آخر مرة yield from asyncio.sleep(3) لتسليم السيطرة إلى حلقة الحدث الرئيسية
 result = yield from slow_function()
 # يمكن للتعاون أن يتم إلغاؤه؛ عند إلغائه، سيتم إلقاء استثناء asyncio.CancelledError في نقطة التوقف الحالية للتعاون
 # يمكن للتعاون أن يلتقط هذا الاستثناء، أو تأجيل الإلغاء، أو رفض الإلغاء
 spinner.cancel()
 return result
def main():
 loop = asyncio.get_event_loop() # الحصول على مرجع حلقة الحدث
 # تدوير supervisor协程، لتشغيله حتى اكتمال عمله؛ قيمة العودة لهذا النداء هي قيمة العودة
 result = loop.run_until_complete(supervisor())
 إغلاق loop
 print('Answer', result)
إذا كان __name__ == '__main__':
 main()

إلا إذا كنت ترغب في إيقاف الطرف الرئيسي، مما يؤدي إلى تجمد حلقة الحدث أو التطبيق بأكمله، لا تستخدم time.sleep() في协رون asyncio.

إذا كان على协رون أن لا يفعل أي شيء لفترة من الزمن، يجب استخدامه yield from asyncio.sleep(DELAY)

استخدام ميزة التزيين @asyncio.coroutine ليس إلزاميًا، ولكن يُنصح به لأنه يمكن أن يجعل协رون يتألق في الكود، إذا لم يتم إنتاج قيمة من قبل، يقوم协رون بإزالة النفايات (ما يعني أن العمل لم يكمل بعد، قد يكون هناك عيوب)، يمكن أن يرسل تحذيرًا. لن يبدأ هذا المزين في تشغيل协رون.

نتائج تنفيذ هذه الق两م متشابهة بشكل عام، دعونا نرى الفرق الأساسي بين الكودين supervisor:

  1. مثل asyncio.Task يبدو تقريباً متطابق مع threading.Thread (Task يشبه النواة الخضراء في مكتبة تنفيذ المهام المتعددة في الكتابة)
  2. تستخدم معينة Task لتشغيل الدورات، وتستخدم معينة Thread لتشغيل العناصر القابلة للتشغيل
  3. لا يتم إنشاء معينة Task بنفسها، بل يتم الحصول عليها عن طريق إرسال الدورات إلى دالة asyncio.async(...) أو أسلوب loop.create_task(...)
  4. يكون المعينة Task التي تم الحصول عليها قد تم تحديد وقت تشغيلها؛ يجب على معينة Thread تفعيل أسلوب start، لتخبرها بوضوح أنها ستشغلها
  5. في دالة supervisor الإضافية للثغرات، تكون slow_function وظيفة عادية، وتُتصل مباشرة بالثغرة، ولكن في النسخة المتوازية للـ slow_function، تكون الوظيفة دورة، وتُتولى من قبل yield from.
  6. لا توجد واجهة يمكن استخدامها لإنهاء الثغرات من الخارج، لأن الثغرات قد تُؤجّل في أي وقت. ولكن إذا كنت ترغب في إنهاء المهمة، يمكنك استخدام أسلوب Task.cancel() في المثال، لإطلاق استثناء CancelledError في الدورات. يمكن للدورات التقاط هذا الاستثناء في مكان التوقف عند yield، وتعامل الطلب على إنهاء.
  7. يجب تشغيل الدورات supervisor في الدالة main باستخدام أسلوب loop.run_until_complete.

من بين المزايا الرئيسية للدورات مقارنة بالثغرات، أن الثغرات يجب أن تذكر الحفاظ على المفتاح، لضمان حماية الأجزاء المهمة من البرنامج، لتجنب انقطاع العمليات المتعددة عند تنفيذها، لتجنب أن تكون الظروف مثل الطقس في تشاوكسيا، وتكون الدورات تتحفظ على الحماية بشكل افتراضي، يجب علينا إنتاجها بشكل واضح (باستخدام yield أو yield from لفقدان السيطرة) حتى يمكن تشغيل باقي البرنامج.

asyncio.Future: لا يتوقف عن العمل عن قصد

يكون واجهة asyncio.Future مشابهة جدًا لـ concurrent.futures.Future، ولكن طريقة التنفيذ مختلفة، ولا يمكن تبديلها.

في المقال السابق [python المشاركة 1: استخدام futures للمعالجة المتوازية]() قمنا بشرح future في concurrent.futures.Future، في concurrent.futures.Future، future هو مجرد نتائج التخطيط لتنفيذ شيء ما. في مكتبة asyncio، يأخذ أسلوب BaseEventLoop.create_task(...) مكتبة، ويقوم بترتيب وقت تشغيله، ثم يعود بمعينة asyncio.Task (وهي أيضًا معينة asyncio.Future، لأن Task هي فرع Future، تستخدم لتغليف الدورات. (في concurrent.futures.Future، العملية المشابهة هي Executor.submit(...)).

مثل فئة concurrent.futures.Future، توفر فئة asyncio.Future أيضًا

  1. يُرجع .done() قيمة بولية تعبر عن ما إذا كان Future قد تم تنفيذه أم لا
  2. طريقة .add_done_callback() تأخذ معاملًا واحدًا، وهو قابل للإجراء، ويتم استدعاء هذا العنصر عند إكمال Future.
  3. طريقة .result() لا تأخذ أي معاملات، لذا لا يمكنك تحديد وقت انتهاء الوقت. إذا تم استدعاء .result() قبل أن ينتهي التشغيل، سيتم إلقاء استثناء asyncio.InvalidStateError.

عند استدعاء result() بعد إكمال Future من فئة concurrent.futures.Future، سيتم إرجاع نتيجة القابل للإجراء أو إلقاء الاستثناء الذي تم إلقاؤه أثناء تنفيذ القابل للإجراء، وإذا تم استدعاء f.result() قبل إكمال Future، سيتم إحباط سلسلة التشغيل للوظيفة التي يتم استدعاؤها حتى يتم إرجاع النتيجة. يمكن أن يستقبل هذا الطريقة أيضًا معامل timeout، وإذا لم يكمل Future تنفيذية في الوقت المحدد، سيتم إلقاء استثناء TimeoutError.

عند استخدام asyncio.Future، نستخدم عادة yield from للحصول على النتيجة، بدلاً من استخدام طريقة result()، حيث يولد تعبير yield from قيمة عائدة في الجيل المتوقف، يستأنف العملية.

يهدف فئة asyncio.Future إلى استخدامها مع yield from، لذا عادة لا تحتاج إلى استخدام الطرق التالية:

  1. ليس من الضروري استدعاء my_future.add_down_callback(...)، لأنه يمكنك ببساطة وضع العمليات التي تريد تنفيذها بعد انتهاء future في الجانب الآخر من تعبير الجيل من my_future yield from. (بما أن الجيل يمكنه التوقف والإعادة إلى الحياة للوظائف)
  2. ليس من الضروري استدعاء my_future.result()، لأن النتيجة التي يولد منها yield from هي (result = yield from my_future)

在 asyncio 包中,可以使用yield from 从asyncio.Future 对象中产出结果。这也就意味着我们可以这么写:

res = yield from foo() # foo 可以是协程函数,也可以是返回 Future 或 task 实例的普通函数

asyncio.async(...)* 函数

asyncio.async(coro_or_future, *, loop=None)

这个函数统一了协程和Future: 第一个参数可以是二者中的任意一个。如果是Future 或者 Task 对象,就直接返回,如果是协程,那么async 函数会自动调用 loop.create_task(...) 方法创建 Task 对象。 loop 参数是可选的,用于传入事件循环; 如果没有传入,那么async函数会通过调用asyncio.get_event_loop() 函数获取循环对象。

BaseEventLoop.create_task(coro)

这个方法排定协程的执行时间,返回一个 asyncio.Task 对象。如果在自定义的BaseEventLoop 子类上调用,返回的对象可能是外部库中与Task类兼容的某个类的实例。

BaseEventLoop.create_task() 方法只在Python3.4.2 及以上版本可用。 Python3.3 只能使用 asyncio.async(...)函数。
如果想在Python控制台或者小型测试脚本中实验future和协程,可以使用下面的片段:

import asyncio
def run_sync(coro_or_future):
 loop = asyncio.get_event_loop()
 return loop.run_until_complete(coro_or_future)
a = run_sync(some_coroutine())

使用asyncio 和 aiohttp 包下载

现在,我们了解了asyncio 的基础知识,是时候使用asyncio 来重写我们 上一篇 [python并发 1:使用 futures 处理并发]() 下载国旗的脚本了。

先看一下代码:

import asyncio
import aiohttp # 需要pip install aiohttp
from flags import save_flag, show, main, BASE_URL
#asyncio.coroutine 我们知道,协程应该使用 asyncio.coroutine 装饰
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
  # يتم تنفيذ العمليات القابلة للإيقاف بواسطة الدورات، ويقوم الكود العملاء بتحويل المسؤولية إلى الدورات من خلال yield from لتنفيذ العمليات غير السلسة
 resp = yield from aiohttp.request('GET', url): 
 # القراءة هي عملية غير سلسة أيضًا
 image = yield from resp.read()
 return image
@asyncio.coroutine
def download_one(cc): # يجب أن تكون هذه الدالة أيضًا دورة، لأنها تستخدم yield from
 image = yield from get_flag(cc) 
 show(cc)
 save_flag(image, cc.lower() + '.gif')
 تعود cc
دالة download_many(cc_list):
 loop = asyncio.get_event_loop() # الحصول على مرجع إلى التنفيذ الأساسي للحلقة التشغيل
 to_do = [download_one(cc) for cc in sorted(cc_list)] # استدعاء download_one للحصول على جميع الأعلام، بناء قائمة من كائنات الجينرات
 # على الرغم من أن اسم الدالة هو wait، إلا أنها ليست دالة قابلة للإيقاف، wait هي دورة، تنتهي عند إكمال جميع الدورات التي يتم تمريرها إليها
 wait_coro = asyncio.wait(to_do)
 res, _ = loop.run_until_complete(wait_coro) # تشغيل حلقة التشغيل حتى يتم إكمال wait_coro؛ خلال تشغيل حلقة التشغيل، سيتم إيقاف هذا الكود هنا.
 loop.close() # إغلاق حلقة التشغيل
 تعود len(res)
إذا كان __name__ == '__main__':
 main(download_many)

يمكن تلخيص تشغيل هذا الكود كما يلي:

  1. يحصل download_many على حلقة التشغيل في وظيفة download_many، ويُعالج عدة كائنات من الدورات التي تم إنشاؤها بواسطة وظيفة download_one
  2. تُشغل حلقة asyncio دورات مختلفة مرة واحدة
  3. عندما يستخدم الكود العملاء دورة (get_flag) yield from لتحويل المسؤولية إلى دورة من مكتبة (aiohttp.request)، يتم إرجاع السيطرة إلى حلقة التشغيل، وتنفيذ الدورة المحددة مسبقًا
  4. تحصل الحلقة التشغيل على الإشعار بعد إكمال العمليات القابلة للإيقاف عن طريق واجهة API القاعدة القائمة على الإشعارات.
  5. بعد الحصول على الإشعار، يرسل الدوران النتيجة إلى الدورة الموقوفة
  6. تتحرك الدورة إلى الأمام إلى التعبير التالي من yield from، مثل وظيفة get_flag yield from resp.read(). يستعيد الحلقة التشغيل السيطرة مرة أخرى، وتكرر الخطوات 4 إلى 6 حتى يتم إنهاء الحلقة.

في وظيفة download_many، استخدمنا دالة asyncio.wait(...). هذه الدالة هي دورة، وسيكون المعامل لها هو مجموعة من المستقبلات أو الدورات القابلة للتشغيل كقابل للتحويل؛ ستعالج wait كل دورة في شكل عنصر داخل كائن Task. النتيجة النهائية هي أن كل عنصر معالج بواسطة wait يصبح مثالاً من فئة Future.

وظيفة wait هي وظيفة دورة، لذا فإنها تعود كائن دورة أو مولد؛ والذي يحتوي عليه waite_coro هو هذا النوع من الكائنات.

يأخذ طريقة loop.run_until_complete معامل future أو دورة. إذا كانت الدورة، فإن طريقة run_until_complete تشبه وظيفة wait، وتقوم بتغليف الدورة في كائن Task. هنا، تقوم طريقة run_until_complete بتغليف wait_coro في كائن Task، ويقودها yield from. بعد انتهاء wait_coro، تعود إثنين من المعاملين، المعامل الأول هو future المنتهي والمعامل الثاني هو future غير المنتهي.

<section class="caption">wait</section> لديه كلمتان مفتوحتان، timeout و return_when، إذا تم تعيينهما قد يعود future غير المنتهي.

لقد أعدنا كتابة وظيفة get_flags لأن مكتبة requests التي استخدمناها كانت تقوم بأداء عمليات مُؤجلة. من أجل استخدام حزمة asyncio، يجب علينا تحويل هذه الوظيفة إلى نسخة مُؤجلة.

نصيحة سريعة

إذا شعرت بأن الكود صعب الفهم بعد استخدام الدورات، يمكنك اتباع نصيحة بابا Python (Guido van Rossum) وتجاهل yield from.

كما في هذا الجزء من الكود المكتوب:

@asyncio.coroutine
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url): 
 image = yield from resp.read()
 return image
# إزالة yield from
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = aiohttp.request('GET', url) 
 image = resp.read()
 return image
# الآن هل أصبحت أكثر وضوحًا؟

نقطة معرفة

عند استخدام yield from في API حزمة asyncio، يجب الانتباه إلى تفصيل مهم:

عند استخدام حزمة asyncio، يأخذ الكود المتسلسل الذي نكتبه دورة من قبل asyncio نفسها (مولد موكول)، ويقوم المولد في النهاية بتقديم المسؤولية إلى دورة asyncio أو مكتبة ثالثة. هذا الأسلوب يمثل بناء أنبوب، يسمح لجولة asyncio بالتحكم في تنفيذ الدوال المُؤجلة في الوقت الحقيقي.

تجنب الاستدعاءات المُؤجلة

دعونا نبدأ بمراجعة رسم بياني، يوضح تأخير قراءة البيانات من مختلف الأجهزة التخزينية للكمبيوتر:

من خلال هذا الرسم البياني، يمكننا رؤية أن الاستدعاءات المُؤجلة تمثل إهدارًا كبيرًا لموارد المعالج. هل هناك طريقة لتجنب توقف الاستدعاءات المُؤجلة عن التطبيق بأكمله؟

هناك طريقتان:

  1. العمليات المُؤجلة تعمل في نواة منفردة
  2. تحويل كل عملية معطلة إلى استدعاءات متزامنة غير معطلة

بالطبع، نوصي بالطريقة الثانية، لأن تكلفة الطريقة الأولى تكون عالية جدًا إذا تم استخدام thread لكل اتصال.

الطريقة الثانية التي يمكننا استخدامها لتحقيق البرمجة المتزامنة هي استخدام generators كcoroutines. بالنسبة للحلقة الحدث، يبدو استدعاء الدالة callback مثل استدعاء method .send() على coroutine معتقلة. استهلاك الذاكرة من قبل جميع coroutines المعتقلة أقل بكثير من استهلاك الذاكرة من قبل threads.

الآن، يجب أن تكون قادرًا على فهم لماذا script flags_asyncio.py أسرع بكثير من flags.py.

لأنflags.py تنزيل متسلسل، ويجب على كل تنزيل أن يستخدم عدة مليارات دورة معالجة مركزية لنتائج، بينما في flags_asyncio.py، عند استدعاء method loop.run_until_complete في function download_many، يقود حلقة الحدث جميع coroutines download_one، حتى reaches expression yield from، حيث يقود expression yield from جميع coroutines get_flag، حتى reaches first expression yield from، ويتم استدعاء function aiohttp.request(). هذه الاستدعاءات لا تؤدي إلى إزعاج، لذا يمكن بدء جميع الطلبات في ثوانٍ قليلة.

تحسين سكربت asyncio التنزيل

الآن نحسن من flags_asyncio.py السابق، بإضافة معالجة الاستثناءات، والمحتسب

import asyncio
import collections
from collections import namedtuple
from enum import Enum
import aiohttp
from aiohttp import web
from flags import save_flag, show, main, BASE_URL
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')
#自定义异常用于包装其他HTTP或网络异常,并获取country_code,以便报告错误
class FetchError(Exception):
 def __init__(self, country_code):
  self.country_code = country_code
@asyncio.coroutine
def get_flag(cc):
 # لهذا coroutine ثلاثة أنواع من النتائج العودة
 # عند العودة بناءً على رد الحصول على الصورة
 # عند العودة بناءً على رد HTTP 404، يتم رفع استثناء web.HTTPNotFound
 # عند العودة بناءً على رمز حالة HTTP آخر، يتم رفع استثناء aiohttp.HttpProcessingError
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url):
 إذا كان resp.status == 200:
  image = yield from resp.read()
  return image
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers
  )
@asyncio.coroutine
def download_one(cc, semaphore):
 # معامل semaphore هو مثيل لـ asyncio.Semaphore
 # Semaphore هو جهاز سينكروني، يستخدم للتحكم في الطلبات المتوازية
 try:
  with (yield from semaphore):
    # استخدامه semaphore كمدخل إداري في تعبير yield from، لمنع حجب النظام بأكمله
    # إذا كانت قيمة مقياس semaphore هي القيمة القصوى المسموح بها، فإنه سيتم حجب هذه coroutine فقط
    image = yield from get_flag(cc)
    # بعد مغادرة جملة with، سيُنخفض قيمة مقياس semaphore
    # إزالة الحجب قد تكون قيد الانتظار على نفس كائن semaphore
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  save_flag(image, cc.lower() + '.gif')
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)
@asyncio.coroutine
def downloader_coro(cc_list):
 counter = collections.Counter()
 # إنشاء مثيل لـ asyncio.Semaphore، يسمح بتنشيط ما يصل إلى MAX_CONCUR_REQ من coroutines باستخدام هذا المقياس
 semaphore = asyncio.Semaphore(MAX_CONCUR_REQ)
 # استدعاء download_one مرة أخرى مكررة، لإنشاء قائمة بأحداث البرمجة المتوازية
 to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
 # الحصول على م迭代ر، هذا الم迭代ر سيقوم بالعودة إلى future عند انتهاء future
 to_do_iter = asyncio.as_completed(to_do)
 for future in to_do_iter:
  # التكرار على future المسموح بالانتهاء 
  try:
   res = yield from future # للحصول على نتيجة كائن asyncio.Future (يمكن أيضًا تفعيل future.result)
  except FetchError as exc:
   # جميع الاستثناءات التي يتم رفعها تُغلف في كائن FetchError
   country_code = exc.country_code
   try:
    # تحاول الحصول على رسالة الخطأ من الاستثناء الأصلي (__cause__)
    error_msg = exc.__cause__.args[0]
   استثناء IndexError:
    # إذا لم يتم العثور على رسالة الخطأ في الاستثناء الأصلي، استخدم اسم فئة الاستثناء المربوط كرسالة خطأ
    error_msg = exc.__cause__.__class__.__name__
   إذا كان error_msg:
    msg = '*** Error for {}: {}'
    اطبع msg.format(country_code, error_msg)
   status = HTTPStatus.error
  else:
   status = res.status
  counter[status] += 1
 عدد الطلبات
دالة download_many(cc_list):
 loop = asyncio.get_event_loop()
 coro = downloader_coro(cc_list)
 عدد الطلبات = loop.run_until_complete(coro)
 إغلاق loop
 عدد الطلبات
إذا كان __name__ == '__main__':
 main(download_many)

بسبب سرعة طلبات الجريدة، لمنع إرسال الكثير من الطلبات المتوازية إلى الخادم مما يؤدي إلى تحميل الخادم، قمنا بإنشاء instance من asyncio.Semaphore في دالة download_coro، ثم نقلناه إلى دالة download_one.

<secion class="caption">Semaphore</section> يحتفظ بمعامل داخلي، إذا تم استدعاء .acquire() coroutine method على هذا الوسيط، فإن المعامل ينخفض؛ وإذا تم استدعاء .release() coroutine method على هذا الوسيط، فإن المعامل يزيد. قيمة المعامل يتم تحديدها عند التأسيس.

إذا كان المعامل أكبر من 0، فإن استدعاء .acquire() لن يسبب التبكير، وإذا كان المعامل يساوي 0، فإن استدعاء .acquire() سيؤدي إلى تبكير من يؤدي هذا الاستدعاء حتى تنفذ协يرات أخرى استدعاء .release() على نفس Semantic object، مما يؤدي إلى زيادة المعامل.

في الكود الموجود أعلاه، لم نستدعي يدويًا .acquire() أو .release()، بل استخدمنا semaphore كمدارة سياق في دالة download_one:

with (yield from semaphore):
 image = yield from get_flag(cc)

这段代码保证,任何时候都不会有超过 MAX_CONCUR_REQ 个 get_flag 协程启动。

这段代码保证,任何时候都不会有超过 MAX_CONCUR_REQ 个 get_flag 协程启动。

使用 asyncio.as_completed 函数

因为要使用 yield from 获取 asyncio.as_completed 函数产出的future的结果,所以 as_completed 函数秩序在协程中调用。由于 download_many 要作为参数传给非协程的main 函数,我已我们添加了一个新的 downloader_coro 协程,让download_many 函数只用于设置事件循环。

使用Executor 对象,防止阻塞事件循环

现在我们回去看下上边关于电脑从不同存储介质读取数据的延迟情况图,有一个实时需要注意,那就是访问本地文件系统也会阻塞。

上边的代码中,save_flag 函数阻塞了客户代码与 asyncio 事件循环公用的唯一线程,因此保存文件时,整个应用程序都会暂停。为了避免这个问题,可以使用事件循环对象的 run_in_executor 方法。

asyncio 的事件循环在后台维护着一个ThreadPoolExecutor 对象,我们可以调用 run_in_executor 方法,把可调用的对象发给它执行。

@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  下边是我们改动后的代码:
  # 这里是改动部分
  loop = asyncio.get_event_loop() # 获取事件循环的引用
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

loop.run_in_executor(None, save_flag, image, cc.lower() + '.gif')

从回调到future到协程

在接触协程之前,我们可能对回调有一定的认识,那么和回调相比,协程有什么改进呢?

python中的回调代码样式:

def stage1(response1):
 request2 = step1(response1)
 api_call2(request2, stage2)
def stage2(response2):
 request3 = step3(response3)
 api_call3(request3, stage3) 
 def stage3(response3):
  step3(response3) 
api_call1(request1, stage1)

عيوب الكود السابق:

  1. من المحتمل أن يحدث عادة الكالbacks
  2. يصبح الكود صعب القراءة

في هذه المشكلة، يمكن للكوروتينات أن تلعب دوراً كبيراً. إذا تم تغيير الكود إلى كود متعلق بالكوروتينات و yield from، يمكن أن يكون الكود كالتالي:

@asyncio.coroutine
def three_stages(request1):
 response1 = yield from api_call1(request1)
 request2 = step1(response1)
 response2 = yield from api_call2(requests)
 request3 = step2(response2)
 response3 = yield from api_call3(requests)
 step3(response3) 
loop.create_task(three_stages(request1)

من مقارنة هذا الكود مع الكود السابق، فإن هذا الكود أكثر وضوحاً. إذا كان سيتم إلقاء استثناء عند الت طلبات المتعلقة بال دعوات ال异步 api_call1,api_call2,api_call3، يمكن وضع تعبيرات yield from المناسبة في كتل try/except لمعالجة الاستثناءات.

عند استخدام الكوروتينات، يجب أن تعتاد على تعبير yield from، و لا يمكن للكوروتينات الت 호출 مباشرة، بل يجب تحديد وقت تنفيذ الكوروتينات بشكل واضح، أو استخدام تعبير yield from في كوروتينات أخرى التي تم تحديد وقت تنفيذها لتشغيلها. إذا لم يتم استخدام loop.create_task(three_stages(request1))، فإن لا شيء سيحدث.

سنستعرض مثالاً عملياً فيما يلي:

كل مرة يتم فيها تنزيل إرسال عدة طلبات

نعدل بعض من كود تنزيل الأعلام السابقة، ليتيح تنزيل الأعلام والاسماء في نفس الوقت عند حفظ الصورة.
نستخدم الكوروتينات و yield from لحل هذه المشكلة:

@asyncio.coroutine
def http_get(url):
 resp = yield from aiohttp.request('GET', url):
 إذا كان resp.status == 200:
  ctype = resp.headers.get('Content-type', '').lower():
  إذا كان 'json' في ctype أو ينتهي url ب 'json':
   data = yield from resp.json()
  else:
   data = yield from resp.read()
  return data
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers)
@asyncio.coroutine
def get_country(cc):
 url = "{}/{cc}/metadata.json".format(BASE_URL, cc=cc.lower())
 metadata = yield from http_get(url)
 return metadata['country']
@asyncio.coroutine
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 return (yield from http_get(url))
@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
  with (yield from semaphore):
   country = yield from get_country(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  country = country.replace(' ', '_')
  filename = '{}--{}.gif'.format(country, cc)
  print(filename)
  loop = asyncio.get_event_loop()
  loop.run_in_executor(None, save_flag, image, filename)
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

في هذا الفقاع، في دالة download_one، نستدعي get_flag و get_country في كلا من كلاسيكيتم semaphore، مع تقليل الوقت المستغرق

يتم إضافة عبارات return لـ get_flag في طبقة خارجية لأن () لديه أولوية عالية، سيتم تنفيذ عبارات yield from الموجودة داخل الباريتة أولاً. إذا لم يتم إضافتها، سيتم إصدار خطأ لغة النص

إضافة ()، يشبه

image = yield from http_get(url)
return image

إذا لم يكن هناك ()، فإن البرنامج سيتم إيقافه عند yield from، ويتم تسليم السيطرة، عند هذا الوقت، إذا تم استخدام return، سيتم إصدار خطأ لغة النص

النهاية

في هذا المقال نناقش:

  1. مقارنة بين برنامج متعدد الأنماط ومسار asyncio، يوضح علاقة الأنماط المتعددة والأعمال المتوازية
  2. مقارنة بين فئة asyncio.Future و فئة concurrent.futures.Future
  3. كيفية استخدام برمجة المتغيرات المتوازية لإدارة التدفق العالي في التطبيقات الشبكية
  4. في برمجة المتغيرات المتوازية، مقارنة بال回调، يرفع السكوريت المكتسبة من قبل النسخة المتوازية بشكل ملحوظ

هذا هو نهاية محتويات هذا المقال، نأمل أن تكون قد ساعدتكم في التعلم، ونأمل أن تشجعوا دروس التعبير.

بيان: محتويات هذا المقال تم جمعها من الإنترنت، حقوق الطبع والتأليف مملوكة للكاتب الأصلي، محتويات الموقع تم إضافتها من قبل مستخدمي الإنترنت بشكل متعاون، ويتم إضافة محتويات الموقع بشكل تلقائي دون تدخل بشري، ولا يتحمل الموقع أي مسؤولية قانونية متعلقة بذلك. إذا اكتشفتم محتوى مخالف للحقوق الطبعية، يرجى إرسال بريد إلكتروني إلى: notice#oldtoolbag.com (عند إرسال البريد الإلكتروني، يرجى استبدال #بـ @) لإبلاغنا، وقدموا الأدلة ذات الصلة، وسيتم حذف المحتوى المزعوم عن الفصل إذا تم التحقق منه.

التي قد تهمك