본문 바로가기
개발 이야기/Springboot

[Kotlin] Spring Batch를 사용한 배치 애플리케이션 작성

by 농개 2021. 4. 5.
반응형

언젠가 한번은 만나게 될 Spring batch라고 생각했습니다.

그간 배치 기능은 Crontab + Nodejs 또는 CI/CD 도구 + Python Script로 작성 해왔었고,

레거시 시스템에서는 Spring에서 제공하는 Scheduler를 사용해 배치 기능을 개발 했었습니다.

 

최근 새로운 배치 애플리케이션을 만들 일이 생겨 

스프링 배치를 학습하고, Kotlin + Spring batch 애플리케이션을 작성하는 방법에 대해서 정리합니다.

 

01. 기본 개념

  • Job: Spring batch의 가장 큰 작업의 단위이며, 실행의 단위가 됩니다.
  • Step: Job안에 속하게 되는 작업으로 하나의 Job은 1개 이상의 Step으로 구성 될 수 있습니다.
  • Tasklet: Step안에 속하는 작업입니다. 아래 그림에는 나와있지 않지만, ItemReader, ItemProcessor, ItemWriter를 묶어 Chunk Oriented Tasklet으로 불리기도 합니다.다음 3가지는 Chunk단위로 작업을 수행 할때 사용되는 개념입니다.
    • ItemReader: 작업의 대상 Item을 읽어드립니다.(DB 조회 or 파일읽기)
    • ItemProcessor: 특정 작업을 수행합니다.(데이터 변환)
    • ItemWriter: Item을 Chunk 단위로 쓰기 연산을 수행합니다.(주로 DB 입력이 될 것입니다.)

출처: https://www.javainuse.com/spring/bootbatch 

 

JobLauncher는 Job을 실행하는 역할을 합니다.

JobRepository는 Job, Step 등의 배치 작업에 대한 메타 정보를 처리하는 인터페이스입니다.

메타정보는 Spring Batch가 제공하는 핵심 기능 중 하나입니다. 

 

아래는 org.springframework.batch:spring-batch-core:4.3.1 내에 DB 종류 별로 제공되는 DDL Script 파일들 입니다.

 

02. 시작하기

아래 예제는 Kotlin + Spring Batch, Gradle, Mariadb(Mysql)을 사용합니다.

TaskletItemReader, Processor, Writer로 구성시킨 예제입니다.

 

전체 예제 소스코드는 아래 레포지토리에 있습니다.

github.com/kjcaway/kotlin-springbatch

 

kjcaway/kotlin-springbatch

spring batch example with kotlin. Contribute to kjcaway/kotlin-springbatch development by creating an account on GitHub.

github.com

 

 

02-1. 의존성 추가

// build.gradle.kts
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
	id("org.springframework.boot") version "2.4.1"
	id("io.spring.dependency-management") version "1.0.10.RELEASE"
	kotlin("jvm") version "1.4.21"
	kotlin("plugin.spring") version "1.4.21"
	kotlin("plugin.jpa") version "1.4.21"

	kotlin("plugin.noarg") version "1.4.21"
	kotlin("plugin.allopen") version "1.4.21"
}

group = "me.examplebatch"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11

repositories {
	mavenCentral()
}

allOpen{
	annotation("javax.persistence.Entity")
	annotation("javax.persistence.MappedSuperclass")
	annotation("javax.persistence.Embeddable")
}

dependencies {
	// Spring
	implementation("org.springframework.boot:spring-boot-starter-batch")
	implementation("org.springframework.boot:spring-boot-starter-data-jpa")

	// Mariadb
	implementation("org.mariadb.jdbc:mariadb-java-client:2.1.2")

	implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
	implementation("org.jetbrains.kotlin:kotlin-reflect")
	implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")

	testImplementation("org.springframework.boot:spring-boot-starter-test")
	testImplementation("org.springframework.batch:spring-batch-test")
}

tasks.withType<KotlinCompile> {
	kotlinOptions {
		freeCompilerArgs = listOf("-Xjsr305=strict")
		jvmTarget = "11"
	}
}

tasks.withType<Test> {
	useJUnitPlatform()
}

spring batch, jpa, mariadb client를 추가해줍니다.

 

 

02-2. 메타 정보 테이블 만들기

schema-mysql.sql의 내용을 긁어서 DB 접속해서 실행 시켜줍니다.

위와 같이 메타정보 테이블이 생성됩니다.

 

 

02-3. application.yml

server:
  port: 8080

spring:
  batch:
    initialize-schema: never   # 1
  application:
    name: BATCH
  datasource:
    url: jdbc:mariadb://localhost:3306/book
    driver-class-name: org.mariadb.jdbc.Driver
    username: root
    password: root
    hikari:
      connection-timeout: 5000
      maximum-pool-size: 10
  jpa:
    open-in-view: false
    generate-ddl: false
    ...(중략)

위와 같이 application.yml 파일에 몇가지 설정을 추가해줍니다.

  • # 1 : never로 해주면 애플리케이션이 매번 실행 할때마다 메타 정보 테이블을 생성하지 않도록 해줍니다.

 

 

02-4. BatchApplication

package me.examplebatch.batch

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.scheduling.annotation.EnableScheduling

@SpringBootApplication
@EnableBatchProcessing  // 1
@EnableScheduling       // 2
class BatchApplication

fun main(args: Array<String>) {
	runApplication<BatchApplication>(*args)
}
  • #1 : 어노테이션 추가해줍니다. 배치를 활성화하는 것입니다.
  • #2 : @EnableScheduling은 선택 사항입니다. 만약 CI/CD 도구 없이 해당 애플리케이션을 계속해서 실행상태를 유지하며 특정 시간대에 배치작업을 수행 하길 원한다면 추가해줍니다.

 

02-5. Domain

@Entity
@Table(name = "tbl_book")
class Book(
    @Id
    var bookId: Long? = null,
    var name: String? = null,
    var author: String? = null,
): Serializable

 

@Repository
interface BookRepository: JpaRepository<Book, Long>{
}

위와 같이 도메인을 정의해줍니다.

 

 

02-6. JobConfiguration

// CustomReaderJobConfig.kt
@Configuration
class CustomReaderJobConfig(
        val jobBuilderFactory: JobBuilderFactory,  // 의존성 주입
        val stepBuilderFactory: StepBuilderFactory, 
        val bookRepository: BookRepository  // 배치 대상 도메인
) {
    private final val CUSTOM_READER_JOB = "CUSTOM_READER_JOB"
    private final val CUSTOM_READER_JOB_STEP = CUSTOM_READER_JOB +"_STEP"
    private final val CHUNK_SIZE = 10

    @Bean
    fun customReaderJob(): Job{
        return jobBuilderFactory.get(CUSTOM_READER_JOB)
                .start(customReaderStep())
                .build()
    }

    @Bean
    fun customReaderStep(): Step {
        return stepBuilderFactory.get(CUSTOM_READER_JOB_STEP)
                .chunk<Book, Book>(CHUNK_SIZE)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build()
    }

    @Bean
    @StepScope
    fun reader(): CustomItemReader{
        return CustomItemReader()
    }

    @Bean
    fun processor(): ItemProcessor<Book, Book>{
        return ItemProcessor {
            it.author = "Author. " + it.author
            it
        }
    }

    @Bean
    fun writer(): ItemWriter<Book> {
        return ItemWriter {
            bookRepository.saveAll(it)
        }
    }
}

위 코드는 하나의 Step으로 이뤄진 Job을 정의하는 코드입니다.

또한 ItemReader, ItemProcessor, ItemWriter를 코드 내에 포함 시켰습니다.(파일로 따로 분리해서 작성하기도 합니다.)

processor, writer는 각각 ItemProcessor<K,V>, ItemWriter<T> 인터페이스 구현체를 리턴합니다.

  • processor : book의 author 컬럼 변경
  • writer : saveAll 메서드로 chunk size 만큼 bulk insert/update

 

단, ItemReader는 특이하게 @StepScope가 붙어 있음을 확인 할 수 있습니다. 

이는 Step실행 시에 새로운 CustomItemReader를 만들어 주기 위함입니다. 

// CustomItemReader.kt
open class CustomItemReader : ItemReader<Book> {
    @Autowired
    private lateinit var bookRepository: BookRepository
    private lateinit var list: MutableList<Book>
    private var nextIndex: Int = 0

    @PostConstruct
    fun postConstruct(){
        list = bookRepository.findAll()  // book  조회, list에 담음
    }

    override fun read(): Book? {
        if(nextIndex < list.size){
            return list[nextIndex++]  // list에서 순차적으로 read
        }
        return null
    }
}

ItemReader<T> 인터페이스의 구현체입니다.

Kotlin에서 class 앞의 open은 다른 클래스에서 상속을 허용하기 위함입니다. 아무것도 안붙여주면 final을 의미합니다. 자바와 달리 코틀린은 더 엄격한(?) 느낌이 있네요.

만약 open을 붙이지 않으면 아래와 같은 에러를 볼수 있을 것입니다.

Caused by: org.springframework.aop.framework.AopConfigException: Could not generate CGLIB subclass of class me.examplebatch.batch.item.CustomItemReader: Common causes of this problem include using a final class or a non-visible class; nested exception is java.lang.IllegalArgumentException: Cannot subclass final class me.examplebatch.batch.item.CustomItemReader
	at org.springframework.aop.framework.CglibAopProxy.getProxy(CglibAopProxy.java:208)
	at org.springframework.aop.framework.ProxyFactory.getProxy(ProxyFactory.java:110)
	at org.springframework.aop.scope.ScopedProxyFactoryBean.setBeanFactory(ScopedProxyFactoryBean.java:117)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeAwareMethods(AbstractAutowireCapableBeanFactory.java:1810)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1775)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:609)
	... 62 common frames omitted
Caused by: java.lang.IllegalArgumentException: Cannot subclass final class me.examplebatch.batch.item.CustomItemReader
	at org.springframework.cglib.proxy.Enhancer.generateClass(Enhancer.java:660)
	at org.springframework.cglib.core.DefaultGeneratorStrategy.generate(DefaultGeneratorStrategy.java:25)

@StepScope는 내부적으로 프록시 객체를 사용합니다.

그러다보니 프록시 객체 생성시 원래 구현체를 상속받아야 하고, proxy target class가 final. 즉, 코틀린에서 open을 안붙여주면 위와 같은 에러가 나게 됩니다.

 

 

자, 이제 실행 시켜보면 Job이 실행 된 것을 확인 할 수 있습니다.

 

 

02-7.  JobParameter

위와 같이 작성된 배치 애플리케이션은 한번만 실행되고, 이후 재 실행시 아래와 같은 메시지를 띄울 것입니다.

All steps already completed or no steps configured for this job.

 

매번 실행 할 수 있게끔 하려면 어떻게 하면 될까요?

JobParameter를 사용하면 됩니다.

Job 실행단위는 JobParameter와 1:1로 매칭될 수 있습니다. 즉, JobParameter가 다르다면 항상 실행 될 수 있습니다.

 

 

 

02-8. Scheduler 추가

CI/CD 도구를 사용해서 Scheduling을 사용 할때는 다른 방법으로 JobParameter를 입력해줘야 할것입니다.

여기선 Spring에서 제공하는 @Scheduled를 사용해서 매번 다른 JobParamter를 가지고 배치가 실행 될 수 있게끔 해보겠습니다.

// SimpleJobScheduler.kt
@Component
class SimpleJobScheduler(
    val jobLauncher: JobLauncher,
    val customReaderJobConfig: CustomReaderJobConfig
) {
    private val logger: Log = LogFactory.getLog(CustomReaderJobConfig::class.java)
    private val dateFormat = SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

    @Scheduled(initialDelay = 10000, fixedDelay = 30000)
    fun runJob(){
        val jobConf = hashMapOf<String, JobParameter>()
        jobConf["time"] = JobParameter(dateFormat.format(System.currentTimeMillis()))
        val jobParameters = JobParameters(jobConf)

        try{
            jobLauncher.run(customReaderJobConfig.customReaderJob(), jobParameters)
        } catch(e: JobExecutionAlreadyCompleteException){
            logger.error(e.localizedMessage)
        } catch(e: JobExecutionAlreadyRunningException){
            logger.error(e.localizedMessage)
        } catch(e: JobParametersInvalidException){
            logger.error(e.localizedMessage)
        }
    }
}

위와 같이 Schedule Task Component를 추가해주면 특정 시간에 반복적으로 배치 작업을 돌릴 수 있습니다.

위의 경우 애플리케이션 실행 10초 뒤, 최초 실행

이후 30초 마다 배치를 실행 시키게 됩니다.

 

Crontab을 사용한다면 아래와 같이 변경 해줄수 있습니다.

@Scheduled("0 10 * * * *")  // 매 시간 10분에 실행
@Scheduled("0 0 8 10 * *")  // 매월 10일 오전 8시 실행
@Scheduled("0 0/10 * * *")  // 10분마다 실행
..
반응형