Anarchy In the 1K

Kafkaコンシューマのリバランスをローカルで体験

TOC

目的

ローカルでコンシューマリバランスを発生させ、その際の挙動を確認します。
Kafkaにおいて、Topic上のPartitionは、Consumer Group上のConsumerに下図の様に紐付けされます。

PartitionとConsumerの関係
リバランスが発生すると、PartitionとConsumerの紐付けが更新されます。

やってみた

概要

PartitionとConsumerの紐づけは、Brokerが管理しており、タイムアウトを契機にリバランスが発生します。

1 つのブローカーが コーディネーター に指定され、グループのメンバーおよびパーティション割り当ての管理を担います。...
セッションタイムアウトの期限までにハートビートを受信しなかった場合、コーディネーターはそのメンバーをグループから除外し、そのメンバーのパーティションを別のメンバーに割り当て直します。
Kafka コンシューマー | Confluent Documentationより

以降では、Javaでコンシューマを実装し、タイムアウトさせてみましょう。 使用したコードはこちらから参照可能です。

事前調査

リバランスが発生する契機となるタイムアウトに関して、以下の2つが存在します。今回は2つ目の方法でタイムアウトを発生させます。

  1. session.timeout.ms: コンシューマからのheartbeatを待つ時間
  2. max.poll.interval.ms: コンシューマからのメッセージ取得を待つ時間

確認手順

Kafka

リポジトリのルートフォルダで、以下手順で準備を行います。

$ docker-compose -f docker/docker-compose.yml up -d
$ docker exec -it kafka /bin/bash

// 以降、コンテナ内での作業
// testトピックをパーティション数1で作成
# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 1 --replication-factor 1 --topic test
// プロヂューサー側でのデータ積み込み
# /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> 1
> 2
> 3
> 4
> 5

コンシューマ(Java)

メッセージの取得に関して、以下の通り設定を行いました。

  • 1度のpollメソッドで、1つのメッセージを取得
  • max.poll.interval.msに、2000を設定

Ref: https://github.com/U0326/samp-impl-kafka-api/blob/3876ddd285e7e853c722b796d34401dafac17585/src/main/java/work/garaku/code/example/ConsumerExample.java#L21-L22

そして、1メッセージを受信する毎に、1秒づつ待ち時間が増える様にしました。
Ref: https://github.com/U0326/samp-impl-kafka-api/blob/3876ddd285e7e853c722b796d34401dafac17585/src/main/java/work/garaku/code/example/ConsumerExample.java#L36
あとは、mainメソッドを実行します。

結果確認

Consumer(Java)のログ

まず、Thread-1で3つ目のメッセージまで行われ、オフセットのコミット前にタイムアウトが発生します。
次に、Thread-0で3つ目のメッセージを再度取得し、5つ目のメッセージを取得した後、オフセットのコミット前にタイムアウトが発生します。

thread_name: Thread-1, message_key: null, message_value:1
thread_name: Thread-1, message_key: null, message_value:2
thread_name: Thread-1, message_key: null, message_value:3
thread_name: Thread-0, message_key: null, message_value:3
Exception in thread "Thread-1" org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1090)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1501)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1400)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1357)
    at work.garaku.code.example.ConsumerExample.lambda$static$0(ConsumerExample.java:37)
    at java.base/java.lang.Thread.run(Thread.java:834)
thread_name: Thread-0, message_key: null, message_value:4
thread_name: Thread-0, message_key: null, message_value:5
Exception in thread "Thread-0" org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1090)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1501)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1400)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1357)
    at work.garaku.code.example.ConsumerExample.lambda$static$0(ConsumerExample.java:37)
    at java.base/java.lang.Thread.run(Thread.java:834)

Kafkaのログ

5行目のログで2つのコンシューマが接続したことが記録されています。
6-9行目でリバランスの実施が記録されています。

kafka_1      | [2022-05-29 07:49:55,315] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 210 (__consumer_offsets-6) (reason: Adding new member consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:49:55,320] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 211 (__consumer_offsets-6) with 1 members (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:49:55,322] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 211 (__consumer_offsets-6) (reason: Adding new member consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e with group instance id None) (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:49:55,338] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 212 (__consumer_offsets-6) with 2 members (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:49:55,344] INFO [GroupCoordinator 1001]: Assignment received from leader for group consumer-1 for generation 212. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:00,459] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7] in group consumer-1 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:00,460] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 212 (__consumer_offsets-6) (reason: removing member consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:01,322] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 213 (__consumer_offsets-6) with 1 members (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:01,326] INFO [GroupCoordinator 1001]: Assignment received from leader for group consumer-1 for generation 213. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:06,438] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e] in group consumer-1 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:06,439] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 213 (__consumer_offsets-6) (reason: removing member consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:06,439] INFO [GroupCoordinator 1001]: Group consumer-1 with generation 214 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)

Spring BatchでPartitioningを用いて処理の多重化

背景

 Spring Batchのパフォーマンスの改善の為に、Partitioningを用いた処理の多重化を検討することになりました。今回の記事では、実際に動くものを元に解説を行います。

目次

解説

概説

 Partitioningでは、masterステップ, slaveステップの順に処理が実行され、それぞれ以下の役割があります。

  • masterステップ: partitionerを実行し、slaveステップで参照する値を用意します。(多重化に入る前の準備)
  • slaveステップ: 多重化し実行される部分がここです。

詳説

 作成したコードを元に解説を行います。コードはこちらに配置しています。

設定

 ジョブの設定を行っているsrc/resources/job-setting.xmlに関して、以下が重要な箇所になります。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    ...
    <!--  ジョブの定義  -->
    <bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer"/>
    <batch:job id="job01" job-repository="jobRepository" incrementer="jobParametersIncrementer">
        <batch:step id="master"> <!-- (1) -->
            <batch:partition partitioner="samplePartitioner" step="slave"> <!-- (2) -->
                <batch:handler grid-size="4" task-executor="parallelTaskExecutor"/>  <!-- (3) -->
            </batch:partition>
        </batch:step>
    </batch:job>

    <batch:step id="slave"> <!-- (4) -->
        <batch:tasklet transaction-manager="transactionManager" ref="customerTasklet"/>
    </batch:step>

    <!--  slaveステップを処理するスレッドプール  -->
    <task:executor id="taskExecutor" pool-size="3" queue-capacity="1"/> <!-- (5) -->
    ...
</beans>
  • 1: masterステップの定義を開始します。
  • 2: partitionerに後ほど用意するbeanを設定し、slaveステップで実行するステップを設定します。
  • 3: grid-sizeに関して、partitionerで参照しslaveステップをいくつ多重化するかの決定に用いることができます。また、task-executorには、slaveステップを処理するスレッドプールを設定します。
  • 4: slaveステップで実行するステップの定義を行います。
  • 5: pool-sizeにスレッド数を、queue-capacityにスレッド処理待ちのキューの上限を設定します。

コード

slaveステップで呼び出すTasklet

 今回の解説とあまり関係ない為、処理の詳細は省きます。簡単に説明すると、LIMIT, OFFSETを設定しSELECTすることで、DBから値を取得します。

@Component
@Slf4j
public class CustomerTasklet implements Tasklet {
  @Autowired private CustomerRepository repository;

  @Override
  public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext)
      throws Exception {
    log.debug("tasklet started.");
    Map<String, Object> context = chunkContext.getStepContext().getStepExecutionContext(); // (1)
    List<Customer> customers =
        repository.findAll((int) context.get("limit"), (int) context.get("offset")); // (2)
    customers.forEach(System.out::println);
    return RepeatStatus.FINISHED;
  }

 (1)でStepExecutionContextを取得し、(2)で後述のPartitionerで設定した値を取得しています。

Partitioner

@Component
@Slf4j
public class SamplePartitioner implements Partitioner {
  @Autowired
  private CustomerRepository repository;

  @Override
  public Map<String, ExecutionContext> partition(int gridSize) {
    int totalCount = repository.takeTotalCount();
    int limit = (totalCount / gridSize) + 1;
    int offset = 0;
    Map<String, ExecutionContext> map = new HashMap<>();

    for (int i = 0; i < gridSize; i++) {
      ExecutionContext context = new ExecutionContext(); // (1)
      context.put("limit", limit); // (2)
      context.put("offset", offset); // (3)
      map.put(String.valueOf(i), context); // (4)
      offset += limit;
    }
    return map;
  }
}

 (1)でExecutionContextを用意し、(2), (3)で"key, value"の形式で値を設定します(slaveステップではこのkey値を用いて値を取得します。)。
 そして(4)で、戻り値として返却するmapにExecutionContextを設定します。slaveステップに関して、mapに設定されたExecutionContextの数だけ生成されます。

Docerを用いてコマンドラインだけでKafka入門

目次

背景

 必要に迫られて、Kafkaの勉強をはじめました。今回はDockerを用いて環境を速攻で構築します。

ハンズオン

Kafka起動

 docker-compose.ymlに以下の通り記載し、該当ファイルが存在するディレクトリで上で、docker-compose up --no-recreate --scale kafka=<起動したいブローカー数>を発行します。

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    expose:
      - "2181"
  kafka:
    image: wurstmeister/kafka
    expose:
      - "9092"
    environment:
      HOSTNAME_COMMAND: hostname
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - "zookeeper"

動作確認

トピック作成

 kafkaのコンテナ上で以下を実行します。

# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions <任意の数> --replication-factor <任意の数> --topic test

主要なオプションに関しては、以下の通りです。

  • --partitions: パーティションの数を指定する。詳しくはこちらを参照ください。
  • --replication-factor: パーティション毎に作成されるレプリカの数を指定します。起動したブローカー数以下である必要があります。詳しくは、上記リンクを参照ください。

コンシューマ起動

 kafkaのコンテナ上で以下を実行します。

# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group consumer-group

主要なオプションに関しては、以下の通りです。

  • --bootstrap-server: 定期的なメタデータ(どのブローカーが利用可能か)の取得先を設定する。あくまでメタデータの取得先であり、実データの読み込みは、全てのブローカーから行う。

プロデューサ起動

 kafkaのコンテナ上で以下を実行します。入力した文字がコンシューマ上で出力されます。

# /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> // 文字を入力

主要なオプションに関しては、以下の通りです。

  • --broker-list: 定期的なメタデータ(どのブローカーが利用可能か)の取得先を設定する。あくまでメタデータの取得先であり、実データの書き込みは、全てのブローカーへ行う。

オフセットの状態確認

 kafkaのコンテナ上で以下を実行します。トピック, パーティション毎のオフセットが確認可能です。

# /opt/kafka/bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group consumer-group
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
consumer-group  test            0          130             143             13              -               -               -

CommandLineJobRunnerを用いたSpring Batchの実行

目次

背景

 前回こちらの記事でSpring Boot Batchを用いたバッチ実行を行いました。今回は、コマンドラインからのバッチ実行を試みます。
 加えて、今回はSpring Bootを用いずに実装を行うことで、前回自動で設定されていた部分を手動で設定します。

ハンズオン

事前準備

DBの用意

 こちらの記事を元に、PostgreSQLを用意します。その後、Spring Batchで用いる以下テーブルを以下の通り作成します。

CREATE TABLE BATCH_JOB_INSTANCE  (
    JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT ,
    JOB_NAME VARCHAR(100) NOT NULL,
    JOB_KEY VARCHAR(32) NOT NULL,
    constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;

CREATE TABLE BATCH_JOB_EXECUTION  (
    JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT  ,
    JOB_INSTANCE_ID BIGINT NOT NULL,
    CREATE_TIME TIMESTAMP NOT NULL,
    START_TIME TIMESTAMP DEFAULT NULL ,
    END_TIME TIMESTAMP DEFAULT NULL ,
    STATUS VARCHAR(10) ,
    EXIT_CODE VARCHAR(2500) ,
    EXIT_MESSAGE VARCHAR(2500) ,
    LAST_UPDATED TIMESTAMP,
    JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
    constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
    references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
    JOB_EXECUTION_ID BIGINT NOT NULL ,
    TYPE_CD VARCHAR(6) NOT NULL ,
    KEY_NAME VARCHAR(100) NOT NULL ,
    STRING_VAL VARCHAR(250) ,
    DATE_VAL TIMESTAMP DEFAULT NULL ,
    LONG_VAL BIGINT ,
    DOUBLE_VAL DOUBLE PRECISION ,
    IDENTIFYING CHAR(1) NOT NULL ,
    constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION  (
    STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    VERSION BIGINT NOT NULL,
    STEP_NAME VARCHAR(100) NOT NULL,
    JOB_EXECUTION_ID BIGINT NOT NULL,
    START_TIME TIMESTAMP NOT NULL ,
    END_TIME TIMESTAMP DEFAULT NULL ,
    STATUS VARCHAR(10) ,
    COMMIT_COUNT BIGINT ,
    READ_COUNT BIGINT ,
    FILTER_COUNT BIGINT ,
    WRITE_COUNT BIGINT ,
    READ_SKIP_COUNT BIGINT ,
    WRITE_SKIP_COUNT BIGINT ,
    PROCESS_SKIP_COUNT BIGINT ,
    ROLLBACK_COUNT BIGINT ,
    EXIT_CODE VARCHAR(2500) ,
    EXIT_MESSAGE VARCHAR(2500) ,
    LAST_UPDATED TIMESTAMP,
    constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
    STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT ,
    constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
    references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
    JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    SERIALIZED_CONTEXT TEXT ,
    constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ MAXVALUE 9223372036854775807 NO CYCLE;
CREATE SEQUENCE BATCH_JOB_SEQ MAXVALUE 9223372036854775807 NO CYCLE;

プロジェクトの作成

 以下コマンドを発行し、プロジェクトを作成します。

mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4

作成されたpom.xmlに以下を追記します。

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!-- 追記箇所↓ -->
    <!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core -->
    <dependency>
      <groupId>org.springframework.batch</groupId>
      <artifactId>spring-batch-core</artifactId>
      <version>4.2.4.RELEASE</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.springframework/spring-jdbc -->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jdbc</artifactId>
      <version>5.2.8.RELEASE</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/commons-dbcp/commons-dbcp -->
    <dependency>
      <groupId>commons-dbcp</groupId>
      <artifactId>commons-dbcp</artifactId>
      <version>1.2.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>42.2.14</version>
      <scope>runtime</scope>
    </dependency>
    <!-- https:/
    /mvnrepository.com/artifact/org.projectlombok/lombok -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.12</version>
      <scope>provided</scope>
    </dependency>
    <!-- 追記箇所↑ -->
  </dependencies>

コーディング

 こちらのサイトのサンプルを実装します。以下では、変更点や重要な箇所を解説します。また、作成したプロジェクトに関して、こちらに配置しています。

設定ファイル

Spring Batchの設定

 src/main/resources/common-batch-context.xml上で、Spring Batchの設定を行います。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xsi:schemaLocation="http://www.springframework.org/schema/batch
  http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
    <!-- Spring Batchで必要になるBeanの定義 -->
    <bean id="jobRepository"
          class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"
          p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager"
          p:maxVarCharLength="2000" p:isolationLevelForCreate="ISOLATION_SERIALIZABLE" />
    <bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean"
          p:dataSource-ref="dataSource" />
    <bean id="jobLauncher"
          class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>
    <bean id="transactionManager"
          class="org.springframework.jdbc.datasource.DataSourceTransactionManager" lazy-init="true">
        <property name="dataSource" ref="dataSource" />
    </bean>

    <!-- DBの設定 -->
    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
        <property name="driverClassName" value="org.postgresql.Driver" />
        <property name="url" value="jdbc:postgresql://localhost:5432/postgres" />
        <property name="username" value="postgres" />
        <property name="password" value="password" />
    </bean>
</beans>

各Beanの説明はこちらをご覧ください。

ジョブ個別の設定

 src/main/resources/job-setting.xml上で、ジョブ個別の設定を行う。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/batch
    http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
    <!--  SpringBatch共通の設定  -->
    <import resource="classpath:/common-batch-context.xml"/>

    <!--  ジョブの設定  -->
    <job id="job1" xmlns="http://www.springframework.org/schema/batch"
         incrementer="jobParametersIncrementer">
        <step id="step1" parent="simpleStep">
            <tasklet>
                <chunk reader="fileItemReader" writer="fileItemWriter"/>
            </tasklet>
        </step>
    </job>

    <bean id="jobParametersIncrementer"
          class="org.springframework.batch.core.launch.support.RunIdIncrementer" />
    <bean id="simpleStep"
          class="org.springframework.batch.core.step.item.SimpleStepFactoryBean" abstract="true">
        <property name="jobRepository" ref="jobRepository" />
        <property name="commitInterval" value="1" />
    </bean>

    <bean id="fileItemReader"
          class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="resource" value="classpath:./input.csv" />
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="names" value="num,name" />
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="work.garaku.code.example.MemberFieldSetMapper" />
                </property>
            </bean>
        </property>
    </bean>

    <bean id="fileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:./src/main/resources/output.txt" />
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor" >
                        <property name="names" value="name,num" />
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>
  1. 前述の「Spring Batchの設定」で記載した設定ファイルの読み込みを行う。
  2. ジョブの設定を以下の通り設定を行う。
    • incrementer: JobInstanceはジョブ名とジョブパラメータ毎に生成される為、incrementerが一意なパラメータを渡すことで、実行の都度一意なJobInstanceが生成されます。*1
    • parent: parentに指定したStepを継承し、プロパティを書き換えることが可能です。詳しくはこちらをご覧ください。
    • chunk: 後続で設定している、reader, writerの設定を行います。

Javaソース

 サンプルをまんま拝借しました。

ファイル

 src/main/resources配下に以下の通りinput.csvを配置します。

1,山田
2,田中

実行

 以下の通りコマンドを発行します。

# libフォルダにライブラリを出力します。
mvn dependency:copy-dependencies -DoutputDirectory=lib
# ビルドします。
mvn clean package
# 実行します。
java -cp 'target/spring_batch_commandline_sample-1.0-SNAPSHOT.jar:lib/*' org.springframework.batch.core.launch.support.CommandLineJobRunner -next job-setting.xml job1

 org.springframework.batch.core.launch.support.CommandLineJobRunnerを用いた実行に関して、詳しくはこちらをご覧ください。また、引数で指定した-nextに関して、incrementerを用いた実行の為に必要になります。詳しくはこちらもご覧ください。

*1:合わせて次もご覧ください。https://fujiu.hatenablog.com/entry/2020/08/01/174618#2回目

Spring Boot Batchの実行とメタデータテーブル

目次

背景

 必要に迫られSrping Boot Batchの勉強をはじめました。今回は、起動方法と自動で作成されるメタデータテーブルの確認を行います。

ハンズオン

 早速ですが動くものを作ります。

事前準備

プロジェクトの作成

 spring initializrで以下の通りプロジェクトを作成します。 f:id:fujiU:20200731085332p:plain

DBの用意

 こちらの記事を元に、PostgreSQLの用意を行います。

コーディング

 上記で作成したプロジェクトを元に以下の通りコーディングを行います。

Tasklet

 Spring Batchでは、タスクレットとチャンクという2つのモデルが存在します。今回は前者を採用します。また、今回は実行時にコマンドライン引数を渡す為、受け取れる様にしておきます。

@Component
@Scope("step") // (1)
public class HelloTasklet implements Tasklet {
  @Value("#{jobParameters[name] ?: \"Nanashi\"}") // (2)
  private String name;

  @Override
  public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext)
      throws Exception {
    System.out.println("Hello: " + name);
    return RepeatStatus.FINISHED; // (3)
  }
}
  1. JobParametersから値を受け取るに当たり、@Scope("step")を付与する必要があります。詳しくは、こちらを参照ください。
  2. JobParametersから、パラメータ名nameの値を取得します。引数が無い場合、デフォルト値として"Nanashi"が設定されます。
  3. 戻り値に指定可能な値は、RepeatStatus.CONTINUABLE(処理継続)とRepeatStatus.FINISHED(処理終了)の2値になります。今回は、終了するので後者を選択します。

JavaConfig

@Configuration // (1)
@EnableBatchProcessing // (2)
@AllArgsConstructor
public class BatchConfig {
  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final HelloTasklet helloTasklet;

  @Bean
  public Step helloStep() { // (3)
    return stepBuilderFactory.get("helloStep").tasklet(helloTasklet).build();
  }
}

  @Bean
  public Job helloJob(Step helloStep) { // (4)
    return jobBuilderFactory.get("helloJob").flow(helloStep).end().build();
  }
  1. Java上で設定を行う為に、@Configurationを付与します。
  2. Spring Batchで必要になるBean定義を自動で行う為に、@EnableBatchProcessingを付与します。
  3. ステップの定義を行う。
  4. 上記で定義したステップを保持するジョブを定義する。ジョブとステップの関係はこちらを参照ください。

エントリポイント

@SpringBootApplication
public class SpringBootBatchSample01Application {
  public static void main(String[] args) {
    SpringApplication.run(SpringBootBatchSample01Application.class, args);
  }
}

設定ファイル

 src/main/resourcesapplication.ymlを作成し、以下の通り記載します。元々存在したapplication.propertiesは削除します。

spring:
  datasource:
    url: jdbc:postgresql://localhost:5432/postgres
    driver-class-name: org.postgresql.Driver
    username: postgres
    password: password
  batch:
    initialize-schema: always # DBを初期化する為の設定

実行

1回目

実行前のDB

 まず、アプリケーションを実行する前にDBの状態を確認しましょう。コンテナ上のDBにログインし*1、以下コマンドを発行します。結果、以下の通りテーブルが1つも存在しないことが確認できます。

postgres=# \dt
Did not find any relations.

アプリケーションの実行

 次に、プロジェクトのルートディレクトリでmvn spring-boot:runを発行します。結果、以下の通りデフォルト値のNanashiが出力されました。

...
2020-08-01 13:56:44.433  INFO 14659 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [helloWorldStep]
Hello: Nanashi
2020-08-01 13:56:44.476  INFO 14659 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [helloWorldStep] executed in 43ms
...

実行後のDB

 アプリケーション実行後のDBの状態を再度確認してみましょう。結果、以下の通り6つのテーブルが作成されています。各テーブルの説明に関して、こちらこちらをご覧ください。

postgres=# \dt 
                    List of relations
 Schema |             Name             | Type  |  Owner   
--------+------------------------------+-------+----------
 public | batch_job_execution          | table | postgres
 public | batch_job_execution_context  | table | postgres
 public | batch_job_execution_params   | table | postgres
 public | batch_job_instance           | table | postgres
 public | batch_step_execution         | table | postgres
 public | batch_step_execution_context | table | postgres
(6 rows)

batch_job_instanceテーブルとbatch_job_executionテーブルの中身を確認してみましょう。以下の通り実行が記録されています。

postgres=# select * from batch_job_instance;
 job_instance_id | version |   job_name    |             job_key
-----------------+---------+---------------+----------------------------------
               1 |       0 | helloWorldJob | d41d8cd98f00b204e9800998ecf8427e
(1 row)

postgres=# select * from batch_job_execution;
 job_execution_id | version | job_instance_id |       create_time       |       start_time        |        end_time         |  status   | exit_code | exit_message |      last_updated       | job_configuration_location
------------------+---------+-----------------+-------------------------+-------------------------+-------------------------+-----------+-----------+--------------+-------------------------+----------------------------
                1 |       2 |               1 | 2020-07-30 08:36:10.414 | 2020-07-30 08:36:10.445 | 2020-07-30 08:36:10.538 | COMPLETED | COMPLETED |              | 2020-07-30 08:36:10.538 |
(1 row)

2回目

アプリケーションの実行

 再度、mvn spring-boot:runを発行し、アプリケーションを実行してみましょう。結果、以下ログが出力されジョブが起動しませんでした。

2020-07-31 08:02:03.801  INFO 12339 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=helloWorldStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=

 調査したところ、JobInstanceはジョブ名とジョブパラメータ毎に生成され、引数なしの再実行は1回目と同様のJobInstanceと見なされた様です。そして、1回目はすでにステータスがCOMPLETEDになっている為、該当のログが出力されました。
 そこで、mvn spring-boot:run -Dspring-boot.run.arguments="name=Yamada"と、コマンドライン引数を与えて実行したことろ、以下の通り無事実行することができました。

...
2020-08-01 17:36:21.093  INFO 14877 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [helloWorldStep]
Hello: Yamada
2020-08-01 17:36:21.134  INFO 14877 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [helloWorldStep] executed in 40ms
...

実行後のDB

 再度、batch_job_instanceテーブルとbatch_job_executionテーブルの中身を確認してみましょう。結果、以下の通り2回分の実行が記録されています。

postgres=# select * from batch_job_instance;
 job_instance_id | version |   job_name    |             job_key
-----------------+---------+---------------+----------------------------------
               1 |       0 | helloWorldJob | d41d8cd98f00b204e9800998ecf8427e
               2 |       0 | helloWorldJob | e44a461448a1f43e3c7fcd84c6f17fe3
(2 rows)

postgres=# select * from batch_step_execution;
 step_execution_id | version |   step_name    | job_execution_id |       start_time        |        end_time         |  status   | commit_count | read_count | filter_count | write_count | read_skip_count | write_skip_count | process_skip_count | rollback_count | exit_code | exit_message |      last_updated
-------------------+---------+----------------+------------------+-------------------------+-------------------------+-----------+--------------+------------+--------------+-------------+-----------------+------------------+--------------------+----------------+-----------+--------------+-------------------------
                 1 |       3 | helloWorldStep |                1 | 2020-07-30 08:36:10.486 | 2020-07-30 08:36:10.528 | COMPLETED |            1 |          0 |            0 |           0 |               0 |                0 |0 |              0 | COMPLETED |              | 2020-07-30 08:36:10.528
                 2 |       3 | helloWorldStep |                3 | 2020-07-31 08:03:39.362 | 2020-07-31 08:03:39.407 | COMPLETED |            1 |          0 |            0 |           0 |               0 |                0 |0 |              0 | COMPLETED |              | 2020-07-31 08:03:39.407
(2 rows)

*1:https://fujiu.hatenablog.com/entry/2020/06/09/201255を合わせてご参照ください。

ストリーミングレプリケーションをDocker上で

背景


 上記書籍を読んでPostgreSQLについてお勉強をしています。そんな中、ストリーミングレプリケーションに関して、実際に動作確認をしたくました。
 そこで今回は、Dockerを用いて環境構築を行った後、動作確認を行うまでを実施します。Dockerを用いたPostgreSQLの構築は、以前こちらに記載した為、合わせてご覧下さい。

目次

 今回の記事の目次は、以下の通りです。

ハンズオン

環境構築

 PostgreSQLを下図の通り、primary, standby構成で構築します。 f:id:fujiU:20200612133842p:plain

primary

コンテナ起動(ホスト上)

docker run -d --name=primary --hostname=primary -e POSTGRES_PASSWORD=password postgres

コンテナ上でbash実行(ホスト上)

$ docker exec -it primary /bin/bash

設定変更(コンテナ上)

事前準備

 以下の通り、設定ファイルの変更に用いるvimをインストールした後、postgresユーザで変更を実施します。

# apt-get update
# apt-get install -y vim
# su - postgres
ストリーミングレプリケーションの有効化

 /var/lib/postgresql/data/postgresql.confを以下の通り変更します。

  • #wal_level = replicaからwal_level = replica
  • #max_wal_senders = 10からmax_wal_senders = 10
  • #archive_mode = offからarchive_mode = on
  • #archive_command = ''からarchive_command = 'cp %p /tmp/%f'
  • #synchronous_stanby_names = ''からsynchronous_stanby_names = 'stanby'
  • #synchronous_commit = onからsynchronous_commit = off
standbyからの接続許可

 /var/lib/postgresql/data/pg_hba.confに以下を追記します。

  • host replication postgres <下記で確認する、standbyのIPアドレス>/32 trust

設定反映(コンテナ上とホスト上)

 ctrl-Dを押下しコンテナから抜けた後、ホスト上で以下の通りコマンドを発行し、PostgreSQLの再起動を実施します。

$ docker restart primary

standby

コンテナ起動(ホスト上)

# docker run -it --name=standby --hostname=standby -e POSTGRES_PASSWORD=password postgres /bin/bash

IPアドレスの確認(ホスト上)

 別ウィンドウを開き以下コマンドを発行する。

$ docker network inspect bridge
[
    {
        ...
        "Containers": {
            "51e8fb3a6db4851a8f09460d2ecd5a702b5a97df8c83e2e24f1f5f740fbf8442": {
                "Name": "primary",
                "EndpointID": "5ecb226b6b8383ddcb472e194f3ac0d7eabcd8bcb823f816e6b177072992794a",
                "MacAddress": "02:42:ac:11:00:02",
                "IPv4Address": "<primaryのIPアドレス>/16",
                "IPv6Address": ""
            },
            "bc6606a60176209dc5b66d14ddf18a40467aac5dbdf9440b9c53e8b68b416fdf": {
                "Name": "standby",
                "EndpointID": "e5e026ba735dbe7f3284ad81754c7e0203998cb9f153fb0b5ba8306eb4204907",
                "MacAddress": "02:42:ac:11:00:03",
                "IPv4Address": "<standbyのIPアドレス>/16",
                "IPv6Address": ""
            }
        },
        ...
  }
]

ベースバックアップの実施(コンテナ上)

 standbyを起動したウィンドウに戻り以下コマンドを発行する。

# su - postgres
$ export PGDATA=/var/lib/postgresql/data 
$ pg_basebackup -R -D ${PGDATA} -h <上記で確認した、primaryのIPアドレス> -p 5432
$ chmod 750 -R ${PGDATA}

PostgreSQLの起動(コンテナ上)

$ /usr/lib/postgresql/<バージョン番号>/bin/pg_ctl start

動作確認

primaryで追加したレコードをstandbyで参照する

primaryでレコード追加(ホスト上とコンテナ上)

// ホスト上で以下コマンドを発行する。
$ docker exec -it primary /bin/bash

// コンテナ上で以下コマンドを発行する。
# su - postgres
$ psql -U postgres

postgres=# CREATE TABLE test (id integer);
postgres=# INSERT INTO test values(1);
postgres=# SELECT * FROM test;
 id 
----
  1
(1 row)

standbyでレコード参照(コンテナ上)

$ psql -U postgres

postgres=# SELECT * FROM test;
 id 
----
  1
(1 row)

standbyからprimaryへ昇格

standbyでコマンド発行(コンテナ上)

// PostgreSQLにログイン
$ psql -U postgres

// 昇格前はINSERTできないことを確認
postgres=# insert into test values(2);
2020-06-11 12:41:13.066 UTC [37] ERROR:  cannot execute INSERT in a read-only transaction
2020-06-11 12:41:13.066 UTC [37] STATEMENT:  insert into test values(2);
ERROR:  cannot execute INSERT in a read-only transaction

// PostgreSQLからログアウト
postgres=# \q

// standbyからprimaryへ昇格
$ /usr/lib/postgresql/<バージョン番号>/bin/pg_ctl promote

// 昇格後はINSERTできることを確認
$ psql -U postgres
postgres=# insert into test values(2);
postgres=# select * from test;
 id 
----
  1
  2
(2 rows)

Docerを用いてPostgreSQL環境を速攻構築

背景

 ひょんなことから、仕事でPostgreSQLを使うことになった為、Dockerを用いてぱっと触れる実験環境の構築手順をメモしておきます。

ハンズオン

起動(ホスト上)

$ docker run --name postgres --rm -e POSTGRES_PASSWORD=password -p 5432:5432 postgres

コンテナ上でbashを実行(ホスト上)

$ docker exec -it postgres /bin/bash

DBにログイン(コンテナ上)

$ su - postgres
$ psql -U postgres
postgres=# // DBにログイン完了

よく使うコマンド

データベース

  • データベースの一覧を表示: \l
  • データベースを選択: \c <データベース名>

スキーマ

テーブル

  • テーブルの一覧を表示: \dt
  • テーブルの構造を表示: \d <テーブル名>

各種ファイルのパス

  • データベースクラスタ: /var/lib/postgresql/data/
  • 設定ファイル: /var/lib/postgresql/data/postgresql.conf