English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
asyncio
في عصر Python 2، كانت البرمجة الشبكية عالية الأداء تعتمد بشكل رئيسي على مكتبات Twisted و Tornado و Gevent هذه الثلاثة، ولكن لم تكن كوداتها المتوازية متوافقة مع بعضها البعض ولا قابلة للنقل. كما ذكرنا في الفصل السابق، كان Gvanrossum يأمل في تحقيق مكتبة جروة قياسية مبنية على مولدات في Python 3، حيث تم تضمين دعم IO المتوازي مباشرة، وهذا هو asyncio، الذي تم إدراجه في مكتبة القياسية في Python 3.4.
يستخدم حزمة asyncio الجراءات التي تدفعها دائرة الحدث لتحقيق التوازي.
تم تسمية حزمة asyncio بـ "Tulip" (الزنبق) قبل إدراجها في مكتبة القياسية، لذا ستشاهد هذا الاسم غالبًا عند البحث عن المعلومات عبر الإنترنت.
ما هو دائرة الحدث?
يقول ويكيبيديا: دائرة الحدث هي "أرختة برمجة تنتظر توزيع أحداث أو رسائل". بشكل أساسي، دائرة الحدث هي: "عندما يحدث A، يتم تنفيذ B". أو يمكن تفسير هذه الفكرة بأبسط شكل وهي دائرة الحدث الخاصة بـ JavaScript الموجودة في كل متصفح. عندما تضغط على شيء ما ("عندما يحدث A")، يتم إرسال هذا الضغط إلى دائرة الحدث الخاصة بـ JavaScript، ويتم التحقق من وجود أي استدعاءات onclick مسجلة للتعامل مع هذا الضغط (تنفيذ B). يتم تنفيذ أي استدعاءات مسجلة مصحوبة بتفاصيل الضغط. يعتبر دائرة الحدث وهمية لأنها تتبع أحداث وتقوم بالاستدعاءات بشكل دوري لمعالجة هذه الأحداث.
بالنسبة لـ Python، تم إضافة asyncio، الذي يقدم دائرة الحدث، إلى مكتبة القياسية. يركز asyncio على حل المشاكل في خدمات الشبكة، حيث تأتي دائرة الحدث من جهاز التوصيل (السوكت) عند استعداد I/O للقراءة والكتابة كـ "عندما يحدث A" (من خلال مكتبة selectors). بالإضافة إلى واجهة المستخدم الرسومية والI/O، تستخدم دائرة الحدث أيضًا غالبًا لتنفيذ الكود في نواة أخرى أو عملية فرعية، وتستخدم كآلية تنظيم (مثل، التعاونية في المهمات المتعددة). إذا كنت تفهم GIL الخاصة بـ Python، فإن دائرة الحدث مفيدة جدًا للأماكن التي تحتاج إلى إطلاق GIL.
الموضوعات والغورورات
دعنا نبدأ بمراجعة قطعتين من الكود، واحدة باستخدام مكتبة threading وآخرى باستخدام مكتبة asyncio.
# sinner_thread.py import threading import itertools import time import sys class Signal: # هذه الفئة تعرف عنصر متغير، يتم استخدامه للتحكم في الخيط من الخارج go = True def spin(msg, signal): # هذه الوظيفة ستتم تشغيلها في خيط منفرد، وهو parameter signal هو مثال على Signal class الذي تم تعريفه مسبقًا write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\\'): # itertools.cycle 函数从指定的序列中反复不断地生成元素 status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) # 使用退格符把光标移回行首 time.sleep(.1) # كل 0.1 ثانية تحديث if not signal.go: # إذا لم يكن خاصية go ليست True، يخرج من الدائرة break write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除状态消息,把光标移回开头 def slow_function(): # لتقليد العملية الطويلة # 假装等待I/O一段时间 time.sleep(3) # سيقوم blockage بالتوقف عن العمل، ويتم القيام بهذا لتحرير GIL، وإقامة خيط فرعي return 42 def supervisor(): # هذه الوظيفة تضبط الخيط الفرعي، تظهر موضوع الخيط، تتحقق من وقت التشغيل، ثم تنهي العملية signal = Signal() spinner = threading.Thread(target=spin, args=('thinking!', signal)) print('spinner object:', spinner) # عرض موضوع spinner، يتم عرض spinner object: <Thread(Thread-1, initial)> spinner.start() # تشغيل عملية الفرعية result = slow_function() # تشغيل slow_function، سيفقده الخيط الرئيسي، وسيقوم الخيط丛书 بلف النقاط بشكل متحرك signal.go = False spinner.join() # ينتظر انتهاء خيط spinner return result def main(): result = supervisor() print('Answer', result) إذا كان __name__ == '__main__': main()
أ�行رها، النتيجة ستكون تقريباً مثل هذا الشكل:
هذه رسمة متحركة، الخط الذي يسبق "thinking" يتحرك (لتحقيق تسجيل الشاشة، قمت بزيادة وقت النوم)
Python لا يقدم API للإنهاء الموضوع، لذا إذا كنت ترغب في إغلاق الموضوع، يجب على الموضوع استقبال رسالة. هنا نستخدم خاصية signal.go: في الخيط الرئيسي، عند ضبطها على False، سيحصل الخيط spinner على إشارة، ثم يخرج
الآن دعونا نرى النسخة التي تستخدم حزمة asyncio:
# spinner_asyncio.py # 通过协程以动画的形式显示文本式旋转指针 import asyncio import itertools import sys @asyncio.coroutine # 打算交给asyncio 处理的协程要使用 @asyncio.coroutine 装饰 def spin(msg): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\\'): # itertools.cycle 函数从指定的序列中反复不断地生成元素 status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) # 使用退格符把光标移回行首 try: yield from asyncio.sleep(0.1) # 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 这样的休眠不会阻塞事件循环 except asyncio.CancelledError: # 如果 spin 函数苏醒后抛出 asyncio.CancelledError 异常,其原因是发出了取消请求 break write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除状态消息,把光标移回开头 @asyncio.coroutine def slow_function(): # 5 现在此函数是协�程,使用休眠假装进行I/O 操作时,使用 yield from 继续执行事件循环 # 假装等待I/O一段时间 yield from asyncio.sleep(3) # 此表达式把控制权交给主循环,在休眠结束后回复这个协程 return 42 @asyncio.coroutine def supervisor(): #这个函数也是协程,因此可以使用 yield from 驱动 slow_function spinner = asyncio.async(spin('thinking!')) # asyncio.async() 函数排定协程的运行时间,使用一个 Task 对象包装spin 协程,并立即返回 print('spinner object:', spinner) # تعريف التعاون، يتم طباعة شيء مثل spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>> # تدوير func_slow()، للحصول على قيمته عند اكتمال عمله. في نفس الوقت، تستمر حلقة الحدث في العمل # لأن func_slow يستخدم آخر مرة yield from asyncio.sleep(3) لتسليم السيطرة إلى حلقة الحدث الرئيسية result = yield from slow_function() # يمكن للتعاون أن يتم إلغاؤه؛ عند إلغائه، سيتم إلقاء استثناء asyncio.CancelledError في نقطة التوقف الحالية للتعاون # يمكن للتعاون أن يلتقط هذا الاستثناء، أو تأجيل الإلغاء، أو رفض الإلغاء spinner.cancel() return result def main(): loop = asyncio.get_event_loop() # الحصول على مرجع حلقة الحدث # تدوير supervisor协程، لتشغيله حتى اكتمال عمله؛ قيمة العودة لهذا النداء هي قيمة العودة result = loop.run_until_complete(supervisor()) إغلاق loop print('Answer', result) إذا كان __name__ == '__main__': main()
إلا إذا كنت ترغب في إيقاف الطرف الرئيسي، مما يؤدي إلى تجمد حلقة الحدث أو التطبيق بأكمله، لا تستخدم time.sleep() في协رون asyncio.
إذا كان على协رون أن لا يفعل أي شيء لفترة من الزمن، يجب استخدامه yield from asyncio.sleep(DELAY)
استخدام ميزة التزيين @asyncio.coroutine ليس إلزاميًا، ولكن يُنصح به لأنه يمكن أن يجعل协رون يتألق في الكود، إذا لم يتم إنتاج قيمة من قبل، يقوم协رون بإزالة النفايات (ما يعني أن العمل لم يكمل بعد، قد يكون هناك عيوب)، يمكن أن يرسل تحذيرًا. لن يبدأ هذا المزين في تشغيل协رون.
نتائج تنفيذ هذه الق两م متشابهة بشكل عام، دعونا نرى الفرق الأساسي بين الكودين supervisor:
من بين المزايا الرئيسية للدورات مقارنة بالثغرات، أن الثغرات يجب أن تذكر الحفاظ على المفتاح، لضمان حماية الأجزاء المهمة من البرنامج، لتجنب انقطاع العمليات المتعددة عند تنفيذها، لتجنب أن تكون الظروف مثل الطقس في تشاوكسيا، وتكون الدورات تتحفظ على الحماية بشكل افتراضي، يجب علينا إنتاجها بشكل واضح (باستخدام yield أو yield from لفقدان السيطرة) حتى يمكن تشغيل باقي البرنامج.
asyncio.Future: لا يتوقف عن العمل عن قصد
يكون واجهة asyncio.Future مشابهة جدًا لـ concurrent.futures.Future، ولكن طريقة التنفيذ مختلفة، ولا يمكن تبديلها.
في المقال السابق [python المشاركة 1: استخدام futures للمعالجة المتوازية]() قمنا بشرح future في concurrent.futures.Future، في concurrent.futures.Future، future هو مجرد نتائج التخطيط لتنفيذ شيء ما. في مكتبة asyncio، يأخذ أسلوب BaseEventLoop.create_task(...) مكتبة، ويقوم بترتيب وقت تشغيله، ثم يعود بمعينة asyncio.Task (وهي أيضًا معينة asyncio.Future، لأن Task هي فرع Future، تستخدم لتغليف الدورات. (في concurrent.futures.Future، العملية المشابهة هي Executor.submit(...)).
مثل فئة concurrent.futures.Future، توفر فئة asyncio.Future أيضًا
عند استدعاء result() بعد إكمال Future من فئة concurrent.futures.Future، سيتم إرجاع نتيجة القابل للإجراء أو إلقاء الاستثناء الذي تم إلقاؤه أثناء تنفيذ القابل للإجراء، وإذا تم استدعاء f.result() قبل إكمال Future، سيتم إحباط سلسلة التشغيل للوظيفة التي يتم استدعاؤها حتى يتم إرجاع النتيجة. يمكن أن يستقبل هذا الطريقة أيضًا معامل timeout، وإذا لم يكمل Future تنفيذية في الوقت المحدد، سيتم إلقاء استثناء TimeoutError.
عند استخدام asyncio.Future، نستخدم عادة yield from للحصول على النتيجة، بدلاً من استخدام طريقة result()، حيث يولد تعبير yield from قيمة عائدة في الجيل المتوقف، يستأنف العملية.
يهدف فئة asyncio.Future إلى استخدامها مع yield from، لذا عادة لا تحتاج إلى استخدام الطرق التالية:
在 asyncio 包中,可以使用yield from 从asyncio.Future 对象中产出结果。这也就意味着我们可以这么写:
res = yield from foo() # foo 可以是协程函数,也可以是返回 Future 或 task 实例的普通函数
asyncio.async(...)* 函数
asyncio.async(coro_or_future, *, loop=None)
这个函数统一了协程和Future: 第一个参数可以是二者中的任意一个。如果是Future 或者 Task 对象,就直接返回,如果是协程,那么async 函数会自动调用 loop.create_task(...) 方法创建 Task 对象。 loop 参数是可选的,用于传入事件循环; 如果没有传入,那么async函数会通过调用asyncio.get_event_loop() 函数获取循环对象。
BaseEventLoop.create_task(coro)
这个方法排定协程的执行时间,返回一个 asyncio.Task 对象。如果在自定义的BaseEventLoop 子类上调用,返回的对象可能是外部库中与Task类兼容的某个类的实例。
BaseEventLoop.create_task() 方法只在Python3.4.2 及以上版本可用。 Python3.3 只能使用 asyncio.async(...)函数。
如果想在Python控制台或者小型测试脚本中实验future和协程,可以使用下面的片段:
import asyncio def run_sync(coro_or_future): loop = asyncio.get_event_loop() return loop.run_until_complete(coro_or_future) a = run_sync(some_coroutine())
使用asyncio 和 aiohttp 包下载
现在,我们了解了asyncio 的基础知识,是时候使用asyncio 来重写我们 上一篇 [python并发 1:使用 futures 处理并发]() 下载国旗的脚本了。
先看一下代码:
import asyncio import aiohttp # 需要pip install aiohttp from flags import save_flag, show, main, BASE_URL #asyncio.coroutine 我们知道,协程应该使用 asyncio.coroutine 装饰 def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) # يتم تنفيذ العمليات القابلة للإيقاف بواسطة الدورات، ويقوم الكود العملاء بتحويل المسؤولية إلى الدورات من خلال yield from لتنفيذ العمليات غير السلسة resp = yield from aiohttp.request('GET', url): # القراءة هي عملية غير سلسة أيضًا image = yield from resp.read() return image @asyncio.coroutine def download_one(cc): # يجب أن تكون هذه الدالة أيضًا دورة، لأنها تستخدم yield from image = yield from get_flag(cc) show(cc) save_flag(image, cc.lower() + '.gif') تعود cc دالة download_many(cc_list): loop = asyncio.get_event_loop() # الحصول على مرجع إلى التنفيذ الأساسي للحلقة التشغيل to_do = [download_one(cc) for cc in sorted(cc_list)] # استدعاء download_one للحصول على جميع الأعلام، بناء قائمة من كائنات الجينرات # على الرغم من أن اسم الدالة هو wait، إلا أنها ليست دالة قابلة للإيقاف، wait هي دورة، تنتهي عند إكمال جميع الدورات التي يتم تمريرها إليها wait_coro = asyncio.wait(to_do) res, _ = loop.run_until_complete(wait_coro) # تشغيل حلقة التشغيل حتى يتم إكمال wait_coro؛ خلال تشغيل حلقة التشغيل، سيتم إيقاف هذا الكود هنا. loop.close() # إغلاق حلقة التشغيل تعود len(res) إذا كان __name__ == '__main__': main(download_many)
يمكن تلخيص تشغيل هذا الكود كما يلي:
في وظيفة download_many، استخدمنا دالة asyncio.wait(...). هذه الدالة هي دورة، وسيكون المعامل لها هو مجموعة من المستقبلات أو الدورات القابلة للتشغيل كقابل للتحويل؛ ستعالج wait كل دورة في شكل عنصر داخل كائن Task. النتيجة النهائية هي أن كل عنصر معالج بواسطة wait يصبح مثالاً من فئة Future.
وظيفة wait هي وظيفة دورة، لذا فإنها تعود كائن دورة أو مولد؛ والذي يحتوي عليه waite_coro هو هذا النوع من الكائنات.
يأخذ طريقة loop.run_until_complete معامل future أو دورة. إذا كانت الدورة، فإن طريقة run_until_complete تشبه وظيفة wait، وتقوم بتغليف الدورة في كائن Task. هنا، تقوم طريقة run_until_complete بتغليف wait_coro في كائن Task، ويقودها yield from. بعد انتهاء wait_coro، تعود إثنين من المعاملين، المعامل الأول هو future المنتهي والمعامل الثاني هو future غير المنتهي.
<section class="caption">wait</section> لديه كلمتان مفتوحتان، timeout و return_when، إذا تم تعيينهما قد يعود future غير المنتهي.
لقد أعدنا كتابة وظيفة get_flags لأن مكتبة requests التي استخدمناها كانت تقوم بأداء عمليات مُؤجلة. من أجل استخدام حزمة asyncio، يجب علينا تحويل هذه الوظيفة إلى نسخة مُؤجلة.
نصيحة سريعة
إذا شعرت بأن الكود صعب الفهم بعد استخدام الدورات، يمكنك اتباع نصيحة بابا Python (Guido van Rossum) وتجاهل yield from.
كما في هذا الجزء من الكود المكتوب:
@asyncio.coroutine def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request('GET', url): image = yield from resp.read() return image # إزالة yield from def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = aiohttp.request('GET', url) image = resp.read() return image # الآن هل أصبحت أكثر وضوحًا؟
نقطة معرفة
عند استخدام yield from في API حزمة asyncio، يجب الانتباه إلى تفصيل مهم:
عند استخدام حزمة asyncio، يأخذ الكود المتسلسل الذي نكتبه دورة من قبل asyncio نفسها (مولد موكول)، ويقوم المولد في النهاية بتقديم المسؤولية إلى دورة asyncio أو مكتبة ثالثة. هذا الأسلوب يمثل بناء أنبوب، يسمح لجولة asyncio بالتحكم في تنفيذ الدوال المُؤجلة في الوقت الحقيقي.
تجنب الاستدعاءات المُؤجلة
دعونا نبدأ بمراجعة رسم بياني، يوضح تأخير قراءة البيانات من مختلف الأجهزة التخزينية للكمبيوتر:
من خلال هذا الرسم البياني، يمكننا رؤية أن الاستدعاءات المُؤجلة تمثل إهدارًا كبيرًا لموارد المعالج. هل هناك طريقة لتجنب توقف الاستدعاءات المُؤجلة عن التطبيق بأكمله؟
هناك طريقتان:
بالطبع، نوصي بالطريقة الثانية، لأن تكلفة الطريقة الأولى تكون عالية جدًا إذا تم استخدام thread لكل اتصال.
الطريقة الثانية التي يمكننا استخدامها لتحقيق البرمجة المتزامنة هي استخدام generators كcoroutines. بالنسبة للحلقة الحدث، يبدو استدعاء الدالة callback مثل استدعاء method .send() على coroutine معتقلة. استهلاك الذاكرة من قبل جميع coroutines المعتقلة أقل بكثير من استهلاك الذاكرة من قبل threads.
الآن، يجب أن تكون قادرًا على فهم لماذا script flags_asyncio.py أسرع بكثير من flags.py.
لأنflags.py تنزيل متسلسل، ويجب على كل تنزيل أن يستخدم عدة مليارات دورة معالجة مركزية لنتائج، بينما في flags_asyncio.py، عند استدعاء method loop.run_until_complete في function download_many، يقود حلقة الحدث جميع coroutines download_one، حتى reaches expression yield from، حيث يقود expression yield from جميع coroutines get_flag، حتى reaches first expression yield from، ويتم استدعاء function aiohttp.request(). هذه الاستدعاءات لا تؤدي إلى إزعاج، لذا يمكن بدء جميع الطلبات في ثوانٍ قليلة.
تحسين سكربت asyncio التنزيل
الآن نحسن من flags_asyncio.py السابق، بإضافة معالجة الاستثناءات، والمحتسب
import asyncio import collections from collections import namedtuple from enum import Enum import aiohttp from aiohttp import web from flags import save_flag, show, main, BASE_URL DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000 Result = namedtuple('Result', 'status data') HTTPStatus = Enum('Status', 'ok not_found error') #自定义异常用于包装其他HTTP或网络异常,并获取country_code,以便报告错误 class FetchError(Exception): def __init__(self, country_code): self.country_code = country_code @asyncio.coroutine def get_flag(cc): # لهذا coroutine ثلاثة أنواع من النتائج العودة # عند العودة بناءً على رد الحصول على الصورة # عند العودة بناءً على رد HTTP 404، يتم رفع استثناء web.HTTPNotFound # عند العودة بناءً على رمز حالة HTTP آخر، يتم رفع استثناء aiohttp.HttpProcessingError url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request('GET', url): إذا كان resp.status == 200: image = yield from resp.read() return image elif resp.status == 404: raise web.HttpNotFound() else: raise aiohttp.HttpProcessionError( code=resp.status, message=resp.reason, headers=resp.headers ) @asyncio.coroutine def download_one(cc, semaphore): # معامل semaphore هو مثيل لـ asyncio.Semaphore # Semaphore هو جهاز سينكروني، يستخدم للتحكم في الطلبات المتوازية try: with (yield from semaphore): # استخدامه semaphore كمدخل إداري في تعبير yield from، لمنع حجب النظام بأكمله # إذا كانت قيمة مقياس semaphore هي القيمة القصوى المسموح بها، فإنه سيتم حجب هذه coroutine فقط image = yield from get_flag(cc) # بعد مغادرة جملة with، سيُنخفض قيمة مقياس semaphore # إزالة الحجب قد تكون قيد الانتظار على نفس كائن semaphore except web.HTTPNotFound: status = HTTPStatus.not_found msg = 'not found' except Exception as exc: raise FetchError(cc) from exc else: save_flag(image, cc.lower() + '.gif') status = HTTPStatus.ok msg = 'ok' return Result(status, cc) @asyncio.coroutine def downloader_coro(cc_list): counter = collections.Counter() # إنشاء مثيل لـ asyncio.Semaphore، يسمح بتنشيط ما يصل إلى MAX_CONCUR_REQ من coroutines باستخدام هذا المقياس semaphore = asyncio.Semaphore(MAX_CONCUR_REQ) # استدعاء download_one مرة أخرى مكررة، لإنشاء قائمة بأحداث البرمجة المتوازية to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)] # الحصول على م迭代ر، هذا الم迭代ر سيقوم بالعودة إلى future عند انتهاء future to_do_iter = asyncio.as_completed(to_do) for future in to_do_iter: # التكرار على future المسموح بالانتهاء try: res = yield from future # للحصول على نتيجة كائن asyncio.Future (يمكن أيضًا تفعيل future.result) except FetchError as exc: # جميع الاستثناءات التي يتم رفعها تُغلف في كائن FetchError country_code = exc.country_code try: # تحاول الحصول على رسالة الخطأ من الاستثناء الأصلي (__cause__) error_msg = exc.__cause__.args[0] استثناء IndexError: # إذا لم يتم العثور على رسالة الخطأ في الاستثناء الأصلي، استخدم اسم فئة الاستثناء المربوط كرسالة خطأ error_msg = exc.__cause__.__class__.__name__ إذا كان error_msg: msg = '*** Error for {}: {}' اطبع msg.format(country_code, error_msg) status = HTTPStatus.error else: status = res.status counter[status] += 1 عدد الطلبات دالة download_many(cc_list): loop = asyncio.get_event_loop() coro = downloader_coro(cc_list) عدد الطلبات = loop.run_until_complete(coro) إغلاق loop عدد الطلبات إذا كان __name__ == '__main__': main(download_many)
بسبب سرعة طلبات الجريدة، لمنع إرسال الكثير من الطلبات المتوازية إلى الخادم مما يؤدي إلى تحميل الخادم، قمنا بإنشاء instance من asyncio.Semaphore في دالة download_coro، ثم نقلناه إلى دالة download_one.
<secion class="caption">Semaphore</section> يحتفظ بمعامل داخلي، إذا تم استدعاء .acquire() coroutine method على هذا الوسيط، فإن المعامل ينخفض؛ وإذا تم استدعاء .release() coroutine method على هذا الوسيط، فإن المعامل يزيد. قيمة المعامل يتم تحديدها عند التأسيس.
إذا كان المعامل أكبر من 0، فإن استدعاء .acquire() لن يسبب التبكير، وإذا كان المعامل يساوي 0، فإن استدعاء .acquire() سيؤدي إلى تبكير من يؤدي هذا الاستدعاء حتى تنفذ协يرات أخرى استدعاء .release() على نفس Semantic object، مما يؤدي إلى زيادة المعامل.
في الكود الموجود أعلاه، لم نستدعي يدويًا .acquire() أو .release()، بل استخدمنا semaphore كمدارة سياق في دالة download_one:
with (yield from semaphore): image = yield from get_flag(cc)
这段代码保证,任何时候都不会有超过 MAX_CONCUR_REQ 个 get_flag 协程启动。
这段代码保证,任何时候都不会有超过 MAX_CONCUR_REQ 个 get_flag 协程启动。
使用 asyncio.as_completed 函数
因为要使用 yield from 获取 asyncio.as_completed 函数产出的future的结果,所以 as_completed 函数秩序在协程中调用。由于 download_many 要作为参数传给非协程的main 函数,我已我们添加了一个新的 downloader_coro 协程,让download_many 函数只用于设置事件循环。
使用Executor 对象,防止阻塞事件循环
现在我们回去看下上边关于电脑从不同存储介质读取数据的延迟情况图,有一个实时需要注意,那就是访问本地文件系统也会阻塞。
上边的代码中,save_flag 函数阻塞了客户代码与 asyncio 事件循环公用的唯一线程,因此保存文件时,整个应用程序都会暂停。为了避免这个问题,可以使用事件循环对象的 run_in_executor 方法。
asyncio 的事件循环在后台维护着一个ThreadPoolExecutor 对象,我们可以调用 run_in_executor 方法,把可调用的对象发给它执行。
@asyncio.coroutine def download_one(cc, semaphore): try: with (yield from semaphore): image = yield from get_flag(cc) except web.HTTPNotFound: status = HTTPStatus.not_found msg = 'not found' except Exception as exc: raise FetchError(cc) from exc else: 下边是我们改动后的代码: # 这里是改动部分 loop = asyncio.get_event_loop() # 获取事件循环的引用 status = HTTPStatus.ok msg = 'ok' return Result(status, cc)
loop.run_in_executor(None, save_flag, image, cc.lower() + '.gif')
从回调到future到协程
在接触协程之前,我们可能对回调有一定的认识,那么和回调相比,协程有什么改进呢?
python中的回调代码样式:
def stage1(response1): request2 = step1(response1) api_call2(request2, stage2) def stage2(response2): request3 = step3(response3) api_call3(request3, stage3) def stage3(response3): step3(response3) api_call1(request1, stage1)
عيوب الكود السابق:
في هذه المشكلة، يمكن للكوروتينات أن تلعب دوراً كبيراً. إذا تم تغيير الكود إلى كود متعلق بالكوروتينات و yield from، يمكن أن يكون الكود كالتالي:
@asyncio.coroutine def three_stages(request1): response1 = yield from api_call1(request1) request2 = step1(response1) response2 = yield from api_call2(requests) request3 = step2(response2) response3 = yield from api_call3(requests) step3(response3) loop.create_task(three_stages(request1)
من مقارنة هذا الكود مع الكود السابق، فإن هذا الكود أكثر وضوحاً. إذا كان سيتم إلقاء استثناء عند الت طلبات المتعلقة بال دعوات ال异步 api_call1,api_call2,api_call3، يمكن وضع تعبيرات yield from المناسبة في كتل try/except لمعالجة الاستثناءات.
عند استخدام الكوروتينات، يجب أن تعتاد على تعبير yield from، و لا يمكن للكوروتينات الت 호출 مباشرة، بل يجب تحديد وقت تنفيذ الكوروتينات بشكل واضح، أو استخدام تعبير yield from في كوروتينات أخرى التي تم تحديد وقت تنفيذها لتشغيلها. إذا لم يتم استخدام loop.create_task(three_stages(request1))، فإن لا شيء سيحدث.
سنستعرض مثالاً عملياً فيما يلي:
كل مرة يتم فيها تنزيل إرسال عدة طلبات
نعدل بعض من كود تنزيل الأعلام السابقة، ليتيح تنزيل الأعلام والاسماء في نفس الوقت عند حفظ الصورة.
نستخدم الكوروتينات و yield from لحل هذه المشكلة:
@asyncio.coroutine def http_get(url): resp = yield from aiohttp.request('GET', url): إذا كان resp.status == 200: ctype = resp.headers.get('Content-type', '').lower(): إذا كان 'json' في ctype أو ينتهي url ب 'json': data = yield from resp.json() else: data = yield from resp.read() return data elif resp.status == 404: raise web.HttpNotFound() else: raise aiohttp.HttpProcessionError( code=resp.status, message=resp.reason, headers=resp.headers) @asyncio.coroutine def get_country(cc): url = "{}/{cc}/metadata.json".format(BASE_URL, cc=cc.lower()) metadata = yield from http_get(url) return metadata['country'] @asyncio.coroutine def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) return (yield from http_get(url)) @asyncio.coroutine def download_one(cc, semaphore): try: with (yield from semaphore): image = yield from get_flag(cc) with (yield from semaphore): country = yield from get_country(cc) except web.HTTPNotFound: status = HTTPStatus.not_found msg = 'not found' except Exception as exc: raise FetchError(cc) from exc else: country = country.replace(' ', '_') filename = '{}--{}.gif'.format(country, cc) print(filename) loop = asyncio.get_event_loop() loop.run_in_executor(None, save_flag, image, filename) status = HTTPStatus.ok msg = 'ok' return Result(status, cc)
في هذا الفقاع، في دالة download_one، نستدعي get_flag و get_country في كلا من كلاسيكيتم semaphore، مع تقليل الوقت المستغرق
يتم إضافة عبارات return لـ get_flag في طبقة خارجية لأن () لديه أولوية عالية، سيتم تنفيذ عبارات yield from الموجودة داخل الباريتة أولاً. إذا لم يتم إضافتها، سيتم إصدار خطأ لغة النص
إضافة ()، يشبه
image = yield from http_get(url) return image
إذا لم يكن هناك ()، فإن البرنامج سيتم إيقافه عند yield from، ويتم تسليم السيطرة، عند هذا الوقت، إذا تم استخدام return، سيتم إصدار خطأ لغة النص
النهاية
في هذا المقال نناقش:
هذا هو نهاية محتويات هذا المقال، نأمل أن تكون قد ساعدتكم في التعلم، ونأمل أن تشجعوا دروس التعبير.
بيان: محتويات هذا المقال تم جمعها من الإنترنت، حقوق الطبع والتأليف مملوكة للكاتب الأصلي، محتويات الموقع تم إضافتها من قبل مستخدمي الإنترنت بشكل متعاون، ويتم إضافة محتويات الموقع بشكل تلقائي دون تدخل بشري، ولا يتحمل الموقع أي مسؤولية قانونية متعلقة بذلك. إذا اكتشفتم محتوى مخالف للحقوق الطبعية، يرجى إرسال بريد إلكتروني إلى: notice#oldtoolbag.com (عند إرسال البريد الإلكتروني، يرجى استبدال #بـ @) لإبلاغنا، وقدموا الأدلة ذات الصلة، وسيتم حذف المحتوى المزعوم عن الفصل إذا تم التحقق منه.