728x90
반응형
SMALL

 

포인트 예약 적립 배치 Job

이번에는, 포인트 예약 적립에 대한 배치를 작성해보자!

우선, Job을 하나 만들자.

ExecutePointReservationJobConfiguration

package cwchoiit.pointbatch.reservation.job;

import cwchoiit.pointbatch.point.job.validator.TodayJobParameterValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ExecutePointReservationJobConfiguration {

    @Bean
    public Job executePointReservationJob(JobRepository jobRepository,
                                          TodayJobParameterValidator validator,
                                          Step executePointReservationStep) {
        return new JobBuilder("executePointReservationJob", jobRepository)
                .validator(validator)
                .start(executePointReservationStep)
                .build();
    }
}
  • 역시나 `today`라는 파라미터를 전달받는 Job이기 때문에 동일한 TodayJobParameterValidator를 사용한다.

이제 저기서 선언한 executePointReservationStep을 만들어보자.

 

ExecutePointReservationStepConfiguration

package cwchoiit.pointbatch.reservation.job;

import com.mysema.commons.lang.Pair;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import cwchoiit.pointbatch.reservation.entity.PointReservation;
import cwchoiit.pointbatch.reservation.repository.PointReservationRepository;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import java.time.LocalDate;
import java.util.Map;

@Configuration
public class ExecutePointReservationStepConfiguration {

    @Bean
    @StepScope
    public JpaPagingItemReader<PointReservation> executePointReservationReader(EntityManagerFactory entityManagerFactory,
                                                                               @Value("#{T(java.time.LocalDate).parse(jobParameters[today])}")LocalDate today) {
        return new JpaPagingItemReaderBuilder<PointReservation>()
                .name("executePointReservationReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("SELECT pr FROM PointReservation pr WHERE pr.earnedDate = :today AND pr.executed = false")
                .parameterValues(Map.of("today", today))
                .pageSize(1000)
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor() {
        return reservation -> {
            reservation.setExecuted(true);
            Point earnedPoint = new Point(
                    reservation.getPointWallet(),
                    reservation.getAmount(),
                    reservation.getEarnedDate(),
                    reservation.getExpireDate());
            PointWallet wallet = reservation.getPointWallet();
            wallet.setAmount(wallet.getAmount().add(earnedPoint.getAmount()));
            return Pair.of(reservation, earnedPoint);
        };
    }

    @Bean
    @StepScope
    public ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter(PointReservationRepository pointReservationRepository,
                                                                                       PointRepository pointRepository,
                                                                                       PointWalletRepository pointWalletRepository) {
        return reservationAndPointPair -> {
            for (Pair<PointReservation, Point> pair : reservationAndPointPair) {
                PointReservation reservation = pair.getFirst();
                pointReservationRepository.save(reservation);

                Point point = pair.getSecond();
                pointRepository.save(point);

                pointWalletRepository.save(reservation.getPointWallet());
            }
        };
    }

    @Bean
    @JobScope
    public Step executePointReservationStep(JobRepository jobRepository,
                                            PlatformTransactionManager transactionManager,
                                            JpaPagingItemReader<PointReservation> executePointReservationReader,
                                            ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor,
                                            ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter) {
        return new StepBuilder("executePointReservationStep", jobRepository)
                .<PointReservation, Pair<PointReservation, Point>>chunk(1000, transactionManager)
                .reader(executePointReservationReader)
                .processor(executePointReservationItemProcessor)
                .writer(executePointReservationItemWriter)
                .build();
    }
}
  • Step부터 살펴보고, Reader, Processor, Writer를 하나씩 살펴보자.
@Bean
@JobScope
public Step executePointReservationStep(JobRepository jobRepository,
                                        PlatformTransactionManager transactionManager,
                                        JpaPagingItemReader<PointReservation> executePointReservationReader,
                                        ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor,
                                        ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter) {
    return new StepBuilder("executePointReservationStep", jobRepository)
            .<PointReservation, Pair<PointReservation, Point>>chunk(1000, transactionManager)
            .reader(executePointReservationReader)
            .processor(executePointReservationItemProcessor)
            .writer(executePointReservationItemWriter)
            .build();
}
  • Step이니까 @JobScope를 사용했다.
  • 이전에 했던 포인트 만료 배치와 비슷하게 StepBuilder를 통해 Step을 만들면 된다.
  • 역시 1000개씩 짤라서 작업을 수행하기 위해 chunk를 사용한다.
@Bean
@StepScope
public JpaPagingItemReader<PointReservation> executePointReservationReader(EntityManagerFactory entityManagerFactory,
                                                                           @Value("#{T(java.time.LocalDate).parse(jobParameters[today])}")LocalDate today) {
    return new JpaPagingItemReaderBuilder<PointReservation>()
            .name("executePointReservationReader")
            .entityManagerFactory(entityManagerFactory)
            .queryString("SELECT pr FROM PointReservation pr WHERE pr.earnedDate = :today AND pr.executed = false")
            .parameterValues(Map.of("today", today))
            .pageSize(1000)
            .build();
}
  • 먼저, 데이터를 가져오는 Reader 부분이다.
  • JPQL을 자세히 보면 된다. 매우 간단하다. PointReservation 테이블에서, earnedDate`today`라는 파라미터로 들어오는 값이고, 아직 예약 적립이 실행되지 않은(`executed = false`) 포인트 예약 레코드를 가져온다.
@Bean
@StepScope
public ItemProcessor<PointReservation, Pair<PointReservation, Point>> executePointReservationItemProcessor() {
    return reservation -> {
        reservation.setExecuted(true);
        Point earnedPoint = new Point(
                reservation.getPointWallet(),
                reservation.getAmount(),
                reservation.getEarnedDate(),
                reservation.getExpireDate());
        PointWallet wallet = reservation.getPointWallet();
        wallet.setAmount(wallet.getAmount().add(earnedPoint.getAmount()));
        return Pair.of(reservation, earnedPoint);
    };
}
  • 이번엔 Processor 인데 이 부분이 좀 재밌다.
  • 우선, PointReservation 엔티티를 하나씩 차례로 가져오면, 이 포인트 예약 적립을 실행해야 하니까 실행 정보를 `true`로 변경한다.
  • 그리고, 신규 포인트 엔티티를 하나 만들고 이 포인트 예약 적립에 대한 데이터를 추가한다.
  • 이 포인트 예약 적립 엔티티는 어떤 포인트 지갑에 들어갈지에 대한 정보가 있다. 해당 정보를 통해 PointWallet 엔티티를 가져온다.
  • PointWallet의 포인트 금액을 현재 금액 + 포인트 예약 적립 금액을 합쳐서 저장한다.
  • 그 다음, 변경 사항은 총 3개다. [PointReservation, PointWallet, Point]
  • 그 말은 이 세 개의 엔티티 모두 다 Writer에서 저장을 해야 한다. 그러려면 하나의 엔티티만 넘길 수는 없기에 여러 방법이 있지만 나는 Pair를 사용했다. 새로운 DTO 클래스를 만들어서 그 녀석에 저 세개를 담고 넘겨도 좋은 방법이다.
  • "어? 왜 2개만 넘겨요?" → PointWalletPointReservation이 참조하고 있으니 그 값을 사용하면 된다.
@Bean
@StepScope
public ItemWriter<Pair<PointReservation, Point>> executePointReservationItemWriter(PointReservationRepository pointReservationRepository,
                                                                                   PointRepository pointRepository,
                                                                                   PointWalletRepository pointWalletRepository) {
    return reservationAndPointPair -> {
        for (Pair<PointReservation, Point> pair : reservationAndPointPair) {
            PointReservation reservation = pair.getFirst();
            pointReservationRepository.save(reservation);

            Point point = pair.getSecond();
            pointRepository.save(point);

            pointWalletRepository.save(reservation.getPointWallet());
        }
    };
}
  • 이번엔 Writer이다. Processor를 통해 전달받은 Pair 데이터를 순회하면서 저장을 전부 다 해주면 된다.

 

이렇게 Job, Step을 만들었으면 역시나 테스트 코드를 통해 제대로 실행되는지 확인해보자.

ExecutePointReservationJobConfigurationTest

package cwchoiit.pointbatch.reservation.job;

import cwchoiit.pointbatch.BatchTestSupport;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import cwchoiit.pointbatch.reservation.entity.PointReservation;
import cwchoiit.pointbatch.reservation.repository.PointReservationRepository;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.*;
import org.springframework.beans.factory.annotation.Autowired;

import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;

import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.*;

class ExecutePointReservationJobConfigurationTest extends BatchTestSupport {

    @Autowired
    PointWalletRepository pointWalletRepository;

    @Autowired
    PointReservationRepository pointReservationRepository;

    @Autowired
    PointRepository pointRepository;

    @Autowired
    Job executePointReservationJob;

    @Test
    void executePointReservationJob() throws Exception {
        PointWallet pointWallet = new PointWallet("user1", BigInteger.valueOf(3000));
        pointWalletRepository.save(pointWallet);

        LocalDate earnDate = LocalDate.of(2024, 1, 1);
        PointReservation pointReservation = new PointReservation(pointWallet, BigInteger.valueOf(1000), earnDate, 10);
        pointReservationRepository.save(pointReservation);

        JobParameters jobParameters = new JobParametersBuilder()
                .addString("today", "2024-01-01")
                .toJobParameters();

        JobExecution jobExecution = launchJob(executePointReservationJob, jobParameters);

        // Job 이 정상적으로 실행됐는지
        assertThat(jobExecution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);

        List<PointReservation> pointReservations = pointReservationRepository.findAll();

        // 예약 포인트는 현재 1개있는지
        assertThat(pointReservations).hasSize(1);
        // 예약 포인트의 실행이 true 로 변경됐는지
        assertThat(pointReservations.getFirst().isExecuted()).isTrue();

        List<Point> points = pointRepository.findAll();

        // 포인트 레포지토리에 포인트는 한개가 있는지
        assertThat(points).hasSize(1);
        // 포인트의 금액은 1000원인지
        assertThat(points.getFirst().getAmount()).isEqualTo(BigInteger.valueOf(1000));
        // 포인트의 발행일은 2024-01-01인지
        assertThat(points.getFirst().getEarnedDate()).isEqualTo(earnDate);
        // 포인트의 만료일은 2024-01-11인지
        assertThat(points.getFirst().getExpireDate()).isEqualTo(LocalDate.of(2024, 1, 11));

        List<PointWallet> pointWallets = pointWalletRepository.findAll();

        // 포인트 지갑은 한개인지
        assertThat(pointWallets).hasSize(1);
        // 포인트 지갑의 총 금액은 4000원인지
        assertThat(pointWallets.getFirst().getAmount()).isEqualTo(BigInteger.valueOf(4000));
    }
}
  • 테스트 실행 결과는 정상적으로 수행될 것이다.
  • 여기서 이제 BatchTestSupport의 위력이 나오는 것이다. 저 클래스로 공통으로 사용되는 부분을 따로 분리하니까 굉장히 테스트 코드 작성하기가 편해졌다.

 

포인트 만료 메시지 배치 Job

이번에는 포인트가 만료되면 메시지(알림)을 사용자에게 보내주는 게 일반적인 앱의 기능인데 그 부분을 배치성으로 만들어보자!

 

MessageExpiredPointJobConfiguration

package cwchoiit.pointbatch.message.job;

import cwchoiit.pointbatch.point.job.validator.TodayJobParameterValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageExpiredPointJobConfiguration {

    @Bean
    public Job messageExpiredPointJob(JobRepository jobRepository,
                                      TodayJobParameterValidator validator,
                                      Step messageExpiredPointStep) {
        return new JobBuilder("messageExpiredPointJob", jobRepository)
                .validator(validator)
                .start(messageExpiredPointStep)
                .build();
    }
}
  • 이제 Job 부분은 더 이상 설명할 게 없다.

 

InputExpiredPointAlarmCriteriaDateStepListener

package cwchoiit.pointbatch.message.listener;

import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

@Component
public class InputExpiredPointAlarmCriteriaDateStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        JobParameter<?> todayParam = stepExecution.getJobParameters().getParameter("today");
        if (todayParam == null) {
            return;
        }

        LocalDate todayLocalDate = LocalDate.parse((String) todayParam.getValue());
        ExecutionContext context = stepExecution.getExecutionContext();
        context.put("alarmCriteriaDate", todayLocalDate.minusDays(1).format(DateTimeFormatter.ISO_DATE));
        stepExecution.setExecutionContext(context);
    }
}
  • 이 배치작업을 할 때 지금까지는 사용해 본 적 없는 StepExecutionListener를 사용해보자.
  • StepExecutionListener는, 이 리스너를 등록한 스텝을 실행하기 전에 리스너를 먼저 발동시켜서 어떤 작업을 처리할 수 있게 해주는 것이다. 지금은 beforeStep만 있지만 afterStep도 구현 가능하다.
  • 이 리스너는 그래서 잡 파라미터에서 `today`를 가져와서, LocalDate로 변환한 다음, `alarmCriteriaDate`라는 값을 만들어내서 ExecutionContext에 담는 리스너이다. 저 `alarmCriteriaDate` 이 녀석은 `today`라는 오늘 날짜라는 파라미터에서 하루를 뺀 날짜를 가리킨다. 그러니까, 이 배치작업이 실제 운영서버에 돌아가는 작업이라면, 하루에 한번씩 실행될텐데 만료일이 있고 오늘날짜가 있을때 오늘날짜보다 하루 뺀 날짜가 만료일인 것을 찾아내고 싶은 거다.

MessageExpiredPointStepConfiguration

package cwchoiit.pointbatch.message.job;

import cwchoiit.pointbatch.message.entity.Message;
import cwchoiit.pointbatch.message.listener.InputExpiredPointAlarmCriteriaDateStepListener;
import cwchoiit.pointbatch.point.dto.ExpiredPointSummary;
import cwchoiit.pointbatch.point.repository.PointRepository;
import jakarta.persistence.EntityManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.data.builder.RepositoryItemReaderBuilder;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.PlatformTransactionManager;

import java.time.LocalDate;
import java.util.Map;

@Slf4j
@Configuration
public class MessageExpiredPointStepConfiguration {

    @Bean
    @StepScope
    public RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader(
            PointRepository pointRepository,
            @Value("#{T(java.time.LocalDate).parse(stepExecutionContext[alarmCriteriaDate])}") LocalDate alarmCriteriaDate
    ) {
        return new RepositoryItemReaderBuilder<ExpiredPointSummary>()
                .name("messageExpiredPointItemReader")
                .repository(pointRepository)
                .methodName("sumByExpiredDate")
                .pageSize(1000)
                .arguments(alarmCriteriaDate)
                .sorts(Map.of("pointWallet", Sort.Direction.ASC))
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor(
            @Value("#{T(java.time.LocalDate).parse(jobParameters[today])}") LocalDate today
    ) {
        return summary -> Message.of(summary.getUserId(), today, summary.getAmount());
    }

    @Bean
    @StepScope
    public JpaItemWriter<Message> messageExpiredPointItemWriter(EntityManagerFactory entityManagerFactory) {
        return new JpaItemWriterBuilder<Message>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

    @Bean
    @JobScope
    public Step messageExpiredPointStep(JobRepository jobRepository,
                                        InputExpiredPointAlarmCriteriaDateStepListener listener,
                                        PlatformTransactionManager transactionManager,
                                        RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader,
                                        ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor,
                                        JpaItemWriter<Message> messageExpiredPointItemWriter) {
        return new StepBuilder("messageExpiredPointStep", jobRepository)
                .listener(listener)
                .<ExpiredPointSummary, Message>chunk(1000, transactionManager)
                .reader(messageExpiredPointItemReader)
                .processor(messageExpiredPointItemProcessor)
                .writer(messageExpiredPointItemWriter)
                .build();
    }
}
  • Step, Reader, Processor, Writer를 구현한 클래스이다. 한번 하나씩 봐보자.
@Bean
@JobScope
public Step messageExpiredPointStep(JobRepository jobRepository,
                                    InputExpiredPointAlarmCriteriaDateStepListener listener,
                                    PlatformTransactionManager transactionManager,
                                    RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader,
                                    ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor,
                                    JpaItemWriter<Message> messageExpiredPointItemWriter) {
    return new StepBuilder("messageExpiredPointStep", jobRepository)
            .listener(listener)
            .<ExpiredPointSummary, Message>chunk(1000, transactionManager)
            .reader(messageExpiredPointItemReader)
            .processor(messageExpiredPointItemProcessor)
            .writer(messageExpiredPointItemWriter)
            .build();
}
  • 지금까지 한 것들이랑 다른게 없지만, 딱 하나, listener를 등록했다. 아까 위에서 만든 그 리스너이다.
@Bean
@StepScope
public RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader(
        PointRepository pointRepository,
        @Value("#{T(java.time.LocalDate).parse(stepExecutionContext[alarmCriteriaDate])}") LocalDate alarmCriteriaDate
) {
    return new RepositoryItemReaderBuilder<ExpiredPointSummary>()
            .name("messageExpiredPointItemReader")
            .repository(pointRepository)
            .methodName("sumByExpiredDate")
            .pageSize(1000)
            .arguments(alarmCriteriaDate)
            .sorts(Map.of("pointWallet", Sort.Direction.ASC))
            .build();
}
  • 이번엔 ReaderJpaPagingReader가 아니라 RepositoryItemReader이다. 이 Reader는 내가 제공한 레포지토리의 특정 메서드를 실행해서 결과를 얻어올 수 있다. 
  • 왜 이것을 사용했냐? 지금까지는 JPQL로 간단하게 데이터를 가져올 수 있었지만 이번에는 살짝 쿼리가 복잡하기 때문에 QueryDSL을 사용할 것이다. 그래서 그 QueryDSL을 사용한 메서드를 호출하게 하고 싶어서 이 RepositoryItemReader를 사용한다.
  • 아직 sumByExpiredDate는 만들지 않았다.
  • 또한 반환 타입이 ExpiredPointSummary이다. 이건 단순 DTO라고 생각하면 편할 것 같다. 이 역시 아직 만들지 않았다.
@Bean
@StepScope
public ItemProcessor<ExpiredPointSummary, Message> messageExpiredPointItemProcessor(
        @Value("#{T(java.time.LocalDate).parse(jobParameters[today])}") LocalDate today
) {
    return summary -> Message.of(summary.getUserId(), today, summary.getAmount());
}
  • Processor는 간단하다. 저기 Processor를 보면, 받은 ExpiredPointSummary를 활용해서 Message.of()를 호출한 결과를 반환한다. 이게 무엇일까? 아래 코드를 보자.

Message

package cwchoiit.pointbatch.message.entity;

import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.math.BigInteger;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

@Entity
@Table(name = "MESSAGE")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class Message {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "user_id", nullable = false)
    private String userId;

    @Column(name = "title", nullable = false)
    private String title;

    @Column(name = "content", nullable = false, columnDefinition = "text")
    private String content;

    public Message(String userId, String title, String content) {
        this.userId = userId;
        this.title = title;
        this.content = content;
    }

    public static Message of(String userId, LocalDate expiredDate, BigInteger expiredAmount) {
        return new Message(
                userId,
                String.format("%s 포인트 만료", expiredAmount.toString()),
                String.format("%s 기준 %s 포인트가 만료되었습니다.", expiredDate.format(DateTimeFormatter.ISO_DATE), expiredAmount)
        );
    }
}
  • Message.of()Message 엔티티에서 Message를 편리하게 만들어내는 생성 메서드이다. 
  • 그 다음 아래 코드를 보자.

ExpiredPointSummary

package cwchoiit.pointbatch.point.dto;

import com.querydsl.core.annotations.QueryProjection;
import lombok.Getter;

import java.math.BigInteger;

@Getter
public class ExpiredPointSummary {
    private String userId;
    private BigInteger amount;

    @QueryProjection
    public ExpiredPointSummary(String userId, BigInteger amount) {
        this.userId = userId;
        this.amount = amount;
    }
}
  • 이 클래스는 단순히 만료된 포인트의 합계와, 해당 유저의 ID를 가지고 있는 DTO다. 물론, QueryDSL을 사용하기 위해 생성자를 Q클래스로 만들어 낸다. 
@Bean
@StepScope
public JpaItemWriter<Message> messageExpiredPointItemWriter(EntityManagerFactory entityManagerFactory) {
    return new JpaItemWriterBuilder<Message>()
            .entityManagerFactory(entityManagerFactory)
            .build();
}
  • WriterJpaItemWriter를 사용한다. 타입인 Message의 엔티티가 적당히 쌓이면, 전달한 엔티티 매니저의 플러시를 호출한다.

 

자, 이제 QueryDSL을 사용해서 쿼리를 짜보자. 참고로, QueryDSL이 뭔지 모르면 따라오기 힘들 수 있다. 포스팅 중에 QueryDSL을 다룬 포스팅이 많으니까 해당 포스팅을 참고하자.

 

PointCustomRepository

package cwchoiit.pointbatch.point.repository;

import cwchoiit.pointbatch.point.dto.ExpiredPointSummary;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

import java.time.LocalDate;

public interface PointCustomRepository {
    Page<ExpiredPointSummary> sumByExpiredDate(LocalDate alarmCriteriaDate, Pageable pageable);
}
  • 우선, QueryDSL 용으로 새로운 레포지토리 하나를 만들자.
  • 만들 메서드는 아까 위에서 정의한 sumByExpiredDate이다.

PointCustomRepositoryImpl

package cwchoiit.pointbatch.point.repository;

import com.querydsl.jpa.JPQLQuery;
import cwchoiit.pointbatch.point.dto.ExpiredPointSummary;
import cwchoiit.pointbatch.point.dto.QExpiredPointSummary;
import cwchoiit.pointbatch.point.entity.Point;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.support.QuerydslRepositorySupport;

import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;
import java.util.Objects;

import static cwchoiit.pointbatch.point.entity.QPoint.point;

@Slf4j
public class PointCustomRepositoryImpl extends QuerydslRepositorySupport implements PointCustomRepository {
    public PointCustomRepositoryImpl() {
        super(Point.class);
    }

    @Override
    public Page<ExpiredPointSummary> sumByExpiredDate(LocalDate alarmCriteriaDate, Pageable pageable) {
        JPQLQuery<ExpiredPointSummary> query =
                from(point)
                .select(
                        new QExpiredPointSummary(
                                point.pointWallet.userId,
                                point.amount.sum().coalesce(BigInteger.ZERO))
                )
                .where(point.expired.eq(true))
                .where(point.used.eq(false))
                .where(point.expireDate.before(alarmCriteriaDate))
                .groupBy(point.pointWallet);

        List<ExpiredPointSummary> expiredPoints =
                Objects.requireNonNull(getQuerydsl()).applyPagination(pageable, query).fetch();
        long elementCount = query.fetchCount();

        log.info("elementCount: {}", elementCount);

        return new PageImpl<>(expiredPoints, PageRequest.of(pageable.getPageNumber(), pageable.getPageSize()), elementCount);
    }
}
  • 위 인터페이스를 구현한 구현 클래스이다. 
  • 저 생성자는 QueryDSL을 사용하려면 무조건 있어야 하는 생성자라고 생각하면 된다.
  • 실질적으로 중요한 부분은 sumByExpiredDate의 구현 부분이다.
  • 이렇게 구현을 했으면 원래 사용중이던 PointRepository가 이를 상속받으면 끝난다.

PointCustomRepository를 상속하게 된 PointRepository

package cwchoiit.pointbatch.point.repository;

import cwchoiit.pointbatch.point.entity.Point;
import org.springframework.data.jpa.repository.JpaRepository;

public interface PointRepository extends JpaRepository<Point, Long>, PointCustomRepository {
}
  • 이렇게 해두면 이제 아래 코드인 Reader에서 PointRepository에서 sumByExpiredDate를 참조할 수 있다.
@Bean
@StepScope
public RepositoryItemReader<ExpiredPointSummary> messageExpiredPointItemReader(
        PointRepository pointRepository,
        @Value("#{T(java.time.LocalDate).parse(stepExecutionContext[alarmCriteriaDate])}") LocalDate alarmCriteriaDate
) {
    return new RepositoryItemReaderBuilder<ExpiredPointSummary>()
            .name("messageExpiredPointItemReader")
            .repository(pointRepository)
            .methodName("sumByExpiredDate")
            .pageSize(1000)
            .arguments(alarmCriteriaDate)
            .sorts(Map.of("pointWallet", Sort.Direction.ASC))
            .build();
}

 

끝났다. 이제 테스트 코드를 작성해서 실행해보자!

MessageExpiredPointJobConfigurationTest

package cwchoiit.pointbatch.message.job;

import cwchoiit.pointbatch.BatchTestSupport;
import cwchoiit.pointbatch.message.entity.Message;
import cwchoiit.pointbatch.message.repository.MessageRepository;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.*;
import org.springframework.beans.factory.annotation.Autowired;

import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;

import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.*;

class MessageExpiredPointJobConfigurationTest extends BatchTestSupport {

    @Autowired
    Job messageExpiredPointJob;

    @Autowired
    MessageRepository messageRepository;

    @Autowired
    PointRepository pointRepository;

    @Autowired
    PointWalletRepository pointWalletRepository;

    @Test
    void messageExpiredPointJob() throws Exception {
        LocalDate earnDate = LocalDate.of(2020, 1, 1);
        LocalDate expireDate = LocalDate.of(2020, 5, 1);
        LocalDate notExpireDate = LocalDate.of(2020, 12, 31);

        PointWallet pointWallet1 = new PointWallet("user1", BigInteger.valueOf(3000));
        PointWallet pointWallet2 = new PointWallet("user2", BigInteger.valueOf(0));

        pointWalletRepository.save(pointWallet1);
        pointWalletRepository.save(pointWallet2);

        // 만료된 포인트 (pointWallet2)
        pointRepository.save(new Point(pointWallet2, BigInteger.valueOf(1000), earnDate, expireDate, false, true));
        pointRepository.save(new Point(pointWallet2, BigInteger.valueOf(500), earnDate, expireDate, false, true));
        // 만료된 포인트 (pointWallet1)
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(1000), earnDate, expireDate, false, true));
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(500), earnDate, expireDate, false, true));
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(1000), earnDate, expireDate, false, true));

        // 만료되지 않은 포인트 (pointWallet1)
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(1000), earnDate, notExpireDate));
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(2000), earnDate, notExpireDate));
        pointRepository.save(new Point(pointWallet1, BigInteger.valueOf(3000), earnDate, notExpireDate));

        JobParameters jobParameters = new JobParametersBuilder()
                .addString("today", "2020-09-12")
                .toJobParameters();

        // messageExpiredPointJob 실행
        JobExecution jobExecution = launchJob(messageExpiredPointJob, jobParameters);

        // Job 성공적으로 끝났는지 체크
        assertThat(jobExecution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);

        List<Message> messages = messageRepository.findAll();
        // 만료 알림 메시지가 총 2개 나갔는지 체크 (user1에게 2500원 만료됐다는 메시지, user2에게 1500원 만료됐다는 메시지)
        assertThat(messages).hasSize(2);

        Message forUser1Message = messages.stream()
                .filter(message -> message.getUserId().equals("user1"))
                .findFirst()
                .orElse(null);
        // 메시지 중 하나는 user1을 위한 메시지인지 체크
        assertThat(forUser1Message).isNotNull();
        // user1에게 나간 메시지의 타이틀을 체크
        assertThat(forUser1Message.getTitle()).isEqualTo("2500 포인트 만료");
        // user1에게 나간 메시지의 내용 체크
        assertThat(forUser1Message.getContent()).isEqualTo("2020-09-12 기준 2500 포인트가 만료되었습니다.");

        Message forUser2Message = messages.stream()
                .filter(message -> message.getUserId().equals("user2"))
                .findFirst()
                .orElse(null);
        // 메시지 중 하나는 user1을 위한 메시지인지 체크
        assertThat(forUser2Message).isNotNull();
        // user1에게 나간 메시지의 타이틀을 체크
        assertThat(forUser2Message.getTitle()).isEqualTo("1500 포인트 만료");
        // user1에게 나간 메시지의 내용 체크
        assertThat(forUser2Message.getContent()).isEqualTo("2020-09-12 기준 1500 포인트가 만료되었습니다.");
    }
}
  • 테스트 결과는 성공적으로 수행된다.

 

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

이커머스 포인트 배치를 구현해보기  (2) 2024.11.21
Listener  (1) 2024.10.09
ItemWriter  (1) 2024.10.09
ItemProcessor  (2) 2024.10.09
ItemReader  (1) 2024.10.09
728x90
반응형
SMALL

어지간한 상품을 판매하는 사이트에는 다 있는 포인트에 대해서, 직접 배치를 사용해 포인트 관련 일괄처리를 공부해보고 스프링 배치에 대해 좀 더 익숙해지는 시간을 가져보자.

 

프로젝트 만들기

현재 기준 Spring Boot 3.3.5 버전으로 프로젝트를 만들었다. 그리고 의존성은 다음을 참고한다.

implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'com.h2database:h2'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.batch:spring-batch-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

testImplementation 'org.springframework.batch:spring-batch-test'

implementation 'com.querydsl:querydsl-jpa:5.0.0:jakarta'
annotationProcessor "com.querydsl:querydsl-apt:5.0.0:jakarta"
annotationProcessor "jakarta.annotation:jakarta.annotation-api"
annotationProcessor "jakarta.persistence:jakarta.persistence-api"
  • Spring Batch
  • Spring Data JPA
  • Lombok
  • QueryDSL
  • H2

이렇게 5개 정도를 추가하면 된다. 

 

build.gradle (전체)

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.3.5'
    id 'io.spring.dependency-management' version '1.1.6'
}

group = 'cwchoiit'
version = '0.0.1-SNAPSHOT'

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(21)
    }
}

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'com.h2database:h2'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.batch:spring-batch-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

    testImplementation 'org.springframework.batch:spring-batch-test'

    implementation 'com.querydsl:querydsl-jpa:5.0.0:jakarta'
    annotationProcessor "com.querydsl:querydsl-apt:5.0.0:jakarta"
    annotationProcessor "jakarta.annotation:jakarta.annotation-api"
    annotationProcessor "jakarta.persistence:jakarta.persistence-api"
}

tasks.named('test') {
    useJUnitPlatform()
}

// querydsl directory path
def querydslDir = "src/main/generated"

// querydsl directory를 자동 임포트 할 수 있게 설정
sourceSets {
    main.java.srcDirs += [ querydslDir ]
}

// task 중 compileJava를 실행하면 Q파일들을 생성
tasks.withType(JavaCompile).configureEach {
    options.getGeneratedSourceOutputDirectory().set(file(querydslDir))
}

// clean을 하면 querydsl directory를 삭제
clean.doLast {
    file(querydslDir).deleteDir()
}

 

 

@EnableBatchProcessing

package cwchoiit.pointbatch.config;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.context.annotation.Configuration;

@EnableBatchProcessing
@Configuration
public class BatchConfig {

}
  • 가장 먼저, 스프링 배치를 사용하기 위해 @EnableBatchProcessing 애노테이션을 달아준다. 그리고 이 애노테이션이 적용되기 위해 @Configuration 애노테이션도 달아준다.

 

application.yaml

spring:
  batch:
    job:
      name: ${job.name:NONE}
    jdbc:
      initialize-schema: always
  jpa:
    show-sql: true
    hibernate:
      ddl-auto: validate
  datasource:
    driver-class-name: org.h2.Driver
    url: jdbc:h2:tcp://localhost/~/h2/db/point
    username: sa
    password:
  • H2 데이터베이스는 미리 다운받고, 데이터베이스를 만들면 된다. 나는 `point`라는 이름의 데이터베이스를 만들었다.
  • `spring.batch.job.name` : ${job.name:NONE} → 이 스프링 배치 프로그램을 패키징하여 Jar 파일로 만들고 해당 파일을 실행할때, 어떤 Job을 실행시킬건지 명명해줘야 한다. 근데 이름이 너무 기니까 그 긴 이름을 `job.name`으로 축소한것이라고 생각하면 된다. 그리고 만약, Job을 선택하지 않은 경우 모든 Job이 다 실행되는데 그것은 싫으니 기본값으로 NONE을 명명했다.
  • 나머지 데이터베이스 관련 설정은 넘어간다. 이 포스팅에서 다룰 내용은 아니라고 생각한다. 이미 이것을 다룬 포스팅이 내 블로그에 많다. 

 

데이터베이스 테이블 만들기

우선, 데이터베이스 테이블을 만들자. DDL은 미리 준비해 두었다.

CREATE TABLE `point_wallet`
(
    `id`      bigint      NOT NULL AUTO_INCREMENT COMMENT 'ID',
    `amount`  bigint      NOT NULL COMMENT '보유금액',
    `user_id` varchar(20) NOT NULL COMMENT '유저 ID',
    PRIMARY KEY (`id`)
);

CREATE TABLE `point_reservation`
(
    `id`              bigint  NOT NULL AUTO_INCREMENT COMMENT 'ID',
    `amount`          bigint  NOT NULL COMMENT '적립금액',
    `available_days`  int     NOT NULL COMMENT '유효기간',
    `earned_date`     date    NOT NULL COMMENT '적립일자',
    `is_executed`     tinyint NOT NULL COMMENT '적용여부',
    `point_wallet_id` bigint  NOT NULL COMMENT '포인트 지갑 ID',
    PRIMARY KEY (`id`)
);

CREATE TABLE `point`
(
    `id`              bigint  NOT NULL AUTO_INCREMENT COMMENT 'ID',
    `amount`          bigint  NOT NULL COMMENT '적립금액',
    `earned_date`     date    NOT NULL COMMENT '적립일자',
    `expire_date`     date    NOT NULL COMMENT '만료일자',
    `is_used`         tinyint NOT NULL COMMENT '사용유무',
    `is_expired`      tinyint NOT NULL COMMENT '만료여부',
    `point_wallet_id` bigint  NOT NULL COMMENT '포인트 지갑 ID',
    PRIMARY KEY (`id`)
);

CREATE TABLE `message`
(
    `id`      bigint       NOT NULL AUTO_INCREMENT COMMENT 'ID',
    `user_id` varchar(20)  NOT NULL COMMENT '유저 ID',
    `title`   varchar(200) NOT NULL COMMENT '제목',
    `content` text         NOT NULL COMMENT '내용',
    PRIMARY KEY (`id`)
);
  • 사용자의 포인트를 보관하는 지갑을 나타내는 `point_wallet` 테이블
  • 사용자에게 포인트가 적립될 예정임을 나타내는 `point_reservation` 테이블
  • 사용자의 포인트를 나타내는 `point` 테이블
  • 포인트 소멸 또는 증액 예정에 대한 알람을 나타내는 `message` 테이블

지금까지는 비즈니스 로직에 관련된 테이블이었다. 여기서 끝나면 안된다. 왜냐하면 스프링 배치를 돌리기 위해 필요한 테이블들이 있다.

Spring Batch Meta-Data Schema

Spring BatchJob, Step, 및 실행 중인 Batch Job의 상태에 대한 정보를 기록하기 위해 다양한 메타데이터 테이블을 사용한다. 그래서 배치작업을 할 때 이 메타데이터 테이블들이 있어야 한다. 어떻게 만들까? `schema-*.sql` 이라는 파일명으로 된 DDL 스크립트를 제공한다. 저기서 `*`은 내가 사용하는 데이터베이스를 의미한다. 그래서 나는 H2를 사용하니까 `schema-h2.sql` 이라는 파일명을 찾으면 된다! 아래처럼 IntelliJ에서 파일을 찾아보면 스크립트 파일 하나가 나올것이다.

 

해당 파일을 열어보면 다음과 같이 생겼다.

-- Autogenerated: do not edit this file

CREATE TABLE BATCH_JOB_INSTANCE  (
	JOB_INSTANCE_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
	VERSION BIGINT ,
	JOB_NAME VARCHAR(100) NOT NULL,
	JOB_KEY VARCHAR(32) NOT NULL,
	constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ;

CREATE TABLE BATCH_JOB_EXECUTION  (
	JOB_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
	VERSION BIGINT  ,
	JOB_INSTANCE_ID BIGINT NOT NULL,
	CREATE_TIME TIMESTAMP(9) NOT NULL,
	START_TIME TIMESTAMP(9) DEFAULT NULL ,
	END_TIME TIMESTAMP(9) DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED TIMESTAMP(9),
	constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
	references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
	JOB_EXECUTION_ID BIGINT NOT NULL ,
	PARAMETER_NAME VARCHAR(100) NOT NULL ,
	PARAMETER_TYPE VARCHAR(100) NOT NULL ,
	PARAMETER_VALUE VARCHAR(2500) ,
	IDENTIFYING CHAR(1) NOT NULL ,
	constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION  (
	STEP_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY ,
	VERSION BIGINT NOT NULL,
	STEP_NAME VARCHAR(100) NOT NULL,
	JOB_EXECUTION_ID BIGINT NOT NULL,
	CREATE_TIME TIMESTAMP(9) NOT NULL,
	START_TIME TIMESTAMP(9) DEFAULT NULL ,
	END_TIME TIMESTAMP(9) DEFAULT NULL ,
	STATUS VARCHAR(10) ,
	COMMIT_COUNT BIGINT ,
	READ_COUNT BIGINT ,
	FILTER_COUNT BIGINT ,
	WRITE_COUNT BIGINT ,
	READ_SKIP_COUNT BIGINT ,
	WRITE_SKIP_COUNT BIGINT ,
	PROCESS_SKIP_COUNT BIGINT ,
	ROLLBACK_COUNT BIGINT ,
	EXIT_CODE VARCHAR(2500) ,
	EXIT_MESSAGE VARCHAR(2500) ,
	LAST_UPDATED TIMESTAMP(9),
	constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
	STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT LONGVARCHAR ,
	constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
	references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ;

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
	JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
	SERIALIZED_CONTEXT LONGVARCHAR ,
	constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;
  • 이 스크립트 역시 실행해서 메타데이터 테이블을 만들면 된다.
  • 각각의 테이블이 정확히 어떤 내용인지는 다음 공식 문서를 참조하자. 그런데, 이름만 봐도 이게 어떤 테이블인지는 감이 대충 온다.
 

Meta-Data Schema :: Spring Batch

The Spring Batch Metadata tables closely match the domain objects that represent them in Java. For example, JobInstance, JobExecution, JobParameters, and StepExecution map to BATCH_JOB_INSTANCE, BATCH_JOB_EXECUTION, BATCH_JOB_EXECUTION_PARAMS, and BATCH_ST

docs.spring.io

 

 

엔티티 만들기

데이터베이스와 테이블을 만들었으니 이제 엔티티를 만들어보자.

 

Message

package cwchoiit.pointbatch.message.entity;

import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Entity
@Table(name = "MESSAGE")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class Message {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "user_id", nullable = false)
    private String userId;

    @Column(name = "title", nullable = false)
    private String title;

    @Column(name = "content", nullable = false, columnDefinition = "text")
    private String content;
}

MessageRepository

package cwchoiit.pointbatch.message.repository;

import cwchoiit.pointbatch.message.entity.Message;
import org.springframework.data.jpa.repository.JpaRepository;

public interface MessageRepository extends JpaRepository<Message, Long> {
}

 

Point

package cwchoiit.pointbatch.point.entity;

import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.math.BigInteger;
import java.time.LocalDate;

@Entity
@Table(name = "POINT")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class Point {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "point_wallet_id", nullable = false)
    private PointWallet pointWallet;

    @Column(name = "amount", nullable = false, columnDefinition = "BIGINT")
    private BigInteger amount;

    @Column(name = "earned_date", nullable = false)
    private LocalDate earnedDate;

    @Column(name = "expire_date", nullable = false)
    private LocalDate expireDate;

    @Column(name = "is_used", nullable = false, columnDefinition = "TINYINT", length = 1)
    private boolean used;

    @Column(name = "is_expired", nullable = false, columnDefinition = "TINYINT", length = 1)
    private boolean expired;

    public Point(PointWallet pointWallet, BigInteger amount, LocalDate earnedDate, LocalDate expireDate) {
        this.pointWallet = pointWallet;
        this.amount = amount;
        this.earnedDate = earnedDate;
        this.expireDate = expireDate;
        this.used = false;
        this.expired = false;
    }

    public void expire() {
        if (!this.used) {
            this.expired = true;
        }
    }
}

PointRepository

package cwchoiit.pointbatch.point.repository;

import cwchoiit.pointbatch.point.entity.Point;
import org.springframework.data.jpa.repository.JpaRepository;

public interface PointRepository extends JpaRepository<Point, Long> {
}

 

PointWallet

package cwchoiit.pointbatch.point.entity;

import jakarta.persistence.*;
import lombok.*;

import java.math.BigInteger;

@Entity
@Table(name = "POINT_WALLET")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class PointWallet {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "user_id", unique = true, nullable = false)
    private String userId;

    @Column(name = "amount", columnDefinition = "BIGINT")
    BigInteger amount;
}

PointWalletRepository

package cwchoiit.pointbatch.point.repository;

import cwchoiit.pointbatch.point.entity.PointWallet;
import org.springframework.data.jpa.repository.JpaRepository;

public interface PointWalletRepository extends JpaRepository<PointWallet, Long> {
}

 

PointReservation

package cwchoiit.pointbatch.reservation.entity;

import cwchoiit.pointbatch.point.entity.PointWallet;
import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.math.BigInteger;
import java.time.LocalDate;

@Entity
@Table(name = "POINT_RESERVATION")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
public class PointReservation {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "point_wallet_id", nullable = false)
    private PointWallet pointWallet;

    @Column(name = "amount", nullable = false, columnDefinition = "BIGINT")
    private BigInteger amount;

    @Column(name = "earned_date", nullable = false)
    private LocalDate earnedDate;

    @Column(name = "available_days", nullable = false)
    private int availableDays;

    @Column(name = "is_executed", columnDefinition = "TINYINT", length = 1)
    private boolean executed;

    public PointReservation(PointWallet pointWallet, BigInteger amount, LocalDate earnedDate, int availableDays) {
        this.pointWallet = pointWallet;
        this.amount = amount;
        this.earnedDate = earnedDate;
        this.availableDays = availableDays;
        this.executed = false;
    }

    public void execute() {
        this.executed = true;
    }

    public LocalDate getExpireDate() {
        return this.earnedDate.plusDays(this.availableDays);
    }
}

PointReservationRepository

package cwchoiit.pointbatch.reservation.repository;

import cwchoiit.pointbatch.reservation.entity.PointReservation;
import org.springframework.data.jpa.repository.JpaRepository;

public interface PointReservationRepository extends JpaRepository<PointReservation, Long> {
}

 

 

테스트 코드 작성하기

이제, 비즈니스 로직을 수행할 Job을 만들건데, 그 Job을 테스트하기 위한 테스트 코드를 같이 작성해보자. 바로 들어가보자.

우선, application.yaml은 동일하게 하나 만들어주자.

 

test/resources/application.yaml

spring:
  batch:
    job:
      name: ${job.name:NONE}
    jdbc:
      initialize-schema: always
  jpa:
    show-sql: true
    hibernate:
      ddl-auto: validate
  datasource:
    url: jdbc:h2:tcp://localhost/~/h2/db/point
    username: sa
    password:
    driver-class-name: org.h2.Driver

 

BatchTestSupport

package cwchoiit.pointbatch;

import cwchoiit.pointbatch.message.repository.MessageRepository;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import cwchoiit.pointbatch.reservation.repository.PointReservationRepository;
import org.junit.jupiter.api.AfterEach;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

@SpringBootTest
@ActiveProfiles("test")
public abstract class BatchTestSupport {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    PointWalletRepository pointWalletRepository;

    @Autowired
    PointRepository pointRepository;

    @Autowired
    MessageRepository messageRepository;

    @Autowired
    PointReservationRepository pointReservationRepository;

    protected JobExecution launchJob(Job job, JobParameters jobParameters) throws Exception {
        JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
        jobLauncherTestUtils.setJob(job);
        jobLauncherTestUtils.setJobLauncher(jobLauncher);
        jobLauncherTestUtils.setJobRepository(jobRepository);
        return jobLauncherTestUtils.launchJob(jobParameters);
    }

    @AfterEach
    protected void deleteAll() {
        pointWalletRepository.deleteAll();
        pointRepository.deleteAll();
        messageRepository.deleteAll();
        pointReservationRepository.deleteAll();
    }
}
  • 이 파일은, Job을 실행할 때 공통적으로 처리하는 부분들을 따로 빼서 유틸리티 느낌으로 계속 사용하기 위해 만든 클래스이다.
  • 테스트 코드를 작성하는 클래스는 모두 이 클래스를 상속받을 것이라 @SpringBootTest, @ActiveProfiles 애노테이션을 달아주었다. 
@Autowired
private JobLauncher jobLauncher;

@Autowired
private JobRepository jobRepository;
  • 이 두개는 스프링 배치에서 제공해주는 빈들이다. 잡을 실행하기 위한 빈인 JobLauncher, 스프링 메타데이터 테이블에 접근하기 위한 빈인 JobRepository.
protected JobExecution launchJob(Job job, JobParameters jobParameters) throws Exception {
    JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
    jobLauncherTestUtils.setJob(job);
    jobLauncherTestUtils.setJobLauncher(jobLauncher);
    jobLauncherTestUtils.setJobRepository(jobRepository);
    return jobLauncherTestUtils.launchJob(jobParameters);
}
  • 실제로 Job을 실행하는 메서드이다. 파라미터로는 실행할 Job, Job을 실행할 때 사용되는 Parameter를 받는다.
  • JobLauncherTestUtils는 스프링 배치 테스트 라이브러리에서 제공해주는 테스트를 할때 유용하게 사용할 수 있는 녀석이다. 위 코드에서처럼 Job, JobLauncher, JobRepository, JobParameters를 넘겨주고 Job을 실행할 수 있다.
@AfterEach
protected void deleteAll() {
    pointWalletRepository.deleteAll();
    pointRepository.deleteAll();
    messageRepository.deleteAll();
    pointReservationRepository.deleteAll();
}
  • 이 부분은 테스트를 진행하면서 생기는 레코드들을 전부 삭제하고 싶어서 만든 부분이다. 분명, 더 좋은 방법이 있을것이라고 생각한다. 원래 스프링 부트 테스트를 실행할 때 아무런 데이터베이스 정보를 넣어주지 않으면 메모리 H2 데이터베이스를 자동으로 사용하게 해주는데 그 데이터베이스에 스프링 메타데이터 테이블을 어떻게 넣는지 모르고 있는 상태라 테스트용 데이터베이스가 아닌 실제 데이터베이스로 테스트를 진행중이다. 그래서 위 코드를 사용중에 있다.

만료된 포인트 배치 작업 처리하기

ExpirePointJobConfigurationTest

package cwchoiit.pointbatch.point.job;

import cwchoiit.pointbatch.BatchTestSupport;
import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.beans.factory.annotation.Autowired;

import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.batch.core.ExitStatus.COMPLETED;

class ExpirePointJobConfigurationTest extends BatchTestSupport {

    @Autowired
    Job expirePointJob;

    @Autowired
    PointWalletRepository pointWalletRepository;

    @Autowired
    PointRepository pointRepository;

    @Test
    void expirePointJob() throws Exception {

        LocalDate earnDate = LocalDate.of(2024, 11, 1);
        LocalDate expireDate = LocalDate.of(2024, 11, 3);

        PointWallet savedPointWallet = pointWalletRepository.save(new PointWallet("userA", BigInteger.valueOf(6000)));

        pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
        pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
        pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));

        JobParameters jobParameters = new JobParametersBuilder()
                .addString("today", "2024-11-09")
                .toJobParameters();
        JobExecution jobExecution = launchJob(expirePointJob, jobParameters);
        assertThat(jobExecution.getExitStatus()).isEqualTo(COMPLETED);

        List<Point> points = pointRepository.findAll();
        assertThat(points.stream().filter(Point::isExpired)).hasSize(3);

        PointWallet changedPointWallet = pointWalletRepository.findById(savedPointWallet.getId()).orElse(null);
        assertThat(changedPointWallet).isNotNull();
        assertThat(changedPointWallet.getAmount()).isEqualTo(BigInteger.valueOf(3000));
    }
}
  • 이제 첫번째 배치성 비즈니스 로직을 작성해보자. 우선, 가장 먼저 만료된 포인트를 처리하는 배치를 작성해보자.
  • 테스트 코드와 비즈니스 로직 코드를 같이 작성하자. 그래서 위 코드를 작성했다. 한 줄 한 줄 살펴보자.
LocalDate earnDate = LocalDate.of(2024, 11, 1);
LocalDate expireDate = LocalDate.of(2024, 11, 3);

PointWallet savedPointWallet = pointWalletRepository.save(new PointWallet("userA", BigInteger.valueOf(6000)));

pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
  • 우선, 이 부분은 데이터베이스에 레코드를 추가하는 부분이다. 필요한 엔티티는 PointWallet 엔티티 하나, Point 엔티티 3개가 필요하다. 그러니까 한명의 유저의 포인트 지갑에 포인트가 3개 들어간다고 보면 된다. 
  • 포인트는 3개 전부 다 1000 포인트, 포인트 지급일은 2024-11-01, 포인트 만료일은 2024-11-03일로 동일하다.
JobParameters jobParameters = new JobParametersBuilder()
                .addString("today", "2024-11-09")
                .toJobParameters();
JobExecution jobExecution = launchJob(expirePointJob, jobParameters);
assertThat(jobExecution.getExitStatus()).isEqualTo(COMPLETED);

List<Point> points = pointRepository.findAll();
assertThat(points.stream().filter(Point::isExpired)).hasSize(3);

PointWallet changedPointWallet = pointWalletRepository.findById(savedPointWallet.getId()).orElse(null);
assertThat(changedPointWallet).isNotNull();
assertThat(changedPointWallet.getAmount()).isEqualTo(BigInteger.valueOf(3000));
  • 그리고 Job을 실행하기 위한 JobParameters를 생성한다. today라는 key에 2024-11-09일 이라는 날짜를 입력한다.
  • 아까 BatchTestSupport에서 만든 launchJob(...) 메서드를 실행한다. JobexpirePointJob을 넘겨준다. 이 Job은 아직 만들지 않았다. 곧 만들어보자. JobParameters로는 today가 들어있는 jobParameters를 넘기자.
  • 그러니까, 오늘날짜보다 만료일이 더 이전인 포인트를 전부 유효하지 않은 포인트 처리를 하는 배치 작업인 것이다.
  • 그리고 배치 작업을 실행했으면 테스트 결과를 찍어봐야 하니까 실행한 JobExecutionExitStatus 값이 COMPLETED인지 확인하고, 모든 포인트들을 찾아서 포인트가 만료된 개수가 3개 모두 인지 확인하고, 포인트 지갑의 남아있는 포인트가 3000인지도 확인한다. 

 

자, 이제 Job을 만들어보자!

 

ExpirePointJobConfiguration

package cwchoiit.pointbatch.point.job;

import cwchoiit.pointbatch.point.job.validator.TodayJobParameterValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ExpirePointJobConfiguration {

    @Bean
    public Job expirePointJob(JobRepository jobRepository, Step expirePointStep, TodayJobParameterValidator validator) {
        return new JobBuilder("expirePointJob", jobRepository)
                .validator(validator)
                .start(expirePointStep)
                .build();
    }
}
  • Job, Step 모두 빈으로 등록해야 한다. 그래서 @Bean 애노테이션을 사용한다.
  • 아까 만들기로 했던 expirePointJob을 만들자. 파라미터로는 JobRepository, Step, TodayJobParameterValidator가 들어간다. 아직 Step expirePointStep, TodayJobParameterValidator validator는 만들지 않았다.
  • JobBuilder를 통해서 새로운 Job을 만들고 validator 적용, 실행할 Step 적용을 하고 build()를 호출한다.

얼른 TodayJobParameterValidator를 만들어보자.

TodayJobParameterValidator

package cwchoiit.pointbatch.point.job.validator;

import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.format.DateTimeParseException;

@Component
public class TodayJobParameterValidator implements JobParametersValidator {

    @Override
    public void validate(JobParameters parameters) throws JobParametersInvalidException {
        if (parameters == null) {
            throw new JobParametersInvalidException("JobParameters is null");
        }
        String today = parameters.getString("today");
        if (today == null) {
            throw new JobParametersInvalidException("JobParameters [today] is null");
        }

        try {
            LocalDate.parse(today);
        } catch (DateTimeParseException e) {
            throw new JobParametersInvalidException("JobParameters [today] is not a valid date");
        }
    }
}
  • 스프링 배치는 JobParameters를 검증할 수 있는 JobParametersValidator를 제공한다. 이것을 구현한 클래스가 바로 TodayJobParameterValidator이다.
  • 구현 로직은 간단하다. Parameter가 있는지, 있다면 today라는 파라미터가 있는지를 검증하고, today라는 파라미터의 값으로 LocalDate로 파싱이 가능한지를 검증한다. 셋 중 하나라도 문제가 발생하면 JobParametersInvalidException을 발생시킨다.

 

이제 expirePointStep을 만들어보자!

 

ExpirePointStepConfiguration

package cwchoiit.pointbatch.point.job;

import cwchoiit.pointbatch.point.entity.Point;
import cwchoiit.pointbatch.point.entity.PointWallet;
import cwchoiit.pointbatch.point.repository.PointRepository;
import cwchoiit.pointbatch.point.repository.PointWalletRepository;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import java.time.LocalDate;
import java.util.Map;

@Configuration
public class ExpirePointStepConfiguration {

    @Bean
    @StepScope
    public JpaPagingItemReader<Point> expirePointItemReader(EntityManagerFactory entityManagerFactory,
                                                            @Value("#{T(java.time.LocalDate).parse(jobParameters[today])}") LocalDate today) {
        return new JpaPagingItemReaderBuilder<Point>()
                .name("expirePointItemReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("SELECT p FROM Point p WHERE p.expireDate < :today and used = false and expired = false")
                .parameterValues(Map.of("today", today))
                .pageSize(1000)
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Point, Point> expirePointItemProcessor() {
        return point -> {
            point.setExpired(true);
            PointWallet wallet = point.getPointWallet();
            wallet.setAmount(wallet.getAmount().subtract(point.getAmount()));
            return point;
        };
    }

    @Bean
    @StepScope
    public ItemWriter<Point> expirePointItemWriter(PointRepository pointRepository,
                                                   PointWalletRepository pointWalletRepository) {
        return points -> {
            for (Point point : points) {
                if (point.isExpired()) {
                    pointRepository.save(point);
                    pointWalletRepository.save(point.getPointWallet());
                }
            }
        };
    }

    @Bean
    @JobScope
    public Step expirePointStep(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager,
                                JpaPagingItemReader<Point> expirePointItemReader,
                                ItemProcessor<Point, Point> expirePointItemProcessor,
                                ItemWriter<Point> expirePointItemWriter) {
        return new StepBuilder("expirePointStep", jobRepository)
                .<Point, Point>chunk(1000, transactionManager)
                .reader(expirePointItemReader)
                .processor(expirePointItemProcessor)
                .writer(expirePointItemWriter)
                .build();
    }
}
  • StepStep을 구성하는 reader, processor, writer를 만들었다. 참고로 이게 어떤 내용인지는 이미 이전 포스팅에 다 작성을 했기 때문에 참고하면 된다. 가장 먼저, Step을 확인해보자.
@Bean
@JobScope
public Step expirePointStep(JobRepository jobRepository,
                            PlatformTransactionManager transactionManager,
                            JpaPagingItemReader<Point> expirePointItemReader,
                            ItemProcessor<Point, Point> expirePointItemProcessor,
                            ItemWriter<Point> expirePointItemWriter) {
    return new StepBuilder("expirePointStep", jobRepository)
            .<Point, Point>chunk(1000, transactionManager)
            .reader(expirePointItemReader)
            .processor(expirePointItemProcessor)
            .writer(expirePointItemWriter)
            .build();
}
  • 우선은 Step도 빈으로 등록되어야 하기 때문에 @Bean 애노테이션을 달았고, 이 Step을 사용하는 Job이 실행될 때 이 Step이 로딩되기를 원하기 때문에 @JobScope 애노테이션을 작성했다. 
  • StepBuilder를 사용해서, Step을 만들어내는데, 데이터베이스에 데이터가 너무 많은 경우 한번에 다 가져올 수 없으니 chunk를 통해 1000개씩 쪼개서 가져오도록 했고, reader, processor, writer를 집어넣었다.
  • 참고로 PlatformTransactionManager는 스프링에서 제공해주는 트랜잭션 매니저이다. 그러니까 이 작업동안에 사용할 트랜잭션을 제공한다고 보면 된다.

expirePointItemReader

@Bean
@StepScope
public JpaPagingItemReader<Point> expirePointItemReader(EntityManagerFactory entityManagerFactory,
                                                        @Value("#{T(java.time.LocalDate).parse(jobParameters[today])}") LocalDate today) {
    return new JpaPagingItemReaderBuilder<Point>()
            .name("expirePointItemReader")
            .entityManagerFactory(entityManagerFactory)
            .queryString("SELECT p FROM Point p WHERE p.expireDate < :today and used = false and expired = false")
            .parameterValues(Map.of("today", today))
            .pageSize(1000)
            .build();
}
  • 역시 이 Reader도 연관된 스텝이 사용될 때 로딩되도록 @StepScope를 사용했다.
  • 간단하게 JPAJPQL로 원하는 데이터를 읽어온다. 우리가 원하는건 "포인트 테이블에서 파라미터로 전달받은 오늘 날짜보다 만료일이 이전이고, 사용하지 않았고, 만료 체크되지도 않은 모든 포인트"를 가져오고 싶다.
  • 이렇게 가져온 데이터들이 Processor로 넘어간다.

expirePointItemProcessor

@Bean
@StepScope
public ItemProcessor<Point, Point> expirePointItemProcessor() {
    return point -> {
        point.setExpired(true);
        PointWallet wallet = point.getPointWallet();
        wallet.setAmount(wallet.getAmount().subtract(point.getAmount()));
        return point;
    };
}
  • 역시 이 Processor도 연관된 스텝이 사용될 때 로딩되도록 @StepScope를 사용했다.
  • 가져온 데이터를 Point 타입에서 Point 타입으로 처리할 것이므로(다른 말로 타입의 변환은 일으키지 않을 것이라는 말이다) <Point, Point>를 사용했다.
  • 각 포인트의 만료여부를 `true`로 변경한다.
  • 그리고 해당 포인트를 가지고 있는 PointWallet의 포인트 가격을 지금 이 포인트가 들고있는 가격만큼 차감한다. (만료된 포인트니까)

expirePointItemWriter

@Bean
@StepScope
public ItemWriter<Point> expirePointItemWriter(PointRepository pointRepository,
                                               PointWalletRepository pointWalletRepository) {
    return points -> {
        for (Point point : points) {
            if (point.isExpired()) {
                pointRepository.save(point);
                pointWalletRepository.save(point.getPointWallet());
            }
        }
    };
}
  • 역시 이 Writer도 연관된 스텝이 사용될 때 로딩되도록 @StepScope를 사용했다.
  • 이제 처리한 모든 포인트들을 순회하며, 만료여부가 `true`인 경우, 작업처리된 포인트와 포인트지갑을 반영해야 하므로 save()를 호출해준다.

 

이렇게 Job, Step, Reader, Processor, Writer를 전부 구현했다. 이제 실행해보자! 아까 테스트 코드로 만든 아래 코드를 실행하자!

@Test
void expirePointJob() throws Exception {

    LocalDate earnDate = LocalDate.of(2024, 11, 1);
    LocalDate expireDate = LocalDate.of(2024, 11, 3);

    PointWallet savedPointWallet = pointWalletRepository.save(new PointWallet("userA", BigInteger.valueOf(6000)));

    pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
    pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));
    pointRepository.save(new Point(savedPointWallet, BigInteger.valueOf(1000), earnDate, expireDate));

    JobParameters jobParameters = new JobParametersBuilder()
            .addString("today", "2024-11-12")
            .toJobParameters();
    JobExecution jobExecution = launchJob(expirePointJob, jobParameters);
    assertThat(jobExecution.getExitStatus()).isEqualTo(COMPLETED);

    List<Point> points = pointRepository.findAll();
    assertThat(points.stream().filter(Point::isExpired)).hasSize(3);

    PointWallet changedPointWallet = pointWalletRepository.findById(savedPointWallet.getId()).orElse(null);
    assertThat(changedPointWallet).isNotNull();
    assertThat(changedPointWallet.getAmount()).isEqualTo(BigInteger.valueOf(3000));
}

실행 결과

  • 테스트가 성공적으로 끝났다. 다시 한번 실행해보자!

재실행 결과

  • 어? 테스트에 실패했다 왜 그럴까?

에러 내용은 다음과 같다.

A job instance already exists and is complete for identifying parameters={'today':'{value=2024-11-12, type=class java.lang.String, identifying=true}'}.  If you want to run this job again, change the parameters.
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for identifying parameters={'today':'{value=2024-11-12, type=class java.lang.String, identifying=true}'}.  If you want to run this job again, change the parameters.
  • 쉽게 말해, 이미 같은 파라미터로 실행한 Job Instance가 존재한다는 것이다. 스프링 배치는 설계하기를 배치성 로직을 같은 파라미터로 한번 더 수행하는 것을 잘못된 수행으로 판단했다. 그도 그럴것이 배치성이라는게 날 잡아서 큰 작업을 한번에 처리하는건데 이것을 동일한 데이터로 한번 더하면 문제가 될 수 있다고 판단하는 것이다.
  • 그래서, 실행하기 전 아까 위에서 말한 메타데이터 테이블로 같은 파라미터로 실행된 Job이 있는지 먼저 확인을 한다. 

 

정리

아직 할 게 더 남았지만, 포스팅이 길어지니까 다음 포스팅에 계속하겠다. 그래도 포인트 만료시키는 배치 작업을 해보면서 어느정도 스프링 배치에 대해 감을 좀 잡았다.

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

이커머스 포인트 배치를 구현해보기 2  (0) 2024.11.22
Listener  (1) 2024.10.09
ItemWriter  (1) 2024.10.09
ItemProcessor  (2) 2024.10.09
ItemReader  (1) 2024.10.09
728x90
반응형
SMALL

Spring Batch에서는 메인 로직 외에 구간 사이사이에 어떤 일을 처리하고자 할 때 Listener를 사용한다. 예를 들면, Job, Step, Chunk, ItemReader, ItemProcessor, ItemWriter의 실행 직전과 직후에 어떤 행위를 할지 정의할 수 있다.

 

Listener의 종류

  • JobExecutionListener
  • StepExecutionListener
  • ChunkListener
  • ItemReadListener
  • ItemProcessListener
  • ItemWriteListener

 

JobExecutionListener

public interface JobExecutionListener {
    default void beforeJob(JobExecution jobExecution) {
    }

    default void afterJob(JobExecution jobExecution) {
    }
}
  • Job의 실행 전에 실행하는 beforeJob.
  • Job의 실행 후에 실행하는 afterJob.
  • 두 메서드 모두 인자로 JobExecution을 넘겨준다.
  • 구현한 JobExecutionListener는 아래처럼 JobBuilder에서 Job을 만들 때 listener()에 넣어주면 된다.

 

나만의 JobListener

package cwchoiit.springbatchmaster.tasklet;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
public class JobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        ExecutionContext executionContext = jobExecution.getExecutionContext();
        Map<String, Object> executionContextMap = new HashMap<>();
        executionContextMap.put("name", "홍길동");
        executionContextMap.put("age", 30);
        jobExecution.setExecutionContext(new ExecutionContext(executionContextMap));
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println(jobExecution.getJobId());
    }
}

listener 등록

@Bean
public Job reserveRestaurantJob(JobRepository jobRepository,
                                JobListener jobListener,
                                Step searchAvailableKoreanRestaurantStep,
                                Step reserveRestaurantStep,
                                Step sendDepositStep) {
    return new JobBuilder("reserveRestaurantJob", jobRepository)
            .listener(jobListener)
            .start(searchAvailableKoreanRestaurantStep)
            .next(reserveRestaurantStep)
            .next(sendDepositStep)
            .build();
}

 

StepExecutionListener

public interface StepExecutionListener extends StepListener {
    default void beforeStep(StepExecution stepExecution) {
    }

    @Nullable
    default ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}
  • Step의 실행 전 실행하는 beforeStep
  • Step의 실행 후 실행하는 afterStep
  • 두 메서드 모두 인자로 StepExecution을 넘겨준다.
  • 구현된 StepExecutionListener는 아래처럼 StepBuilder에서 Step을 만들 때 listener()에 넣어주면 된다.

나만의 StepListener

package cwchoiit.springbatchmaster.tasklet;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;

@Component
public class StepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        StepExecutionListener.super.beforeStep(stepExecution);
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return StepExecutionListener.super.afterStep(stepExecution);
    }
}

listener 등록

@Bean
@JobScope
public Step sampleStep(JobRepository jobRepository,
                       StepListener stepListener,
                       PlatformTransactionManager platformTransactionManager) {
    return new StepBuilder("sampleStep", jobRepository)
            .tasklet(sampleTasklet(), platformTransactionManager)
            .listener(stepListener)
            .build();
}

 

 

ChunkListener

public interface ChunkListener extends StepListener {
    String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception";

    default void beforeChunk(ChunkContext context) {
    }

    default void afterChunk(ChunkContext context) {
    }

    default void afterChunkError(ChunkContext context) {
    }
}
  • 맥락은 위 Listener들과 똑같다. 다만, 여기서는 Chunk를 했을 때 에러가 발생하면 호출되는 메서드도 있다. 

ItemReadListener

public interface ItemReadListener<T> extends StepListener {
    default void beforeRead() {
    }

    default void afterRead(T item) {
    }

    default void onReadError(Exception ex) {
    }
}
  • 마찬가지다.

ItemProcessListener

public interface ItemProcessListener<T, S> extends StepListener {
    default void beforeProcess(T item) {
    }

    default void afterProcess(T item, @Nullable S result) {
    }

    default void onProcessError(T item, Exception e) {
    }
}

ItemWriteListener

public interface ItemWriteListener<S> extends StepListener {
    default void beforeWrite(Chunk<? extends S> items) {
    }

    default void afterWrite(Chunk<? extends S> items) {
    }

    default void onWriteError(Exception exception, Chunk<? extends S> items) {
    }
}

 

 

정리를 하자면

이번 포스팅에서는, 리스너들을 알아보았다. 각 작업 전후로 어떤 행위가 필요할때, 또는 Chunk, Read, Process, Write의 경우 에러가 발생했을 때 어떤 행위가 필요하면 정의하면 좋을듯하다.

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

이커머스 포인트 배치를 구현해보기 2  (0) 2024.11.22
이커머스 포인트 배치를 구현해보기  (2) 2024.11.21
ItemWriter  (1) 2024.10.09
ItemProcessor  (2) 2024.10.09
ItemReader  (1) 2024.10.09
728x90
반응형
SMALL

Chunk Processing의 마지막 단계로 Item을 쓰는 단계이다. ItemReaderItemProcessor를 거쳐 처리된 Item을 Chunk 단위만큼 처리한 뒤 이를 ItemWriter에 전달한다. ItemWriterItem 1개가 아니라, 데이터의 묶음인 Item List를 처리한다. ItemWriter가 쓰기를 하는 대상은 그 어떤 것도 될 수 있다. 파일이나, RDBMS, NoSQL에 데이터를 쓸 수도 있고, 다른 API를 호출할 수도 있다.

@FunctionalInterface
public interface ItemWriter<T> {
    void write(@NonNull Chunk<? extends T> chunk) throws Exception;
}

 

ItemWriter 또한, Spring Batch에서 개발자들이 많이 사용할 것들을 미리 만들어 줬다. 대표적인 것들은 다음과 같다.

  • JdbcBatchItemWriter
  • JpaItemWriter

 

ItemWriterList로 처리할까?

ItemReaderread()ItemProcessorprocess()는 아래와 같이 데이터 1개를 반환한다.

O process(@NonNull I item) throws Exception;
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

 

근데 왜 ItemWriterList로 처리할까? ItemWriter는 대부분 쓰기 작업이 일어난다. 이런 쓰기 작업을 건별로 처리하면 효율이 떨어지고 성능에 문제가 되는 경우가 많다. 예를 들어, ItemWriter에서 DatabaseINSERT를 한다고 하면, 1000개의 데이터를 한번에 저장하는 것과 개별 트랜잭션으로 1개씩 저장하는 것은 많은 성능 차이를 만든다. 그렇기 때문에 ItemWriterList로 처리한다고 보면 된다.

 

그럼에도 불구하고, 건별로 처리하고자 할 경우가 있으면 List를 받아서, 요소마다 건별로 처리하도록 구현할 수도 있다.

 

정리를 하자면

이번에는 ItemWriter에 대해 알아보았다. Chunk Processing의 마지막 단계인 ItemWriter. 다음에는 Listener라는 것을 알아보자!

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

이커머스 포인트 배치를 구현해보기  (2) 2024.11.21
Listener  (1) 2024.10.09
ItemProcessor  (2) 2024.10.09
ItemReader  (1) 2024.10.09
Chunk Processing  (5) 2024.10.09
728x90
반응형
SMALL

ItemProcessorItemReader에서 read()로 넘겨준 데이터를 개별로 가공한다. ItemProcessor는 언제 사용할까?

  • ItemReader가 넘겨준 데이터를 가공하려고 할 때
  • 데이터를 ItemWriter로 넘길지 말지 결정할 때 (null을 반환하면, ItemWriter에 전달하지 않음)

ItemProcessor는 필수가 아니다. 필요가 없는 경우는 어떤 경우일까?

  • 정말 ItemProcessor가 필요없는 경우. 예를 들면, ItemReader가 데이터베이스에서 데이터를 읽은 뒤 수정없이 그대로 ItemWriter에서 FileWrite하는 경우에는 ItemProcessor가 굳이 필요하지 않다.
  • ItemReaderItemWriter에서 데이터를 직접 가공까지 하는 경우인데, 이 방법은 추천하지는 않지만 상황에 따라 어쩔 수 없이 ItemReader에서 read할 때, 수정한 데이터를 넘겨주는 경우도 있고, ItemWriter에서 쓰기 전 데이터를 수정해서 write하는 경우가 있다.

 

ItemProcessor는 어떻게 구현할까?

ItemProcessorprocess를 구현하면 된다. ItemProcessorInputOutput이 있고, Input을 받아서 Output으로 변환한 뒤 반환해야 한다. 이때, InputOutput의 타입은 같을 수도 있고 다를 수도 있다.

@FunctionalInterface
public interface ItemProcessor<I, O> {
    @Nullable
    O process(@NonNull I item) throws Exception;
}

 

다음은, ItemProcessor를 입맛에 맞게 구현한 코드이다.

@Bean
@StepScope
public ItemProcessor<Order, Price> findExpensivePriceProcessor(PriceRepository priceRepository) {
    return order -> {
        Price price = priceRepository.findByProductId(order.getProductId());
        if (price.amount > 1000000L) {
            return price;
        } else {
            return null;
        }
    };
}

 

 

CompositeItemProcessor

자주 사용하지는 않지만, ItemProcessor 여러개를 체인처럼 연결할 때 사용한다. read를 통해 나온 아이템은 processor1processor2를 연속적으로 통과한다.

@Bean
@StepScope
public CompositeItemProcessor compositeItemProcessor(ItemProcessor<Order, Order> findExpensivePriceProcessor,
                                                     ItemProcessor<Order, Price> priceToPointProcessor) {
    List<ItemProcessor> delegates = List.of(findExpensivePriceProcessor, priceToPointProcessor);
    CompositeItemProcessor processor = new CompositeItemProcessor<>();
    processor.setDelegates(delegates);
    return processor;
}
  • findExpensivePriceProcessor, priceToPointProcessor를 연속적으로 사용하고자 할 때 이렇게 만들 수 있다.

 

정리를 하자면

데이터를 받아 가공할 때 사용하는 ItemProcessor를 알아보았다. 이 녀석은 필수는 아니다. 데이터를 가공할 필요가 있을 때 사용하면 된다. 이제 ItemWriter를 알아보자!

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

Listener  (1) 2024.10.09
ItemWriter  (1) 2024.10.09
ItemReader  (1) 2024.10.09
Chunk Processing  (5) 2024.10.09
Tasklet  (0) 2024.10.09
728x90
반응형
SMALL

ItemReaderChunk Processing에서 데이터를 제공하는 인터페이스이다. ItemReader는 반드시 read 메서드를 구현해야 한다. read 메서드를 통해서, ItemProcessor 또는 ItemWriter에게 데이터를 제공한다. read 메서드가 null을 반환하면 더이상 데이터가 없고 Step을 끝내겠다고 판단한다. 그렇기 때문에 처음부터 null을 반환했다고 하더라도 에러가 나지는 않는다. 단, 데이터가 없으니 바로 Step이 종료될 것이다.

 

@FunctionalInterface
public interface ItemReader<T> {
    @Nullable
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
  • ItemReader를 보면 T 타입의 단일 데이터 1개를 반환한다. 더 이상 읽을 수 없을 때까지(null이 나올 때까지) 반복한다. 
  • ItemReader의 데이터 조회 방식은 크게 두 가지로 나눌 수 있다.
    • 정말 1개씩 데이터를 가져와서 결과로 주는 방식
    • 한번에 대량으로 가져오고 가져온 데이터에서 하나씩 빼주는 방식 (단 대량으로 가져올 때 최대 가져올 수 있는 개수는 정해져 있어야 한다)

ItemReader가 가져오는 데이터는 정말 다양하게 있을 수 있지만 대개는 File, DB 데이터 정도이다. 그래서 Spring Batch는 우리를 위해 자주 사용될 법한 ItemReader들을 미리 만들어 두었다.

 

  • FlatFileItemReader → 보통 구분자로 나누어져 있는 파일을 읽는다. 대표적인 예로 CSV 파일
  • JdbcCursorItemReader → JDBC Cursor로 조회하여, 결과를 ObjectMapping해서 넣어주는 방식
  • JdbcPagingItemReader → 페이징해서 데이터베이스에서 데이터를 가져온다.
  • JpaPagingItemReader → 위에랑 똑같은데 JPA를 사용해서 페이징해서 데이터베이스에서 데이터를 가져온다.
  • RepositoryItemReader → Repository에서 구현한 메서드의 이름을 넣는 방식이다. 그래서 해당 메서드를 통해 데이터를 가져오는 것이다. 이때 주의할 점은, Repository에서 구현한 메서드의 인자에 Pageable이 포함되어 있어서, Pagination을 지원하는 상황이어야 한다. 아래가 그 예시다.
@Repository
public interface PointRepository extends JpaRepository<Point, Long>  {
    Page<Point> findByAmountGreaterThan(Long amount, Pageable pageable);
}
@Bean
@StepScope
public RepositoryItemReader<Point> pointRepositoryItemReader(PointRepository pointRepository) {
    return new RepositoryItemReaderBuilder<Point>()
            .repository(pointRepository)
            .methodName("findByAmountGreaterThan")
            .pageSize(1000)
            .maxItemCount(1000)
            .arguments(List.of(BigInteger.valueOf(100)))
            .sorts(Collections.singletonMap("id", Sort.Direction.DESC))
            .build();
}

 

정리를 하자면

ItemReaderChunk Processing을 구현할 때 데이터를 가져오는 녀석이다. 그리고 스프링 배치는 자주 사용되는 ItemReader들을 미리 우리를 위해 구현해 두었다. 이제 ItemProcessor를 알아보자!

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

ItemWriter  (1) 2024.10.09
ItemProcessor  (2) 2024.10.09
Chunk Processing  (5) 2024.10.09
Tasklet  (0) 2024.10.09
Step  (2) 2024.10.09
728x90
반응형
SMALL

Chunk Processing의 필요성

Batch 프로세싱의 가장 큰 특징이 일괄 처리이면서 동시에 가장 큰 문제가 일괄 처리이다. 일괄로 한번에 데이터를 처리한다는 것은 시스템의 리소스가 한순간에 많이 필요하다는 것을 말한다.  포인트 관리 배치 프로그램이 있는데 오늘 만료 시켜야 할 포인트가 십만개라면 어떨까? 서비스가 대성공해서 백만개, 천만개라면 어떨까? 그 어떤 서버도 한번에 천만개를 처리하기 쉽지 않을 것이다. 이를 해결하기 위해서 Spring Batch에서는 Chunk라는 개념을 만들었다. Chunk는 일정 개수만큼 잘라서 처리하겠다는 의미로, Chunk Size가 1000이면 한번에 1000개씩 처리하고 완료하고 그 다음 1000개 처리하고 완료하겠다는 의미이다. 이렇게 하면 한순간에는 1000개에 해당하는 리소스만 있으면 된다. 

 

 

일반적인 Chunk 기반 Step 흐름

  • 1 → 트랜잭션 시작
  • 2 → ItemReader가 데이터 1개 제공하기
  • 3 → ItemProcessor를 통해 데이터 1개를 가공하기
  • 4 → Chunk Size만큼 데이터가 쌓일때까지 2-3번을 반복
  • 5 → ItemWriter에게 데이터 전달하기 (보통의 경우, DB에 저장)
  • 6 → 트랜잭션 종료
  • 7 → 2번이 더이상 진행할 수 없을때까지, 1-6번을 계속해서 반복

 

ChunkProcessing 구현방법

StepChunk방식으로 구현하기 위해서는 다음과 같이 <A, B>chunk(...)를 사용하면 된다.

@Bean
@JobScope
public Step saveOrderedPriceStep(JobRepository jobRepository,
                                 PlatformTransactionManager transactionManager,
                                 JpaPagingItemReader<Order> orderReader,
                                 ItemProcessor<Order, Price> orderToPriceProcessor,
                                 ItemWriter<Price> priceWriter) {
    return new StepBuilder("saveOrderedPriceStep", jobRepository)
            .<Order, Price>chunk(1000, transactionManager)
            .reader(orderReader)
            .processor(orderToPriceProcessor)
            .writer(priceWriter)
            .build();
}
  • Chunk Processing을 구현할 때는, ItemReader, ItemWriter만 필수이고 ItemProcessor는 필요없다면 없어도 된다.
  • 그리고 ItemReader<T>, ItemProcessor<T, G>, ItemWriter<T> <T,G>chunk(...) 형식이 맞아야 한다.

 

그럼 Chunk Size는 얼마가 적당할까?

정답은 없다. 업무의 종류, 코드의 로직, 환경등에 따라 다르다. Chunk Size가 너무 작으면, 일괄처리 효율이 떨어지고, 또 반대로 Chunk Size가 너무 커도 리소스 문제나 처리량의 한계 등 문제가 있을 수 있다. 적당한 크기의 사이즈를 찾는것도 Batch 성능에 큰 도움이 된다.

 

다시 보는 StepExecutionContext

StepExecutionContext를 사용하면, 1개의 Step안에서 공유하는 공간을 만들 수 있다고 했다. 즉, 1개의 Step안에 있는 ItemReader, ItemProcessor, ItemWriter가 같은 공간을 접근할 수 있다. 

 

다시 보는 PlatformTransactionManager

@EnableBatchProcessing을 달면, 기본 트랜잭션 매니저를 가져올 수 있고 이걸 StepBuilder에서 등록할 수 있다고 했다. 이 트랜잭션 매니저를 StepBuilder에서 사용하게 되면 1개 Chunk 단위로 트랜잭션이 생기게 된다. 따라서 1개 Chunk가 끝나면 일괄적으로 트랜잭션이 끝나게 되고 ItemWriter에서 저장한 모든 대상들의 CommitChunk가 끝나면 발생한다.

 

 

정리를 하자면

Chunk Processing에 대해 알아보았다. 정해진 크기만큼 쪼개서 여러번 일괄처리를 하는 방법이다. 이제는 그 방법을 사용하기 위한 ItemReader, ItemProcessor, ItemWriter에 대해 알아보자!

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

ItemProcessor  (2) 2024.10.09
ItemReader  (1) 2024.10.09
Tasklet  (0) 2024.10.09
Step  (2) 2024.10.09
Job  (7) 2024.10.09
728x90
반응형
SMALL

Step을 구현할 때 구현 방법에는 크게 Tasklet 방식이 있고 Chunk 방식이 있다. 아래 그림을 보자.

  • Step 1Tasklet으로 구현했고, Step 2Chunk 방식으로 구현했음을 알 수 있다.

그럼 Tasklet은 무엇일까?

 

Tasklet

Step을 구현하는 방법 중 하나인 TaskletChunk보다 단순한 방식으로 단일 작업을 구현한다. 아래는 Taskletinterface이다. execute를 구현해야 함을 알 수 있다.

@FunctionalInterface
public interface Tasklet {
    @Nullable
    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}

 

StepBuilderStep을 만들때, tasklet()안에다가 구현한 Tasklet을 넣어주면 된다. 아래 예시의 경우에는 sampleStep은 넣어준 sampleTasklet으로 동작하게 된다.

@Bean
@JobScope
public Step sampleStep(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
    return new StepBuilder("sampleStep", jobRepository)
            .tasklet(sampleTasklet(), platformTransactionManager)
            .build();
}

 

그럼 sampleTasklet은 어떻게 생겼을까? 아래 예시를 보자. 

@Bean
@StepScope
public Tasklet sampleTasklet() {
	return (contribution, chunkContext) -> {
    	log.info("never ending tasklet");
        return RepeatStatus.CONTINUABLE:
    }
}

@Bean
@StepScope
public Tasklet sampleTasklet() {
	return (contribution, chunkContext) -> {
    	log.info("finish");
        return RepeatStatus.FINISHED:
    }
}
  • 두 가지 예시를 가져와봤다. Tasklet의 상태는 계속 진행할지, 끝낼지 두가지로만 표현된다.
  • RepeatStatus.FINISHED가 반환되면 tasklet이 바로 끝나고, RepeatStatus.CONTINUABLE이 반환되면, Tasklet을 다시 실행한다. 따라서, RepeatStatus.CONTINUABLE을 반환한 예시는 영구적으로 끝나지 않고 계속해서 로그를 남기게 된다.
  • 참고로, null을 반환하면 FINISHED와 동일하게 동작한다.

 

Tasklet을 만들 때 주의할 점

@Bean
@StepScope
public Tasklet hugeReadTasklet(PriceRepository priceRepository) {
    return (contribution, chunkContext) -> {
        List<Price> prices = priceRepository.findByDate(LocalDateTime.now());
        prices.forEach(price -> price.setAmount(0L));
        priceRepository.saveAll(prices);
        return RepeatStatus.FINISHED;
    };
}
  • 이 코드는 문제가 발생할 수 있는 포인트가 있다.
  • PriceRepository.findByDate()를 통해서 얼마나 많은 데이터를 가져올지 예측이 불가하다. 데이터가 너무 많다면 천만개의 데이터를 조회할 수도 있다. 천만개의 데이터를 한번에 가져온다면 메모리 이슈로 인해 처리가 불가해지고 OOM이 발생할 수 있다.
  • Tasklet 형식의 SteptransactionManager를 추가하게 되면 해당 TaskletTransaction에 묶이게 된다. 이때 Tasklet에서 너무 많은 데이터를 불러오고 쓰게 되면, TaskletTransaction은 너무 거대해진다. 그 말은 DatabaseTransaction 1개가 처리해야 할 일이 너무 많아지게 된다고도 할 수 있다.

 

이런 문제를 해결하기 위해, Chunk Processing을 사용하게 된다. Step을 구현하는 방법 중 대표적인 방법 하나인 Chunk Processing.

 

정리를 하자면

Tasklet에 대해 알아보았다. 간단한 스텝을 구현할 땐 유용하게 사용할 수 있지만, 조금 복잡하거나 처리할 데이터의 양이 많아지고 예측할 수 없다면 주의해야 한다. 그래서 이럴땐 Chunk Processing을 사용하면 된다. 다음 포스팅에서 알아보자!

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

ItemReader  (1) 2024.10.09
Chunk Processing  (5) 2024.10.09
Step  (2) 2024.10.09
Job  (7) 2024.10.09
Spring Batch 구조와 핵심 키워드  (2) 2024.10.09
728x90
반응형
SMALL

 

Job에는 여러 Step이 있다. Step은 실질적으로 요청을 처리하는 객체이다. StepJob과 마찬가지로 행위에 대한 명세서이다. 1개의 Job에는 여러개의 Step을 포함할 수 있고 따라서 1개의 JobExecution에는 여러개의 StepExecution을 포함할 수 있다.

 

예시)

  • Job: 식당을 예약한다.
    • Step 1: 식당에 전화한다.
    • Step 2: 예약한다.
    • Step 3: 예약금을 송금한다.

 

StepExecution

Step이라는 명세서를 실행시켜 실행된 기록을 StepExecution이라고 한다. JobExecutionJob의 실행 정보를 가지고 있는것처럼, StepExecutionStep의 실행 정보를 가지고 있다. 다음과 같은 메서드들을 StepExecution은 가지고 있다.

  • getStepName() → Step의 이름
  • getJobExecution() → JobExecution
  • getStartTime() → Step의 시작 시간
  • getEndTime() → Step의 종료 시간
  • getExecutionContext() → ExecutionContext
  • getExitStatus() → Step의 실행 결과
  • getStatus() → Step의 현재 실행 상태(Batch Status)

StepExecutionContext

JobExecutionContext가 1개의 Job에서 공유하는 공간이면, StepExecutionContext는 1개의 Step 내에서 공유하는 공간(Context)이다. 

 

PlatformTransactionManager

StepBuilderStep을 정의할 때, transactionManager를 받을 수 있다. @EnableBatchProcessing을 추가하면 아래와 같이 기본 transactionManager를 사용할 수 있다. (다만, 프로젝트에서 datasource가 여러개인 경우에는, 다른 말로, 데이터베이스를 여러개 사용한다면, 직접 별도로 transactionManager를 만들어서 사용해야 한다)

@Autowired
PlatformTransactionManager transactionManager;

transactionManagerStepBuilderFactory에 추가하면, 해당 SteptransactionManager를 사용해서 내부의 transaction을 관리한다.

TransactionManager란, 데이터베이스 트랜잭션은 데이터베이스의 데이터가 변하는 과정이 독립적이며, 일관되고 믿을 수 있는걸 보장하는 것을 말한다. 예를 들면, A가 B에게 송금을 했을 때, A는 돈이 보내져서 계좌에서 돈이 빠졌는데, B는 어떤 에러가 발생해서 돈을 받지 못했다. 만약, 이것이 트랜잭션으로 관리가 됐다면 이런일은 일어나지 않았을 것이다. 두 과정을 1개의 트랜잭션으로 묶었다면, B가 돈을 받지 못하면 A의 돈도 빠져나가지 않았던 것이 된다. 바로 이런 트랜잭션을 관리해주는 것이 바로 트랜잭션 매니저이고 @EnableBatchProcessing을 통해 자동으로 기본 트랜잭션 매니저를 만들어 준다.

 

@JobScope

일반적으로, Scope를 지정하지 않는다면 처음 스프링부트가 시작될 때 모든 Bean이 생성된다. Step을 생성하는 코드에 @JobScope 애노테이션을 달면, Step을 스프링 부트가 시작될 때 바로 만드는 것이 아니라 연관된 JobStep을 실행하는 시점에 만들어준다. Lazy Loading과 비슷한 개념이라고 보면 된다. 

 

1개의 스프링 배치 애플리케이션에 Job1, Job2, Job3이 있고, 3개의 Job들에 연결된 Step도 많이 만들어 두었는데, Job1만 실행시킨다면 Job1과 연관되지 않는 Step들은 굳이 만들 필요가 없다. 굳이 필요도 없는 Step을 만드는데 리소스나 시간을 낭비할 필요가 없기 때문에 이 @JobScope는 꽤나 중요하다. 또 다른 이유로는, Job이 실행된 다음에 나중에 결정되는 값이 있다면 늦게 Step을 생성하고 싶을 것이다. 예를 들면 다음과 같은 JobStep이 있다고 해보자.

  • Job1 → Step1.1 - Step1.2 - Step1.3

Job1은 세 개의 스텝으로 구성되어 있는데 Step1.2Step1.1의 결과를 통해 얻어낸 데이터를 가지고 Step1.2를 만들어야 한다면, Step1.2는 미리 만들수가 없다. Step1.1을 실행하고 나서 Step1.2을 만들어야 하기 때문에도 @JobScope를 사용하면 좋다.

 

결론은, 일반적으로는 @JobScope를 사용하면 좋다.

 

정리를 하자면

이번 포스팅에선 Step에 대해 알아보았다. 다음 포스팅은 Tasklet에 대해 알아보자!

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

ItemReader  (1) 2024.10.09
Chunk Processing  (5) 2024.10.09
Tasklet  (0) 2024.10.09
Job  (7) 2024.10.09
Spring Batch 구조와 핵심 키워드  (2) 2024.10.09
728x90
반응형
SMALL

Job이란?

1개의 작업에 대한 명세서를 의미한다. 어디까지가 1개 작업으로 봐야할지 기준이 애매할 수 있다. 그 기준은 상황별로 판단하면 된다.

다음 예시를 보자.

  • Job: 식당을 예약한다.
    • Step 1: 전화를 건다.
    • Step 2: 예약을 한다.
    • Step 3: 예약금을 송금한다.

이게 하나의 Job이라고 볼 수 있다. 이처럼,

  • 1개의 Job은 여러개의 Step을 포함할 수 있다.
  • Job name을 통해 Job을 구분할 수 있다. 
  • Job name으로 Job을 실행시킬 수 있다.
  • Job을 만드는 빌더는 많지만, JobBuilder로 쉽게 Job을 만들 수 있다.

 

아래 그림을 보자.

 

Job을 만들고 실행하는 과정에 대한 그림이다. 그럼 저기서 JobJob Instance는 어떤 차이가 있을까? 그리고 Job Execution, Job Parameter는 무엇일까?

 

JobInstance

Job이 명세서라면, JobInstanceJob이 실행되어 실체화된 것이다. JobInstance는 배치 처리에서 Job이 실행될 때 하나의 Job 실행 단위이다. 같은 Job에 같은 조건(Job Parameters)이면, JobInstance는 동일하다고 판단한다. 혹시, Job이 실패해서 다시 같은 조건으로 Job을 실행한다면 같은 JobInstance라고 할 수 있다. 그럼 당연히 같은 Job에 다른 조건(Job Parameters)이면, JobInstance는 다를 것이다.

 

JobExecution

JobExecutionJobInstance의 한번 실행을 뜻한다. 어떤 Job같은 조건으로 1번 실패하고, 1번 성공한다면 JobInstance는 1개이고, JobExecution은 2개이다. JobExecution은 실패했든지 성공했든지 간에 실제로 실행시킨 사실과 동일한 의미이기 때문에 배치 실행과 관련된 정보를 포함하고 있다. 

 

JobExecution에는 들어있는 게 꽤 많다. 예를 들면 다음과 같은 메서드들을 사용할 수 있다.

  • getJobInstance() → JobExecutionJobInstance
  • getJobParameters()JobExecution에서 사용한 Job Parameters
  • getStartTime()Job의 시작시간
  • getEndTime()Job의 종료시간
  • getExitStatus()Job의 실행 결과(Exit Code)
  • getStatus()Job의 현재 상태(Batch Status)
  • getExecutionContext() → Job Execution Context

어떤 것들을 하는 메서드인지 대충봐도 감이 오는데 마지막 ExecutionContext는 무엇일까?

JobExecutionContext

1개의 Job내에서 공유하는 공간(Context)이다. 1개의 Job의 여러 Step들이 있다면, 그 Step들은 해당 공간을 공유할 수 있다.

 

JobParameters

Job이 시작할 때, 필요한 시작 조건을 JobParameters에 넣는다. 동일한 JobJobParameters까지 동일하면, 같은 JobInstance이다. 예를 들면, 다음과 같은 상황이 있다고 해보자.

  • 대한민국의 편의점 리스트를 가져와서 저장하는 Job
    • 매일 폐업하고, 새로 오픈하는 편의점들 리스트를 갱신하는데 목적이 있다.
    • 밤 10시에는 C편의점 리스트 정보 최신화
    • 밤 11시에는 G편의점 리스트 정보 최신화
    • 조건 1: 밤 10시에는 'JobParameters = C편의점' 으로 구동
    • 조건 2: 밤 11시에는 'JobParameters = G편의점' 으로 구동

 

Job 구현 예시

 

@Bean
public Job reserveRestaurantJob(JobRepository jobRepository,
                                Step searchAvailableKoreanRestaurantStep,
                                Step reserveRestaurantStep,
                                Step sendDepositStep) {
    return new JobBuilder("reserveRestaurantJob", jobRepository)
            .start(searchAvailableKoreanRestaurantStep)
            .next(reserveRestaurantStep)
            .next(sendDepositStep)
            .build();
}

 

  • JobBuilderJob을 만든다. 이때 Job을 가져와야 하는데 Job의 이름(Bean의 이름)을 통해 가져온다.
  • 시작 스텝과 다음 스텝들을 지정한다.
  • 여기서, 한개의 스텝이라도 실패하면 다음 스텝으로 진행하지 않는다.
  • build()를 통해 Job을 생성한다.

 

조금 더 복잡한 Job의 예시를 보면 다음과 같다.

 

@Bean
public Job reserveRestaurantJob(JobRepository jobRepository,
                                Step searchAvailableKoreanRestaurantStep,
                                Step searchAvailableAsianRestaurantStep,
                                Step reserveRestaurantStep,
                                Step sendDepositStep) {
    return new JobBuilder("reserveRestaurantJob", jobRepository)
            .start(searchAvailableKoreanRestaurantStep)
                .on("FAILED") // searchAvailableKoreanRestaurantStep 가 FAILED 인 경우
                .to(searchAvailableAsianRestaurantStep) // searchAvailableAsianRestaurantStep 실행
                .on("FAILED") // searchAvailableAsianRestaurantStep 가 FAILED 인 경우
                .end() // 아무것도 하지 않고 FLOW 종료
            .from(searchAvailableKoreanRestaurantStep)
                .on("*") // searchAvailableKoreanRestaurantStep 가 FAILED 가 아닌 경우
                .to(reserveRestaurantStep) // reserveRestaurantStep 실행
                .next(sendDepositStep) // sendDepositStep 실행
            .from(searchAvailableAsianRestaurantStep)
                .on("*") // searchAvailableAsianRestaurantStep 가 FAILED 가 아닌 경우
                .to(reserveRestaurantStep) // reserveRestaurantStep 실행
                .next(sendDepositStep) // sendDepositStep 실행
            .end() // Job 종료
            .build();
}
  • 이번에는 한식 레스토랑 예약이 실패했을 경우 다른 방법을 찾기 위해 아시안 레스토랑을 예약하는 과정까지 넣었다.
  • 그래서 위 코드처럼, 먼저 한식 레스토랑 예약 스텝을 실행하는데 그 스텝이 FAILED인 경우, 아시안 레스토랑 예약 스텝을 실행한다. 그런데 그마저도 실패할 경우가 있으므로, 그땐 이 Job을 종료한다.
  • 한식 레스토랑 예약 스텝 또는 아시안 레스토랑 예약 스텝 둘 중 하나라도 실패가 아닌 경우가 생겼으면 다음 스텝들을 쭉쭉 진행한다.

 

저기서 JobBuilder는 어떻게 가져올까? 다음과 같이 @EnableBatchProcessing 애노테이션을 Spring Boot 엔트리 포인트 클래스에 붙여주면 된다.

@SpringBootApplication
@EnableBatchProcessing
public class DMakerApplication {
    public static void main(String[] args) {
        SpringApplication.run(DMakerApplication.class, args);
    }
}

이렇게 애노테이션을 달면 다음과 같은 빈들을 자동으로 등록해 준다.

  • JobRepository
  • JobLauncher
  • JobRegistry
  • JobExplorer
  • PlatformTransactionManager
  • JobBuilder
  • StepBuilder

이렇게 사용할 수 있는 빈들 중 보이는 JobBuilder를 주입받아 사용하면 되는 것이다. 원래는 JobBuilderFactory가 있었는데 5.0 이후부터 Deprecated 됐다. 그래서 JobBuilder를 사용하면 된다.

 

정리를 하자면

Job에 대해 알아보았다. 다음 포스팅에는 Step에 대해 알아보자!

728x90
반응형
LIST

'Spring Batch' 카테고리의 다른 글

ItemReader  (1) 2024.10.09
Chunk Processing  (5) 2024.10.09
Tasklet  (0) 2024.10.09
Step  (2) 2024.10.09
Spring Batch 구조와 핵심 키워드  (2) 2024.10.09

+ Recent posts