細談 Java 線程池
1. 線程池相關基本概念
1) 任務(Task):任務是線程池中要執行的工作單元。任務可以是實現了 Runnable 接口或 Callable 接口的對象。Runnable 任務沒有返回值,而 Callable 任務可以返回一個結果。
2) 線程池管理器(ThreadPool Manager):線程池管理器是用于創建和管理線程池的組件。它負責創建線程池,控制線程的創建和銷毀,并調度任務的執行。
3) 工作線程(Worker Threads):工作線程是線程池中實際執行任務的線程。線程池中可以有多個工作線程,它們并行地從任務隊列中獲取任務并執行。
4) 任務隊列(Task Queue):任務隊列是用于存儲待執行的任務的數據結構。當線程池中的工作線程空閑時,它們會從任務隊列中獲取任務并執行。
5) 拒絕策略(Rejection Policy):拒絕策略定義了當任務隊列已滿且無法繼續接收新的任務時,線程池應該如何處理新的任務。常見的拒絕策略包括丟棄任務、丟棄最早的任務、拋出異常等。
6) 線程池大?。≒ool Size):線程池大小指定了線程池中工作線程的數量。線程池的大小可以是固定的,也可以是根據需要自動調整的。
7) 核心線程數(Core Pool Size):核心線程數是線程池中保持活動狀態的最小工作線程數量。即使線程處于空閑狀態,核心線程也不會被銷毀。
8) 最大線程數(Maximum Pool Size):最大線程數是線程池中允許的最大工作線程數量。當任務隊列已滿且活動線程數達到最大線程數時,線程池可能會創建新的線程來執行任務。
9) 閑置線程回收時間(Keep-Alive Time):閑置線程回收時間是指當線程池中的線程數超過核心線程數,并且空閑一段時間后,多余的線程會被銷毀。
10) 線程工廠(Thread Factory):線程工廠用于創建線程池中的工作線程。它負責創建線程,并可以自定義線程的屬性和命名方式。
線程池的設計目的是提高系統的性能和資源利用率。通過重用線程和控制并發線程的數量,線程池可以減少線程創建和銷毀的開銷,避免資源耗盡,并提供更好的任務調度和執行控制。
在使用線程池時,我們可以根據任務的類型和系統的需求來選擇適當的線程池大小、拒絕策略和其他參數,以實現最佳的性能和可擴展性。
2. 線程池主要處理流程
1) 判斷核心線程池是否已滿,如果不是,則創建線程執行任務
2) 如果核心線程池滿了,判斷隊列是否滿了,如果隊列沒滿,將任務放在隊列中
3) 如果隊列滿了,則判斷線程池是否已滿,如果沒滿,創建線程執行任務
4) 如果線程池也滿了,則按照拒絕策略對任務進行處理
更進一步的里層核心類處理流程:
當我們使用 Java 中的 ThreadPoolExecutor 類來創建線程池時,其處理流程如下:
1) 任務提交:外部調用者通過調用 ThreadPoolExecutor 的 execute() 或 submit() 方法將任務提交給線程池。
2) 任務接收:ThreadPoolExecutor 接收到任務后,首先檢查線程池的狀態。如果線程池已經關閉,就不再接收新的任務。
3) 任務排隊:線程池將接收到的任務放入內部的任務隊列中等待執行。任務隊列可以是有界隊列(如 ArrayBlockingQueue)或無界隊列(如 LinkedBlockingQueue 或 SynchronousQueue)。
4) 工作線程獲取任務:線程池中的工作線程從任務隊列中獲取任務。如果任務隊列為空,工作線程可能會阻塞等待新任務的到來,或者在等待一定時間后退出。
5) 任務執行:工作線程獲取到任務后,調用任務對象的 run() 方法執行任務的具體邏輯。
6) 任務完成:任務執行完成后,可以返回一個結果(對于 Callable 任務),或者不返回任何結果(對于 Runnable 任務)。
7) 任務狀態更新:ThreadPoolExecutor 會更新任務的狀態,包括任務的執行進度、執行結果等信息。
8) 結果返回:如果任務是 Callable 任務,ThreadPoolExecutor 會將任務的執行結果封裝在 Future 對象中返回給調用者。調用者可以通過 Future 對象獲取任務的執行結果。
9) 繼續處理下一個任務:工作線程完成當前任務后,會繼續從任務隊列中獲取下一個任務進行處理。
10) 線程回收:如果線程池中的線程處于空閑狀態,并且空閑時間超過一定閾值,ThreadPoolExecutor 可能會回收這些空閑線程,以避免資源的浪費。
11) 異常處理:ThreadPoolExecutor 會捕獲任務執行過程中的異常,并根據預定義的異常處理策略進行處理,比如記錄日志、統計異常次數等。
12) 線程池關閉:當不再需要線程池時,調用 ThreadPoolExecutor 的 shutdown() 或 shutdownNow() 方法來停止線程池的運行。關閉線程池的過程包括不再接收新任務、等待已提交的任務執行完成、銷毀工作線程等操作。
ThreadPoolExecutor 是 Java 中用于創建和管理線程池的核心類,通過其靈活的配置參數,可以實現對線程池的各種行為和特性進行定制。這個類提供了豐富的方法和選項,用于控制線程池的大小、任務隊列類型、拒絕策略、線程工廠等,以滿足不同場景下的需求。
3. 線程池實現的主要步驟
3.1 創建線程池
在 Java 中,可以使用 Executors 類提供的靜態方法創建線程池。以下是幾種常見的創建線程池的方法:
3.1.1 創建固定大小的線程池
固定大小的線程池將在初始化時創建指定數量的線程,并且不會增加或減少線程的數量。
ExecutorService executorService = Executors.newFixedThreadPool(10);//創建一個固定大小為 10 的線程池
3.1.2 創建單個線程的線程池
單個線程的線程池只會創建一個工作線程來執行任務。
ExecutorService executorService = Executors.newSingleThreadExecutor();
3.1.3 創建可根據需要自動調整大小的線程池
可根據需要自動調整大小的線程池將根據任務的數量動態地增加或減少線程的數量。
ExecutorService executorService = Executors.newCachedThreadPool();
3.1.4 手動按自己需求創建線程池
在 Java 中,可以手動創建線程池,而不僅僅依賴于內置的線程池實現。手動創建線程池的主要原因是為了更好地控制線程池的行為、特性和參數配置,以滿足特定的需求,比如特定的任務隊列類型需求,特定的拒絕策略需求
根據ThreadPoolExecutor構造方法可知,需要準備以下參數:
1) 核心線程數(corePoolSize):核心線程數是線程池中保持活動狀態的線程數。即使這些線程處于空閑狀態,它們也不會被回收。線程池會根據任務的數量和任務隊列的狀態來動態調整線程池中的線程數量。
2) 最大線程數(maximumPoolSize):最大線程數指定了線程池中允許存在的最大線程數量。當任務數量超過核心線程數并且任務隊列已滿時,線程池會創建新的線程來處理任務,直到達到最大線程數。如果達到最大線程數后仍有任務到來,采用拒絕策略處理新任務。
3) 空閑線程存活時間(keepAliveTime):當線程池中的線程數超過核心線程數,并且處于空閑狀態時,空閑線程存活時間指定了它們在沒有接收到新任務時的存活時間。超過存活時間后,空閑線程將被回收,直到線程池中的線程數不超過核心線程數。
4) 時間單位(unit):用于指定時間參數的單位,可以是秒、毫秒、微秒等。
5) 任務隊列(workQueue):任務隊列用于存儲等待執行的任務??梢赃x擇合適的隊列類型,如有界隊列(如 ArrayBlockingQueue)或無界隊列(如 LinkedBlockingQueue)。
6) 線程工廠(threadFactory):線程工廠用于創建線程對象??梢宰远x線程工廠類,實現創建線程的邏輯。
7) 拒絕策略(rejectedExecutionHandler):當任務隊列已滿并且線程池中的線程數達到最大線程數時,拒絕策略指定了如何處理新的任務??梢赃x擇預定義的拒絕策略,如拋出異常、丟棄任務等,或者自定義拒絕策略。
import java.util.concurrent.*; // 自定義拒絕策略類 class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { // 自定義拒絕策略的邏輯 System.out.println("Task Rejected: " + runnable.toString()); // 可根據需求進行不同的處理方式,如拋出異常、丟棄任務、調用者執行等 } } // 自定義線程工廠類 class CustomThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable runnable) { // 自定義線程工廠的邏輯 Thread thread = new Thread(runnable); // 可以進行一些線程屬性的配置,如設置線程名稱、優先級等 thread.setName("CustomThread"); thread.setPriority(Thread.NORM_PRIORITY); return thread; } } public class ManualThreadPoolCreationExample { public static void main(String[] args) { // 創建自定義的拒絕策略實例 RejectedExecutionHandler rejectionHandler = new CustomRejectedExecutionHandler(); // 創建自定義的線程工廠實例 ThreadFactory threadFactory = new CustomThreadFactory(); // 創建任務隊列(這里使用無界隊列) BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); // 創建線程池并進行手動配置 int corePoolSize = 10; int maxPoolSize = 20; long keepAliveTime = 60; TimeUnit timeUnit = TimeUnit.SECONDS; ThreadPoolExecutor threadPool = new ThreadPoolExecutor( corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, threadFactory, rejectionHandler ); } }
3.2 提交任務給線程池執行
創建線程池之后,可以將任務提交給線程池執行。任務可以是實現了 Runnable 接口的對象,也可以是實現了 Callable 接口的對象。
3.2.1 提交 Runnable 任務
executorService.execute(new Runnable() { @Override public void run() { // 任務邏輯 } }); 或者使用 Lambda 表達式: executorService.execute(() -> { // 任務邏輯 });
3.2.2 提交 Callable 任務
Callable 任務可以返回一個結果
Future<SomeResult> future = executorService.submit(new Callable<SomeResult>() { @Override public SomeResult call() throws Exception { // 任務邏輯 return someResult; } }); 或者使用 Lambda 表達式: Future<SomeResult> future = executorService.submit(() -> { // 任務邏輯 return someResult; });
3.3 關閉線程池
當不再需要線程池時,應該顯式地關閉它,以釋放資源。關閉線程池兩種方式
executorService.shutdown(); // 不再接受新的任務,但會等待已提交的任務執行完成。 executorService.shutdownNow(); //希望立即關閉線程池,并嘗試中斷正在執行的任務
3.4 處理任務執行結果
當提交任務給線程池執行后,可以通過 Future 對象來獲取任務的執行結果。
Future<SomeResult> future = executorService.submit(...); try { SomeResult result = future.get(); // 處理結果 } catch (InterruptedException e) { // 處理中斷異常 } catch (ExecutionException e) { // 處理執行異常 }
使用 future.get() 方法可以阻塞當前線程,直到任務執行完成并返回結果。get() 方法可能會拋出 InterruptedException 和 ExecutionException 異常,需要進行適當的異常處理。