Anarchy In the 1K

Spring BatchでPartitioningを用いて処理の多重化

背景

 Spring Batchのパフォーマンスの改善の為に、Partitioningを用いた処理の多重化を検討することになりました。今回の記事では、実際に動くものを元に解説を行います。

目次

解説

概説

 Partitioningでは、masterステップ, slaveステップの順に処理が実行され、それぞれ以下の役割があります。

  • masterステップ: partitionerを実行し、slaveステップで参照する値を用意します。(多重化に入る前の準備)
  • slaveステップ: 多重化し実行される部分がここです。

詳説

 作成したコードを元に解説を行います。コードはこちらに配置しています。

設定

 ジョブの設定を行っているsrc/resources/job-setting.xmlに関して、以下が重要な箇所になります。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    ...
    <!--  ジョブの定義  -->
    <bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer"/>
    <batch:job id="job01" job-repository="jobRepository" incrementer="jobParametersIncrementer">
        <batch:step id="master"> <!-- (1) -->
            <batch:partition partitioner="samplePartitioner" step="slave"> <!-- (2) -->
                <batch:handler grid-size="4" task-executor="parallelTaskExecutor"/>  <!-- (3) -->
            </batch:partition>
        </batch:step>
    </batch:job>

    <batch:step id="slave"> <!-- (4) -->
        <batch:tasklet transaction-manager="transactionManager" ref="customerTasklet"/>
    </batch:step>

    <!--  slaveステップを処理するスレッドプール  -->
    <task:executor id="taskExecutor" pool-size="3" queue-capacity="1"/> <!-- (5) -->
    ...
</beans>
  • 1: masterステップの定義を開始します。
  • 2: partitionerに後ほど用意するbeanを設定し、slaveステップで実行するステップを設定します。
  • 3: grid-sizeに関して、partitionerで参照しslaveステップをいくつ多重化するかの決定に用いることができます。また、task-executorには、slaveステップを処理するスレッドプールを設定します。
  • 4: slaveステップで実行するステップの定義を行います。
  • 5: pool-sizeにスレッド数を、queue-capacityにスレッド処理待ちのキューの上限を設定します。

コード

slaveステップで呼び出すTasklet

 今回の解説とあまり関係ない為、処理の詳細は省きます。簡単に説明すると、LIMIT, OFFSETを設定しSELECTすることで、DBから値を取得します。

@Component
@Slf4j
public class CustomerTasklet implements Tasklet {
  @Autowired private CustomerRepository repository;

  @Override
  public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext)
      throws Exception {
    log.debug("tasklet started.");
    Map<String, Object> context = chunkContext.getStepContext().getStepExecutionContext(); // (1)
    List<Customer> customers =
        repository.findAll((int) context.get("limit"), (int) context.get("offset")); // (2)
    customers.forEach(System.out::println);
    return RepeatStatus.FINISHED;
  }

 (1)でStepExecutionContextを取得し、(2)で後述のPartitionerで設定した値を取得しています。

Partitioner

@Component
@Slf4j
public class SamplePartitioner implements Partitioner {
  @Autowired
  private CustomerRepository repository;

  @Override
  public Map<String, ExecutionContext> partition(int gridSize) {
    int totalCount = repository.takeTotalCount();
    int limit = (totalCount / gridSize) + 1;
    int offset = 0;
    Map<String, ExecutionContext> map = new HashMap<>();

    for (int i = 0; i < gridSize; i++) {
      ExecutionContext context = new ExecutionContext(); // (1)
      context.put("limit", limit); // (2)
      context.put("offset", offset); // (3)
      map.put(String.valueOf(i), context); // (4)
      offset += limit;
    }
    return map;
  }
}

 (1)でExecutionContextを用意し、(2), (3)で"key, value"の形式で値を設定します(slaveステップではこのkey値を用いて値を取得します。)。
 そして(4)で、戻り値として返却するmapにExecutionContextを設定します。slaveステップに関して、mapに設定されたExecutionContextの数だけ生成されます。