Kafkaコンシューマのリバランスをローカルで体験
TOC
目的
ローカルでコンシューマリバランスを発生させ、その際の挙動を確認します。
Kafkaにおいて、Topic上のPartitionは、Consumer Group上のConsumerに下図の様に紐付けされます。
リバランスが発生すると、PartitionとConsumerの紐付けが更新されます。
やってみた
概要
PartitionとConsumerの紐づけは、Brokerが管理しており、タイムアウトを契機にリバランスが発生します。
1 つのブローカーが コーディネーター に指定され、グループのメンバーおよびパーティション割り当ての管理を担います。...
セッションタイムアウトの期限までにハートビートを受信しなかった場合、コーディネーターはそのメンバーをグループから除外し、そのメンバーのパーティションを別のメンバーに割り当て直します。
Kafka コンシューマー | Confluent Documentationより
以降では、Javaでコンシューマを実装し、タイムアウトさせてみましょう。 使用したコードはこちらから参照可能です。
事前調査
リバランスが発生する契機となるタイムアウトに関して、以下の2つが存在します。今回は2つ目の方法でタイムアウトを発生させます。
- session.timeout.ms: コンシューマからのheartbeatを待つ時間
- max.poll.interval.ms: コンシューマからのメッセージ取得を待つ時間
確認手順
Kafka
リポジトリのルートフォルダで、以下手順で準備を行います。
$ docker-compose -f docker/docker-compose.yml up -d $ docker exec -it kafka /bin/bash // 以降、コンテナ内での作業 // testトピックをパーティション数1で作成 # /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 1 --replication-factor 1 --topic test // プロヂューサー側でのデータ積み込み # /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > 1 > 2 > 3 > 4 > 5
コンシューマ(Java)
メッセージの取得に関して、以下の通り設定を行いました。
- 1度のpollメソッドで、1つのメッセージを取得
- max.poll.interval.msに、2000を設定
そして、1メッセージを受信する毎に、1秒づつ待ち時間が増える様にしました。
Ref: https://github.com/U0326/samp-impl-kafka-api/blob/3876ddd285e7e853c722b796d34401dafac17585/src/main/java/work/garaku/code/example/ConsumerExample.java#L36
あとは、mainメソッドを実行します。
結果確認
Consumer(Java)のログ
まず、Thread-1で3つ目のメッセージまで行われ、オフセットのコミット前にタイムアウトが発生します。
次に、Thread-0で3つ目のメッセージを再度取得し、5つ目のメッセージを取得した後、オフセットのコミット前にタイムアウトが発生します。
thread_name: Thread-1, message_key: null, message_value:1 thread_name: Thread-1, message_key: null, message_value:2 thread_name: Thread-1, message_key: null, message_value:3 thread_name: Thread-0, message_key: null, message_value:3 Exception in thread "Thread-1" org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1231) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1090) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1501) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1400) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1357) at work.garaku.code.example.ConsumerExample.lambda$static$0(ConsumerExample.java:37) at java.base/java.lang.Thread.run(Thread.java:834) thread_name: Thread-0, message_key: null, message_value:4 thread_name: Thread-0, message_key: null, message_value:5 Exception in thread "Thread-0" org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1231) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1090) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1501) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1400) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1357) at work.garaku.code.example.ConsumerExample.lambda$static$0(ConsumerExample.java:37) at java.base/java.lang.Thread.run(Thread.java:834)
Kafkaのログ
5行目のログで2つのコンシューマが接続したことが記録されています。
6-9行目でリバランスの実施が記録されています。
kafka_1 | [2022-05-29 07:49:55,315] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 210 (__consumer_offsets-6) (reason: Adding new member consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7 with group instance id None) (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:49:55,320] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 211 (__consumer_offsets-6) with 1 members (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:49:55,322] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 211 (__consumer_offsets-6) (reason: Adding new member consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e with group instance id None) (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:49:55,338] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 212 (__consumer_offsets-6) with 2 members (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:49:55,344] INFO [GroupCoordinator 1001]: Assignment received from leader for group consumer-1 for generation 212. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:00,459] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7] in group consumer-1 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:00,460] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 212 (__consumer_offsets-6) (reason: removing member consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:01,322] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 213 (__consumer_offsets-6) with 1 members (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:01,326] INFO [GroupCoordinator 1001]: Assignment received from leader for group consumer-1 for generation 213. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:06,438] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e] in group consumer-1 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:06,439] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 213 (__consumer_offsets-6) (reason: removing member consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e on LeaveGroup) (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:06,439] INFO [GroupCoordinator 1001]: Group consumer-1 with generation 214 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
Spring BatchでPartitioningを用いて処理の多重化
背景
Spring Batchのパフォーマンスの改善の為に、Partitioningを用いた処理の多重化を検討することになりました。今回の記事では、実際に動くものを元に解説を行います。
目次
解説
概説
Partitioningでは、masterステップ, slaveステップの順に処理が実行され、それぞれ以下の役割があります。
- masterステップ:
partitioner
を実行し、slaveステップで参照する値を用意します。(多重化に入る前の準備) - slaveステップ: 多重化し実行される部分がここです。
詳説
作成したコードを元に解説を行います。コードはこちらに配置しています。
設定
ジョブの設定を行っているsrc/resources/job-setting.xml
に関して、以下が重要な箇所になります。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" ... <!-- ジョブの定義 --> <bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer"/> <batch:job id="job01" job-repository="jobRepository" incrementer="jobParametersIncrementer"> <batch:step id="master"> <!-- (1) --> <batch:partition partitioner="samplePartitioner" step="slave"> <!-- (2) --> <batch:handler grid-size="4" task-executor="parallelTaskExecutor"/> <!-- (3) --> </batch:partition> </batch:step> </batch:job> <batch:step id="slave"> <!-- (4) --> <batch:tasklet transaction-manager="transactionManager" ref="customerTasklet"/> </batch:step> <!-- slaveステップを処理するスレッドプール --> <task:executor id="taskExecutor" pool-size="3" queue-capacity="1"/> <!-- (5) --> ... </beans>
- 1: masterステップの定義を開始します。
- 2: partitionerに後ほど用意するbeanを設定し、slaveステップで実行するステップを設定します。
- 3:
grid-size
に関して、partitionerで参照しslaveステップをいくつ多重化するかの決定に用いることができます。また、task-executor
には、slaveステップを処理するスレッドプールを設定します。 - 4: slaveステップで実行するステップの定義を行います。
- 5:
pool-size
にスレッド数を、queue-capacity
にスレッド処理待ちのキューの上限を設定します。
コード
slaveステップで呼び出すTasklet
今回の解説とあまり関係ない為、処理の詳細は省きます。簡単に説明すると、LIMIT, OFFSETを設定しSELECTすることで、DBから値を取得します。
@Component @Slf4j public class CustomerTasklet implements Tasklet { @Autowired private CustomerRepository repository; @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { log.debug("tasklet started."); Map<String, Object> context = chunkContext.getStepContext().getStepExecutionContext(); // (1) List<Customer> customers = repository.findAll((int) context.get("limit"), (int) context.get("offset")); // (2) customers.forEach(System.out::println); return RepeatStatus.FINISHED; }
(1)でStepExecutionContext
を取得し、(2)で後述のPartitionerで設定した値を取得しています。
Partitioner
@Component @Slf4j public class SamplePartitioner implements Partitioner { @Autowired private CustomerRepository repository; @Override public Map<String, ExecutionContext> partition(int gridSize) { int totalCount = repository.takeTotalCount(); int limit = (totalCount / gridSize) + 1; int offset = 0; Map<String, ExecutionContext> map = new HashMap<>(); for (int i = 0; i < gridSize; i++) { ExecutionContext context = new ExecutionContext(); // (1) context.put("limit", limit); // (2) context.put("offset", offset); // (3) map.put(String.valueOf(i), context); // (4) offset += limit; } return map; } }
(1)でExecutionContext
を用意し、(2), (3)で"key, value"の形式で値を設定します(slaveステップではこのkey値を用いて値を取得します。)。
そして(4)で、戻り値として返却するmapにExecutionContext
を設定します。slaveステップに関して、mapに設定されたExecutionContext
の数だけ生成されます。
Docerを用いてコマンドラインだけでKafka入門
目次
背景
必要に迫られて、Kafkaの勉強をはじめました。今回はDockerを用いて環境を速攻で構築します。
ハンズオン
Kafka起動
docker-compose.yml
に以下の通り記載し、該当ファイルが存在するディレクトリで上で、docker-compose up --no-recreate --scale kafka=<起動したいブローカー数>
を発行します。
version: '3' services: zookeeper: image: wurstmeister/zookeeper expose: - "2181" kafka: image: wurstmeister/kafka expose: - "9092" environment: HOSTNAME_COMMAND: hostname KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 depends_on: - "zookeeper"
動作確認
トピック作成
kafkaのコンテナ上で以下を実行します。
# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions <任意の数> --replication-factor <任意の数> --topic test
主要なオプションに関しては、以下の通りです。
- --partitions: パーティションの数を指定する。詳しくはこちらを参照ください。
- --replication-factor: パーティション毎に作成されるレプリカの数を指定します。起動したブローカー数以下である必要があります。詳しくは、上記リンクを参照ください。
コンシューマ起動
kafkaのコンテナ上で以下を実行します。
# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group consumer-group
主要なオプションに関しては、以下の通りです。
プロデューサ起動
kafkaのコンテナ上で以下を実行します。入力した文字がコンシューマ上で出力されます。
# /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > // 文字を入力
主要なオプションに関しては、以下の通りです。
オフセットの状態確認
kafkaのコンテナ上で以下を実行します。トピック, パーティション毎のオフセットが確認可能です。
# /opt/kafka/bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group consumer-group GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID consumer-group test 0 130 143 13 - - -
CommandLineJobRunnerを用いたSpring Batchの実行
目次
背景
前回こちらの記事でSpring Boot Batchを用いたバッチ実行を行いました。今回は、コマンドラインからのバッチ実行を試みます。
加えて、今回はSpring Bootを用いずに実装を行うことで、前回自動で設定されていた部分を手動で設定します。
ハンズオン
事前準備
DBの用意
こちらの記事を元に、PostgreSQLを用意します。その後、Spring Batchで用いる以下テーブルを以下の通り作成します。
CREATE TABLE BATCH_JOB_INSTANCE ( JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY , VERSION BIGINT , JOB_NAME VARCHAR(100) NOT NULL, JOB_KEY VARCHAR(32) NOT NULL, constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) ) ; CREATE TABLE BATCH_JOB_EXECUTION ( JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY , VERSION BIGINT , JOB_INSTANCE_ID BIGINT NOT NULL, CREATE_TIME TIMESTAMP NOT NULL, START_TIME TIMESTAMP DEFAULT NULL , END_TIME TIMESTAMP DEFAULT NULL , STATUS VARCHAR(10) , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED TIMESTAMP, JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL, constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID) references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) ) ; CREATE TABLE BATCH_JOB_EXECUTION_PARAMS ( JOB_EXECUTION_ID BIGINT NOT NULL , TYPE_CD VARCHAR(6) NOT NULL , KEY_NAME VARCHAR(100) NOT NULL , STRING_VAL VARCHAR(250) , DATE_VAL TIMESTAMP DEFAULT NULL , LONG_VAL BIGINT , DOUBLE_VAL DOUBLE PRECISION , IDENTIFYING CHAR(1) NOT NULL , constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE TABLE BATCH_STEP_EXECUTION ( STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY , VERSION BIGINT NOT NULL, STEP_NAME VARCHAR(100) NOT NULL, JOB_EXECUTION_ID BIGINT NOT NULL, START_TIME TIMESTAMP NOT NULL , END_TIME TIMESTAMP DEFAULT NULL , STATUS VARCHAR(10) , COMMIT_COUNT BIGINT , READ_COUNT BIGINT , FILTER_COUNT BIGINT , WRITE_COUNT BIGINT , READ_SKIP_COUNT BIGINT , WRITE_SKIP_COUNT BIGINT , PROCESS_SKIP_COUNT BIGINT , ROLLBACK_COUNT BIGINT , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED TIMESTAMP, constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT ( STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT TEXT , constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID) references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) ) ; CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT ( JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT TEXT , constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE; CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE; CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
プロジェクトの作成
以下コマンドを発行し、プロジェクトを作成します。
mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4
作成されたpom.xml
に以下を追記します。
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- 追記箇所↓ --> <!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>4.2.4.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework/spring-jdbc --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>5.2.8.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/commons-dbcp/commons-dbcp --> <dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.2.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql --> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.14</version> <scope>runtime</scope> </dependency> <!-- https:/ /mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <!-- 追記箇所↑ --> </dependencies>
コーディング
こちらのサイトのサンプルを実装します。以下では、変更点や重要な箇所を解説します。また、作成したプロジェクトに関して、こちらに配置しています。
設定ファイル
Spring Batchの設定
src/main/resources/common-batch-context.xml
上で、Spring Batchの設定を行います。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> <!-- Spring Batchで必要になるBeanの定義 --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:maxVarCharLength="2000" p:isolationLevelForCreate="ISOLATION_SERIALIZABLE" /> <bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean" p:dataSource-ref="dataSource" /> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" lazy-init="true"> <property name="dataSource" ref="dataSource" /> </bean> <!-- DBの設定 --> <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClassName" value="org.postgresql.Driver" /> <property name="url" value="jdbc:postgresql://localhost:5432/postgres" /> <property name="username" value="postgres" /> <property name="password" value="password" /> </bean> </beans>
各Beanの説明はこちらをご覧ください。
ジョブ個別の設定
src/main/resources/job-setting.xml
上で、ジョブ個別の設定を行う。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> <!-- SpringBatch共通の設定 --> <import resource="classpath:/common-batch-context.xml"/> <!-- ジョブの設定 --> <job id="job1" xmlns="http://www.springframework.org/schema/batch" incrementer="jobParametersIncrementer"> <step id="step1" parent="simpleStep"> <tasklet> <chunk reader="fileItemReader" writer="fileItemWriter"/> </tasklet> </step> </job> <bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" /> <bean id="simpleStep" class="org.springframework.batch.core.step.item.SimpleStepFactoryBean" abstract="true"> <property name="jobRepository" ref="jobRepository" /> <property name="commitInterval" value="1" /> </bean> <bean id="fileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"> <property name="resource" value="classpath:./input.csv" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="names" value="num,name" /> </bean> </property> <property name="fieldSetMapper"> <bean class="work.garaku.code.example.MemberFieldSetMapper" /> </property> </bean> </property> </bean> <bean id="fileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"> <property name="resource" value="file:./src/main/resources/output.txt" /> <property name="lineAggregator"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator"> <property name="fieldExtractor"> <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor" > <property name="names" value="name,num" /> </bean> </property> </bean> </property> </bean> </beans>
- 前述の「Spring Batchの設定」で記載した設定ファイルの読み込みを行う。
- ジョブの設定を以下の通り設定を行う。
Javaソース
サンプルをまんま拝借しました。
ファイル
src/main/resources
配下に以下の通りinput.csv
を配置します。
1,山田 2,田中
実行
以下の通りコマンドを発行します。
# libフォルダにライブラリを出力します。 mvn dependency:copy-dependencies -DoutputDirectory=lib # ビルドします。 mvn clean package # 実行します。 java -cp 'target/spring_batch_commandline_sample-1.0-SNAPSHOT.jar:lib/*' org.springframework.batch.core.launch.support.CommandLineJobRunner -next job-setting.xml job1
org.springframework.batch.core.launch.support.CommandLineJobRunner
を用いた実行に関して、詳しくはこちらをご覧ください。また、引数で指定した-next
に関して、incrementer
を用いた実行の為に必要になります。詳しくはこちらもご覧ください。
Spring Boot Batchの実行とメタデータテーブル
目次
背景
必要に迫られSrping Boot Batch
の勉強をはじめました。今回は、起動方法と自動で作成されるメタデータテーブルの確認を行います。
ハンズオン
早速ですが動くものを作ります。
事前準備
プロジェクトの作成
spring initializrで以下の通りプロジェクトを作成します。
DBの用意
こちらの記事を元に、PostgreSQLの用意を行います。
コーディング
上記で作成したプロジェクトを元に以下の通りコーディングを行います。
Tasklet
Spring Batch
では、タスクレットとチャンクという2つのモデルが存在します。今回は前者を採用します。また、今回は実行時にコマンドライン引数を渡す為、受け取れる様にしておきます。
@Component @Scope("step") // (1) public class HelloTasklet implements Tasklet { @Value("#{jobParameters[name] ?: \"Nanashi\"}") // (2) private String name; @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("Hello: " + name); return RepeatStatus.FINISHED; // (3) } }
JobParameters
から値を受け取るに当たり、@Scope("step")
を付与する必要があります。詳しくは、こちらを参照ください。JobParameters
から、パラメータ名name
の値を取得します。引数が無い場合、デフォルト値として"Nanashi"が設定されます。- 戻り値に指定可能な値は、
RepeatStatus.CONTINUABLE
(処理継続)とRepeatStatus.FINISHED
(処理終了)の2値になります。今回は、終了するので後者を選択します。
JavaConfig
@Configuration // (1) @EnableBatchProcessing // (2) @AllArgsConstructor public class BatchConfig { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final HelloTasklet helloTasklet; @Bean public Step helloStep() { // (3) return stepBuilderFactory.get("helloStep").tasklet(helloTasklet).build(); } } @Bean public Job helloJob(Step helloStep) { // (4) return jobBuilderFactory.get("helloJob").flow(helloStep).end().build(); }
- Java上で設定を行う為に、
@Configuration
を付与します。 - Spring Batchで必要になるBean定義を自動で行う為に、
@EnableBatchProcessing
を付与します。 - ステップの定義を行う。
- 上記で定義したステップを保持するジョブを定義する。ジョブとステップの関係はこちらを参照ください。
エントリポイント
@SpringBootApplication public class SpringBootBatchSample01Application { public static void main(String[] args) { SpringApplication.run(SpringBootBatchSample01Application.class, args); } }
設定ファイル
src/main/resources
にapplication.yml
を作成し、以下の通り記載します。元々存在したapplication.properties
は削除します。
spring: datasource: url: jdbc:postgresql://localhost:5432/postgres driver-class-name: org.postgresql.Driver username: postgres password: password batch: initialize-schema: always # DBを初期化する為の設定
実行
1回目
実行前のDB
まず、アプリケーションを実行する前にDBの状態を確認しましょう。コンテナ上のDBにログインし*1、以下コマンドを発行します。結果、以下の通りテーブルが1つも存在しないことが確認できます。
postgres=# \dt Did not find any relations.
アプリケーションの実行
次に、プロジェクトのルートディレクトリでmvn spring-boot:run
を発行します。結果、以下の通りデフォルト値のNanashiが出力されました。
... 2020-08-01 13:56:44.433 INFO 14659 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep] Hello: Nanashi 2020-08-01 13:56:44.476 INFO 14659 --- [ main] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 43ms ...
実行後のDB
アプリケーション実行後のDBの状態を再度確認してみましょう。結果、以下の通り6つのテーブルが作成されています。各テーブルの説明に関して、こちらとこちらをご覧ください。
postgres=# \dt List of relations Schema | Name | Type | Owner --------+------------------------------+-------+---------- public | batch_job_execution | table | postgres public | batch_job_execution_context | table | postgres public | batch_job_execution_params | table | postgres public | batch_job_instance | table | postgres public | batch_step_execution | table | postgres public | batch_step_execution_context | table | postgres (6 rows)
batch_job_instance
テーブルとbatch_job_execution
テーブルの中身を確認してみましょう。以下の通り実行が記録されています。
postgres=# select * from batch_job_instance; job_instance_id | version | job_name | job_key -----------------+---------+---------------+---------------------------------- 1 | 0 | helloWorldJob | d41d8cd98f00b204e9800998ecf8427e (1 row) postgres=# select * from batch_job_execution; job_execution_id | version | job_instance_id | create_time | start_time | end_time | status | exit_code | exit_message | last_updated | job_configuration_location ------------------+---------+-----------------+-------------------------+-------------------------+-------------------------+-----------+-----------+--------------+-------------------------+---------------------------- 1 | 2 | 1 | 2020-07-30 08:36:10.414 | 2020-07-30 08:36:10.445 | 2020-07-30 08:36:10.538 | COMPLETED | COMPLETED | | 2020-07-30 08:36:10.538 | (1 row)
2回目
アプリケーションの実行
再度、mvn spring-boot:run
を発行し、アプリケーションを実行してみましょう。結果、以下ログが出力されジョブが起動しませんでした。
2020-07-31 08:02:03.801 INFO 12339 --- [ main] o.s.batch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=helloWorldStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=
調査したところ、JobInstance
はジョブ名とジョブパラメータ毎に生成され、引数なしの再実行は1回目と同様のJobInstance
と見なされた様です。そして、1回目はすでにステータスがCOMPLETED
になっている為、該当のログが出力されました。
そこで、mvn spring-boot:run -Dspring-boot.run.arguments="name=Yamada"
と、コマンドライン引数を与えて実行したことろ、以下の通り無事実行することができました。
... 2020-08-01 17:36:21.093 INFO 14877 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep] Hello: Yamada 2020-08-01 17:36:21.134 INFO 14877 --- [ main] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 40ms ...
実行後のDB
再度、batch_job_instanceテーブルとbatch_job_executionテーブルの中身を確認してみましょう。結果、以下の通り2回分の実行が記録されています。
postgres=# select * from batch_job_instance; job_instance_id | version | job_name | job_key -----------------+---------+---------------+---------------------------------- 1 | 0 | helloWorldJob | d41d8cd98f00b204e9800998ecf8427e 2 | 0 | helloWorldJob | e44a461448a1f43e3c7fcd84c6f17fe3 (2 rows) postgres=# select * from batch_step_execution; step_execution_id | version | step_name | job_execution_id | start_time | end_time | status | commit_count | read_count | filter_count | write_count | read_skip_count | write_skip_count | process_skip_count | rollback_count | exit_code | exit_message | last_updated -------------------+---------+----------------+------------------+-------------------------+-------------------------+-----------+--------------+------------+--------------+-------------+-----------------+------------------+--------------------+----------------+-----------+--------------+------------------------- 1 | 3 | helloWorldStep | 1 | 2020-07-30 08:36:10.486 | 2020-07-30 08:36:10.528 | COMPLETED | 1 | 0 | 0 | 0 | 0 | 0 |0 | 0 | COMPLETED | | 2020-07-30 08:36:10.528 2 | 3 | helloWorldStep | 3 | 2020-07-31 08:03:39.362 | 2020-07-31 08:03:39.407 | COMPLETED | 1 | 0 | 0 | 0 | 0 | 0 |0 | 0 | COMPLETED | | 2020-07-31 08:03:39.407 (2 rows)
*1:https://fujiu.hatenablog.com/entry/2020/06/09/201255を合わせてご参照ください。
ストリーミングレプリケーションをDocker上で
背景
上記書籍を読んでPostgreSQLについてお勉強をしています。そんな中、ストリーミングレプリケーションに関して、実際に動作確認をしたくました。
そこで今回は、Dockerを用いて環境構築を行った後、動作確認を行うまでを実施します。Dockerを用いたPostgreSQLの構築は、以前こちらに記載した為、合わせてご覧下さい。
目次
今回の記事の目次は、以下の通りです。
ハンズオン
環境構築
PostgreSQLを下図の通り、primary, standby構成で構築します。
primary
コンテナ起動(ホスト上)
docker run -d --name=primary --hostname=primary -e POSTGRES_PASSWORD=password postgres
コンテナ上でbash実行(ホスト上)
$ docker exec -it primary /bin/bash
設定変更(コンテナ上)
事前準備
以下の通り、設定ファイルの変更に用いるvimをインストールした後、postgres
ユーザで変更を実施します。
# apt-get update # apt-get install -y vim # su - postgres
ストリーミングレプリケーションの有効化
/var/lib/postgresql/data/postgresql.conf
を以下の通り変更します。
#wal_level = replica
からwal_level = replica
に#max_wal_senders = 10
からmax_wal_senders = 10
に#archive_mode = off
からarchive_mode = on
に#archive_command = ''
からarchive_command = 'cp %p /tmp/%f'
に#synchronous_stanby_names = ''
からsynchronous_stanby_names = 'stanby'
に#synchronous_commit = on
からsynchronous_commit = off
に
standbyからの接続許可
/var/lib/postgresql/data/pg_hba.conf
に以下を追記します。
host replication postgres <下記で確認する、standbyのIPアドレス>/32 trust
設定反映(コンテナ上とホスト上)
ctrl-D
を押下しコンテナから抜けた後、ホスト上で以下の通りコマンドを発行し、PostgreSQLの再起動を実施します。
$ docker restart primary
standby
コンテナ起動(ホスト上)
# docker run -it --name=standby --hostname=standby -e POSTGRES_PASSWORD=password postgres /bin/bash
IPアドレスの確認(ホスト上)
別ウィンドウを開き以下コマンドを発行する。
$ docker network inspect bridge [ { ... "Containers": { "51e8fb3a6db4851a8f09460d2ecd5a702b5a97df8c83e2e24f1f5f740fbf8442": { "Name": "primary", "EndpointID": "5ecb226b6b8383ddcb472e194f3ac0d7eabcd8bcb823f816e6b177072992794a", "MacAddress": "02:42:ac:11:00:02", "IPv4Address": "<primaryのIPアドレス>/16", "IPv6Address": "" }, "bc6606a60176209dc5b66d14ddf18a40467aac5dbdf9440b9c53e8b68b416fdf": { "Name": "standby", "EndpointID": "e5e026ba735dbe7f3284ad81754c7e0203998cb9f153fb0b5ba8306eb4204907", "MacAddress": "02:42:ac:11:00:03", "IPv4Address": "<standbyのIPアドレス>/16", "IPv6Address": "" } }, ... } ]
ベースバックアップの実施(コンテナ上)
standbyを起動したウィンドウに戻り以下コマンドを発行する。
# su - postgres $ export PGDATA=/var/lib/postgresql/data $ pg_basebackup -R -D ${PGDATA} -h <上記で確認した、primaryのIPアドレス> -p 5432 $ chmod 750 -R ${PGDATA}
PostgreSQLの起動(コンテナ上)
$ /usr/lib/postgresql/<バージョン番号>/bin/pg_ctl start
動作確認
primaryで追加したレコードをstandbyで参照する
primaryでレコード追加(ホスト上とコンテナ上)
// ホスト上で以下コマンドを発行する。 $ docker exec -it primary /bin/bash // コンテナ上で以下コマンドを発行する。 # su - postgres $ psql -U postgres postgres=# CREATE TABLE test (id integer); postgres=# INSERT INTO test values(1); postgres=# SELECT * FROM test; id ---- 1 (1 row)
standbyでレコード参照(コンテナ上)
$ psql -U postgres postgres=# SELECT * FROM test; id ---- 1 (1 row)
standbyからprimaryへ昇格
standbyでコマンド発行(コンテナ上)
// PostgreSQLにログイン $ psql -U postgres // 昇格前はINSERTできないことを確認 postgres=# insert into test values(2); 2020-06-11 12:41:13.066 UTC [37] ERROR: cannot execute INSERT in a read-only transaction 2020-06-11 12:41:13.066 UTC [37] STATEMENT: insert into test values(2); ERROR: cannot execute INSERT in a read-only transaction // PostgreSQLからログアウト postgres=# \q // standbyからprimaryへ昇格 $ /usr/lib/postgresql/<バージョン番号>/bin/pg_ctl promote // 昇格後はINSERTできることを確認 $ psql -U postgres postgres=# insert into test values(2); postgres=# select * from test; id ---- 1 2 (2 rows)
Docerを用いてPostgreSQL環境を速攻構築
背景
ひょんなことから、仕事でPostgreSQLを使うことになった為、Dockerを用いてぱっと触れる実験環境の構築手順をメモしておきます。
ハンズオン
起動(ホスト上)
$ docker run --name postgres --rm -e POSTGRES_PASSWORD=password -p 5432:5432 postgres
コンテナ上でbashを実行(ホスト上)
$ docker exec -it postgres /bin/bash
DBにログイン(コンテナ上)
$ su - postgres $ psql -U postgres postgres=# // DBにログイン完了
よく使うコマンド
データベース
- データベースの一覧を表示:
\l
- データベースを選択:
\c <データベース名>
スキーマ
テーブル
- テーブルの一覧を表示:
\dt
- テーブルの構造を表示:
\d <テーブル名>
各種ファイルのパス
- データベースクラスタ:
/var/lib/postgresql/data/
- 設定ファイル:
/var/lib/postgresql/data/postgresql.conf