728x90
반응형
SMALL

 

포인트 예약 적립 배치 Job

이번에는, 포인트 예약 적립에 대한 배치를 작성해보자!

우선, Job을 하나 만들자.

ExecutePointReservationJobConfiguration

package cwchoiit.pointbatch.reservation.job;

import cwchoiit.pointbatch.point.job.validator.TodayJobParameterValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ExecutePointReservationJobConfiguration {

    @Bean
    public Job executePointReservationJob(JobRepository jobRepository,
                                          TodayJobParameterValidator validator,
                                          Step executePointReservationStep) {
        return new JobBuilder("executePointReservationJob", jobRepository)
                .validator(validator)
                .start(executePointReservationStep)
                .build();
    }
}
  • 역시나 `today`라는 파라미터를 전달받는 Job이기 때문에 동일한 TodayJobParameterValidator를 사용한다.

이제 저기서 선언한 executePointReservationStep을 만들어보자.

 

ExecutePointReservationStepConfiguration

package cwchoiit.pointbatch.reservation.job;

import com.mysema.commons.lang.Pair;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import cwchoiit.pointbatch.reservation.entity.PointReservation;
import cwchoiit.pointbatch.reservation.repository.PointReservationRepository;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import java.time.LocalDate;
import java.util.Map;

@Configuration
public class ExecutePointReservationStepConfiguration {

    @Bean
    @StepScope
    public JpaPagingItemReader<PointReservation> executePointReservationReader(EntityManagerFactory entityManagerFactory,
                                                                               @Value("#{T(java.time.LocalDate).parse(jobParameters[today])}")LocalDate today) {
        return new JpaPagingItemReaderBuilder<PointReservation>()
                .name("executePointReservationReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("SELECT pr FROM PointReservation pr WHERE pr.earnedDate = :today AND pr.executed = false")
                .parameterValues(Map.of("today", today))
                .pageSize(1000)
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor() {
        return reservation -> {
            reservation.setExecuted(true);
            Point earnedPoint = new Point(
                    reservation.getPointWallet(),
                    reservation.getAmount(),
                    reservation.getEarnedDate(),
                    reservation.getExpireDate());
            PointWallet wallet = reservation.getPointWallet();
            wallet.setAmount(wallet.getAmount().add(earnedPoint.getAmount()));
            return Pair.of(reservation, earnedPoint);
        };
    }

    @Bean
    @StepScope
    public ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter(PointReservationRepository pointReservationRepository,
                                                                                       PointRepository pointRepository,
                                                                                       PointWalletRepository pointWalletRepository) {
        return reservationAndPointPair -> {
            for (Pair<PointReservation, Point> pair : reservationAndPointPair) {
                PointReservation reservation = pair.getFirst();
                pointReservationRepository.save(reservation);

                Point point = pair.getSecond();
                pointRepository.save(point);

                pointWalletRepository.save(reservation.getPointWallet());
            }
        };
    }

    @Bean
    @JobScope
    public Step executePointReservationStep(JobRepository jobRepository,
                                            PlatformTransactionManager transactionManager,
                                            JpaPagingItemReader<PointReservation> executePointReservationReader,
                                            ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor,
                                            ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter) {
        return new StepBuilder("executePointReservationStep", jobRepository)
                .<PointReservation, Pair<PointReservation, Point>>chunk(1000, transactionManager)
                .reader(executePointReservationReader)
                .processor(executePointReservationItemProcessor)
                .writer(executePointReservationItemWriter)
                .build();
    }
}
  • Step부터 살펴보고, Reader, Processor, Writer를 하나씩 살펴보자.
@Bean
@JobScope
public Step executePointReservationStep(JobRepository jobRepository,
                                        PlatformTransactionManager transactionManager,
                                        JpaPagingItemReader<PointReservation> executePointReservationReader,
                                        ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor,
                                        ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter) {
    return new StepBuilder("executePointReservationStep", jobRepository)
            .<PointReservation, Pair<PointReservation, Point>>chunk(1000, transactionManager)
            .reader(executePointReservationReader)
            .processor(executePointReservationItemProcessor)
            .writer(executePointReservationItemWriter)
            .build();
}
  • Step이니까 @JobScope를 사용했다.
  • 이전에 했던 포인트 만료 배치와 비슷하게 StepBuilder를 통해 Step을 만들면 된다.
  • 역시 1000개씩 짤라서 작업을 수행하기 위해 chunk를 사용한다.
@Bean
@StepScope
public JpaPagingItemReader<PointReservation> executePointReservationReader(EntityManagerFactory entityManagerFactory,
                                                                           @Value("#{T(java.time.LocalDate).parse(jobParameters[today])}")LocalDate today) {
    return new JpaPagingItemReaderBuilder<PointReservation>()
            .name("executePointReservationReader")
            .entityManagerFactory(entityManagerFactory)
            .queryString("SELECT pr FROM PointReservation pr WHERE pr.earnedDate = :today AND pr.executed = false")
            .parameterValues(Map.of("today", today))
            .pageSize(1000)
            .build();
}
  • 먼저, 데이터를 가져오는 Reader 부분이다.
  • JPQL을 자세히 보면 된다. 매우 간단하다. PointReservation 테이블에서, earnedDate`today`라는 파라미터로 들어오는 값이고, 아직 예약 적립이 실행되지 않은(`executed = false`) 포인트 예약 레코드를 가져온다.
@Bean
@StepScope
public ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor() {
    return reservation -> {
        reservation.setExecuted(true);
        Point earnedPoint = new Point(
                reservation.getPointWallet(),
                reservation.getAmount(),
                reservation.getEarnedDate(),
                reservation.getExpireDate());
        PointWallet wallet = reservation.getPointWallet();
        wallet.setAmount(wallet.getAmount().add(earnedPoint.getAmount()));
        return Pair.of(reservation, earnedPoint);
    };
}
  • 이번엔 Processor 인데 이 부분이 좀 재밌다.
  • 우선, PointReservation 엔티티를 하나씩 차례로 가져오면, 이 포인트 예약 적립을 실행해야 하니까 실행 정보를 `true`로 변경한다.
  • 그리고, 신규 포인트 엔티티를 하나 만들고 이 포인트 예약 적립에 대한 데이터를 추가한다.
  • 이 포인트 예약 적립 엔티티는 어떤 포인트 지갑에 들어갈지에 대한 정보가 있다. 해당 정보를 통해 PointWallet 엔티티를 가져온다.
  • PointWallet의 포인트 금액을 현재 금액 + 포인트 예약 적립 금액을 합쳐서 저장한다.
  • 그 다음, 변경 사항은 총 3개다. [PointReservation, PointWallet, Point]
  • 그 말은 이 세 개의 엔티티 모두 다 Writer에서 저장을 해야 한다. 그러려면 하나의 엔티티만 넘길 수는 없기에 여러 방법이 있지만 나는 Pair를 사용했다. 새로운 DTO 클래스를 만들어서 그 녀석에 저 세개를 담고 넘겨도 좋은 방법이다.
  • "어? 왜 2개만 넘겨요?" → PointWalletPointReservation이 참조하고 있으니 그 값을 사용하면 된다.
@Bean
@StepScope
public ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter(PointReservationRepository pointReservationRepository,
                                                                                   PointRepository pointRepository,
                                                                                   PointWalletRepository pointWalletRepository) {
    return reservationAndPointPair -> {
        for (Pair<PointReservation, Point> pair : reservationAndPointPair) {
            PointReservation reservation = pair.getFirst();
            pointReservationRepository.save(reservation);

            Point point = pair.getSecond();
            pointRepository.save(point);

            pointWalletRepository.save(reservation.getPointWallet());
        }
    };
}
  • 이번엔 Writer이다. Processor를 통해 전달받은 Pair 데이터를 순회하면서 저장을 전부 다 해주면 된다.

 

이렇게 Job, Step을 만들었으면 역시나 테스트 코드를 통해 제대로 실행되는지 확인해보자.

ExecutePointReservationJobConfigurationTest

package cwchoiit.pointbatch.reservation.job;

import cwchoiit.pointbatch.BatchTestSupport;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import cwchoiit.pointbatch.reservation.entity.PointReservation;
import cwchoiit.pointbatch.reservation.repository.PointReservationRepository;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.*;
import org.springframework.beans.factory.annotation.Autowired;

import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;

import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.*;

class ExecutePointReservationJobConfigurationTest extends BatchTestSupport {

    @Autowired
    PointWalletRepository pointWalletRepository;

    @Autowired
    PointReservationRepository pointReservationRepository;

    @Autowired
    PointRepository pointRepository;

    @Autowired
    Job executePointReservationJob;

    @Test
    void executePointReservationJob() throws Exception {
        PointWallet pointWallet = new PointWallet("user1", BigInteger.valueOf(3000));
        pointWalletRepository.save(pointWallet);

        LocalDate earnDate = LocalDate.of(2024, 1, 1);
        PointReservation pointReservation = new PointReservation(pointWallet, BigInteger.valueOf(1000), earnDate, 10);
        pointReservationRepository.save(pointReservation);

        JobParameters jobParameters = new JobParametersBuilder()
                .addString("today", "2024-01-01")
                .toJobParameters();

        JobExecution jobExecution = launchJob(executePointReservationJob, jobParameters);

        // Job 이 정상적으로 실행됐는지
        assertThat(jobExecution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);

        List<PointReservation> pointReservations = pointReservationRepository.findAll();

        // 예약 포인트는 현재 1개있는지
        assertThat(pointReservations).hasSize(1);
        // 예약 포인트의 실행이 true 로 변경됐는지
        assertThat(pointReservations.getFirst().isExecuted()).isTrue();

        List<Point> points = pointRepository.findAll();

        // 포인트 레포지토리에 포인트는 한개가 있는지
        assertThat(points).hasSize(1);
        // 포인트의 금액은 1000원인지
        assertThat(points.getFirst().getAmount()).isEqualTo(BigInteger.valueOf(1000));
        // 포인트의 발행일은 2024-01-01인지
        assertThat(points.getFirst().getEarnedDate()).isEqualTo(earnDate);
        // 포인트의 만료일은 2024-01-11인지
        assertThat(points.getFirst().getExpireDate()).isEqualTo(LocalDate.of(2024, 1, 11));

        List<PointWallet> pointWallets = pointWalletRepository.findAll();

        // 포인트 지갑은 한개인지
        assertThat(pointWallets).hasSize(1);
        // 포인트 지갑의 총 금액은 4000원인지
        assertThat(pointWallets.getFirst().getAmount()).isEqualTo(BigInteger.valueOf(4000));
    }
}
  • 테스트 실행 결과는 정상적으로 수행될 것이다.
  • 여기서 이제 BatchTestSupport의 위력이 나오는 것이다. 저 클래스로 공통으로 사용되는 부분을 따로 분리하니까 굉장히 테스트 코드 작성하기가 편해졌다.

 

포인트 만료 메시지 배치 Job

이번에는 포인트가 만료되면 메시지(알림)을 사용자에게 보내주는 게 일반적인 앱의 기능인데 그 부분을 배치성으로 만들어보자!

 

MessageExpiredPointJobConfiguration

package cwchoiit.pointbatch.message.job;

import cwchoiit.pointbatch.point.job.validator.TodayJobParameterValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageExpiredPointJobConfiguration {

    @Bean
    public Job messageExpiredPointJob(JobRepository jobRepository,
                                      TodayJobParameterValidator validator,
                                      Step messageExpiredPointStep) {
        return new JobBuilder("messageExpiredPointJob", jobRepository)
                .validator(validator)
                .start(messageExpiredPointStep)
                .build();
    }
}
  • 이제 Job 부분은 더 이상 설명할 게 없다.

 

InputExpiredPointAlarmCriteriaDateStepListener

package cwchoiit.pointbatch.message.listener;

import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

@Component
public class InputExpiredPointAlarmCriteriaDateStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        JobParameter<?> todayParam = stepExecution.getJobParameters().getParameter("today");
        if (todayParam == null) {
            return;
        }

        LocalDate todayLocalDate = LocalDate.parse((String) todayParam.getValue());
        ExecutionContext context = stepExecution.getExecutionContext();
        context.put("alarmCriteriaDate", todayLocalDate.minusDays(1).format(DateTimeFormatter.ISO_DATE));
        stepExecution.setExecutionContext(context);
    }
}
  • 이 배치작업을 할 때 지금까지는 사용해 본 적 없는 StepExecutionListener를 사용해보자.
  • StepExecutionListener는, 이 리스너를 등록한 스텝을 실행하기 전에 리스너를 먼저 발동시켜서 어떤 작업을 처리할 수 있게 해주는 것이다. 지금은 beforeStep만 있지만 afterStep도 구현 가능하다.
  • 이 리스너는 그래서 잡 파라미터에서 `today`를 가져와서, LocalDate로 변환한 다음, `alarmCriteriaDate`라는 값을 만들어내서 ExecutionContext에 담는 리스너이다. 저 `alarmCriteriaDate` 이 녀석은 `today`라는 오늘 날짜라는 파라미터에서 하루를 뺀 날짜를 가리킨다. 그러니까, 이 배치작업이 실제 운영서버에 돌아가는 작업이라면, 하루에 한번씩 실행될텐데 만료일이 있고 오늘날짜가 있을때 오늘날짜보다 하루 뺀 날짜가 만료일인 것을 찾아내고 싶은 거다.

MessageExpiredPointStepConfiguration

package cwchoiit.pointbatch.message.job;

import cwchoiit.pointbatch.message.entity.Message;
import cwchoiit.pointbatch.message.listener.InputExpiredPointAlarmCriteriaDateStepListener;
import cwchoiit.pointbatch.point.dto.ExpiredPointSummary;
import cwchoiit.pointbatch.point.repository.PointRepository;
import jakarta.persistence.EntityManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.data.builder.RepositoryItemReaderBuilder;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.PlatformTransactionManager;

import java.time.LocalDate;
import java.util.Map;

@Slf4j
@Configuration
public class MessageExpiredPointStepConfiguration {

    @Bean
    @StepScope
    public RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader(
            PointRepository pointRepository,
            @Value("#{T(java.time.LocalDate).parse(stepExecutionContext[alarmCriteriaDate])}") LocalDate alarmCriteriaDate
    ) {
        return new RepositoryItemReaderBuilder<ExpiredPointSummary>()
                .name("messageExpiredPointItemReader")
                .repository(pointRepository)
                .methodName("sumByExpiredDate")
                .pageSize(1000)
                .arguments(alarmCriteriaDate)
                .sorts(Map.of("pointWallet", Sort.Direction.ASC))
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor(
            @Value("#{T(java.time.LocalDate).parse(jobParameters[today])}") LocalDate today
    ) {
        return summary -> Message.of(summary.getUserId(), today, summary.getAmount());
    }

    @Bean
    @StepScope
    public JpaItemWriter<Message> messageExpiredPointItemWriter(EntityManagerFactory entityManagerFactory) {
        return new JpaItemWriterBuilder<Message>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

    @Bean
    @JobScope
    public Step messageExpiredPointStep(JobRepository jobRepository,
                                        InputExpiredPointAlarmCriteriaDateStepListener listener,
                                        PlatformTransactionManager transactionManager,
                                        RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader,
                                        ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor,
                                        JpaItemWriter<Message> messageExpiredPointItemWriter) {
        return new StepBuilder("messageExpiredPointStep", jobRepository)
                .listener(listener)
                .<ExpiredPointSummary, Message>chunk(1000, transactionManager)
                .reader(messageExpiredPointItemReader)
                .processor(messageExpiredPointItemProcessor)
                .writer(messageExpiredPointItemWriter)
                .build();
    }
}
  • Step, Reader, Processor, Writer를 구현한 클래스이다. 한번 하나씩 봐보자.
@Bean
@JobScope
public Step messageExpiredPointStep(JobRepository jobRepository,
                                    InputExpiredPointAlarmCriteriaDateStepListener listener,
                                    PlatformTransactionManager transactionManager,
                                    RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader,
                                    ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor,
                                    JpaItemWriter<Message> messageExpiredPointItemWriter) {
    return new StepBuilder("messageExpiredPointStep", jobRepository)
            .listener(listener)
            .<ExpiredPointSummary, Message>chunk(1000, transactionManager)
            .reader(messageExpiredPointItemReader)
            .processor(messageExpiredPointItemProcessor)
            .writer(messageExpiredPointItemWriter)
            .build();
}
  • 지금까지 한 것들이랑 다른게 없지만, 딱 하나, listener를 등록했다. 아까 위에서 만든 그 리스너이다.
@Bean
@StepScope
public RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader(
        PointRepository pointRepository,
        @Value("#{T(java.time.LocalDate).parse(stepExecutionContext[alarmCriteriaDate])}") LocalDate alarmCriteriaDate
) {
    return new RepositoryItemReaderBuilder<ExpiredPointSummary>()
            .name("messageExpiredPointItemReader")
            .repository(pointRepository)
            .methodName("sumByExpiredDate")
            .pageSize(1000)
            .arguments(alarmCriteriaDate)
            .sorts(Map.of("pointWallet", Sort.Direction.ASC))
            .build();
}
  • 이번엔 ReaderJpaPagingReader가 아니라 RepositoryItemReader이다. 이 Reader는 내가 제공한 레포지토리의 특정 메서드를 실행해서 결과를 얻어올 수 있다. 
  • 왜 이것을 사용했냐? 지금까지는 JPQL로 간단하게 데이터를 가져올 수 있었지만 이번에는 살짝 쿼리가 복잡하기 때문에 QueryDSL을 사용할 것이다. 그래서 그 QueryDSL을 사용한 메서드를 호출하게 하고 싶어서 이 RepositoryItemReader를 사용한다.
  • 아직 sumByExpiredDate는 만들지 않았다.
  • 또한 반환 타입이 ExpiredPointSummary이다. 이건 단순 DTO라고 생각하면 편할 것 같다. 이 역시 아직 만들지 않았다.
@Bean
@StepScope
public ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor(
        @Value("#{T(java.time.LocalDate).parse(jobParameters[today])}") LocalDate today
) {
    return summary -> Message.of(summary.getUserId(), today, summary.getAmount());
}
  • Processor는 간단하다. 저기 Processor를 보면, 받은 ExpiredPointSummary를 활용해서 Message.of()를 호출한 결과를 반환한다. 이게 무엇일까? 아래 코드를 보자.

Message

package cwchoiit.pointbatch.message.entity;

import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.math.BigInteger;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

@Entity
@Table(name = "MESSAGE")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class Message {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "user_id", nullable = false)
    private String userId;

    @Column(name = "title", nullable = false)
    private String title;

    @Column(name = "content", nullable = false, columnDefinition = "text")
    private String content;

    public Message(String userId, String title, String content) {
        this.userId = userId;
        this.title = title;
        this.content = content;
    }

    public static Message of(String userId, LocalDate expiredDate, BigInteger expiredAmount) {
        return new Message(
                userId,
                String.format("%s 포인트 만료", expiredAmount.toString()),
                String.format("%s 기준 %s 포인트가 만료되었습니다.", expiredDate.format(DateTimeFormatter.ISO_DATE), expiredAmount)
        );
    }
}
  • Message.of()Message 엔티티에서 Message를 편리하게 만들어내는 생성 메서드이다. 
  • 그 다음 아래 코드를 보자.

ExpiredPointSummary

package cwchoiit.pointbatch.point.dto;

import com.querydsl.core.annotations.QueryProjection;
import lombok.Getter;

import java.math.BigInteger;

@Getter
public class ExpiredPointSummary {
    private String userId;
    private BigInteger amount;

    @QueryProjection
    public ExpiredPointSummary(String userId, BigInteger amount) {
        this.userId = userId;
        this.amount = amount;
    }
}
  • 이 클래스는 단순히 만료된 포인트의 합계와, 해당 유저의 ID를 가지고 있는 DTO다. 물론, QueryDSL을 사용하기 위해 생성자를 Q클래스로 만들어 낸다. 
@Bean
@StepScope
public JpaItemWriter<Message> messageExpiredPointItemWriter(EntityManagerFactory entityManagerFactory) {
    return new JpaItemWriterBuilder<Message>()
            .entityManagerFactory(entityManagerFactory)
            .build();
}
  • WriterJpaItemWriter를 사용한다. 타입인 Message의 엔티티가 적당히 쌓이면, 전달한 엔티티 매니저의 플러시를 호출한다.

 

자, 이제 QueryDSL을 사용해서 쿼리를 짜보자. 참고로, QueryDSL이 뭔지 모르면 따라오기 힘들 수 있다. 포스팅 중에 QueryDSL을 다룬 포스팅이 많으니까 해당 포스팅을 참고하자.

 

PointCustomRepository

package cwchoiit.pointbatch.point.repository;

import cwchoiit.pointbatch.point.dto.ExpiredPointSummary;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

import java.time.LocalDate;

public interface PointCustomRepository {
    Page<ExpiredPointSummary> sumByExpiredDate(LocalDate alarmCriteriaDate, Pageable pageable);
}
  • 우선, QueryDSL 용으로 새로운 레포지토리 하나를 만들자.
  • 만들 메서드는 아까 위에서 정의한 sumByExpiredDate이다.

PointCustomRepositoryImpl

package cwchoiit.pointbatch.point.repository;

import com.querydsl.jpa.JPQLQuery;
import cwchoiit.pointbatch.point.dto.ExpiredPointSummary;
import cwchoiit.pointbatch.point.dto.QExpiredPointSummary;
import cwchoiit.pointbatch.point.entity.Point;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.support.QuerydslRepositorySupport;

import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;
import java.util.Objects;

import static cwchoiit.pointbatch.point.entity.QPoint.point;

@Slf4j
public class PointCustomRepositoryImpl extends QuerydslRepositorySupport implements PointCustomRepository {
    public PointCustomRepositoryImpl() {
        super(Point.class);
    }

    @Override
    public Page<ExpiredPointSummary> sumByExpiredDate(LocalDate alarmCriteriaDate, Pageable pageable) {
        JPQLQuery<ExpiredPointSummary> query =
                from(point)
                .select(
                        new QExpiredPointSummary(
                                point.pointWallet.userId,
                                point.amount.sum().coalesce(BigInteger.ZERO))
                )
                .where(point.expired.eq(true))
                .where(point.used.eq(false))
                .where(point.expireDate.before(alarmCriteriaDate))
                .groupBy(point.pointWallet);

        List<ExpiredPointSummary> expiredPoints =
                Objects.requireNonNull(getQuerydsl()).applyPagination(pageable, query).fetch();
        long elementCount = query.fetchCount();

        log.info("elementCount: {}", elementCount);

        return new PageImpl<>(expiredPoints, PageRequest.of(pageable.getPageNumber(), pageable.getPageSize()), elementCount);
    }
}
  • 위 인터페이스를 구현한 구현 클래스이다. 
  • 저 생성자는 QueryDSL을 사용하려면 무조건 있어야 하는 생성자라고 생각하면 된다.
  • 실질적으로 중요한 부분은 sumByExpiredDate의 구현 부분이다.
  • 이렇게 구현을 했으면 원래 사용중이던 PointRepository가 이를 상속받으면 끝난다.

PointCustomRepository를 상속하게 된 PointRepository

package cwchoiit.pointbatch.point.repository;

import cwchoiit.pointbatch.point.entity.Point;
import org.springframework.data.jpa.repository.JpaRepository;

public interface PointRepository extends JpaRepository<Point, Long>, PointCustomRepository {
}
  • 이렇게 해두면 이제 아래 코드인 Reader에서 PointRepository에서 sumByExpiredDate를 참조할 수 있다.
@Bean
@StepScope
public RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader(
        PointRepository pointRepository,
        @Value("#{T(java.time.LocalDate).parse(stepExecutionContext[alarmCriteriaDate])}") LocalDate alarmCriteriaDate
) {
    return new RepositoryItemReaderBuilder<ExpiredPointSummary>()
            .name("messageExpiredPointItemReader")
            .repository(pointRepository)
            .methodName("sumByExpiredDate")
            .pageSize(1000)
            .arguments(alarmCriteriaDate)
            .sorts(Map.of("pointWallet", Sort.Direction.ASC))
            .build();
}

 

끝났다. 이제 테스트 코드를 작성해서 실행해보자!

MessageExpiredPointJobConfigurationTest

package cwchoiit.pointbatch.message.job;

import cwchoiit.pointbatch.BatchTestSupport;
import cwchoiit.pointbatch.message.entity.Message;
import cwchoiit.pointbatch.message.repository.MessageRepository;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.*;
import org.springframework.beans.factory.annotation.Autowired;

import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;

import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.*;

class MessageExpiredPointJobConfigurationTest extends BatchTestSupport {

    @Autowired
    Job messageExpiredPointJob;

    @Autowired
    MessageRepository messageRepository;

    @Autowired
    PointRepository pointRepository;

    @Autowired
    PointWalletRepository pointWalletRepository;

    @Test
    void messageExpiredPointJob() throws Exception {
        LocalDate earnDate = LocalDate.of(2020, 1, 1);
        LocalDate expireDate = LocalDate.of(2020, 5, 1);
        LocalDate notExpireDate = LocalDate.of(2020, 12, 31);

        PointWallet pointWallet1 = new PointWallet("user1", BigInteger.valueOf(3000));
        PointWallet pointWallet2 = new PointWallet("user2", BigInteger.valueOf(0));

        pointWalletRepository.save(pointWallet1);
        pointWalletRepository.save(pointWallet2);

        // 만료된 포인트 (pointWallet2)
        pointRepository.save(new Point(pointWallet2, BigInteger.valueOf(1000), earnDate, expireDate, false, true));
        pointRepository.save(new Point(pointWallet2, BigInteger.valueOf(500), earnDate, expireDate, false, true));
        // 만료된 포인트 (pointWallet1)
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(1000), earnDate, expireDate, false, true));
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(500), earnDate, expireDate, false, true));
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(1000), earnDate, expireDate, false, true));

        // 만료되지 않은 포인트 (pointWallet1)
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(1000), earnDate, notExpireDate));
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(2000), earnDate, notExpireDate));
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(3000), earnDate, notExpireDate));

        JobParameters jobParameters = new JobParametersBuilder()
                .addString("today", "2020-09-12")
                .toJobParameters();

        // messageExpiredPointJob 실행
        JobExecution jobExecution = launchJob(messageExpiredPointJob, jobParameters);

        // Job 성공적으로 끝났는지 체크
        assertThat(jobExecution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);

        List<Message> messages = messageRepository.findAll();
        // 만료 알림 메시지가 총 2개 나갔는지 체크 (user1에게 2500원 만료됐다는 메시지, user2에게 1500원 만료됐다는 메시지)
        assertThat(messages).hasSize(2);

        Message forUser1Message = messages.stream()
                .filter(message -> message.getUserId().equals("user1"))
                .findFirst()
                .orElse(null);
        // 메시지 중 하나는 user1을 위한 메시지인지 체크
        assertThat(forUser1Message).isNotNull();
        // user1에게 나간 메시지의 타이틀을 체크
        assertThat(forUser1Message.getTitle()).isEqualTo("2500 포인트 만료");
        // user1에게 나간 메시지의 내용 체크
        assertThat(forUser1Message.getContent()).isEqualTo("2020-09-12 기준 2500 포인트가 만료되었습니다.");

        Message forUser2Message = messages.stream()
                .filter(message -> message.getUserId().equals("user2"))
                .findFirst()
                .orElse(null);
        // 메시지 중 하나는 user1을 위한 메시지인지 체크
        assertThat(forUser2Message).isNotNull();
        // user1에게 나간 메시지의 타이틀을 체크
        assertThat(forUser2Message.getTitle()).isEqualTo("1500 포인트 만료");
        // user1에게 나간 메시지의 내용 체크
        assertThat(forUser2Message.getContent()).isEqualTo("2020-09-12 기준 1500 포인트가 만료되었습니다.");
    }
}
  • 테스트 결과는 성공적으로 수행된다.

 

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

이커머스 포인트 배치를 구현해보기  (2) 2024.11.21
Listener  (1) 2024.10.09
ItemWriter  (1) 2024.10.09
ItemProcessor  (2) 2024.10.09
ItemReader  (1) 2024.10.09

+ Recent posts