English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Подробное объяснение и пример кода Java Thread Pool

Технический фон线程-пools

В面向对象编程中,创建和销毁对象是很费时的,因为创建一个对象需要获取内存资源或其他更多资源。在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。

Таким образом, одним из средств повышения эффективности служебных программ является минимизация次数 создания и уничтожения объектов, особенно создания и уничтожения объектов, которые занимают много ресурсов. Как использовать существующие объекты для обслуживания - это ключевая проблема, которую необходимо решить, и это именно причина возникновения таких технологий, как "пул ресурсов".

например, многие общие компоненты, часто встречающиеся в Android, не могут обойтись без концепции "пула", такие как различные библиотеки для загрузки изображений, библиотеки для сетевых запросов, даже в механизме сообщения Android Message, когда используется Message.obtain(), это использование объектов из Message- пула, поэтому эта концепция очень важна. В этой статье介绍的 технология пулов также соответствует этому мышлению.

преимущества пулов线程:

1. повторно использует потоки в пуле, уменьшает производственные издержки, связанные с созданием и уничтожением объектов;

2. эффективно контролирует максимальное количество параллельных потоков, повышает использование системных ресурсов, одновременно избегает избыточной конкуренции за ресурсы и блокировки;

3. позволяет многоthread- manage просто, делая использование потоков простым и эффективным.

фреймворк пулов线程 Executor

java- thread pool реализуется через фреймворк Executor, фреймворк Executor включает классы: Executor, Executors, ExecutorService, ThreadPoolExecutor, Callable и Future, FutureTask и т.д.

Executor: это интерфейс всех пулов线程, у него есть только один метод.

public interface Executor {  
 void execute(Runnable command);  
}

ExecutorService: добавляет поведение Executor, это наиболее прямой интерфейс для класса реализации Executor.

Executors: предоставляет一系列 фабричных методов для создания池ов线程, возвращаемые пулы всегда реализуют интерфейс ExecutorService.

ThreadPoolExecutor: это конкретный класс реализации池а线程, обычно все используемые различные池ы реализованы на основе этой класса. Метод конструктора следующий:

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,)
Executors.defaultThreadFactory(), defaultHandler);
}

corePoolSize: это количество ядерных потоков пул потоков, количество выполняющихся потоков в пул потоков никогда не должно превышать corePoolSize, по умолчанию они могут жить вечно. Их можно изменить, установив allowCoreThreadTimeOut в True, в этом случае количество ядерных потоков будет равно 0, а keepAliveTime будет контролировать время ожидания всех потоков.

maximumPoolSize: это максимальное количество потоков, которое может разрешить пул потоков;

keepAliveTime: это время ожидания для завершения空闲线程的超时时间;

unit: это энум, который представляет собой единицу измерения keepAliveTime;

workQueue: это BlockingQueue<Runnable>, который используется для хранения задач.

BlockingQueue: блокирующая очередь (BlockingQueue) - это основное средство для управления синхронизацией потоков в java.util.concurrent. Если BlockQueue пуст, операции по извлечению данных из BlockingQueue будут блокированы и перейти в состояние ожидания, пока BlockingQueue не получит данные, и только после этого будет вызван к ним переход. Точно так же, если BlockingQueue полна, любая попытка добавить данные в нее также будет блокирована и перейти в состояние ожидания, пока в BlockingQueue не появится место, и только после этого можно продолжить операции. Блокирующая очередь часто используется в сценариях производителя-потребителя, где производителем являются потоки, добавляющие элементы в очередь, а потребителем - потоки, извлекающие элементы из очереди. Блокирующая очередь является контейнером для хранения элементов производителем, а потребители также только извлекают элементы из контейнера. Конкретные реализации классов включают LinkedBlockingQueue, ArrayBlockingQueued и т.д. Внутренне они реализуются через Lock и Condition (изучение и использование явных замков (Lock) и Condition).

Процесс работы пул потоков выглядит следующим образом:

При создании пул потоков внутри него нет потоков. КwargsQueue передается в качестве параметра. Однако, даже если в очереди есть задачи, пул потоков не будет их сразу выполнять.

Когда вызывается метод execute() для добавления задачи, пул потоков выполняет следующие действия:

Если количество выполняющихся потоков меньше corePoolSize, то немедленно создается поток для выполнения этой задачи;

Если количество выполняющихся потоков больше или равняется corePoolSize, то эта задача будет добавлена в очередь;

Если в этот момент очередь заполнена, и количество работающих потоков меньше maximumPoolSize, то все же создается неядерный поток для немедленного выполнения этой задачи;

Если очередь заполнена и количество работающих потоков больше или равно maximumPoolSize, то пул потоков выбрасывает исключение RejectExecutionException.

Когда поток выполняет задачу, он берет следующую задачу из очереди для выполнения.

Когда поток больше не имеет задач для выполнения, и это продолжается в течение определенного времени (keepAliveTime), пул потоков определяет, если текущее количество работающих потоков больше corePoolSize, то этот поток отключается. Таким образом, после выполнения всех задач пул потоков в конечном итоге сжимается до размера corePoolSize.

Создание и использование пулов потоков

Создание пулов потоков использует статические методы класса инструментов Executors. Вот несколько общих типов пулов потоков.

SingleThreadExecutor: один фоновый поток (его буферная очередь неограничена)

public static ExecutorService newSingleThreadExecutor() {  
 return new FinalizableDelegatedExecutorService (
  new ThreadPoolExecutor(1, 1,         
  0L, TimeUnit.MILLISECONDS,         
  new LinkedBlockingQueue<Runnable>())); 
}

Создание пула потоков с одним потоком. Этот пул потоков имеет только один ядренный поток, который работает, что эквивалентно однопотоковому параллельному выполнению всех задач. Если единственный поток завершается по причине исключительной ситуации, то вместо него будет создан новый поток. Этот пул потоков гарантирует, что все задачи выполняются в порядке их提交а.

FixedThreadPool: пул потоков с только ядреными потоками, размер фиксирован (его буферная очередь неограничена).

public static ExecutorService newFixedThreadPool(int nThreads) {        
        return new ThreadPoolExecutor(nThreads, nThreads,                                      
            0L, TimeUnit.MILLISECONDS,                                        
            new LinkedBlockingQueue<Runnable>());    
}
Создание фиксированного大小的 пула потоков. При каждом提交е задачи создается новый поток, пока количество потоков не достигнет максимального размера пулов. Размер пулов сохраняется неизменным, если поток завершается по причине выполнения исключительной ситуации, то пул добавляет новый поток.

CachedThreadPool:无界线程池,可以进行自动线程回收。

public static ExecutorService newCachedThreadPool() {   
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,           
   60L, TimeUnit.SECONDS,          
   new SynchronousQueue<Runnable>());  
}

如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能地添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。SynchronousQueue是一个缓冲区为1的阻塞队列。

ScheduledThreadPool:核心线程池固定,大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

public static ExecutorService newScheduledThreadPool(int corePoolSize) {   
 return new ScheduledThreadPool(corePoolSize, 
    Integer.MAX_VALUE,             
    DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,             
    new DelayedWorkQueue()); 
}

创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在DEFAULT_KEEPALIVEMILLIS时间内回收。

线程池最常用的提交任务的方法有两种:

execute:

ExecutorService.execute(Runnable runable);

submit:

FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable, T Result);

FutureTask<T> task = ExecutorService.submit(Callable<T> callable);

Реализация submit(Callable callable), submit(Runnable runnable) аналогична.

public <T> Future<T> submit(Callable<T> task) {
 if (task == null) throw new NullPointerException();
 FutureTask<T> ftask = newTaskFor(task);
 execute(ftask);
 return ftask;
}

可以看出submit запускает задачу с возвращаемым результатом, которая возвращает объект FutureTask, таким образом, можно получить результат через метод get(). В конечном итоге submit также вызывает execute(Runnable runable), submit просто упаковывает объект Callable или Runnable в объект FutureTask, так как FutureTask implements Runnable, поэтому его можно выполнить в execute. О том, как упаковывать объект Callable и Runnable в объект FutureTask, см. в разделе использования Callable и Future, FutureTask.

Принцип реализации пулов нитей

Если говорить только о использовании пулов нитей, то эта статья не имеет большой ценности, она всего лишь процесс знакомства с API Executor. Реализация пулов нитей не использует ключевое слово Synchronized, а использует Volatile, Lock и синхронные (блокирующие) очереди, классы Atomic и FutureTask и т.д., потому что последние имеют лучшую производительность. Процесс понимания может хорошо изучить思想 управления параллельностью в исходном коде.

В начале упоминалось, что преимущества пула нитей можно резюмировать в следующие три точки:

Рециклирование нитей

Контроль максимального количества параллельных запросов

Управление нитями

1. Процесс рециклирования нитей

Чтобы понять принцип рециклирования нитей, сначала нужно понять жизнь нити.

В жизни нити она проходит через 5 состояния:新建(New), готовность(Runnable), выполнение(Running), блокирование(Blocked) и смерть(Dead).

Thread через new создает новую нить, этот процесс initializes некоторые информации о нити, такие как имя нити, идентификатор, группа, к которой принадлежит нить, и можно считать, что это просто обычный объект. После вызова start() виртуальная машина Java создает метод для вызова стека и счетчик программы, а также устанавливает hasBeenStarted в true, после чего вызов start метода вызовет исключение.

Потоки, находящиеся в этом состоянии, еще не начали работать, они просто указывают, что поток может быть запущен. Когда线程 начнет работать, это зависит от планировщика потоков JVM. После того как поток получит CPU, будет вызван метод run(). Не нужно самому вызывать метод run() Thread. Затем, в зависимости от планирования CPU, поток будет переключаться между состояниями готовности, выполнения и блокировки, до тех пор, пока метод run() не завершится или поток не будет остановлен другим способом, после чего он перейдет в состояние dead.

Таким образом, принцип реализации повторного использования потоков должен заключаться в поддержании состояния жизни потока (готовность, выполнение или блокировка). Далее рассмотрим, как ThreadPoolExecutor реализует повторное использование потоков.

В ThreadPoolExecutor используется класс Worker для управления повторным использованием потоков. Давайте посмотрим на упрощенный код класса Worker, чтобы лучше понять это:

private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null){
task.run();
}
}

Worker является Runnable, у него есть один поток, который нужно запустить. При создании объекта Worker создается также объект Thread, который будет запущен, и сам Worker передается в TThread в качестве параметра. Таким образом, при вызове метода start() Thread,运行的实际上是Worker метод run(). Затем к runWorker(), где есть цикл while, который постоянно получает объект Runnable из getTask(), и выполняет их по порядку. Как же getTask() получает объект Runnable?

依旧是简化后的代码:

private Runnable getTask() {
 if(一些特殊情况) {
  return null;
 }
Runnable r = workQueue.take();
return r;
}

这个workQueue就是初始化ThreadPoolExecutor时存放任务的BlockingQueue队列,这个队列里的存放的都是将要执行的Runnable任务。因为BlockingQueue是个阻塞队列,BlockingQueue.take()得到如果是空,则进入等待状态直到BlockingQueue有新的对象被加入时唤醒阻塞的线程。所以一般情况Thread的run()方法就不会结束,而是不断执行从workQueue里的Runnable任务,这就达到了线程复用的原理了。

2.控制最大并发数

那Runnable是什么时候放入workQueue?Worker又是什么时候创建,Worker里的Thread的又是什么时候调用start()开启新线程来执行Worker的run()方法的呢?有上面的分析看出Worker里的runWorker()执行任务时是一个接一个,串行进行的,那并发是怎么体现的呢?

很容易想到是在execute(Runnable runnable)时会做上面的一些任务。看下execute里是怎么做的。

execute:

Упрощенный код

public void execute(Runnable command) {
 if (command == null)
  throw new NullPointerException();
int c = ctl.get();
// 当前线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// Прямо запустить новый поток.
if (addWorker(command, true))
return;
c = ctl.get();
}
// Количество активных потоков >= corePoolSize
// runState равен RUNNING и очередь не заполнена
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// Проверяем снова, является ли состояние RUNNING
// В состоянии NONRUNNING удаляем задачу из workQueue и отказываем
if (!isRunning(recheck) && remove(command))
reject(command); // Отказ в соответствии с политикой ThreadPool
// Два случая:
// Отказ в состоянии NONRUNNING для новых задач
// Не удалось запустить новый поток, когда очередь заполнена (workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}

addWorker:

Упрощенный код

private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}

Теперь посмотрим на добавление задачи в процессе работы ThreadPool, основываясь на коде:

* Если количество выполняющихся потоков меньше corePoolSize, то немедленно создают поток для выполнения этой задачи;  
* Если количество выполняющихся потоков больше или равно corePoolSize, то把这个 задачу ставят в очередь;
* Если в этот момент очередь заполнена и количество выполняющихся потоков меньше maximumPoolSize, то все равно нужно создать некритический поток для немедленного выполнения этой задачи;
* Если очередь заполнена и количество выполняющихся потоков больше или равно maximumPoolSize, то ThreadPool будет выбрасывать исключение RejectExecutionException.

Вот почему Android AsyncTask выбрасывает RejectExecutionException, когда параллельное выполнение выходит за пределы максимального количества задач, см. также интерпретацию исходного кода AsyncTask на основе последней версии и темные стороны AsyncTask.

Если создание нового потока через addWorker успешно, то через start() запускается новый поток, и firstTask выполняется как первая задача в run() этого Worker.

Хотя задачи каждого Worker обрабатываются последовательно, если создается несколько Worker, так как они делят один workQueue, они обрабатываются параллельно.

Таким образом, можно контролировать максимальное количество параллельных потоков, основываясь на corePoolSize и maximumPoolSize. Общий процесс можно представить на следующем рисунке.

Объяснение и схема могут помочь лучше понять этот процесс.

Если вы занимаетесь разработкой Android и хорошо знакомы с принципами работы Handler, то вам может показаться, что эта схема знакома, некоторые процессы и Handler, Looper, Message очень похожи. send(Message) Handler аналогичен execute(Runnable), Message-очередь, которую поддерживает Looper, аналогична BlockingQueue, только нужно самим поддерживать эту очередь через синхронизацию, функция loop() Looper повторно извлекает Message из очереди Message, и Worker, который постоянно извлекает Runnable из BlockingQueue, делает то же самое.

3. Управление потоками

с помощью线程-пул можно很好地 управлять повторным использованием потоков, контролем параллелизма и процессом разрушения, о повторном использовании потоков и контроле параллелизма уже было сказано, а процесс управления потоками уже включен в это, и он также легко понимается.

в ThreadPoolExecutor есть переменная AtomicInteger ctl. Через эту переменную сохраняются два содержимого:

количество всех потоков, состояние каждого потока, где низшие 29 бит хранят количество потоков, а высоко 3 бита хранят runState, чтобы получить различные значения, используется битовая арифметика.

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// получить состояние потока
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
// Получение количества Worker
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// Определение того, находится ли поток в рабочем состоянии
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

Здесь主要通过 shutdown и shutdownNow() анализируется процесс закрытия потока. Поток имеет пять состояний для управления добавлением и выполнением задач. Основные介绍了 следующие три:

Статус RUNNING: Поток正常运行, принимает новые задачи и обрабатывает задачи в очереди;

Статус SHUTDOWN: Новые задачи не принимаются, но задачи в очереди выполняются;

Статус STOP: Новые задачи не принимаются, задачи в очереди не обрабатываются; метод shutdown устанавливает runState в SHUTDOWN, останавливает все空闲 потоки, а потоки, которые работают, не受到影响, поэтому задачи в очереди будут выполнены.

Метод shutdownNow устанавливает runState в STOP. Разница с методом shutdown, этот метод останавливает все потоки, поэтому задачи в очереди также не будут выполняться.

Резюме
Анализируя исходный код ThreadPoolExecutor, можно в общем понять процесс создания потока, добавления задач и их выполнения,熟悉这些过程, использование потока будет более легким.

И что вы узнали из этого, например, о контроле параллельного выполнения и использовании модели производителя-потребителя для обработки задач, это будет очень полезно для понимания или решения других相关问题. Например, в Android механизме Handler, а в очереди Messager Looper можно использовать BlookQueue для обработки, это и есть результат чтения исходного кода.

Вот и заканчивается сбор материалов о Java потоковом池е, продолжим пополнять相关信息, спасибо всем за поддержку нашего сайта!

Вам может понравиться