어지간한 상품을 판매하는 사이트에는 다 있는 포인트에 대해서, 직접 배치를 사용해 포인트 관련 일괄처리를 공부해보고 스프링 배치에 대해 좀 더 익숙해지는 시간을 가져보자.
프로젝트 만들기
현재 기준 Spring Boot 3.3.5 버전으로 프로젝트를 만들었다. 그리고 의존성은 다음을 참고한다.
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'com.h2database:h2'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.batch:spring-batch-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testImplementation 'org.springframework.batch:spring-batch-test'
implementation 'com.querydsl:querydsl-jpa:5.0.0:jakarta'
annotationProcessor "com.querydsl:querydsl-apt:5.0.0:jakarta"
annotationProcessor "jakarta.annotation:jakarta.annotation-api"
annotationProcessor "jakarta.persistence:jakarta.persistence-api"
- Spring Batch
- Spring Data JPA
- Lombok
- QueryDSL
- H2
이렇게 5개 정도를 추가하면 된다.
build.gradle (전체)
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.5'
id 'io.spring.dependency-management' version '1.1.6'
}
group = 'cwchoiit'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'com.h2database:h2'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.batch:spring-batch-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testImplementation 'org.springframework.batch:spring-batch-test'
implementation 'com.querydsl:querydsl-jpa:5.0.0:jakarta'
annotationProcessor "com.querydsl:querydsl-apt:5.0.0:jakarta"
annotationProcessor "jakarta.annotation:jakarta.annotation-api"
annotationProcessor "jakarta.persistence:jakarta.persistence-api"
}
tasks.named('test') {
useJUnitPlatform()
}
// querydsl directory path
def querydslDir = "src/main/generated"
// querydsl directory를 자동 임포트 할 수 있게 설정
sourceSets {
main.java.srcDirs += [ querydslDir ]
}
// task 중 compileJava를 실행하면 Q파일들을 생성
tasks.withType(JavaCompile).configureEach {
options.getGeneratedSourceOutputDirectory().set(file(querydslDir))
}
// clean을 하면 querydsl directory를 삭제
clean.doLast {
file(querydslDir).deleteDir()
}
@EnableBatchProcessing
package cwchoiit.pointbatch.config;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.context.annotation.Configuration;
@EnableBatchProcessing
@Configuration
public class BatchConfig {
}
- 가장 먼저, 스프링 배치를 사용하기 위해 @EnableBatchProcessing 애노테이션을 달아준다. 그리고 이 애노테이션이 적용되기 위해 @Configuration 애노테이션도 달아준다.
application.yaml
spring:
batch:
job:
name: ${job.name:NONE}
jdbc:
initialize-schema: always
jpa:
show-sql: true
hibernate:
ddl-auto: validate
datasource:
driver-class-name: org.h2.Driver
url: jdbc:h2:tcp://localhost/~/h2/db/point
username: sa
password:
- H2 데이터베이스는 미리 다운받고, 데이터베이스를 만들면 된다. 나는 `point`라는 이름의 데이터베이스를 만들었다.
- `spring.batch.job.name` : ${job.name:NONE} → 이 스프링 배치 프로그램을 패키징하여 Jar 파일로 만들고 해당 파일을 실행할때, 어떤 Job을 실행시킬건지 명명해줘야 한다. 근데 이름이 너무 기니까 그 긴 이름을 `job.name`으로 축소한것이라고 생각하면 된다. 그리고 만약, Job을 선택하지 않은 경우 모든 Job이 다 실행되는데 그것은 싫으니 기본값으로 NONE을 명명했다.
- 나머지 데이터베이스 관련 설정은 넘어간다. 이 포스팅에서 다룰 내용은 아니라고 생각한다. 이미 이것을 다룬 포스팅이 내 블로그에 많다.
데이터베이스 테이블 만들기
우선, 데이터베이스 테이블을 만들자. DDL은 미리 준비해 두었다.
CREATE TABLE `point_wallet`
(
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`amount` bigint NOT NULL COMMENT '보유금액',
`user_id` varchar(20) NOT NULL COMMENT '유저 ID',
PRIMARY KEY (`id`)
);
CREATE TABLE `point_reservation`
(
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`amount` bigint NOT NULL COMMENT '적립금액',
`available_days` int NOT NULL COMMENT '유효기간',
`earned_date` date NOT NULL COMMENT '적립일자',
`is_executed` tinyint NOT NULL COMMENT '적용여부',
`point_wallet_id` bigint NOT NULL COMMENT '포인트 지갑 ID',
PRIMARY KEY (`id`)
);
CREATE TABLE `point`
(
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`amount` bigint NOT NULL COMMENT '적립금액',
`earned_date` date NOT NULL COMMENT '적립일자',
`expire_date` date NOT NULL COMMENT '만료일자',
`is_used` tinyint NOT NULL COMMENT '사용유무',
`is_expired` tinyint NOT NULL COMMENT '만료여부',
`point_wallet_id` bigint NOT NULL COMMENT '포인트 지갑 ID',
PRIMARY KEY (`id`)
);
CREATE TABLE `message`
(
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`user_id` varchar(20) NOT NULL COMMENT '유저 ID',
`title` varchar(200) NOT NULL COMMENT '제목',
`content` text NOT NULL COMMENT '내용',
PRIMARY KEY (`id`)
);
- 사용자의 포인트를 보관하는 지갑을 나타내는 `point_wallet` 테이블
- 사용자에게 포인트가 적립될 예정임을 나타내는 `point_reservation` 테이블
- 사용자의 포인트를 나타내는 `point` 테이블
- 포인트 소멸 또는 증액 예정에 대한 알람을 나타내는 `message` 테이블
지금까지는 비즈니스 로직에 관련된 테이블이었다. 여기서 끝나면 안된다. 왜냐하면 스프링 배치를 돌리기 위해 필요한 테이블들이 있다.
Spring Batch Meta-Data Schema
Spring Batch는 Job, Step, 및 실행 중인 Batch Job의 상태에 대한 정보를 기록하기 위해 다양한 메타데이터 테이블을 사용한다. 그래서 배치작업을 할 때 이 메타데이터 테이블들이 있어야 한다. 어떻게 만들까? `schema-*.sql` 이라는 파일명으로 된 DDL 스크립트를 제공한다. 저기서 `*`은 내가 사용하는 데이터베이스를 의미한다. 그래서 나는 H2를 사용하니까 `schema-h2.sql` 이라는 파일명을 찾으면 된다! 아래처럼 IntelliJ에서 파일을 찾아보면 스크립트 파일 하나가 나올것이다.
해당 파일을 열어보면 다음과 같이 생겼다.
-- Autogenerated: do not edit this file
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP(9) NOT NULL,
START_TIME TIMESTAMP(9) DEFAULT NULL ,
END_TIME TIMESTAMP(9) DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP(9),
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
PARAMETER_NAME VARCHAR(100) NOT NULL ,
PARAMETER_TYPE VARCHAR(100) NOT NULL ,
PARAMETER_VALUE VARCHAR(2500) ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP(9) NOT NULL,
START_TIME TIMESTAMP(9) DEFAULT NULL ,
END_TIME TIMESTAMP(9) DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP(9),
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT LONGVARCHAR ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT LONGVARCHAR ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;
- 이 스크립트 역시 실행해서 메타데이터 테이블을 만들면 된다.
- 각각의 테이블이 정확히 어떤 내용인지는 다음 공식 문서를 참조하자. 그런데, 이름만 봐도 이게 어떤 테이블인지는 감이 대충 온다.
엔티티 만들기
데이터베이스와 테이블을 만들었으니 이제 엔티티를 만들어보자.
Message
package cwchoiit.pointbatch.message.entity;
import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
@Entity
@Table(name = "MESSAGE")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class Message {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "user_id", nullable = false)
private String userId;
@Column(name = "title", nullable = false)
private String title;
@Column(name = "content", nullable = false, columnDefinition = "text")
private String content;
}
MessageRepository
package cwchoiit.pointbatch.message.repository;
import cwchoiit.pointbatch.message.entity.Message;
import org.springframework.data.jpa.repository.JpaRepository;
public interface MessageRepository extends JpaRepository<Message, Long> {
}
Point
package cwchoiit.pointbatch.point.entity;
import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.math.BigInteger;
import java.time.LocalDate;
@Entity
@Table(name = "POINT")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class Point {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "point_wallet_id", nullable = false)
private PointWallet pointWallet;
@Column(name = "amount", nullable = false, columnDefinition = "BIGINT")
private BigInteger amount;
@Column(name = "earned_date", nullable = false)
private LocalDate earnedDate;
@Column(name = "expire_date", nullable = false)
private LocalDate expireDate;
@Column(name = "is_used", nullable = false, columnDefinition = "TINYINT", length = 1)
private boolean used;
@Column(name = "is_expired", nullable = false, columnDefinition = "TINYINT", length = 1)
private boolean expired;
public Point(PointWallet pointWallet, BigInteger amount, LocalDate earnedDate, LocalDate expireDate) {
this.pointWallet = pointWallet;
this.amount = amount;
this.earnedDate = earnedDate;
this.expireDate = expireDate;
this.used = false;
this.expired = false;
}
public void expire() {
if (!this.used) {
this.expired = true;
}
}
}
PointRepository
package cwchoiit.pointbatch.point.repository;
import cwchoiit.pointbatch.point.entity.Point;
import org.springframework.data.jpa.repository.JpaRepository;
public interface PointRepository extends JpaRepository<Point, Long> {
}
PointWallet
package cwchoiit.pointbatch.point.entity;
import jakarta.persistence.*;
import lombok.*;
import java.math.BigInteger;
@Entity
@Table(name = "POINT_WALLET")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class PointWallet {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "user_id", unique = true, nullable = false)
private String userId;
@Column(name = "amount", columnDefinition = "BIGINT")
BigInteger amount;
}
PointWalletRepository
package cwchoiit.pointbatch.point.repository;
import cwchoiit.pointbatch.point.entity.PointWallet;
import org.springframework.data.jpa.repository.JpaRepository;
public interface PointWalletRepository extends JpaRepository<PointWallet, Long> {
}
PointReservation
package cwchoiit.pointbatch.reservation.entity;
import cwchoiit.pointbatch.point.entity.PointWallet;
import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.math.BigInteger;
import java.time.LocalDate;
@Entity
@Table(name = "POINT_RESERVATION")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class PointReservation {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "point_wallet_id", nullable = false)
private PointWallet pointWallet;
@Column(name = "amount", nullable = false, columnDefinition = "BIGINT")
private BigInteger amount;
@Column(name = "earned_date", nullable = false)
private LocalDate earnedDate;
@Column(name = "available_days", nullable = false)
private int availableDays;
@Column(name = "is_executed", columnDefinition = "TINYINT", length = 1)
private boolean executed;
public PointReservation(PointWallet pointWallet, BigInteger amount, LocalDate earnedDate, int availableDays) {
this.pointWallet = pointWallet;
this.amount = amount;
this.earnedDate = earnedDate;
this.availableDays = availableDays;
this.executed = false;
}
public void execute() {
this.executed = true;
}
public LocalDate getExpireDate() {
return this.earnedDate.plusDays(this.availableDays);
}
}
PointReservationRepository
package cwchoiit.pointbatch.reservation.repository;
import cwchoiit.pointbatch.reservation.entity.PointReservation;
import org.springframework.data.jpa.repository.JpaRepository;
public interface PointReservationRepository extends JpaRepository<PointReservation, Long> {
}
테스트 코드 작성하기
이제, 비즈니스 로직을 수행할 Job을 만들건데, 그 Job을 테스트하기 위한 테스트 코드를 같이 작성해보자. 바로 들어가보자.
우선, application.yaml은 동일하게 하나 만들어주자.
test/resources/application.yaml
spring:
batch:
job:
name: ${job.name:NONE}
jdbc:
initialize-schema: always
jpa:
show-sql: true
hibernate:
ddl-auto: validate
datasource:
url: jdbc:h2:tcp://localhost/~/h2/db/point
username: sa
password:
driver-class-name: org.h2.Driver
BatchTestSupport
package cwchoiit.pointbatch;
import cwchoiit.pointbatch.message.repository.MessageRepository;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import cwchoiit.pointbatch.reservation.repository.PointReservationRepository;
import org.junit.jupiter.api.AfterEach;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@SpringBootTest
@ActiveProfiles("test")
public abstract class BatchTestSupport {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobRepository jobRepository;
@Autowired
PointWalletRepository pointWalletRepository;
@Autowired
PointRepository pointRepository;
@Autowired
MessageRepository messageRepository;
@Autowired
PointReservationRepository pointReservationRepository;
protected JobExecution launchJob(Job job, JobParameters jobParameters) throws Exception {
JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
jobLauncherTestUtils.setJob(job);
jobLauncherTestUtils.setJobLauncher(jobLauncher);
jobLauncherTestUtils.setJobRepository(jobRepository);
return jobLauncherTestUtils.launchJob(jobParameters);
}
@AfterEach
protected void deleteAll() {
pointWalletRepository.deleteAll();
pointRepository.deleteAll();
messageRepository.deleteAll();
pointReservationRepository.deleteAll();
}
}
- 이 파일은, Job을 실행할 때 공통적으로 처리하는 부분들을 따로 빼서 유틸리티 느낌으로 계속 사용하기 위해 만든 클래스이다.
- 테스트 코드를 작성하는 클래스는 모두 이 클래스를 상속받을 것이라 @SpringBootTest, @ActiveProfiles 애노테이션을 달아주었다.
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobRepository jobRepository;
- 이 두개는 스프링 배치에서 제공해주는 빈들이다. 잡을 실행하기 위한 빈인 JobLauncher, 스프링 메타데이터 테이블에 접근하기 위한 빈인 JobRepository.
protected JobExecution launchJob(Job job, JobParameters jobParameters) throws Exception {
JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
jobLauncherTestUtils.setJob(job);
jobLauncherTestUtils.setJobLauncher(jobLauncher);
jobLauncherTestUtils.setJobRepository(jobRepository);
return jobLauncherTestUtils.launchJob(jobParameters);
}
- 실제로 Job을 실행하는 메서드이다. 파라미터로는 실행할 Job, Job을 실행할 때 사용되는 Parameter를 받는다.
- JobLauncherTestUtils는 스프링 배치 테스트 라이브러리에서 제공해주는 테스트를 할때 유용하게 사용할 수 있는 녀석이다. 위 코드에서처럼 Job, JobLauncher, JobRepository, JobParameters를 넘겨주고 Job을 실행할 수 있다.
@AfterEach
protected void deleteAll() {
pointWalletRepository.deleteAll();
pointRepository.deleteAll();
messageRepository.deleteAll();
pointReservationRepository.deleteAll();
}
- 이 부분은 테스트를 진행하면서 생기는 레코드들을 전부 삭제하고 싶어서 만든 부분이다. 분명, 더 좋은 방법이 있을것이라고 생각한다. 원래 스프링 부트 테스트를 실행할 때 아무런 데이터베이스 정보를 넣어주지 않으면 메모리 H2 데이터베이스를 자동으로 사용하게 해주는데 그 데이터베이스에 스프링 메타데이터 테이블을 어떻게 넣는지 모르고 있는 상태라 테스트용 데이터베이스가 아닌 실제 데이터베이스로 테스트를 진행중이다. 그래서 위 코드를 사용중에 있다.
만료된 포인트 배치 작업 처리하기
ExpirePointJobConfigurationTest
package cwchoiit.pointbatch.point.job;
import cwchoiit.pointbatch.BatchTestSupport;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.batch.core.ExitStatus.COMPLETED;
class ExpirePointJobConfigurationTest extends BatchTestSupport {
@Autowired
Job expirePointJob;
@Autowired
PointWalletRepository pointWalletRepository;
@Autowired
PointRepository pointRepository;
@Test
void expirePointJob() throws Exception {
LocalDate earnDate = LocalDate.of(2024, 11, 1);
LocalDate expireDate = LocalDate.of(2024, 11, 3);
PointWallet savedPointWallet = pointWalletRepository.save(new PointWallet("userA", BigInteger.valueOf(6000)));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
JobParameters jobParameters = new JobParametersBuilder()
.addString("today", "2024-11-09")
.toJobParameters();
JobExecution jobExecution = launchJob(expirePointJob, jobParameters);
assertThat(jobExecution.getExitStatus()).isEqualTo(COMPLETED);
List<Point> points = pointRepository.findAll();
assertThat(points.stream().filter(Point::isExpired)).hasSize(3);
PointWallet changedPointWallet = pointWalletRepository.findById(savedPointWallet.getId()).orElse(null);
assertThat(changedPointWallet).isNotNull();
assertThat(changedPointWallet.getAmount()).isEqualTo(BigInteger.valueOf(3000));
}
}
- 이제 첫번째 배치성 비즈니스 로직을 작성해보자. 우선, 가장 먼저 만료된 포인트를 처리하는 배치를 작성해보자.
- 테스트 코드와 비즈니스 로직 코드를 같이 작성하자. 그래서 위 코드를 작성했다. 한 줄 한 줄 살펴보자.
LocalDate earnDate = LocalDate.of(2024, 11, 1);
LocalDate expireDate = LocalDate.of(2024, 11, 3);
PointWallet savedPointWallet = pointWalletRepository.save(new PointWallet("userA", BigInteger.valueOf(6000)));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
- 우선, 이 부분은 데이터베이스에 레코드를 추가하는 부분이다. 필요한 엔티티는 PointWallet 엔티티 하나, Point 엔티티 3개가 필요하다. 그러니까 한명의 유저의 포인트 지갑에 포인트가 3개 들어간다고 보면 된다.
- 포인트는 3개 전부 다 1000 포인트, 포인트 지급일은 2024-11-01, 포인트 만료일은 2024-11-03일로 동일하다.
JobParameters jobParameters = new JobParametersBuilder()
.addString("today", "2024-11-09")
.toJobParameters();
JobExecution jobExecution = launchJob(expirePointJob, jobParameters);
assertThat(jobExecution.getExitStatus()).isEqualTo(COMPLETED);
List<Point> points = pointRepository.findAll();
assertThat(points.stream().filter(Point::isExpired)).hasSize(3);
PointWallet changedPointWallet = pointWalletRepository.findById(savedPointWallet.getId()).orElse(null);
assertThat(changedPointWallet).isNotNull();
assertThat(changedPointWallet.getAmount()).isEqualTo(BigInteger.valueOf(3000));
- 그리고 Job을 실행하기 위한 JobParameters를 생성한다. today라는 key에 2024-11-09일 이라는 날짜를 입력한다.
- 아까 BatchTestSupport에서 만든 launchJob(...) 메서드를 실행한다. Job은 expirePointJob을 넘겨준다. 이 Job은 아직 만들지 않았다. 곧 만들어보자. JobParameters로는 today가 들어있는 jobParameters를 넘기자.
- 그러니까, 오늘날짜보다 만료일이 더 이전인 포인트를 전부 유효하지 않은 포인트 처리를 하는 배치 작업인 것이다.
- 그리고 배치 작업을 실행했으면 테스트 결과를 찍어봐야 하니까 실행한 JobExecution의 ExitStatus 값이 COMPLETED인지 확인하고, 모든 포인트들을 찾아서 포인트가 만료된 개수가 3개 모두 인지 확인하고, 포인트 지갑의 남아있는 포인트가 3000인지도 확인한다.
자, 이제 Job을 만들어보자!
ExpirePointJobConfiguration
package cwchoiit.pointbatch.point.job;
import cwchoiit.pointbatch.point.job.validator.TodayJobParameterValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ExpirePointJobConfiguration {
@Bean
public Job expirePointJob(JobRepository jobRepository, Step expirePointStep, TodayJobParameterValidator validator) {
return new JobBuilder("expirePointJob", jobRepository)
.validator(validator)
.start(expirePointStep)
.build();
}
}
- Job, Step 모두 빈으로 등록해야 한다. 그래서 @Bean 애노테이션을 사용한다.
- 아까 만들기로 했던 expirePointJob을 만들자. 파라미터로는 JobRepository, Step, TodayJobParameterValidator가 들어간다. 아직 Step expirePointStep, TodayJobParameterValidator validator는 만들지 않았다.
- JobBuilder를 통해서 새로운 Job을 만들고 validator 적용, 실행할 Step 적용을 하고 build()를 호출한다.
얼른 TodayJobParameterValidator를 만들어보자.
TodayJobParameterValidator
package cwchoiit.pointbatch.point.job.validator;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.format.DateTimeParseException;
@Component
public class TodayJobParameterValidator implements JobParametersValidator {
@Override
public void validate(JobParameters parameters) throws JobParametersInvalidException {
if (parameters == null) {
throw new JobParametersInvalidException("JobParameters is null");
}
String today = parameters.getString("today");
if (today == null) {
throw new JobParametersInvalidException("JobParameters [today] is null");
}
try {
LocalDate.parse(today);
} catch (DateTimeParseException e) {
throw new JobParametersInvalidException("JobParameters [today] is not a valid date");
}
}
}
- 스프링 배치는 JobParameters를 검증할 수 있는 JobParametersValidator를 제공한다. 이것을 구현한 클래스가 바로 TodayJobParameterValidator이다.
- 구현 로직은 간단하다. Parameter가 있는지, 있다면 today라는 파라미터가 있는지를 검증하고, today라는 파라미터의 값으로 LocalDate로 파싱이 가능한지를 검증한다. 셋 중 하나라도 문제가 발생하면 JobParametersInvalidException을 발생시킨다.
이제 expirePointStep을 만들어보자!
ExpirePointStepConfiguration
package cwchoiit.pointbatch.point.job;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import java.time.LocalDate;
import java.util.Map;
@Configuration
public class ExpirePointStepConfiguration {
@Bean
@StepScope
public JpaPagingItemReader<Point> expirePointItemReader(EntityManagerFactory entityManagerFactory,
@Value("#{T(java.time.LocalDate).parse(jobParameters[today])}") LocalDate today) {
return new JpaPagingItemReaderBuilder<Point>()
.name("expirePointItemReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT p FROM Point p WHERE p.expireDate < :today and used = false and expired = false")
.parameterValues(Map.of("today", today))
.pageSize(1000)
.build();
}
@Bean
@StepScope
public ItemProcessor<Point, Point> expirePointItemProcessor() {
return point -> {
point.setExpired(true);
PointWallet wallet = point.getPointWallet();
wallet.setAmount(wallet.getAmount().subtract(point.getAmount()));
return point;
};
}
@Bean
@StepScope
public ItemWriter<Point> expirePointItemWriter(PointRepository pointRepository,
PointWalletRepository pointWalletRepository) {
return points -> {
for (Point point : points) {
if (point.isExpired()) {
pointRepository.save(point);
pointWalletRepository.save(point.getPointWallet());
}
}
};
}
@Bean
@JobScope
public Step expirePointStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JpaPagingItemReader<Point> expirePointItemReader,
ItemProcessor<Point, Point> expirePointItemProcessor,
ItemWriter<Point> expirePointItemWriter) {
return new StepBuilder("expirePointStep", jobRepository)
.<Point, Point>chunk(1000, transactionManager)
.reader(expirePointItemReader)
.processor(expirePointItemProcessor)
.writer(expirePointItemWriter)
.build();
}
}
- Step과 Step을 구성하는 reader, processor, writer를 만들었다. 참고로 이게 어떤 내용인지는 이미 이전 포스팅에 다 작성을 했기 때문에 참고하면 된다. 가장 먼저, Step을 확인해보자.
@Bean
@JobScope
public Step expirePointStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JpaPagingItemReader<Point> expirePointItemReader,
ItemProcessor<Point, Point> expirePointItemProcessor,
ItemWriter<Point> expirePointItemWriter) {
return new StepBuilder("expirePointStep", jobRepository)
.<Point, Point>chunk(1000, transactionManager)
.reader(expirePointItemReader)
.processor(expirePointItemProcessor)
.writer(expirePointItemWriter)
.build();
}
- 우선은 Step도 빈으로 등록되어야 하기 때문에 @Bean 애노테이션을 달았고, 이 Step을 사용하는 Job이 실행될 때 이 Step이 로딩되기를 원하기 때문에 @JobScope 애노테이션을 작성했다.
- StepBuilder를 사용해서, Step을 만들어내는데, 데이터베이스에 데이터가 너무 많은 경우 한번에 다 가져올 수 없으니 chunk를 통해 1000개씩 쪼개서 가져오도록 했고, reader, processor, writer를 집어넣었다.
- 참고로 PlatformTransactionManager는 스프링에서 제공해주는 트랜잭션 매니저이다. 그러니까 이 작업동안에 사용할 트랜잭션을 제공한다고 보면 된다.
expirePointItemReader
@Bean
@StepScope
public JpaPagingItemReader<Point> expirePointItemReader(EntityManagerFactory entityManagerFactory,
@Value("#{T(java.time.LocalDate).parse(jobParameters[today])}") LocalDate today) {
return new JpaPagingItemReaderBuilder<Point>()
.name("expirePointItemReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT p FROM Point p WHERE p.expireDate < :today and used = false and expired = false")
.parameterValues(Map.of("today", today))
.pageSize(1000)
.build();
}
- 역시 이 Reader도 연관된 스텝이 사용될 때 로딩되도록 @StepScope를 사용했다.
- 간단하게 JPA의 JPQL로 원하는 데이터를 읽어온다. 우리가 원하는건 "포인트 테이블에서 파라미터로 전달받은 오늘 날짜보다 만료일이 이전이고, 사용하지 않았고, 만료 체크되지도 않은 모든 포인트"를 가져오고 싶다.
- 이렇게 가져온 데이터들이 Processor로 넘어간다.
expirePointItemProcessor
@Bean
@StepScope
public ItemProcessor<Point, Point> expirePointItemProcessor() {
return point -> {
point.setExpired(true);
PointWallet wallet = point.getPointWallet();
wallet.setAmount(wallet.getAmount().subtract(point.getAmount()));
return point;
};
}
- 역시 이 Processor도 연관된 스텝이 사용될 때 로딩되도록 @StepScope를 사용했다.
- 가져온 데이터를 Point 타입에서 Point 타입으로 처리할 것이므로(다른 말로 타입의 변환은 일으키지 않을 것이라는 말이다) <Point, Point>를 사용했다.
- 각 포인트의 만료여부를 `true`로 변경한다.
- 그리고 해당 포인트를 가지고 있는 PointWallet의 포인트 가격을 지금 이 포인트가 들고있는 가격만큼 차감한다. (만료된 포인트니까)
expirePointItemWriter
@Bean
@StepScope
public ItemWriter<Point> expirePointItemWriter(PointRepository pointRepository,
PointWalletRepository pointWalletRepository) {
return points -> {
for (Point point : points) {
if (point.isExpired()) {
pointRepository.save(point);
pointWalletRepository.save(point.getPointWallet());
}
}
};
}
- 역시 이 Writer도 연관된 스텝이 사용될 때 로딩되도록 @StepScope를 사용했다.
- 이제 처리한 모든 포인트들을 순회하며, 만료여부가 `true`인 경우, 작업처리된 포인트와 포인트지갑을 반영해야 하므로 save()를 호출해준다.
이렇게 Job, Step, Reader, Processor, Writer를 전부 구현했다. 이제 실행해보자! 아까 테스트 코드로 만든 아래 코드를 실행하자!
@Test
void expirePointJob() throws Exception {
LocalDate earnDate = LocalDate.of(2024, 11, 1);
LocalDate expireDate = LocalDate.of(2024, 11, 3);
PointWallet savedPointWallet = pointWalletRepository.save(new PointWallet("userA", BigInteger.valueOf(6000)));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
JobParameters jobParameters = new JobParametersBuilder()
.addString("today", "2024-11-12")
.toJobParameters();
JobExecution jobExecution = launchJob(expirePointJob, jobParameters);
assertThat(jobExecution.getExitStatus()).isEqualTo(COMPLETED);
List<Point> points = pointRepository.findAll();
assertThat(points.stream().filter(Point::isExpired)).hasSize(3);
PointWallet changedPointWallet = pointWalletRepository.findById(savedPointWallet.getId()).orElse(null);
assertThat(changedPointWallet).isNotNull();
assertThat(changedPointWallet.getAmount()).isEqualTo(BigInteger.valueOf(3000));
}
실행 결과
- 테스트가 성공적으로 끝났다. 다시 한번 실행해보자!
재실행 결과
- 어? 테스트에 실패했다 왜 그럴까?
에러 내용은 다음과 같다.
A job instance already exists and is complete for identifying parameters={'today':'{value=2024-11-12, type=class java.lang.String, identifying=true}'}. If you want to run this job again, change the parameters.
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for identifying parameters={'today':'{value=2024-11-12, type=class java.lang.String, identifying=true}'}. If you want to run this job again, change the parameters.
- 쉽게 말해, 이미 같은 파라미터로 실행한 Job Instance가 존재한다는 것이다. 스프링 배치는 설계하기를 배치성 로직을 같은 파라미터로 한번 더 수행하는 것을 잘못된 수행으로 판단했다. 그도 그럴것이 배치성이라는게 날 잡아서 큰 작업을 한번에 처리하는건데 이것을 동일한 데이터로 한번 더하면 문제가 될 수 있다고 판단하는 것이다.
- 그래서, 실행하기 전 아까 위에서 말한 메타데이터 테이블로 같은 파라미터로 실행된 Job이 있는지 먼저 확인을 한다.
정리
아직 할 게 더 남았지만, 포스팅이 길어지니까 다음 포스팅에 계속하겠다. 그래도 포인트 만료시키는 배치 작업을 해보면서 어느정도 스프링 배치에 대해 감을 좀 잡았다.
'Spring Batch' 카테고리의 다른 글
Listener (1) | 2024.10.09 |
---|---|
ItemWriter (1) | 2024.10.09 |
ItemProcessor (2) | 2024.10.09 |
ItemReader (1) | 2024.10.09 |
Chunk Processing (5) | 2024.10.09 |