ThreadPoolExecutorの調査
背景
Javaでスレッドプールを扱う際に、いつもお世話になっているThreadPoolExecutor
に関して、Executors
クラスのnewXyzThreadPool
メソッドを用いて生成するだけで、その中身を意識したことはありませんでした。今回はそのThreadPoolExecutor
関して、調べた結果をまとめます。
調査結果
ThreadPoolExecutor
の構成
下図の通りタスクが格納されるTask Queueと、スレッドが格納されるThread Poolで構成されています。
Task QueueとThread Poolを用いることで、以下が実現できます。
- タスク数に対してスレッド数が少ない場合(リクエスト過多)でも、タスクがTask Queueにバッファリングされる為、(Task Queueの上限までは)タスクの依頼が可能になる。
- Thread Poolのサイズを正しく調整することで、スレッド数が少なすぎる為にプロセッサが遊んでいる状態や、スレッド数が多すぎる為にリソースが不足する状態を防ぐことができる。
ThreadPoolExecutor
のタスク依頼時の挙動
execute
メソッドを呼び出し、タスクを依頼した際の挙動に関して、Thread Poolの容量に関連する以下変数とスレッド数の関係によって異なります。
- corePoolSize: アイドル状態でもプール内に維持されるスレッド数
- maxPoolSize: プール内の最大スレッド数
具体的には、下図の3パターンに分類されます。
maxPoolSize = スレッド数
の状態でタスクが拒否された場合、デフォルトでは、RejectedExecutionException
がスローされます。
Task Queueに使用するキューの種類
ThreadPoolExecutor
のコンストラクタを呼び出す際に、引数としてTask Queueを渡します。Task Queueに使用するキューの種類を以下にまとめます。
まず、下図の通りキューが要素を保持するものと、しないもので大別されます。次に、キューが要素を保持するものに関して、キューの容量を明示的に設定するものと、しないものに分類されます。
結果として、キューは3種類に分類され、ぞれぞれ以下の違いがあります。
キューが要素を保持する。
キューの容量を明示的に設定する。
ArrayBlockingQueue
などが該当します。上限を明示的に設定する為、リクエスト過多によるリソース不足を防ぐことができます。その一方で、キューの上限としてどの様な値が妥当か、検証を行う必要があります。
キューの容量を明示的に設定しない。
LinkedBlockingQueue
などが該当します。リソースが許す限りTask Queueにタスクの格納が可能である為、スレッド数はcorePoolSize
を超えません*1。一時的にリクエスト過多な状態を処理するには便利ですが、恒常的にリクエスト過多な状態が続くとキューの容量が増え続けてしまいます。
Executors
クラスのnewFixedThreadPool
, newSingleThreadExecutor
メソッドを用いて、ThreadPoolExecutor
を生成した際のTask Queueはこちらになります。
キューが要素を保持しない。
SynchronousQueue
が該当します。Task Queueへタスクの格納を行わず、直接Thread Poolにタスクを渡します。その為、依頼できるタスクの上限は、maxPoolSize
までになります。
Executors
クラスのnewCachedThreadPool
メソッドを用いて、ThreadPoolExecutor
を生成した際のTask Queueはこちらになります。
ThreadPoolExecutor
の終了
ThreadPoolExecutor
の終了に関して、後述するshutdown
メソッドとshutdownNow
メソッドが用意されています。全てのタスクが処理された後、ThreadPoolExecutor
は、終了スタータスになります。
shutdown
メソッド
shutdown
メソッドは、以下の通り順序正しくタスクを処理して終了します。
- メソッド呼び出し時に実行中だったタスク
タスクが完了するまで待つ。 - メソッド呼び出し時に待機中だったタスク
タスクが完了するまで待つ。 - メソッド呼び出し後に依頼されたタスク
タスクは拒否され、デフォルトではRejectedExecutionException
がスローさる。
shutdownNow
メソッド
shutdownNow
メソッドは、以下の通りタスクを処理して即時終了を試みます。
- メソッド呼び出し時に実行中だったタスク
interrupt
メソッドを呼び出して、タスクのキャンセルを試みる。 - メソッド呼び出し時に待機中だったタスク
タスクを実行せず、戻り値で該当するタスクのリストを返却する。 - メソッド呼び出し後に依頼されたタスク
タスクは拒否され、デフォルトではRejectedExecutionException
がスローされる。
*1:合わせて、「タスク追加時の挙動」をご覧下さい。
Javaにおけるスレッドの状態待ち
背景
マルチスレッドの処理において、排他制御の為にロックを取得し、期待する状態を待って後続処理を続行する。この様なよくある処理に関して、詳細を以下にまとめます。
実装例
早速ですが、スレッドの状態待ちにの実装例です。概要は以下の通りです。
ready
,count
: 各スレッドで共有する値です。- タスク1:
ready
ステータスがtrue
だった際に、count
に10加算する。 - タスク2:
ready
ステータスをtrue
に変更する。
public class ThreadSleepSample { // スレッド間で共有する値 private static boolean ready = false; // スレッド間で共有する値 private static int count = 0; // 各タスク内で呼び出しを行うインスタンスを生成する。 private static ThreadSleepSample threadSleepSample = new ThreadSleepSample(); // タスク1 private static Callable<Void> addCountTask = () -> { threadSleepSample.addCount(); return null; }; // タスク2 private static Callable<Void> changeReadyTask = () -> { TimeUnit.SECONDS.sleep(1); threadSleepSample.changeReady(); return null; }; public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); executorService.submit(addCountTask); executorService.submit(addCountTask); executorService.submit(changeReadyTask); executorService.shutdown(); } // モニタロックを取得する。(*1) private synchronized void addCount() throws InterruptedException { while (!ready) { System.out.println(Thread.currentThread() + " is waiting."); // 状態待ちを行う。(*2) wait(); System.out.println(Thread.currentThread() + " is not waiting."); } count += 10; System.out.println(count); } // モニタロックを取得する。(*1) private synchronized void changeReady() { ready = true; // 状態変更通知を行う。(*3) notifyAll(); } }
実行結果は以下の通りです。
Thread[pool-1-thread-2,5,main] is waiting. Thread[pool-1-thread-1,5,main] is waiting. Thread[pool-1-thread-2,5,main] is not waiting. 10 Thread[pool-1-thread-1,5,main] is not waiting. 20
解説
モニタロックの取得(*1)
モニタロックとは
モニタロックとは、各インスタンスに1つ用意されているロックのことです。排他制御にモニタロックを用いることで、あるインスタンスにおける特定処理の実行を、単一スレッドのみに制限しています。
モニタロックの取得方法
以下の通り記載することで、該当のメソッドを実行する前に、モニタロックの取得を試みます。
アクセス修飾子 synchronized 戻り値の型 メソッド名(...) {
実装例においては、threadSleepSample
インスタンスのaddCount
メソッド, changeReady
メソッドの実行は、モニタロックを取得した単一スレッドのみが可能です。
状態待ち(*2)
wait
メソッドを呼び出し、期待する状態待ちを行います。
sleep
メソッドでなく、wait
メソッドを用いる理由
処理を停止するなら、sleep
メソッドでも可能と考えるかもしれません。
ですが、sleep
メソッドには以下特徴があります。
- モニタロックをスレッドが保持し続ける。(状態更新にモニタロック取得が必須の場合、デッドロックになる。)
- 期待した状態が満たされたか否かに関わらず、指定した時間停止する。
一方で、wait
メソッドには以下特徴があります。
- モニタロックをスレッドが手放す。
- 期待した状態が満たされた可能性の通知(後述の状態変更通知を御覧下さい。)を受け取るまで、停止する。
上記をまとめると、両者の使い分けは以下の通りです。
sleep
メソッド: 1つのスレッドにおける、処理間隔の調整に用いる。wait
メソッド: マルチスレッドにおける、状態待ちに用いる。
wait
をwhile
文で囲う理由
wait
で停止したスレッドは、後述の状態変更通知を受けて処理を再開します。
しかし、ここで問題になるのが、状態変更通知を受けても目的の状態が満たされたとは限らないということです。
というのも、状態変更通知はどの変数がどの様な状態になったか、という情報を含みません。例えば、異なる変数の状態待ちを行うスレッドが複数ある場合、その何れかが状態変更通知を受け取っても、それが自身が待ちを行っている変数に関する通知とは限らないわけです。
その為、wait
メソッドをwhile
文で囲うことで、処理が再開した後に再度期待する状態かチェックし、そうでない場合はもう一度待ちを行います。
状態変更通知(*3)
notifyAll
メソッドを呼び出し、状態待ちを行っているスレッドに、期待した状態が満たされた可能性を通知します。
notify
メソッドでなく、notifyAll
メソッドである理由
以下にnotify
メソッドとnotifyAll
メソッドの違いをまとめました。
説明に当たり、以下用語を用います。
- ウェイトセット:
wait
メソッドを呼び出し、状態待ちを行うスレッドの集合です。 - エントリセット: 状態変更通知を受けて、モニタロックの取得を試みるスレッドの集合です。
各メソッドの挙動の違い
notify
メソッド
下図の通りの挙動になります。notifyAll
メソッドとの違いは、1つのスレッドがウェイトセットからエントリセットに移動する点です。
notifyAll
メソッド
下図の通りの挙動になります。notify
メソッドとの違いは、全てのスレッドがウェイトセットからエントリセットに移動する点です。
notify
メソッドの問題点
状態変更通知にnotify
メソッドを利用した場合、1つのスレッドのみがウェイトセットからエントリセットに移動する為、以下の問題点があります。
- 移動したスレッドが該当の通知を待っていたスレッドではなかった場合、その通知を待っていた他のスレッドが起動する機会を永遠に失う可能性がある。
- 選ばれなかった他のスレッドにモニタロック取得の機会が与えられず、スレッド毎の処理時間のバラつきが大きくなる。
スケーラブルI/O実現方法の調査
背景
スケーラブルなI/Oに関して、以下書籍で勉強しています。
- 作者:エズモンド・ピット
- 発売日: 2007/04/28
- メディア: 大型本
スケーラブルなI/Oの実現方法に関して、こちらでは以下の通り記載があります*1。
I/Oのノンブロッキングモードと多重化によって、同時接続数が数千にも達する大規模なWebサーバとアプリケーションサーバの開発が可能になり、複数のプロセッサを有効利用できるようになります。...個々の接続に専用のスレッドを割り当てる必要がないからです。
ノンブロッキング
と多重化
がキーワードになる様です。これらに関して、調査した結果を以下にまとめます。
ノンブロッキング
とは
ノンブロッキング
に関して、カーネルがI/O処理の準備中*2だった際に、アプリケーションが準備完了を待たないことを表します。 合わせて、こちらの記事もご覧ください。
I/Oの多重化
とは
1つのスレッドで、複数の接続を管理する仕組みのこと。具体的には、状態*3が変更された接続をスレッドに通知することで実現する。つまり、イベント駆動での処理が可能になります。
疑問点
I/Oの多重化を用いれば、I/O処理の準備完了が通知される為、準備完了を待たないノンブロッキングI/Oでなく、準備完了を待つブロッキングI/Oで良いのではと疑問に思いました。
上記に関して調べたところ、ある接続を複数のスレッドが利用する場合、以下の様に状態が変更される可能性がある様です。
- スレッド1が、ある接続のI/O処理の準備完了を通知される。
- スレッド2が、上記接続を利用したI/O処理を行う。
- スレッド1が、I/O処理を試みたところ、I/O処理の準備中であり、待ちが発生する。
その為、書籍ではスケーラブルI/Oとして、I/Oの多重化とノンブロッキングI/Oが紹介されています。
まとめ
ブロッキングI/Oを用いると、I/O処理の準備完了の待ちが発生する為、1つのスレッドが1つの接続の管理に掛かりきりになります。
I/Oの多重化とノンブロッキングI/Oを用いれば、I/O準備完了の通知を受け、仮に実行前にその状態が変わっても待ちが発生しない為、1つのスレッドで複数の接続の管理が可能になります。
結果として、よりスケーラブルなI/Oが実現できるということですね。
Clean Architectureの内容を元に原則の分類
背景
プログラミングを行う中でいくつか原則を学ぶ機会があったのですが、正直多すぎて覚えられない。
一度体系的にまとめて整理しないとダメだなと思っていたら、以下の内容が求めていたものだった為、備忘も兼ねてメモしておきます。
Clean Architecture 達人に学ぶソフトウェアの構造と設計 (アスキードワンゴ)
- 作者:Robert C.Martin,角 征典,高木 正弘
- 発売日: 2018/08/01
- メディア: Kindle版
原則の分類
本書より
Clean Architectureのp.78に以下の通り記載があります。
...SOLID原則がモジュールレベルの開発に使われるものであることを意図している。コードレベルよりも上に適用するものであり、モジュールやコンポーネントで使うソフトウェア構造の定義に役立つ。
...SOLID原則の説明が終わったら、次はそれに対応するコンポーネントの原則を説明する。その後に上位レベルのアーキテクチャの原則の話に移る。
これを読むと、原則が以下の通り分類されると読み取れます。
各レベルの考察
コードレベル
コードレベルの原則に関して、本書の中では特に言及されていませんでした。しかし、モジュールレベルよりも粒度が細かいレベルであることを考えると、処理単体に関する原則が、このレベルに当たると思われます。
モジュールレベル
そもそも、モジュールとは何かに関しては、本書のp.77の以下記載より、SOLIDの原則(モジュールレベル)の対象がクラスであることから、データと処理のまとまりをモジュールと呼んでいると判断しました。
...SOLID原則は、関数やデータ構造をどの様に組み込むのか、そしてクラスの相互接続をどのようにするのかといったことを教えてくれる。
モジュールにどのデータと処理を含めるかや、モジュール間の関係に関する原則が、このレベルに当たるかと思われます。
コンポーネントレベル
本書のp.109に以下の通り記載があります。
コンポーネントとは、デプロイの単位のことである。システムの一部としてデプロイできる、最小限のまとまりを指す。
コンポーネントにどのモジュールを含めるか*1や、コンポーネント間の関係*2に関する原則が、このレベルに当たるかと思われます。
アーキテクチャレベル
本書のp.148に以下の通り記載があります。
ソフトウェアシステムのアーキテクチャは、それを構築した人がシステムに与えた「形状」である。その形状を生み出すためには、システムをコンポーネントに分割し、コンポーネントをうまく配置して、コンポーネントが相互に通信できるようにする必要がある。
コンポーネントの分割基準や、コンポーネントの実行環境への配置方法*3に関する原則が、このレベルに当たるかと思われます。
コンポーネントレベルとの違い
コンポーネントの分割基準に関して、コンポーネントレベルの原則との違いが分からず少々悩んだのですが、以下の違いがあると気が付きました。
MyBatis Dynamic SQLを使ってみた。
背景
Java + MyBatisを使って開発を行っているのですが、コード上でSQLの組み立てを行う方法が無いか調べたところ、MyBatis Dynamic SQL
を使うと実現できる様です。
以下のQuick Startを元に試してみたので、ここではその際のメモを残そうと思います。
MyBatis Dynamic SQL – MyBatis Dynamic SQL Quick Start
ハンズオン
用意するクラス, インターフェース
- Supportクラス(テーブル, カラムの定義を行う。)
- Mapperインタフェース
- Mapperを呼び出すクラス
- Entityクラス
実装例
コード全体は以下より取得可能です。
GitHub - U0326/code-example-mybatis_dynamic_sql
事前準備
CREATE TABLE friends ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(10) );
上記の通り作成したテーブルを利用します。
Supportクラス
public class FriendDynamicSqlSupport { public static final Friend friend = new Friend(); // 1 public static final SqlColumn<Integer> id = friend.id; // 1 public static final SqlColumn<String> mame = friend.name; // 1 public static final class Friend extends SqlTable { // 2 public final SqlColumn<Integer> id = column("id", JDBCType.INTEGER); public final SqlColumn<String> name = column("name", JDBCType.VARCHAR); public Friend() { super("friends"); //3 } } }
- 2で作成する内部クラスとその変数をstatic参照可能にする為に、
public static
な変数を定義する。 SqlTable
を継承した内部クラスを作成し、カラム定義を行う。[スキーマ名.]テーブル名
を渡してSqlTable
のコンストラクタを呼び出す。
Mapperインターフェース
上記で作成したSupportクラスをstatic importし、以下の通りCRUD操作を定義します。
Create
@Mapper public interface FriendMapper { // Create @InsertProvider(type = SqlProviderAdapter.class, method = "insert") // 1 int insert(InsertStatementProvider<FriendEntity> insertStatement); // 2 default int insert(FriendEntity entity) { // 3 return MyBatis3Utils.insert(this::insert, entity, friend, c -> c.map(mame).toProperty("name")); // 4 } ... }
- Createを行うメソッドであることを、アノテーションを用いて宣言する。
InsertStatementProvider<Entityクラス>
を引数に取るメソッドを定義する。MyBatis3Utils
を用いたdefaultメソッドを定義し、呼び元でEntityクラスをのみを引数にしたCreateを可能にする。- 以下の通り引数を渡し、
MyBatis3Utils.insert
を呼び出す。- 第3引数: Supportクラス内の
SqlTable
を継承したクラスのstatic参照 - 第4引数: Supportクラス内の
SqlColumn
のstatic参照と、Entityクラスの変数名を以下の通り紐付けるラムダ式
c -> c.map(SqlColumnのstatic参照).toProperty("Entityクラスの変数名")
- 第3引数: Supportクラス内の
Read
@Mapper public interface FriendMapper { ... // Read @SelectProvider(type = SqlProviderAdapter.class, method = "select") // 1 @Results( // 2 { @Result(column = "id", jdbcType = JdbcType.INTEGER, property = "id", id = true), @Result(column = "name", jdbcType = JdbcType.VARCHAR, property = "name"), }) Optional<FriendEntity> selectOne(SelectStatementProvider selectStatement); // 3 default Optional<FriendEntity> selectOne(SelectDSLCompleter completer) { // 4 return MyBatis3Utils.selectOne(this::selectOne, selectList, friend, completer); // 5 } BasicColumn[] selectList = BasicColumn.columnList(id, name); // 6 ... }
- Readを行うメソッドであることを、アノテーションを用いて宣言する。
- カラム名とEntityクラスの変数名を以下の通り紐付ける。
@Result(column = "カラム名", jdbcType = カラムの型, property = "Entityクラスの変数名")
SelectStatementProvider
を引数に取るメソッドを定義する。MyBatis3Utils
を用いたdefaultメソッドを定義し、呼び元でSELECT対象のカラムを指定せずReadを可能にする。- 以下の通り引数を渡し、
MyBatis3Utils.selectOne
を呼び出す。- 第2引数: 6で定義するSELECT対象のカラムの配列
- 第3引数: Supportクラス内のSqlTableを継承したクラスのstatic参照
- Supportクラス内の
SqlColumn
のstatic参照を用いて、SELECT対象のカラムの配列を定義する。
Update
@Mapper public interface FriendMapper { ... // Update @UpdateProvider(type = SqlProviderAdapter.class, method = "update") // 1 int update(UpdateStatementProvider updateStatement); // 2 default int update(UpdateDSLCompleter completer) { // 3 return MyBatis3Utils.update(this::update, friend, completer); // 4 } ... }
- Updateを行うメソッドであることを、アノテーションを用いて宣言する。
UpdateStatementProvider
を引数に取るメソッドを定義する。MyBatis3Utils
を用いたdefaultメソッドを定義し、呼び元でテーブル名を指定せずにUpdateを可能にする。- 以下の通り引数を渡し、
MyBatis3Utils.update
を呼び出す。- 第2引数: Supportクラス内のSqlTableを継承したクラスのstatic参照
Delete
@Mapper public interface FriendMapper { ... // Delete @DeleteProvider(type = SqlProviderAdapter.class, method = "delete") // 1 int delete(DeleteStatementProvider deleteStatement); // 2 default int delete(DeleteDSLCompleter completer) { // 3 return MyBatis3Utils.deleteFrom(this::delete, friend, completer); // 4 } ... }
- Deleteを行うメソッドであることを、アノテーションを用いて宣言する。
DeleteStatementProvider
を引数に取るメソッドを定義する。MyBatis3Utils
を用いたdefaultメソッドを定義し、呼び元でテーブル名を指定せずにDeleteを可能にする。- 以下の通り引数を渡し、
MyBatis3Utils.deleteFrom
を呼び出す。- 第2引数: Supportクラス内のSqlTableを継承したクラスのstatic参照
Mapperを呼び出すクラス
Supportクラスをstatic importし、以下の通りCRUD操作を呼び出します。
public class FriendMapperCaller { @Autowired private FriendMapper friendMapper; public void execute() { // Create FriendEntity newFriend = FriendEntity.builder().name("Jon").build(); friendMapper.insert(newFriend); // Read friendMapper.selectOne(c -> c.where(id, isEqualTo(1))) // Update friendMapper.update(c -> c.set(name).equalTo("Bob").where(id, isEqualTo(1))); // Delete friendMapper.delete(c -> c.where(id, isEqualTo(1))); } }