Spring BatchでPartitioningを用いて処理の多重化
背景
Spring Batchのパフォーマンスの改善の為に、Partitioningを用いた処理の多重化を検討することになりました。今回の記事では、実際に動くものを元に解説を行います。
目次
解説
概説
Partitioningでは、masterステップ, slaveステップの順に処理が実行され、それぞれ以下の役割があります。
- masterステップ:
partitioner
を実行し、slaveステップで参照する値を用意します。(多重化に入る前の準備) - slaveステップ: 多重化し実行される部分がここです。
詳説
作成したコードを元に解説を行います。コードはこちらに配置しています。
設定
ジョブの設定を行っているsrc/resources/job-setting.xml
に関して、以下が重要な箇所になります。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" ... <!-- ジョブの定義 --> <bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer"/> <batch:job id="job01" job-repository="jobRepository" incrementer="jobParametersIncrementer"> <batch:step id="master"> <!-- (1) --> <batch:partition partitioner="samplePartitioner" step="slave"> <!-- (2) --> <batch:handler grid-size="4" task-executor="parallelTaskExecutor"/> <!-- (3) --> </batch:partition> </batch:step> </batch:job> <batch:step id="slave"> <!-- (4) --> <batch:tasklet transaction-manager="transactionManager" ref="customerTasklet"/> </batch:step> <!-- slaveステップを処理するスレッドプール --> <task:executor id="taskExecutor" pool-size="3" queue-capacity="1"/> <!-- (5) --> ... </beans>
- 1: masterステップの定義を開始します。
- 2: partitionerに後ほど用意するbeanを設定し、slaveステップで実行するステップを設定します。
- 3:
grid-size
に関して、partitionerで参照しslaveステップをいくつ多重化するかの決定に用いることができます。また、task-executor
には、slaveステップを処理するスレッドプールを設定します。 - 4: slaveステップで実行するステップの定義を行います。
- 5:
pool-size
にスレッド数を、queue-capacity
にスレッド処理待ちのキューの上限を設定します。
コード
slaveステップで呼び出すTasklet
今回の解説とあまり関係ない為、処理の詳細は省きます。簡単に説明すると、LIMIT, OFFSETを設定しSELECTすることで、DBから値を取得します。
@Component @Slf4j public class CustomerTasklet implements Tasklet { @Autowired private CustomerRepository repository; @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { log.debug("tasklet started."); Map<String, Object> context = chunkContext.getStepContext().getStepExecutionContext(); // (1) List<Customer> customers = repository.findAll((int) context.get("limit"), (int) context.get("offset")); // (2) customers.forEach(System.out::println); return RepeatStatus.FINISHED; }
(1)でStepExecutionContext
を取得し、(2)で後述のPartitionerで設定した値を取得しています。
Partitioner
@Component @Slf4j public class SamplePartitioner implements Partitioner { @Autowired private CustomerRepository repository; @Override public Map<String, ExecutionContext> partition(int gridSize) { int totalCount = repository.takeTotalCount(); int limit = (totalCount / gridSize) + 1; int offset = 0; Map<String, ExecutionContext> map = new HashMap<>(); for (int i = 0; i < gridSize; i++) { ExecutionContext context = new ExecutionContext(); // (1) context.put("limit", limit); // (2) context.put("offset", offset); // (3) map.put(String.valueOf(i), context); // (4) offset += limit; } return map; } }
(1)でExecutionContext
を用意し、(2), (3)で"key, value"の形式で値を設定します(slaveステップではこのkey値を用いて値を取得します。)。
そして(4)で、戻り値として返却するmapにExecutionContext
を設定します。slaveステップに関して、mapに設定されたExecutionContext
の数だけ生成されます。