언젠가 한번은 만나게 될 Spring batch라고 생각했습니다.
그간 배치 기능은 Crontab + Nodejs 또는 CI/CD 도구 + Python Script로 작성 해왔었고,
레거시 시스템에서는 Spring에서 제공하는 Scheduler를 사용해 배치 기능을 개발 했었습니다.
최근 새로운 배치 애플리케이션을 만들 일이 생겨
스프링 배치를 학습하고, Kotlin + Spring batch 애플리케이션을 작성하는 방법에 대해서 정리합니다.
01. 기본 개념
- Job: Spring batch의 가장 큰 작업의 단위이며, 실행의 단위가 됩니다.
- Step: Job안에 속하게 되는 작업으로 하나의 Job은 1개 이상의 Step으로 구성 될 수 있습니다.
- Tasklet: Step안에 속하는 작업입니다. 아래 그림에는 나와있지 않지만, ItemReader, ItemProcessor, ItemWriter를 묶어 Chunk Oriented Tasklet으로 불리기도 합니다.다음 3가지는 Chunk단위로 작업을 수행 할때 사용되는 개념입니다.
- ItemReader: 작업의 대상 Item을 읽어드립니다.(DB 조회 or 파일읽기)
- ItemProcessor: 특정 작업을 수행합니다.(데이터 변환)
- ItemWriter: Item을 Chunk 단위로 쓰기 연산을 수행합니다.(주로 DB 입력이 될 것입니다.)
JobLauncher는 Job을 실행하는 역할을 합니다.
JobRepository는 Job, Step 등의 배치 작업에 대한 메타 정보를 처리하는 인터페이스입니다.
메타정보는 Spring Batch가 제공하는 핵심 기능 중 하나입니다.
아래는 org.springframework.batch:spring-batch-core:4.3.1 내에 DB 종류 별로 제공되는 DDL Script 파일들 입니다.
02. 시작하기
아래 예제는 Kotlin + Spring Batch, Gradle, Mariadb(Mysql)을 사용합니다.
Tasklet은 ItemReader, Processor, Writer로 구성시킨 예제입니다.
전체 예제 소스코드는 아래 레포지토리에 있습니다.
github.com/kjcaway/kotlin-springbatch
02-1. 의존성 추가
// build.gradle.kts
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.4.1"
id("io.spring.dependency-management") version "1.0.10.RELEASE"
kotlin("jvm") version "1.4.21"
kotlin("plugin.spring") version "1.4.21"
kotlin("plugin.jpa") version "1.4.21"
kotlin("plugin.noarg") version "1.4.21"
kotlin("plugin.allopen") version "1.4.21"
}
group = "me.examplebatch"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
}
allOpen{
annotation("javax.persistence.Entity")
annotation("javax.persistence.MappedSuperclass")
annotation("javax.persistence.Embeddable")
}
dependencies {
// Spring
implementation("org.springframework.boot:spring-boot-starter-batch")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
// Mariadb
implementation("org.mariadb.jdbc:mariadb-java-client:2.1.2")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.batch:spring-batch-test")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
spring batch, jpa, mariadb client를 추가해줍니다.
02-2. 메타 정보 테이블 만들기
schema-mysql.sql의 내용을 긁어서 DB 접속해서 실행 시켜줍니다.
위와 같이 메타정보 테이블이 생성됩니다.
02-3. application.yml
server:
port: 8080
spring:
batch:
initialize-schema: never # 1
application:
name: BATCH
datasource:
url: jdbc:mariadb://localhost:3306/book
driver-class-name: org.mariadb.jdbc.Driver
username: root
password: root
hikari:
connection-timeout: 5000
maximum-pool-size: 10
jpa:
open-in-view: false
generate-ddl: false
...(중략)
위와 같이 application.yml 파일에 몇가지 설정을 추가해줍니다.
- # 1 : never로 해주면 애플리케이션이 매번 실행 할때마다 메타 정보 테이블을 생성하지 않도록 해줍니다.
02-4. BatchApplication
package me.examplebatch.batch
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.scheduling.annotation.EnableScheduling
@SpringBootApplication
@EnableBatchProcessing // 1
@EnableScheduling // 2
class BatchApplication
fun main(args: Array<String>) {
runApplication<BatchApplication>(*args)
}
- #1 : 어노테이션 추가해줍니다. 배치를 활성화하는 것입니다.
- #2 : @EnableScheduling은 선택 사항입니다. 만약 CI/CD 도구 없이 해당 애플리케이션을 계속해서 실행상태를 유지하며 특정 시간대에 배치작업을 수행 하길 원한다면 추가해줍니다.
02-5. Domain
@Entity
@Table(name = "tbl_book")
class Book(
@Id
var bookId: Long? = null,
var name: String? = null,
var author: String? = null,
): Serializable
@Repository
interface BookRepository: JpaRepository<Book, Long>{
}
위와 같이 도메인을 정의해줍니다.
02-6. JobConfiguration
// CustomReaderJobConfig.kt
@Configuration
class CustomReaderJobConfig(
val jobBuilderFactory: JobBuilderFactory, // 의존성 주입
val stepBuilderFactory: StepBuilderFactory,
val bookRepository: BookRepository // 배치 대상 도메인
) {
private final val CUSTOM_READER_JOB = "CUSTOM_READER_JOB"
private final val CUSTOM_READER_JOB_STEP = CUSTOM_READER_JOB +"_STEP"
private final val CHUNK_SIZE = 10
@Bean
fun customReaderJob(): Job{
return jobBuilderFactory.get(CUSTOM_READER_JOB)
.start(customReaderStep())
.build()
}
@Bean
fun customReaderStep(): Step {
return stepBuilderFactory.get(CUSTOM_READER_JOB_STEP)
.chunk<Book, Book>(CHUNK_SIZE)
.reader(reader())
.processor(processor())
.writer(writer())
.build()
}
@Bean
@StepScope
fun reader(): CustomItemReader{
return CustomItemReader()
}
@Bean
fun processor(): ItemProcessor<Book, Book>{
return ItemProcessor {
it.author = "Author. " + it.author
it
}
}
@Bean
fun writer(): ItemWriter<Book> {
return ItemWriter {
bookRepository.saveAll(it)
}
}
}
위 코드는 하나의 Step으로 이뤄진 Job을 정의하는 코드입니다.
또한 ItemReader, ItemProcessor, ItemWriter를 코드 내에 포함 시켰습니다.(파일로 따로 분리해서 작성하기도 합니다.)
processor, writer는 각각 ItemProcessor<K,V>, ItemWriter<T> 인터페이스 구현체를 리턴합니다.
- processor : book의 author 컬럼 변경
- writer : saveAll 메서드로 chunk size 만큼 bulk insert/update
단, ItemReader는 특이하게 @StepScope가 붙어 있음을 확인 할 수 있습니다.
이는 Step실행 시에 새로운 CustomItemReader를 만들어 주기 위함입니다.
// CustomItemReader.kt
open class CustomItemReader : ItemReader<Book> {
@Autowired
private lateinit var bookRepository: BookRepository
private lateinit var list: MutableList<Book>
private var nextIndex: Int = 0
@PostConstruct
fun postConstruct(){
list = bookRepository.findAll() // book 조회, list에 담음
}
override fun read(): Book? {
if(nextIndex < list.size){
return list[nextIndex++] // list에서 순차적으로 read
}
return null
}
}
ItemReader<T> 인터페이스의 구현체입니다.
Kotlin에서 class 앞의 open은 다른 클래스에서 상속을 허용하기 위함입니다. 아무것도 안붙여주면 final을 의미합니다. 자바와 달리 코틀린은 더 엄격한(?) 느낌이 있네요.
만약 open을 붙이지 않으면 아래와 같은 에러를 볼수 있을 것입니다.
Caused by: org.springframework.aop.framework.AopConfigException: Could not generate CGLIB subclass of class me.examplebatch.batch.item.CustomItemReader: Common causes of this problem include using a final class or a non-visible class; nested exception is java.lang.IllegalArgumentException: Cannot subclass final class me.examplebatch.batch.item.CustomItemReader
at org.springframework.aop.framework.CglibAopProxy.getProxy(CglibAopProxy.java:208)
at org.springframework.aop.framework.ProxyFactory.getProxy(ProxyFactory.java:110)
at org.springframework.aop.scope.ScopedProxyFactoryBean.setBeanFactory(ScopedProxyFactoryBean.java:117)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeAwareMethods(AbstractAutowireCapableBeanFactory.java:1810)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1775)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:609)
... 62 common frames omitted
Caused by: java.lang.IllegalArgumentException: Cannot subclass final class me.examplebatch.batch.item.CustomItemReader
at org.springframework.cglib.proxy.Enhancer.generateClass(Enhancer.java:660)
at org.springframework.cglib.core.DefaultGeneratorStrategy.generate(DefaultGeneratorStrategy.java:25)
@StepScope는 내부적으로 프록시 객체를 사용합니다.
그러다보니 프록시 객체 생성시 원래 구현체를 상속받아야 하고, proxy target class가 final. 즉, 코틀린에서 open을 안붙여주면 위와 같은 에러가 나게 됩니다.
자, 이제 실행 시켜보면 Job이 실행 된 것을 확인 할 수 있습니다.
02-7. JobParameter
위와 같이 작성된 배치 애플리케이션은 한번만 실행되고, 이후 재 실행시 아래와 같은 메시지를 띄울 것입니다.
All steps already completed or no steps configured for this job.
매번 실행 할 수 있게끔 하려면 어떻게 하면 될까요?
JobParameter를 사용하면 됩니다.
Job 실행단위는 JobParameter와 1:1로 매칭될 수 있습니다. 즉, JobParameter가 다르다면 항상 실행 될 수 있습니다.
02-8. Scheduler 추가
CI/CD 도구를 사용해서 Scheduling을 사용 할때는 다른 방법으로 JobParameter를 입력해줘야 할것입니다.
여기선 Spring에서 제공하는 @Scheduled를 사용해서 매번 다른 JobParamter를 가지고 배치가 실행 될 수 있게끔 해보겠습니다.
// SimpleJobScheduler.kt
@Component
class SimpleJobScheduler(
val jobLauncher: JobLauncher,
val customReaderJobConfig: CustomReaderJobConfig
) {
private val logger: Log = LogFactory.getLog(CustomReaderJobConfig::class.java)
private val dateFormat = SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
@Scheduled(initialDelay = 10000, fixedDelay = 30000)
fun runJob(){
val jobConf = hashMapOf<String, JobParameter>()
jobConf["time"] = JobParameter(dateFormat.format(System.currentTimeMillis()))
val jobParameters = JobParameters(jobConf)
try{
jobLauncher.run(customReaderJobConfig.customReaderJob(), jobParameters)
} catch(e: JobExecutionAlreadyCompleteException){
logger.error(e.localizedMessage)
} catch(e: JobExecutionAlreadyRunningException){
logger.error(e.localizedMessage)
} catch(e: JobParametersInvalidException){
logger.error(e.localizedMessage)
}
}
}
위와 같이 Schedule Task Component를 추가해주면 특정 시간에 반복적으로 배치 작업을 돌릴 수 있습니다.
위의 경우 애플리케이션 실행 10초 뒤, 최초 실행
이후 30초 마다 배치를 실행 시키게 됩니다.
Crontab을 사용한다면 아래와 같이 변경 해줄수 있습니다.
@Scheduled("0 10 * * * *") // 매 시간 10분에 실행
@Scheduled("0 0 8 10 * *") // 매월 10일 오전 8시 실행
@Scheduled("0 0/10 * * *") // 10분마다 실행
..
'개발 이야기 > Springboot' 카테고리의 다른 글
Springboot Application 로드 시 외부 파일 읽기 & 환경 설정 (0) | 2024.01.07 |
---|---|
Springboot에서 ResourceTransformer 적용하여 뷰 페이지 렌더링 (0) | 2024.01.07 |
[Kotlin] Springboot, Mock을 활용한 유닛 테스트(feat. Method Stub) (0) | 2021.02.27 |
[Kotlin] Springboot, AOP를 활용한 요청 데이터 조작(feat. Reflection) (0) | 2021.02.09 |
[Kotlin] Springboot + WebSocket 사용법 (3) | 2020.06.07 |