포인트 예약 적립 배치 Job
이번에는, 포인트 예약 적립에 대한 배치를 작성해보자!
우선, Job을 하나 만들자.
ExecutePointReservationJobConfiguration
package cwchoiit.pointbatch.reservation.job;
import cwchoiit.pointbatch.point.job.validator.TodayJobParameterValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ExecutePointReservationJobConfiguration {
@Bean
public Job executePointReservationJob(JobRepository jobRepository,
TodayJobParameterValidator validator,
Step executePointReservationStep) {
return new JobBuilder("executePointReservationJob", jobRepository)
.validator(validator)
.start(executePointReservationStep)
.build();
}
}
- 역시나 `today`라는 파라미터를 전달받는 Job이기 때문에 동일한 TodayJobParameterValidator를 사용한다.
이제 저기서 선언한 executePointReservationStep을 만들어보자.
ExecutePointReservationStepConfiguration
package cwchoiit.pointbatch.reservation.job;
import com.mysema.commons.lang.Pair;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import cwchoiit.pointbatch.reservation.entity.PointReservation;
import cwchoiit.pointbatch.reservation.repository.PointReservationRepository;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import java.time.LocalDate;
import java.util.Map;
@Configuration
public class ExecutePointReservationStepConfiguration {
@Bean
@StepScope
public JpaPagingItemReader<PointReservation> executePointReservationReader(EntityManagerFactory entityManagerFactory,
@Value("#{T(java.time.LocalDate).parse(jobParameters[today])}")LocalDate today) {
return new JpaPagingItemReaderBuilder<PointReservation>()
.name("executePointReservationReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT pr FROM PointReservation pr WHERE pr.earnedDate = :today AND pr.executed = false")
.parameterValues(Map.of("today", today))
.pageSize(1000)
.build();
}
@Bean
@StepScope
public ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor() {
return reservation -> {
reservation.setExecuted(true);
Point earnedPoint = new Point(
reservation.getPointWallet(),
reservation.getAmount(),
reservation.getEarnedDate(),
reservation.getExpireDate());
PointWallet wallet = reservation.getPointWallet();
wallet.setAmount(wallet.getAmount().add(earnedPoint.getAmount()));
return Pair.of(reservation, earnedPoint);
};
}
@Bean
@StepScope
public ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter(PointReservationRepository pointReservationRepository,
PointRepository pointRepository,
PointWalletRepository pointWalletRepository) {
return reservationAndPointPair -> {
for (Pair<PointReservation, Point> pair : reservationAndPointPair) {
PointReservation reservation = pair.getFirst();
pointReservationRepository.save(reservation);
Point point = pair.getSecond();
pointRepository.save(point);
pointWalletRepository.save(reservation.getPointWallet());
}
};
}
@Bean
@JobScope
public Step executePointReservationStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JpaPagingItemReader<PointReservation> executePointReservationReader,
ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor,
ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter) {
return new StepBuilder("executePointReservationStep", jobRepository)
.<PointReservation, Pair<PointReservation, Point>>chunk(1000, transactionManager)
.reader(executePointReservationReader)
.processor(executePointReservationItemProcessor)
.writer(executePointReservationItemWriter)
.build();
}
}
- Step부터 살펴보고, Reader, Processor, Writer를 하나씩 살펴보자.
@Bean
@JobScope
public Step executePointReservationStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JpaPagingItemReader<PointReservation> executePointReservationReader,
ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor,
ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter) {
return new StepBuilder("executePointReservationStep", jobRepository)
.<PointReservation, Pair<PointReservation, Point>>chunk(1000, transactionManager)
.reader(executePointReservationReader)
.processor(executePointReservationItemProcessor)
.writer(executePointReservationItemWriter)
.build();
}
- Step이니까 @JobScope를 사용했다.
- 이전에 했던 포인트 만료 배치와 비슷하게 StepBuilder를 통해 Step을 만들면 된다.
- 역시 1000개씩 짤라서 작업을 수행하기 위해 chunk를 사용한다.
@Bean
@StepScope
public JpaPagingItemReader<PointReservation> executePointReservationReader(EntityManagerFactory entityManagerFactory,
@Value("#{T(java.time.LocalDate).parse(jobParameters[today])}")LocalDate today) {
return new JpaPagingItemReaderBuilder<PointReservation>()
.name("executePointReservationReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT pr FROM PointReservation pr WHERE pr.earnedDate = :today AND pr.executed = false")
.parameterValues(Map.of("today", today))
.pageSize(1000)
.build();
}
- 먼저, 데이터를 가져오는 Reader 부분이다.
- JPQL을 자세히 보면 된다. 매우 간단하다. PointReservation 테이블에서, earnedDate가 `today`라는 파라미터로 들어오는 값이고, 아직 예약 적립이 실행되지 않은(`executed = false`) 포인트 예약 레코드를 가져온다.
@Bean
@StepScope
public ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor() {
return reservation -> {
reservation.setExecuted(true);
Point earnedPoint = new Point(
reservation.getPointWallet(),
reservation.getAmount(),
reservation.getEarnedDate(),
reservation.getExpireDate());
PointWallet wallet = reservation.getPointWallet();
wallet.setAmount(wallet.getAmount().add(earnedPoint.getAmount()));
return Pair.of(reservation, earnedPoint);
};
}
- 이번엔 Processor 인데 이 부분이 좀 재밌다.
- 우선, PointReservation 엔티티를 하나씩 차례로 가져오면, 이 포인트 예약 적립을 실행해야 하니까 실행 정보를 `true`로 변경한다.
- 그리고, 신규 포인트 엔티티를 하나 만들고 이 포인트 예약 적립에 대한 데이터를 추가한다.
- 이 포인트 예약 적립 엔티티는 어떤 포인트 지갑에 들어갈지에 대한 정보가 있다. 해당 정보를 통해 PointWallet 엔티티를 가져온다.
- 이 PointWallet의 포인트 금액을 현재 금액 + 포인트 예약 적립 금액을 합쳐서 저장한다.
- 그 다음, 변경 사항은 총 3개다. [PointReservation, PointWallet, Point]
- 그 말은 이 세 개의 엔티티 모두 다 Writer에서 저장을 해야 한다. 그러려면 하나의 엔티티만 넘길 수는 없기에 여러 방법이 있지만 나는 Pair를 사용했다. 새로운 DTO 클래스를 만들어서 그 녀석에 저 세개를 담고 넘겨도 좋은 방법이다.
- "어? 왜 2개만 넘겨요?" → PointWallet은 PointReservation이 참조하고 있으니 그 값을 사용하면 된다.
@Bean
@StepScope
public ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter(PointReservationRepository pointReservationRepository,
PointRepository pointRepository,
PointWalletRepository pointWalletRepository) {
return reservationAndPointPair -> {
for (Pair<PointReservation, Point> pair : reservationAndPointPair) {
PointReservation reservation = pair.getFirst();
pointReservationRepository.save(reservation);
Point point = pair.getSecond();
pointRepository.save(point);
pointWalletRepository.save(reservation.getPointWallet());
}
};
}
- 이번엔 Writer이다. Processor를 통해 전달받은 Pair 데이터를 순회하면서 저장을 전부 다 해주면 된다.
이렇게 Job, Step을 만들었으면 역시나 테스트 코드를 통해 제대로 실행되는지 확인해보자.
ExecutePointReservationJobConfigurationTest
package cwchoiit.pointbatch.reservation.job;
import cwchoiit.pointbatch.BatchTestSupport;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import cwchoiit.pointbatch.reservation.entity.PointReservation;
import cwchoiit.pointbatch.reservation.repository.PointReservationRepository;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;
import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.*;
class ExecutePointReservationJobConfigurationTest extends BatchTestSupport {
@Autowired
PointWalletRepository pointWalletRepository;
@Autowired
PointReservationRepository pointReservationRepository;
@Autowired
PointRepository pointRepository;
@Autowired
Job executePointReservationJob;
@Test
void executePointReservationJob() throws Exception {
PointWallet pointWallet = new PointWallet("user1", BigInteger.valueOf(3000));
pointWalletRepository.save(pointWallet);
LocalDate earnDate = LocalDate.of(2024, 1, 1);
PointReservation pointReservation = new PointReservation(pointWallet, BigInteger.valueOf(1000), earnDate, 10);
pointReservationRepository.save(pointReservation);
JobParameters jobParameters = new JobParametersBuilder()
.addString("today", "2024-01-01")
.toJobParameters();
JobExecution jobExecution = launchJob(executePointReservationJob, jobParameters);
// Job 이 정상적으로 실행됐는지
assertThat(jobExecution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);
List<PointReservation> pointReservations = pointReservationRepository.findAll();
// 예약 포인트는 현재 1개있는지
assertThat(pointReservations).hasSize(1);
// 예약 포인트의 실행이 true 로 변경됐는지
assertThat(pointReservations.getFirst().isExecuted()).isTrue();
List<Point> points = pointRepository.findAll();
// 포인트 레포지토리에 포인트는 한개가 있는지
assertThat(points).hasSize(1);
// 포인트의 금액은 1000원인지
assertThat(points.getFirst().getAmount()).isEqualTo(BigInteger.valueOf(1000));
// 포인트의 발행일은 2024-01-01인지
assertThat(points.getFirst().getEarnedDate()).isEqualTo(earnDate);
// 포인트의 만료일은 2024-01-11인지
assertThat(points.getFirst().getExpireDate()).isEqualTo(LocalDate.of(2024, 1, 11));
List<PointWallet> pointWallets = pointWalletRepository.findAll();
// 포인트 지갑은 한개인지
assertThat(pointWallets).hasSize(1);
// 포인트 지갑의 총 금액은 4000원인지
assertThat(pointWallets.getFirst().getAmount()).isEqualTo(BigInteger.valueOf(4000));
}
}
- 테스트 실행 결과는 정상적으로 수행될 것이다.
- 여기서 이제 BatchTestSupport의 위력이 나오는 것이다. 저 클래스로 공통으로 사용되는 부분을 따로 분리하니까 굉장히 테스트 코드 작성하기가 편해졌다.
포인트 만료 메시지 배치 Job
이번에는 포인트가 만료되면 메시지(알림)을 사용자에게 보내주는 게 일반적인 앱의 기능인데 그 부분을 배치성으로 만들어보자!
MessageExpiredPointJobConfiguration
package cwchoiit.pointbatch.message.job;
import cwchoiit.pointbatch.point.job.validator.TodayJobParameterValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageExpiredPointJobConfiguration {
@Bean
public Job messageExpiredPointJob(JobRepository jobRepository,
TodayJobParameterValidator validator,
Step messageExpiredPointStep) {
return new JobBuilder("messageExpiredPointJob", jobRepository)
.validator(validator)
.start(messageExpiredPointStep)
.build();
}
}
- 이제 Job 부분은 더 이상 설명할 게 없다.
InputExpiredPointAlarmCriteriaDateStepListener
package cwchoiit.pointbatch.message.listener;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
@Component
public class InputExpiredPointAlarmCriteriaDateStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
JobParameter<?> todayParam = stepExecution.getJobParameters().getParameter("today");
if (todayParam == null) {
return;
}
LocalDate todayLocalDate = LocalDate.parse((String) todayParam.getValue());
ExecutionContext context = stepExecution.getExecutionContext();
context.put("alarmCriteriaDate", todayLocalDate.minusDays(1).format(DateTimeFormatter.ISO_DATE));
stepExecution.setExecutionContext(context);
}
}
- 이 배치작업을 할 때 지금까지는 사용해 본 적 없는 StepExecutionListener를 사용해보자.
- 이 StepExecutionListener는, 이 리스너를 등록한 스텝을 실행하기 전에 리스너를 먼저 발동시켜서 어떤 작업을 처리할 수 있게 해주는 것이다. 지금은 beforeStep만 있지만 afterStep도 구현 가능하다.
- 이 리스너는 그래서 잡 파라미터에서 `today`를 가져와서, LocalDate로 변환한 다음, `alarmCriteriaDate`라는 값을 만들어내서 ExecutionContext에 담는 리스너이다. 저 `alarmCriteriaDate` 이 녀석은 `today`라는 오늘 날짜라는 파라미터에서 하루를 뺀 날짜를 가리킨다. 그러니까, 이 배치작업이 실제 운영서버에 돌아가는 작업이라면, 하루에 한번씩 실행될텐데 만료일이 있고 오늘날짜가 있을때 오늘날짜보다 하루 뺀 날짜가 만료일인 것을 찾아내고 싶은 거다.
MessageExpiredPointStepConfiguration
package cwchoiit.pointbatch.message.job;
import cwchoiit.pointbatch.message.entity.Message;
import cwchoiit.pointbatch.message.listener.InputExpiredPointAlarmCriteriaDateStepListener;
import cwchoiit.pointbatch.point.dto.ExpiredPointSummary;
import cwchoiit.pointbatch.point.repository.PointRepository;
import jakarta.persistence.EntityManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.data.builder.RepositoryItemReaderBuilder;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.PlatformTransactionManager;
import java.time.LocalDate;
import java.util.Map;
@Slf4j
@Configuration
public class MessageExpiredPointStepConfiguration {
@Bean
@StepScope
public RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader(
PointRepository pointRepository,
@Value("#{T(java.time.LocalDate).parse(stepExecutionContext[alarmCriteriaDate])}") LocalDate alarmCriteriaDate
) {
return new RepositoryItemReaderBuilder<ExpiredPointSummary>()
.name("messageExpiredPointItemReader")
.repository(pointRepository)
.methodName("sumByExpiredDate")
.pageSize(1000)
.arguments(alarmCriteriaDate)
.sorts(Map.of("pointWallet", Sort.Direction.ASC))
.build();
}
@Bean
@StepScope
public ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor(
@Value("#{T(java.time.LocalDate).parse(jobParameters[today])}") LocalDate today
) {
return summary -> Message.of(summary.getUserId(), today, summary.getAmount());
}
@Bean
@StepScope
public JpaItemWriter<Message> messageExpiredPointItemWriter(EntityManagerFactory entityManagerFactory) {
return new JpaItemWriterBuilder<Message>()
.entityManagerFactory(entityManagerFactory)
.build();
}
@Bean
@JobScope
public Step messageExpiredPointStep(JobRepository jobRepository,
InputExpiredPointAlarmCriteriaDateStepListener listener,
PlatformTransactionManager transactionManager,
RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader,
ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor,
JpaItemWriter<Message> messageExpiredPointItemWriter) {
return new StepBuilder("messageExpiredPointStep", jobRepository)
.listener(listener)
.<ExpiredPointSummary, Message>chunk(1000, transactionManager)
.reader(messageExpiredPointItemReader)
.processor(messageExpiredPointItemProcessor)
.writer(messageExpiredPointItemWriter)
.build();
}
}
- Step, Reader, Processor, Writer를 구현한 클래스이다. 한번 하나씩 봐보자.
@Bean
@JobScope
public Step messageExpiredPointStep(JobRepository jobRepository,
InputExpiredPointAlarmCriteriaDateStepListener listener,
PlatformTransactionManager transactionManager,
RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader,
ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor,
JpaItemWriter<Message> messageExpiredPointItemWriter) {
return new StepBuilder("messageExpiredPointStep", jobRepository)
.listener(listener)
.<ExpiredPointSummary, Message>chunk(1000, transactionManager)
.reader(messageExpiredPointItemReader)
.processor(messageExpiredPointItemProcessor)
.writer(messageExpiredPointItemWriter)
.build();
}
- 지금까지 한 것들이랑 다른게 없지만, 딱 하나, listener를 등록했다. 아까 위에서 만든 그 리스너이다.
@Bean
@StepScope
public RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader(
PointRepository pointRepository,
@Value("#{T(java.time.LocalDate).parse(stepExecutionContext[alarmCriteriaDate])}") LocalDate alarmCriteriaDate
) {
return new RepositoryItemReaderBuilder<ExpiredPointSummary>()
.name("messageExpiredPointItemReader")
.repository(pointRepository)
.methodName("sumByExpiredDate")
.pageSize(1000)
.arguments(alarmCriteriaDate)
.sorts(Map.of("pointWallet", Sort.Direction.ASC))
.build();
}
- 이번엔 Reader가 JpaPagingReader가 아니라 RepositoryItemReader이다. 이 Reader는 내가 제공한 레포지토리의 특정 메서드를 실행해서 결과를 얻어올 수 있다.
- 왜 이것을 사용했냐? 지금까지는 JPQL로 간단하게 데이터를 가져올 수 있었지만 이번에는 살짝 쿼리가 복잡하기 때문에 QueryDSL을 사용할 것이다. 그래서 그 QueryDSL을 사용한 메서드를 호출하게 하고 싶어서 이 RepositoryItemReader를 사용한다.
- 아직 sumByExpiredDate는 만들지 않았다.
- 또한 반환 타입이 ExpiredPointSummary이다. 이건 단순 DTO라고 생각하면 편할 것 같다. 이 역시 아직 만들지 않았다.
@Bean
@StepScope
public ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor(
@Value("#{T(java.time.LocalDate).parse(jobParameters[today])}") LocalDate today
) {
return summary -> Message.of(summary.getUserId(), today, summary.getAmount());
}
- Processor는 간단하다. 저기 Processor를 보면, 받은 ExpiredPointSummary를 활용해서 Message.of()를 호출한 결과를 반환한다. 이게 무엇일까? 아래 코드를 보자.
Message
package cwchoiit.pointbatch.message.entity;
import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.math.BigInteger;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
@Entity
@Table(name = "MESSAGE")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class Message {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "user_id", nullable = false)
private String userId;
@Column(name = "title", nullable = false)
private String title;
@Column(name = "content", nullable = false, columnDefinition = "text")
private String content;
public Message(String userId, String title, String content) {
this.userId = userId;
this.title = title;
this.content = content;
}
public static Message of(String userId, LocalDate expiredDate, BigInteger expiredAmount) {
return new Message(
userId,
String.format("%s 포인트 만료", expiredAmount.toString()),
String.format("%s 기준 %s 포인트가 만료되었습니다.", expiredDate.format(DateTimeFormatter.ISO_DATE), expiredAmount)
);
}
}
- Message.of()는 Message 엔티티에서 Message를 편리하게 만들어내는 생성 메서드이다.
- 그 다음 아래 코드를 보자.
ExpiredPointSummary
package cwchoiit.pointbatch.point.dto;
import com.querydsl.core.annotations.QueryProjection;
import lombok.Getter;
import java.math.BigInteger;
@Getter
public class ExpiredPointSummary {
private String userId;
private BigInteger amount;
@QueryProjection
public ExpiredPointSummary(String userId, BigInteger amount) {
this.userId = userId;
this.amount = amount;
}
}
- 이 클래스는 단순히 만료된 포인트의 합계와, 해당 유저의 ID를 가지고 있는 DTO다. 물론, QueryDSL을 사용하기 위해 생성자를 Q클래스로 만들어 낸다.
@Bean
@StepScope
public JpaItemWriter<Message> messageExpiredPointItemWriter(EntityManagerFactory entityManagerFactory) {
return new JpaItemWriterBuilder<Message>()
.entityManagerFactory(entityManagerFactory)
.build();
}
- Writer는 JpaItemWriter를 사용한다. 타입인 Message의 엔티티가 적당히 쌓이면, 전달한 엔티티 매니저의 플러시를 호출한다.
자, 이제 QueryDSL을 사용해서 쿼리를 짜보자. 참고로, QueryDSL이 뭔지 모르면 따라오기 힘들 수 있다. 포스팅 중에 QueryDSL을 다룬 포스팅이 많으니까 해당 포스팅을 참고하자.
PointCustomRepository
package cwchoiit.pointbatch.point.repository;
import cwchoiit.pointbatch.point.dto.ExpiredPointSummary;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import java.time.LocalDate;
public interface PointCustomRepository {
Page<ExpiredPointSummary> sumByExpiredDate(LocalDate alarmCriteriaDate, Pageable pageable);
}
- 우선, QueryDSL 용으로 새로운 레포지토리 하나를 만들자.
- 만들 메서드는 아까 위에서 정의한 sumByExpiredDate이다.
PointCustomRepositoryImpl
package cwchoiit.pointbatch.point.repository;
import com.querydsl.jpa.JPQLQuery;
import cwchoiit.pointbatch.point.dto.ExpiredPointSummary;
import cwchoiit.pointbatch.point.dto.QExpiredPointSummary;
import cwchoiit.pointbatch.point.entity.Point;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.support.QuerydslRepositorySupport;
import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import static cwchoiit.pointbatch.point.entity.QPoint.point;
@Slf4j
public class PointCustomRepositoryImpl extends QuerydslRepositorySupport implements PointCustomRepository {
public PointCustomRepositoryImpl() {
super(Point.class);
}
@Override
public Page<ExpiredPointSummary> sumByExpiredDate(LocalDate alarmCriteriaDate, Pageable pageable) {
JPQLQuery<ExpiredPointSummary> query =
from(point)
.select(
new QExpiredPointSummary(
point.pointWallet.userId,
point.amount.sum().coalesce(BigInteger.ZERO))
)
.where(point.expired.eq(true))
.where(point.used.eq(false))
.where(point.expireDate.before(alarmCriteriaDate))
.groupBy(point.pointWallet);
List<ExpiredPointSummary> expiredPoints =
Objects.requireNonNull(getQuerydsl()).applyPagination(pageable, query).fetch();
long elementCount = query.fetchCount();
log.info("elementCount: {}", elementCount);
return new PageImpl<>(expiredPoints, PageRequest.of(pageable.getPageNumber(), pageable.getPageSize()), elementCount);
}
}
- 위 인터페이스를 구현한 구현 클래스이다.
- 저 생성자는 QueryDSL을 사용하려면 무조건 있어야 하는 생성자라고 생각하면 된다.
- 실질적으로 중요한 부분은 sumByExpiredDate의 구현 부분이다.
- 이렇게 구현을 했으면 원래 사용중이던 PointRepository가 이를 상속받으면 끝난다.
PointCustomRepository를 상속하게 된 PointRepository
package cwchoiit.pointbatch.point.repository;
import cwchoiit.pointbatch.point.entity.Point;
import org.springframework.data.jpa.repository.JpaRepository;
public interface PointRepository extends JpaRepository<Point, Long>, PointCustomRepository {
}
- 이렇게 해두면 이제 아래 코드인 Reader에서 PointRepository에서 sumByExpiredDate를 참조할 수 있다.
@Bean
@StepScope
public RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader(
PointRepository pointRepository,
@Value("#{T(java.time.LocalDate).parse(stepExecutionContext[alarmCriteriaDate])}") LocalDate alarmCriteriaDate
) {
return new RepositoryItemReaderBuilder<ExpiredPointSummary>()
.name("messageExpiredPointItemReader")
.repository(pointRepository)
.methodName("sumByExpiredDate")
.pageSize(1000)
.arguments(alarmCriteriaDate)
.sorts(Map.of("pointWallet", Sort.Direction.ASC))
.build();
}
끝났다. 이제 테스트 코드를 작성해서 실행해보자!
MessageExpiredPointJobConfigurationTest
package cwchoiit.pointbatch.message.job;
import cwchoiit.pointbatch.BatchTestSupport;
import cwchoiit.pointbatch.message.entity.Message;
import cwchoiit.pointbatch.message.repository.MessageRepository;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;
import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.*;
class MessageExpiredPointJobConfigurationTest extends BatchTestSupport {
@Autowired
Job messageExpiredPointJob;
@Autowired
MessageRepository messageRepository;
@Autowired
PointRepository pointRepository;
@Autowired
PointWalletRepository pointWalletRepository;
@Test
void messageExpiredPointJob() throws Exception {
LocalDate earnDate = LocalDate.of(2020, 1, 1);
LocalDate expireDate = LocalDate.of(2020, 5, 1);
LocalDate notExpireDate = LocalDate.of(2020, 12, 31);
PointWallet pointWallet1 = new PointWallet("user1", BigInteger.valueOf(3000));
PointWallet pointWallet2 = new PointWallet("user2", BigInteger.valueOf(0));
pointWalletRepository.save(pointWallet1);
pointWalletRepository.save(pointWallet2);
// 만료된 포인트 (pointWallet2)
pointRepository.save(new Point(pointWallet2, BigInteger.valueOf(1000), earnDate, expireDate, false, true));
pointRepository.save(new Point(pointWallet2, BigInteger.valueOf(500), earnDate, expireDate, false, true));
// 만료된 포인트 (pointWallet1)
pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(1000), earnDate, expireDate, false, true));
pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(500), earnDate, expireDate, false, true));
pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(1000), earnDate, expireDate, false, true));
// 만료되지 않은 포인트 (pointWallet1)
pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(1000), earnDate, notExpireDate));
pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(2000), earnDate, notExpireDate));
pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(3000), earnDate, notExpireDate));
JobParameters jobParameters = new JobParametersBuilder()
.addString("today", "2020-09-12")
.toJobParameters();
// messageExpiredPointJob 실행
JobExecution jobExecution = launchJob(messageExpiredPointJob, jobParameters);
// Job 성공적으로 끝났는지 체크
assertThat(jobExecution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);
List<Message> messages = messageRepository.findAll();
// 만료 알림 메시지가 총 2개 나갔는지 체크 (user1에게 2500원 만료됐다는 메시지, user2에게 1500원 만료됐다는 메시지)
assertThat(messages).hasSize(2);
Message forUser1Message = messages.stream()
.filter(message -> message.getUserId().equals("user1"))
.findFirst()
.orElse(null);
// 메시지 중 하나는 user1을 위한 메시지인지 체크
assertThat(forUser1Message).isNotNull();
// user1에게 나간 메시지의 타이틀을 체크
assertThat(forUser1Message.getTitle()).isEqualTo("2500 포인트 만료");
// user1에게 나간 메시지의 내용 체크
assertThat(forUser1Message.getContent()).isEqualTo("2020-09-12 기준 2500 포인트가 만료되었습니다.");
Message forUser2Message = messages.stream()
.filter(message -> message.getUserId().equals("user2"))
.findFirst()
.orElse(null);
// 메시지 중 하나는 user1을 위한 메시지인지 체크
assertThat(forUser2Message).isNotNull();
// user1에게 나간 메시지의 타이틀을 체크
assertThat(forUser2Message.getTitle()).isEqualTo("1500 포인트 만료");
// user1에게 나간 메시지의 내용 체크
assertThat(forUser2Message.getContent()).isEqualTo("2020-09-12 기준 1500 포인트가 만료되었습니다.");
}
}
- 테스트 결과는 성공적으로 수행된다.
'Spring Batch' 카테고리의 다른 글
이커머스 포인트 배치를 구현해보기 (2) | 2024.11.21 |
---|---|
Listener (1) | 2024.10.09 |
ItemWriter (1) | 2024.10.09 |
ItemProcessor (2) | 2024.10.09 |
ItemReader (1) | 2024.10.09 |