English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
خلفية تقنية حوض النواة
في برمجة الموارد الموجهة للكائن، يتطلب إنشاء وإزالة الكائنات الكثير من الوقت، لأن إنشاء كائن يتطلب الحصول على موارد الذاكرة أو موارد أخرى أكثر. خاصة في Java، حيث يحاول الجهاز الافتراضي تتبع كل كائن، حتى يتمكن من إزالة النفايات بعد إزالة الكائن.
لذلك، إحدى طرق تحسين كفاءة البرنامج هو تقليل أقصى قدر ممكن من إنشاء وتدمير العناصر، خاصة إنشاء وتدمير العناصر الكثيرة التي تستخدم موارد كثيرة. كيفية استخدام العناصر الموجودة لتقديم الخدمة هي مشكلة يجب حلها، وهذا في الواقع هو سبب ظهور تقنيات 'النواة' هذه.
على سبيل المثال، العديد من المكونات الشائعة في Android، مثل مكتبات تحميل الصور، مكتبات الطلبات الشبكية، لا يمكنهم الابتعاد عن مفهوم 'النواة'، مثل مكتبات تحميل الصور، مكتبات الطلبات الشبكية، حتى في ميكانيكية نقل الرسائل في Android، عند استخدام Message.obtain()، يتم استخدام عناصر النواة في نواة الرسائل، لذا فهذا المفهوم مهم. ستقوم هذا المقال بتقديم تقنية نواة النواة التي تتوافق مع هذا المفهوم.
مزايا نواة النواة:
1. يمكنه إعادة استخدام نواة النواة الموجودة، مما يقلل من تكاليف الأداء التي تنشأ من إنشاء وتدمير العناصر;
2. يمكنه التحكم في عدد النواة القصوى المتاحة، زيادة استفادة موارد النظام، وضمان عدم وجود تنافس كبير لل موارد، وتجنب الت拥堵;
3. يمكنه إدارة متعددة النواة بسهولة، مما يجعل استخدام النواة بسيطًا ومفيدًا.
إطار العمل للنواة
java في نواة الفئات يتم تنفيذها من خلال إطار العمل Executor، يتضمن إطار العمل الفئات: Executor،Executors،ExecutorService،ThreadPoolExecutor،Callable و Future،FutureTask الاستخدام، إلخ.
Executor: واجهة جميع نواة الفئات، لديها طريقة واحدة فقط.
public interface Executor { void execute(Runnable command); }
ExecutorService: يزيد من سلوك Executor، هي أقرب واجهة تنفيذ لـ Executor.
Executors: يقدم مجموعة من طرق التصنيع للنواة، العوامل التي يتم إنشاؤها جميعها تنفذ واجهة ExecutorService.
ThreadPoolExecutor: تصنيع فئة لتنفيذ مسبق للنواة, العادة هي أن جميع نواة الفئات المستخدمة تعتمد على هذه الفئة. طريقة بناءها كالتالي:
public ThreadPoolExecutor(int النواة_حجم_النواة_الأساسية, int الحجم_القصوى_للنواة, long وقت_الإبقاء_الحياة, TimeUnit وحدة, BlockingQueue<Runnable> قائمة_العمل) { this(النواة_حجم_النواة_الأساسية, الحجم_القصوى_للنواة, وقت_الإبقاء_الحياة, وحدة, قائمة_العمل, Executors.defaultThreadFactory(), defaultHandler); }
corePoolSize: هو عدد الأنماط الأساسية لمكتب العمل، ولن يزيد عدد الأنماط التي تعمل في مكتب العمل أبدًا عن corePoolSize، ويكون قياسيًا قادرًا على البقاء على قيد الحياة إلى الأبد. يمكن تعيين allowCoreThreadTimeOut كـ True، حيث يكون عدد الأنماط الأساسية 0، ويقوم keepAliveTime بتحكم في وقت انتهاء المدة الفارغة لجميع الأنماط.
maximumPoolSize: هو عدد الأنماط الأعلى الذي يسمح به مكتب العمل;
keepAliveTime: يشير إلى وقت انتهاء المدة الفارغة للسطر;
unit: هو سمة من نوع إnum، تمثل وحدة keepAliveTime.
workQueue: يمثل حاجز منعطف من نوع BlockingQueue<Runnable> لوضع المهام.
BlockingQueue: هي حاجز منعطف (BlockingQueue) تستخدم أساسًا تحت java.util.concurrent للتحكم في التوافق بين الأنماط. إذا كان حاجز منعطف فارغًا، فإن عملية سحب العناصر منه ستتوقف وتنتظر حتى يتم إضافة شيء إلى حاجز منعطف. بنفس الطريقة، إذا كان حاجز منعطف ممتلئًا، فإن أي محاولة لوضع شيء فيه ستتوقف وتنتظر حتى يتم إضافة مساحة إلى حاجز منعطف. تُستخدم حاجز منعطف عادة في سيناريوهات المنتج والمستهلك، حيث يكون المنتج هو النمط الذي يضيف العناصر إلى الصف، والمستهلك هو النمط الذي يأخذ العناصر منه. يحتوي حاجز منعطف على حاوية لوضع العناصر من قبل المنتج، ويمكن للمستهلك أن يأخذ العناصر فقط من هذه الحاوية. وتشمل الأمثلة على الصفوف المحددة LinkedBlockingQueue، ArrayBlockingQueue، إلخ. عادةً، يتم تنفيذ الحاجز منعطف من خلال Lock و Condition (تعلم استخدامهما للقفل (Lock) والشرط (Condition)).
يتمتع مكتب العمل بالعملية التالية:
عند إنشاء مكتب العمل لأول مرة، لا يحتوي على أي أنماط. يتم نقل صف الأنماط كمعامل. ومع ذلك، حتى لو كان هناك مهام في الصف، لن يقوم مكتب العمل بتنفيذها على الفور.
عند استدعاء طريقة execute() لإضافة مهمة، يقوم مكتب العمل بالتحقق من ما يلي:
إذا كان عدد الأنماط التي تعمل حاليًا أقل من corePoolSize، فإنه سيقوم بإنشاء سطر جديد لتنفيذ هذه المهمة.
إذا كان عدد الأنماط التي تعمل حاليًا أكبر أو يساوي corePoolSize، فإنه سيضع هذه المهمة في الصف.
إذا كانت الحاوية ممتلئة في هذا الوقت، وعدد السيزارات التي تعمل حاليًا أقل من maximumPoolSize، فإن الحاوية ستقوم بإنشاء سطر غير أساسي جديد لتنفيذ هذه المهمة على الفور;}
إذا كانت الحاوية ممتلئة، وعدد السيزارات التي تعمل حاليًا أكبر أو يساوي maximumPoolSize، فإن الحاوية ستقوم بإطلاق استثناء RejectExecutionException.
إذا انتهت مهمة سطر من السيزارات، فإنه سيأخذ مهمة أخرى من الخط إلى التنفيذ.
إذا لم يكن هناك شيء يقوم به سطر من السيزارات لفترة زمنية معينة (keepAliveTime)، فإن الحاوية ستقوم بالتحقق، وإذا كان عدد السيزارات التي تعمل حاليًا أكبر من corePoolSize، فإن هذا السطر سيتم إيقافه. لذلك، بعد اكتمال جميع المهام في الحاوية، ستقوم الحاوية بال縮يد إلى حجم corePoolSize.
إنشاء واستخدام حاوية السيزارات
يتم إنشاء حاوية السيزارات باستخدام طريقة ثابتة لفئة الأدوات Executors، وتأتي أدناه أنواع عدة من حاويات السيزارات.
SingleThreadExecutor: سطر خلفي واحد (محتوياته غير محدودة الحجم)
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
إنشاء حاوية سيزار بسيطة. تحتوي هذه الحاوية على سطر أساسي واحد يعمل فقط، مما يعني تنفيذ جميع المهام بشكل سلس على سطر واحد. إذا انتهت حياة هذا السطر الوحيد بسبب خطأ، فإن سطر جديد سيقوم بتعويضه. تضمن هذه الحاوية تنفيذ جميع المهام وفقًا لترتيب تقديم المهام.
FixedThreadPool: حاوية سيزار ثابتة تحتوي فقط على سطور أساسية (والمحتويات الخاصة بها غير محدودة الحجم).
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
إنشاء حاوية سيزار بالحجم الثابت. يتم إنشاء سطر جديد لكل مهمة يتم تقديمها حتى يصل حجم الحاوية إلى الحجم الأقصى. إذا وصل حجم الحاوية إلى الحجم الأقصى، فإنه سيبقى ثابتًا، وإذا انتهت حياة سطر بسبب خطأ في التنفيذ، فإن الحاوية ستقوم بإكمال سطر جديد.
CachedThreadPool: صندوق أعمال غير محدود، يمكنه إعادة التشغيل التلقائي للخطوط.}
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
إذا كان حجم صندوق الأعمال أكبر من عدد الأعمال المطلوبة لمعالجة الأعمال، فإنه سيقوم بإعادة التشغيل جزء من الأعمال الحرة (60 ثانية بدون تنفيذ العمل) عند زيادة عدد الأعمال. لن يضع هذا النصذيد أي حدود على حجم صندوق الأعمال، وسيكون حجم صندوق الأعمال يعتمد تمامًا على أكبر حجم لصندوق الأعمال الذي يمكن إنشاؤه من قبل نظام التشغيل (أو JVM). SynchronousQueue هو حاجز يحتوي على حاوية واحدة.
ScheduledThreadPool:核心线程池固定,大小无限的线程ص. هذا النصذيد يدعم تنفيذ المهام المزمنة والمتكررة.
public static ExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPool(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在DEFAULT_KEEPALIVEMILLIS时间内回收。
线程池最常用的提交任务的方法有两种:
تنفيذ:
ExecutorService.execute(Runnable runable);
submit:
FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable, T Result);
FutureTask<T> task = ExecutorService.submit(Callable<T> callable);
تنفيذ submit(Callable callable)،submit(Runnable runnable) مشابه.
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); FutureTask<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
من الواضح أن task التي يفتحها submit هي التي تحمل نتيجة، سيتم إرجاع FutureTask object، بهذا يمكن الحصول على النتيجة من خلال method get(). سيتم أيضًا إدخال execute(Runnable runable) في نهاية submit، submit هو مجرد وضع Callable object أو Runnable في object FutureTask، لأن FutureTask هو Runnable، لذا يمكن تنفيذها في execute. حول كيفية تعبئة Callable object وRunnable إلى FutureTask، راجع Callable وFuture وFutureTask.
مبدأ تنفيذ thread pool
إذا تحدثنا فقط عن استخدام thread pool، فإن هذا المقال ليس له قيمة كبيرة، لا أكثر من مجرد التعرف على API ذات الصلة بExecutor. لم يتم استخدام كلمة مفتاح Synchronized في عملية تنفيذ thread pool، بل تم استخدامه Volatile،Lock وline synchronous (حجرة الانتظار) ووحدات Atomic والكلاسات ذات الصلة، مثل FutureTask، إلخ، لأن أداءها أفضل. يمكن للعملية الفهم أن يتعلم بشكل جيد أفكار التحكم في التوازي في الكود المصدر.
في بداية المقال ذكرت مزايا thread pool يمكن تلخيصها في النقاط التالية:
تكرار thread
تحكم في عدد التشغيل المتوازي الأقصى
إدارة thread
1. عملية التكرار للthread
للتفهم مبدأ التكرار للthreads يجب أن يتم فهم دورة حياة thread أولاً.
في دورة حياة thread،هو يجب أن يمر عبر 5��态: الجديد (New)،الحالة القابلة للتنفيذ (Runnable)،الحالة التشغيلية (Running)،الحالة المتوقفة (Blocked) والحالة الميتة (Dead).
Thread من خلال new يُستخدم لبناء thread جديد، هذا العملية هو لتحديد بعض معلومات thread مثل اسم thread،id،group الذي ينتمي إليه thread،ويمكن اعتبارها مجرد object عادي. بعد إدخال start() في Thread، سينشئ Java Virtual Machine stack method والعداد البرنامج، وسيتم تعيين hasBeenStarted إلى true،ثم عند إدخال start method سيكون هناك استثناء.
threads في هذه الحالة ليست قد بدأت في التشغيل بعد، بل إنها تمثل فقط أن thread يمكن تشغيله. بالنسبة لوقت بدء تشغيل thread، يعتمد على schedule thread scheduler في JVM. عند الحصول على CPU، يتم استدعاء method run(). لا تقوم بإنشاء طلب Thread's run() نفسها. بعد ذلك، يتم التبديل بين حالة الاستعداد - التشغيل - المعطل بناءً على schedule CPU، حتى ينتهي method run() أو يتم إيقاف thread بطريقة أخرى، ويصل إلى حالة dead.
لذلك، يجب أن تكون فكرة تنفيذ تكرار الاستخدام للthreads هي الحفاظ على حالة thread في حالة الحياة (مستعد، يعمل أو معطل). الآن دعونا نرى كيف يقوم ThreadPoolExecutor بتنفيذ تكرار الاستخدام للthreads.
في ThreadPoolExecutor، يتم التحكم في تكرار الاستخدام للthreads بواسطة Worker class الرئيسية. لنظر في رمز Worker class بعد التبسيط، مما يجعل الفهم أسهل:
private final class Worker implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } final void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null){ task.run(); } }
Worker هوRunnable، يمتلك أيضًا thread، وهو thread يجب فتحه، عند إنشاء Worker object، يتم إنشاء Thread object جديد، ويتم إدخال Worker نفسه كمعامل إلى TThread، وبالتالي عند إجراء طلب start() لـ Thread، يتم تشغيل run() لـ Worker، ثم إلى runWorker()، هناك loop while، يستمر في الحصول على object Runnable من getTask()، يتم تنفيذها بالترتيب. كيف يحصل getTask() على object Runnable؟
لا تزال الكود المبسط:}
النوع العام private Runnable الحصول على مهمة() { إذا (بعض الحالات الخاصة) { إرجاع null; } Runnable r = workQueue.take(); إرجاع r; }
هذا workQueue هو مجموعة الحجوزات من BlockingQueue التي تحتوي على المهام التي سيتم تنفيذها عند إنشاء ThreadPoolExecutor، وتحتوي هذه المجموعة على المهامRunnable التي سيتم تنفيذها. لأن BlockingQueue هو مجموعة حجوزات مقيدة، إذا كان BlockingQueue.take() عائداً بالصفر، يدخل في حالة الانتظار حتى يتم إضافة كائن جديد إلى BlockingQueue مما يقوم ب��ان التسمية الموقوفة. لذلك، في العادة، لا ينتهي run() للThread، بل يستمر في تنفيذ المهام من workQueue، مما يحقق مبدأ إعادة استخدام الأطراف.
2. التحكم في عدد التوازي الأقصى
متى يتم إضافة Runnable إلى workQueue؟ ومتى يتم إنشاء Worker، وما هو وقت تفعيل start() في Thread داخل Worker للبدء في تنفيذ run() لWorker؟ من خلال التحليل أعلاه يمكن ملاحظة أن تنفيذ runWorker() في Worker ي�行راء المهام الواحدة تلو الأخرى، بترتيب متسلسل، إذن كيف يتم体现 التوازي؟
من السهل التفكير أن بعض المهام المذكورة أعلاه تتم عند تنفيذ execute(Runnable runnable). لنلق نظرة على كيفية تنفيذ ذلك داخل تنفيذ.
تنفيذ:
الكود المبسط
النوع العام void تنفيذ (Runnable الأمر) { إذا (الأمر == null) إرمز إلى NullPointerإكسception(); إعداد c = ctl.get(); // عدد الأطراف الحالية للنواة < حجم النواة القصوى إذا (عدد العمال في c < حجم النواة القصوى) { // قم ببدء نماط جديد مباشرة. if (addWorker(command, true)) return; c = ctl.get(); } // عدد الأنماط النشطة >= corePoolSize // حالة runState RUNNING والصف لم يملأ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // تأكد مرة أخرى من حالة RUNNING // إذا كانت الحالة ليست RUNNING، قم بإزالة المهمة من workQueue ورفضها if (!isRunning(recheck) && remove(command)) reject(command);// رفض المهمة باستخدام استراتيجية حاوية الأنماط المحددة // حالتان: // رفض المهمة الجديدة عند عدم حالة RUNNING // فشل بدء نماط جديد عند ملء الصف (workCount > maximumPoolSize) } else if (!addWorker(command, false)) reject(command); }
addWorker:
الكود المبسط
private boolean addWorker(Runnable firstTask, boolean core) { int wc = workerCountOf(c); if (wc >= (core ? corePoolSize : maximumPoolSize)) { return false; } w = new Worker(firstTask); final Thread t = w.thread; t.start(); }
من خلال النظر في الكود، يمكننا رؤية كيفية إضافة المهمة في عملية عمل حاوية الأنماط المذكورة سابقًا:
* إذا كان عدد الأنماط التي تعمل حاليًا أقل من corePoolSize، فإنه يجب إنشاء نماط على الفور لتشغيل هذه المهمة;
* إذا كان عدد الأنماط التي تعمل حاليًا أكبر أو يساوي corePoolSize، فإنه يجب وضع هذه المهمة في الصف;
* إذا تم ملء الصف في هذه اللحظة وعدد الأنماط التي تعمل حاليًا أقل من maximumPoolSize، فإنه يجب إنشاء نماط غير رئيسية على الفور لتشغيل هذه المهمة;
* إذا تم ملء الصف وعدد الأنماط التي تعمل حاليًا أكبر أو يساوي maximumPoolSize، فإن حاوية الأنماط ستقوم بإطلاق استثناء RejectExecutionException.
This is the reason why Android's AsyncTask throws RejectExecutionException when executing in parallel and exceeds the maximum task number, see detailed explanation and dark side of AsyncTask based on the latest version of source code
If the new thread is successfully created through addWorker, the new thread is started through start(), and the firstTask is set as the first task to be executed in the run() of this Worker.
Although each Worker's task is processed serially, if multiple Workers are created, because they share a workQueue, they will be processed in parallel.
Therefore, the maximum concurrency is controlled according to corePoolSize and maximumPoolSize. The general process can be represented by the following figure.
The above explanation and diagram can help you understand this process very well.
If you are doing Android development and are familiar with the Handler principle, you may find this diagram quite familiar. Some processes in it are very similar to Handler, Looper, and Message usage. Handler.send(Message) is equivalent to execute(Runnable), and the Message queue maintained by Looper is equivalent to BlockingQueue, but it needs to be maintained through synchronization. The loop() function in Looper continuously takes Message from the Message queue, and the runWork() in Worker continuously takes Runnable from the BlockingQueue, which is the same principle.
3. Manage threads
Through the thread pool, thread reuse, control of concurrency, and destruction processes can be managed very well. The reuse of threads and control of concurrency have been discussed, and the management process is intertwined among them, which is also easy to understand.
In ThreadPoolExecutor, there is an AtomicInteger variable named ctl. This variable saves two contents through this variable:
The number of all threads, the state of each thread, where the lower 29 bits store the thread count, and the higher 3 bits store the runState, different values are obtained through bitwise operations.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // get thread state private static int runStateOf(int c) { return c & ~CAPACITY; } // الحصول على عدد العمال private static int workerCountOf(int c) { return c & CAPACITY; } // تحقق مما إذا كانت الخدمة تعمل private static boolean isRunning(int c) { return c < SHUTDOWN; }
هنا، سأقوم بتحليل عملية إغلاق حوض الخدمة من خلال shutdown و shutdownNow(). أولاً، لدي حوض الخدمة خمس حالات لتحكم في إضافة المهام والتنفيذ. سأقوم بشرح ثلاثة منها:
حالة RUNNING: يعمل حوض الخدمة بشكل طبيعي، يمكنه قبول المهام الجديدة وتعامل المهام في الصندوق;
حالة SHUTDOWN: لا يقبل المهام الجديدة، ولكن ستنفذ المهام في الصندوق;
حالة STOP: لا يقبل المهام الجديدة، ولا يعالج المهام في الصندوق shutdown هذه الطريقة ستبدل حالة runState إلى SHUTDOWN، وسيتوقف جميع الخدمات العاطلة، ولكن الخدمات التي تعمل لا تتأثر، لذا لن يتم تنفيذ المهام في الصندوق.
يضبط shutdownNow method حالة runState إلى STOP. الفرق بين shutdown method، هذا الأسلوب سيتوقف عن جميع الخدمات، لذا لن يتم تنفيذ المهام في الصندوق.
الخلاصة
من خلال تحليل شيفرة ThreadPoolExecutor، تعرفت بشكل عام على عملية إنشاء حوض الخدمة، وإضافة المهام، والتنفيذ، وبتعلم هذه العمليات، سيكون استخدام حوض الخدمة أسهل.
ومما تعلمته من ذلك بعض التحكم في التوازي، وكيفية استخدام نموذج المعالجة الإنتاجية-الاستهلاكية، سيكون له دور كبير في فهم أو حل مشاكل أخرى في المستقبل. مثل ميكانيكية Handler في Android، ويمكن استخدام BlookQueue لمعالجة قائمة Messager في Looper أيضًا، هذا هو ما أتعلمه من قراءة الشيفرة المصدرية.
هذا هو تجميع المعلومات المتعلقة بـ Java Thread Pool، وسنستمر في إضافة المعلومات ذات الصلة لاحقًا، شكرًا لكم على دعم هذا الموقع!