Anarchy In the 1K

ThreadPoolExecutorの調査

背景

 Javaでスレッドプールを扱う際に、いつもお世話になっているThreadPoolExecutorに関して、ExecutorsクラスのnewXyzThreadPoolメソッドを用いて生成するだけで、その中身を意識したことはありませんでした。今回はそのThreadPoolExecutor関して、調べた結果をまとめます。

調査結果

ThreadPoolExecutorの構成

 下図の通りタスクが格納されるTask Queueと、スレッドが格納されるThread Poolで構成されています。

f:id:fujiU:20200204171514p:plain

Task QueueとThread Poolを用いることで、以下が実現できます。

  • タスク数に対してスレッド数が少ない場合(リクエスト過多)でも、タスクがTask Queueにバッファリングされる為、(Task Queueの上限までは)タスクの依頼が可能になる。
  • Thread Poolのサイズを正しく調整することで、スレッド数が少なすぎる為にプロセッサが遊んでいる状態や、スレッド数が多すぎる為にリソースが不足する状態を防ぐことができる。

ThreadPoolExecutorのタスク依頼時の挙動

 executeメソッドを呼び出し、タスクを依頼した際の挙動に関して、Thread Poolの容量に関連する以下変数とスレッド数の関係によって異なります。

  • corePoolSize: アイドル状態でもプール内に維持されるスレッド数
  • maxPoolSize: プール内の最大スレッド数

具体的には、下図の3パターンに分類されます。 f:id:fujiU:20200207165802p:plain

maxPoolSize = スレッド数の状態でタスクが拒否された場合、デフォルトでは、RejectedExecutionExceptionがスローされます。

Task Queueに使用するキューの種類

 ThreadPoolExecutorのコンストラクタを呼び出す際に、引数としてTask Queueを渡します。Task Queueに使用するキューの種類を以下にまとめます。

 まず、下図の通りキューが要素を保持するものと、しないもので大別されます。次に、キューが要素を保持するものに関して、キューの容量を明示的に設定するものと、しないものに分類されます。 f:id:fujiU:20200207182814p:plain

結果として、キューは3種類に分類され、ぞれぞれ以下の違いがあります。

キューが要素を保持する。

キューの容量を明示的に設定する。

 ArrayBlockingQueueなどが該当します。上限を明示的に設定する為、リクエスト過多によるリソース不足を防ぐことができます。その一方で、キューの上限としてどの様な値が妥当か、検証を行う必要があります。

キューの容量を明示的に設定しない。

 LinkedBlockingQueueなどが該当します。リソースが許す限りTask Queueにタスクの格納が可能である為、スレッド数はcorePoolSizeを超えません*1。一時的にリクエスト過多な状態を処理するには便利ですが、恒常的にリクエスト過多な状態が続くとキューの容量が増え続けてしまいます。

 ExecutorsクラスのnewFixedThreadPool, newSingleThreadExecutorメソッドを用いて、ThreadPoolExecutorを生成した際のTask Queueはこちらになります。

キューが要素を保持しない。

 SynchronousQueueが該当します。Task Queueへタスクの格納を行わず、直接Thread Poolにタスクを渡します。その為、依頼できるタスクの上限は、maxPoolSizeまでになります。

 ExecutorsクラスのnewCachedThreadPoolメソッドを用いて、ThreadPoolExecutorを生成した際のTask Queueはこちらになります。

ThreadPoolExecutorの終了

 ThreadPoolExecutorの終了に関して、後述するshutdownメソッドとshutdownNowメソッドが用意されています。全てのタスクが処理された後、ThreadPoolExecutorは、終了スタータスになります。

shutdownメソッド

 shutdownメソッドは、以下の通り順序正しくタスクを処理して終了します。

  • メソッド呼び出し時に実行中だったタスク
    タスクが完了するまで待つ。
  • メソッド呼び出し時に待機中だったタスク
    タスクが完了するまで待つ。
  • メソッド呼び出し後に依頼されたタスク
    タスクは拒否され、デフォルトではRejectedExecutionExceptionがスローさる。

shutdownNowメソッド

 shutdownNowメソッドは、以下の通りタスクを処理して即時終了を試みます。

  • メソッド呼び出し時に実行中だったタスク
    interruptメソッドを呼び出して、タスクのキャンセルを試みる。
  • メソッド呼び出し時に待機中だったタスク
    タスクを実行せず、戻り値で該当するタスクのリストを返却する。
  • メソッド呼び出し後に依頼されたタスク
    タスクは拒否され、デフォルトではRejectedExecutionExceptionがスローされる。

*1:合わせて、「タスク追加時の挙動」をご覧下さい。

Javaにおけるスレッドの状態待ち

背景

マルチスレッドの処理において、排他制御の為にロックを取得し、期待する状態を待って後続処理を続行する。この様なよくある処理に関して、詳細を以下にまとめます。

実装例

早速ですが、スレッドの状態待ちにの実装例です。概要は以下の通りです。

  • ready, count: 各スレッドで共有する値です。
  • タスク1: readyステータスがtrueだった際に、countに10加算する。
  • タスク2: readyステータスをtrueに変更する。
public class ThreadSleepSample {
  // スレッド間で共有する値
  private static boolean ready = false;
  // スレッド間で共有する値
  private static int count = 0;
  // 各タスク内で呼び出しを行うインスタンスを生成する。
  private static ThreadSleepSample threadSleepSample = new ThreadSleepSample();
  // タスク1
  private static Callable<Void> addCountTask =
      () -> {
        threadSleepSample.addCount();
        return null;
      };
  // タスク2
  private static Callable<Void> changeReadyTask =
      () -> {
        TimeUnit.SECONDS.sleep(1);
        threadSleepSample.changeReady();
        return null;
      };

  public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    executorService.submit(addCountTask);
    executorService.submit(addCountTask);
    executorService.submit(changeReadyTask);
    executorService.shutdown();
  }

  // モニタロックを取得する。(*1)
  private synchronized void addCount() throws InterruptedException {
    while (!ready) {
      System.out.println(Thread.currentThread() + " is waiting.");
      // 状態待ちを行う。(*2)
      wait();
      System.out.println(Thread.currentThread() + " is not waiting.");
    }
    count += 10;
    System.out.println(count);
  }

  // モニタロックを取得する。(*1)
  private synchronized void changeReady() {
    ready = true;
    // 状態変更通知を行う。(*3)
    notifyAll();
  }
}

実行結果は以下の通りです。

Thread[pool-1-thread-2,5,main] is waiting.
Thread[pool-1-thread-1,5,main] is waiting.
Thread[pool-1-thread-2,5,main] is not waiting.
10
Thread[pool-1-thread-1,5,main] is not waiting.
20

解説

モニタロックの取得(*1)

モニタロックとは

モニタロックとは、各インスタンスに1つ用意されているロックのことです。排他制御にモニタロックを用いることで、あるインスタンスにおける特定処理の実行を、単一スレッドのみに制限しています。

モニタロックの取得方法

以下の通り記載することで、該当のメソッドを実行する前に、モニタロックの取得を試みます。

アクセス修飾子 synchronized 戻り値の型 メソッド名(...) {

実装例においては、threadSleepSampleインスタンスaddCountメソッド, changeReadyメソッドの実行は、モニタロックを取得した単一スレッドのみが可能です。

状態待ち(*2)

waitメソッドを呼び出し、期待する状態待ちを行います。

sleepメソッドでなく、waitメソッドを用いる理由

処理を停止するなら、sleepメソッドでも可能と考えるかもしれません。
ですが、sleepメソッドには以下特徴があります。

  • モニタロックをスレッドが保持し続ける。(状態更新にモニタロック取得が必須の場合、デッドロックになる。)
  • 期待した状態が満たされたか否かに関わらず、指定した時間停止する。

一方で、waitメソッドには以下特徴があります。

  • モニタロックをスレッドが手放す。
  • 期待した状態が満たされた可能性の通知(後述の状態変更通知を御覧下さい。)を受け取るまで、停止する。

上記をまとめると、両者の使い分けは以下の通りです。

  • sleepメソッド: 1つのスレッドにおける、処理間隔の調整に用いる。
  • waitメソッド: マルチスレッドにおける、状態待ちに用いる。

waitwhile文で囲う理由

waitで停止したスレッドは、後述の状態変更通知を受けて処理を再開します。 しかし、ここで問題になるのが、状態変更通知を受けても目的の状態が満たされたとは限らないということです。

というのも、状態変更通知はどの変数がどの様な状態になったか、という情報を含みません。例えば、異なる変数の状態待ちを行うスレッドが複数ある場合、その何れかが状態変更通知を受け取っても、それが自身が待ちを行っている変数に関する通知とは限らないわけです。

その為、waitメソッドをwhile文で囲うことで、処理が再開した後に再度期待する状態かチェックし、そうでない場合はもう一度待ちを行います。

状態変更通知(*3)

notifyAllメソッドを呼び出し、状態待ちを行っているスレッドに、期待した状態が満たされた可能性を通知します。

notifyメソッドでなく、notifyAllメソッドである理由

以下にnotifyメソッドとnotifyAllメソッドの違いをまとめました。 説明に当たり、以下用語を用います。

  • ウェイトセット: waitメソッドを呼び出し、状態待ちを行うスレッドの集合です。
  • エントリセット: 状態変更通知を受けて、モニタロックの取得を試みるスレッドの集合です。

各メソッドの挙動の違い

notifyメソッド

下図の通りの挙動になります。notifyAllメソッドとの違いは、1つのスレッドがウェイトセットからエントリセットに移動する点です。

f:id:fujiU:20200202163514p:plain
notifyメソッドの挙動

notifyAllメソッド

下図の通りの挙動になります。notifyメソッドとの違いは、全てのスレッドがウェイトセットからエントリセットに移動する点です。

f:id:fujiU:20200202164513p:plain
notifyAllメソッドの挙動

notifyメソッドの問題点

状態変更通知にnotifyメソッドを利用した場合、1つのスレッドのみがウェイトセットからエントリセットに移動する為、以下の問題点があります。

  • 移動したスレッドが該当の通知を待っていたスレッドではなかった場合、その通知を待っていた他のスレッドが起動する機会を永遠に失う可能性がある。
  • 選ばれなかった他のスレッドにモニタロック取得の機会が与えられず、スレッド毎の処理時間のバラつきが大きくなる。

スケーラブルI/O実現方法の調査

背景

 スケーラブルなI/Oに関して、以下書籍で勉強しています。


スケーラブルなI/Oの実現方法に関して、こちらでは以下の通り記載があります*1

I/Oのノンブロッキングモードと多重化によって、同時接続数が数千にも達する大規模なWebサーバとアプリケーションサーバの開発が可能になり、複数のプロセッサを有効利用できるようになります。...個々の接続に専用のスレッドを割り当てる必要がないからです。

 ノンブロッキング多重化がキーワードになる様です。これらに関して、調査した結果を以下にまとめます。

ノンブロッキングとは

 ノンブロッキングに関して、カーネルがI/O処理の準備中*2だった際に、アプリケーションが準備完了を待たないことを表します。 合わせて、こちらの記事もご覧ください。

I/Oの多重化とは

 1つのスレッドで、複数の接続を管理する仕組みのこと。具体的には、状態*3が変更された接続をスレッドに通知することで実現する。つまり、イベント駆動での処理が可能になります。

疑問点

 I/Oの多重化を用いれば、I/O処理の準備完了が通知される為、準備完了を待たないノンブロッキングI/Oでなく、準備完了を待つブロッキングI/Oで良いのではと疑問に思いました。
 上記に関して調べたところ、ある接続を複数のスレッドが利用する場合、以下の様に状態が変更される可能性がある様です。

  1. スレッド1が、ある接続のI/O処理の準備完了を通知される。
  2. スレッド2が、上記接続を利用したI/O処理を行う。
  3. スレッド1が、I/O処理を試みたところ、I/O処理の準備中であり、待ちが発生する。

 その為、書籍ではスケーラブルI/Oとして、I/Oの多重化とノンブロッキングI/Oが紹介されています。

まとめ

 ブロッキングI/Oを用いると、I/O処理の準備完了の待ちが発生する為、1つのスレッドが1つの接続の管理に掛かりきりになります。
 I/Oの多重化とノンブロッキングI/Oを用いれば、I/O準備完了の通知を受け、仮に実行前にその状態が変わっても待ちが発生しない為、1つのスレッドで複数の接続の管理が可能になります。
 結果として、よりスケーラブルなI/Oが実現できるということですね。

*1:該当箇所はp.75

*2:ソケット送信バッファに空きがない場合や、ソケット受信バッファにデータがない場合

*3:I/O処理の準備完了や、タイムアウトの発生など

Clean Architectureの内容を元に原則の分類

背景

プログラミングを行う中でいくつか原則を学ぶ機会があったのですが、正直多すぎて覚えられない。

一度体系的にまとめて整理しないとダメだなと思っていたら、以下の内容が求めていたものだった為、備忘も兼ねてメモしておきます。

原則の分類

本書より

Clean Architectureのp.78に以下の通り記載があります。

...SOLID原則がモジュールレベルの開発に使われるものであることを意図している。コードレベルよりも上に適用するものであり、モジュールやコンポーネントで使うソフトウェア構造の定義に役立つ。
...SOLID原則の説明が終わったら、次はそれに対応するコンポーネントの原則を説明する。その後に上位レベルのアーキテクチャの原則の話に移る。

これを読むと、原則が以下の通り分類されると読み取れます。

各レベルの考察

コードレベル

コードレベルの原則に関して、本書の中では特に言及されていませんでした。しかし、モジュールレベルよりも粒度が細かいレベルであることを考えると、処理単体に関する原則が、このレベルに当たると思われます。

モジュールレベル

そもそも、モジュールとは何かに関しては、本書のp.77の以下記載より、SOLIDの原則(モジュールレベル)の対象がクラスであることから、データと処理のまとまりをモジュールと呼んでいると判断しました。

...SOLID原則は、関数やデータ構造をどの様に組み込むのか、そしてクラスの相互接続をどのようにするのかといったことを教えてくれる。

モジュールにどのデータと処理を含めるかや、モジュール間の関係に関する原則が、このレベルに当たるかと思われます。

コンポーネントレベル

本書のp.109に以下の通り記載があります。

コンポーネントとは、デプロイの単位のことである。システムの一部としてデプロイできる、最小限のまとまりを指す。

コンポーネントにどのモジュールを含めるか*1や、コンポーネント間の関係*2に関する原則が、このレベルに当たるかと思われます。

アーキテクチャレベル

本書のp.148に以下の通り記載があります。

ソフトウェアシステムのアーキテクチャは、それを構築した人がシステムに与えた「形状」である。その形状を生み出すためには、システムをコンポーネントに分割し、コンポーネントをうまく配置して、コンポーネントが相互に通信できるようにする必要がある。

コンポーネントの分割基準や、コンポーネントの実行環境への配置方法*3に関する原則が、このレベルに当たるかと思われます。

コンポーネントレベルとの違い

コンポーネントの分割基準に関して、コンポーネントレベルの原則との違いが分からず少々悩んだのですが、以下の違いがあると気が付きました。

*1:本書の言葉では、コンポーネント凝集性

*2:本書の言葉では、コンポーネントの結合

*3:コンポーネントをモノリシックなプロセスとして起動するか、同一サーバ上に別プロセスとして起動するか、別サーバ上に別プロセス(サービス)として起動するかなど。

MyBatis Dynamic SQLを使ってみた。

背景

 Java + MyBatisを使って開発を行っているのですが、コード上でSQLの組み立てを行う方法が無いか調べたところ、MyBatis Dynamic SQLを使うと実現できる様です。

 以下のQuick Startを元に試してみたので、ここではその際のメモを残そうと思います。
MyBatis Dynamic SQL – MyBatis Dynamic SQL Quick Start

ハンズオン

用意するクラス, インターフェース

  • Supportクラス(テーブル, カラムの定義を行う。)
  • Mapperインタフェース
  • Mapperを呼び出すクラス
  • Entityクラス

実装例

コード全体は以下より取得可能です。
GitHub - U0326/code-example-mybatis_dynamic_sql

事前準備

CREATE TABLE friends (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(10)
);

上記の通り作成したテーブルを利用します。

Supportクラス

public class FriendDynamicSqlSupport {
  public static final Friend friend = new Friend(); // 1
  public static final SqlColumn<Integer> id = friend.id; // 1
  public static final SqlColumn<String> mame = friend.name; // 1

  public static final class Friend extends SqlTable { // 2
    public final SqlColumn<Integer> id = column("id", JDBCType.INTEGER);
    public final SqlColumn<String> name = column("name", JDBCType.VARCHAR);

    public Friend() {
      super("friends"); //3
    }
  }
}
  1. 2で作成する内部クラスとその変数をstatic参照可能にする為に、public staticな変数を定義する。
  2. SqlTableを継承した内部クラスを作成し、カラム定義を行う。
  3. [スキーマ名.]テーブル名を渡してSqlTableのコンストラクタを呼び出す。

Mapperインターフェース

上記で作成したSupportクラスをstatic importし、以下の通りCRUD操作を定義します。

Create

@Mapper
public interface FriendMapper {
  // Create
  @InsertProvider(type = SqlProviderAdapter.class, method = "insert") // 1
  int insert(InsertStatementProvider<FriendEntity> insertStatement); // 2
  default int insert(FriendEntity entity) { // 3
    return MyBatis3Utils.insert(this::insert, entity, friend, c -> c.map(mame).toProperty("name")); // 4
  }
  ...
}
  1. Createを行うメソッドであることを、アノテーションを用いて宣言する。
  2. InsertStatementProvider<Entityクラス>を引数に取るメソッドを定義する。
  3. MyBatis3Utilsを用いたdefaultメソッドを定義し、呼び元でEntityクラスをのみを引数にしたCreateを可能にする。
  4. 以下の通り引数を渡し、MyBatis3Utils.insertを呼び出す。
    • 第3引数: Supportクラス内のSqlTableを継承したクラスのstatic参照
    • 第4引数: Supportクラス内のSqlColumnのstatic参照と、Entityクラスの変数名を以下の通り紐付けるラムダ式
      c -> c.map(SqlColumnのstatic参照).toProperty("Entityクラスの変数名")

Read

@Mapper
public interface FriendMapper {
  ...
  // Read
  @SelectProvider(type = SqlProviderAdapter.class, method = "select") // 1
  @Results( // 2
      {
        @Result(column = "id", jdbcType = JdbcType.INTEGER, property = "id", id = true),
        @Result(column = "name", jdbcType = JdbcType.VARCHAR, property = "name"),
      })
  Optional<FriendEntity> selectOne(SelectStatementProvider selectStatement); // 3
  default Optional<FriendEntity> selectOne(SelectDSLCompleter completer) { // 4
    return MyBatis3Utils.selectOne(this::selectOne, selectList, friend, completer); // 5
  }
  BasicColumn[] selectList = BasicColumn.columnList(id, name); // 6
  ...
}
  1. Readを行うメソッドであることを、アノテーションを用いて宣言する。
  2. カラム名とEntityクラスの変数名を以下の通り紐付ける。
    @Result(column = "カラム名", jdbcType = カラムの型, property = "Entityクラスの変数名")
  3. SelectStatementProviderを引数に取るメソッドを定義する。
  4. MyBatis3Utilsを用いたdefaultメソッドを定義し、呼び元でSELECT対象のカラムを指定せずReadを可能にする。
  5. 以下の通り引数を渡し、MyBatis3Utils.selectOneを呼び出す。
    • 第2引数: 6で定義するSELECT対象のカラムの配列
    • 第3引数: Supportクラス内のSqlTableを継承したクラスのstatic参照
  6. Supportクラス内のSqlColumnのstatic参照を用いて、SELECT対象のカラムの配列を定義する。

Update

@Mapper
public interface FriendMapper {
  ...
  // Update
  @UpdateProvider(type = SqlProviderAdapter.class, method = "update") // 1
  int update(UpdateStatementProvider updateStatement); // 2
  default int update(UpdateDSLCompleter completer) { // 3
    return MyBatis3Utils.update(this::update, friend, completer); // 4
  }
  ...
}
  1. Updateを行うメソッドであることを、アノテーションを用いて宣言する。
  2. UpdateStatementProviderを引数に取るメソッドを定義する。
  3. MyBatis3Utilsを用いたdefaultメソッドを定義し、呼び元でテーブル名を指定せずにUpdateを可能にする。
  4. 以下の通り引数を渡し、MyBatis3Utils.updateを呼び出す。
    • 第2引数: Supportクラス内のSqlTableを継承したクラスのstatic参照

Delete

@Mapper
public interface FriendMapper {
  ...
  // Delete
  @DeleteProvider(type = SqlProviderAdapter.class, method = "delete") // 1
  int delete(DeleteStatementProvider deleteStatement); // 2
  default int delete(DeleteDSLCompleter completer) { // 3
    return MyBatis3Utils.deleteFrom(this::delete, friend, completer); // 4
  }
  ...
}
  1. Deleteを行うメソッドであることを、アノテーションを用いて宣言する。
  2. DeleteStatementProviderを引数に取るメソッドを定義する。
  3. MyBatis3Utilsを用いたdefaultメソッドを定義し、呼び元でテーブル名を指定せずにDeleteを可能にする。
  4. 以下の通り引数を渡し、MyBatis3Utils.deleteFromを呼び出す。
    • 第2引数: Supportクラス内のSqlTableを継承したクラスのstatic参照

Mapperを呼び出すクラス

Supportクラスをstatic importし、以下の通りCRUD操作を呼び出します。

public class FriendMapperCaller {
  @Autowired
  private FriendMapper friendMapper;

  public void execute() {
    // Create
    FriendEntity newFriend = FriendEntity.builder().name("Jon").build();
    friendMapper.insert(newFriend);

    // Read
    friendMapper.selectOne(c -> c.where(id, isEqualTo(1)))

    // Update
    friendMapper.update(c -> c.set(name).equalTo("Bob").where(id, isEqualTo(1)));

    // Delete
    friendMapper.delete(c -> c.where(id, isEqualTo(1)));
  }
}

参考資料