๊ด€๋ฆฌ ๋ฉ”๋‰ด

๐‘†๐‘ข๐‘›๐‘ โ„Ž๐‘–๐‘›๐‘’ ๐‘Ž๐‘“๐‘ก๐‘’๐‘Ÿ ๐‘Ÿ๐‘Ž๐‘–๐‘›โœง

[Spring Batch] Tasklet, Chunk ๋ณธ๋ฌธ

์Šคํ”„๋ง ๋ฐฐ์น˜ ๋™์ž‘ ๊ตฌ์กฐ

step์—์„œ tasklet ๋ฐฉ์‹์œผ๋กœ ๊ตฌ์„ฑํ•˜๋Š” ๋ฐฉ๋ฒ•๊ณผ ItemReader/ItemProcessor/ItemWriter ๋‹จ์œ„๋กœ Chunk ์ง€ํ–ฅ ํ”„๋กœ์„ธ์‹ฑ์„ ๊ตฌ์„ฑํ•˜๋Š” ๋ฐฉ๋ฒ•์ด ์žˆ๋‹ค.

 

Tasklet ๋ฐฉ์‹

 

  • ๋‹จ๊ณ„ ๋‚ด์—์„œ ๋‹จ์ผ ํƒœ์Šคํฌ๋ฅผ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์œ„ํ•œ ๊ฒƒ์œผ๋กœ ์ž„์˜์˜ Step์„ ์‹คํ–‰ํ•  ๋•Œ ์ฝ๊ธฐ/์ฒ˜๋ฆฌ/์“ฐ๊ธฐ๋ฅผ ํ•˜๋‚˜์˜ ์ž‘์—…์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹
  • ์ฆ‰ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๊ณผ์ •์ด tasklet์•ˆ์—์„œ ํ•œ๋ฒˆ์— ์ด๋ค„์ง„๋‹ค
  • Job ๊ตฌ์„ฑ ํด๋ž˜์Šค ์™ธ๋ถ€์— tasklet ํด๋ž˜์Šค๋ฅผ ๋งŒ๋“ค์–ด์„œ ์ฐธ์กฐํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค.
    1. Step์—์„œ chunk() ๋Œ€์‹  tasklet()์„ ์‚ฌ์šฉํ•œ๋‹ค.
    2. tasklet์€ ์™ธ๋ถ€ ํด๋ž˜์Šค๋ฅผ ๋งŒ๋“ค๊ณ  tasklet ์ž‘์—…์„ ์ •์˜ํ•œ๋‹ค.
    3. tasklet ์ž‘์—…์„ ์ •์˜ํ•œ ์™ธ๋ถ€ ํด๋ž˜์Šค์—์„œ๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ Tasklet์„ implements ๋ฐ›์•„์„œ ๊ตฌํ˜„์„ ํ•ด์•ผํ•œ๋‹ค. ๊ทธ์— ๋”ฐ๋ผ execute ๋ฉ”์„œ๋“œ๋ฅผ ๊ตฌํ˜„ํ•ด์•ผ ํ•œ๋‹ค.
      • execute ๋ฉ”์„œ๋“œ ์ธ์ž contribution/chunkContext
      • contribution - ํ˜„์žฌ ๋‹จ๊ณ„ ์‹คํ–‰์„ ์—…๋ฐ์ดํŠธํ•˜๊ธฐ ์œ„ํ•ด ๋‹ค์‹œ ์ „๋‹ฌ๋˜๋Š” ๋ณ€๊ฒฝ ๊ฐ€๋Šฅํ•œ ์ƒํƒœ
      • chunkContext - ์‹คํ–‰ ์‹œ์ ์˜ Job ์ƒํƒœ ์ œ๊ณต. ํ˜ธ์ถœ ๊ฐ„์—๋Š” ๊ณต์œ ๋˜์ง€๋งŒ ์žฌ์‹œ์ž‘ ๊ฐ„์—๋Š” ๊ณต์œ ๋˜์ง€ ์•Š๋Š”๋‹ค.
        *tasklet ํด๋ž˜์Šค๊ฐ€ ์žฌ์‚ฌ์šฉ์ด ๋  ์ˆ˜ ์žˆ๋Š” ์ƒํ™ฉ์— ์ ํ•ฉํ•˜๋‹ค.

  • .๋ฐฐ์น˜ ์ฒ˜๋ฆฌ๊ณผ์ •์ด ์‰ฌ์šด ๊ฒฝ์šฐ ์‰ฝ๊ฒŒ ์‚ฌ์šฉ๋˜๋ฉฐ, ๋Œ€๋Ÿ‰์ฒ˜๋ฆฌ ๊ฒฝ์šฐ ๋” ๋ณต์žกํ•ด์งˆ ์ˆ˜ ์žˆ๋‹ค.

์ •๋ฆฌํ•˜์ž๋ฉด tasklet์€ Job ๊ตฌ์„ฑ ํด๋ž˜์Šค ๋‚ด๋ถ€์— tasklet ๊ตฌํ˜„๋ถ€๋ฅผ ๋„ฃ์–ด์„œ ํ•˜๋‚˜์˜ ํด๋ž˜์Šค๋กœ ํ•˜๋‚˜์˜ ์—…๋ฌด๋ฅผ ์ •์˜ํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค.

 

 

 

Chunk ๋ฐฉ์‹

 

  • Chunk ๊ธฐ๋ฐ˜ : ํ•˜๋‚˜์˜ ํฐ ๋ฉ์–ด๋ฆฌ๋ฅผ n๊ฐœ์”ฉ ๋‚˜๋ˆ ์„œ ์‹คํ–‰
  • Chunk ๊ธฐ๋ฐ˜ Step์€ ItemReader, ItemProcessor, ItemWriter๊ฐ€ ์žˆ๋‹ค.
  • ItemReader๋Š” ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ๋Œ€์ƒ ๊ฐ์ฒด๋ฅผ ์ฝ์–ด ItemProcessor ๋˜๋Š” ItemWriter์—๊ฒŒ ์ „๋‹ฌํ•œ๋‹ค.
    • ์˜ˆ๋ฅผ ๋“ค๋ฉด, ํŒŒ์ผ ๋˜๋Š” DB์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๋Š”๋‹ค.
  • ItemProcessor๋Š” input ๊ฐ์ฒด๋ฅผ output ๊ฐ์ฒด๋กœ filtering ๋˜๋Š” processing ํ•ด ItemWriter์—๊ฒŒ ์ „๋‹ฌํ•œ๋‹ค.
    • ์˜ˆ๋ฅผ ๋“ค๋ฉด, ItemReader์—์„œ ์ฝ์€ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ • ๋˜๋Š” ItemWriter ๋Œ€์ƒ์ธ์ง€ filtering ํ•œ๋‹ค.
    • ItemProcessor๋Š” optional ํ•˜๋‹ค.
    • ItemProcessor๊ฐ€ ํ•˜๋Š” ์ผ์„ ItemReader ๋˜๋Š” ItemWriter๊ฐ€ ๋Œ€์‹ ํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ItemWriter๋Š” ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ๋Œ€์ƒ ๊ฐ์ฒด๋ฅผ ์ฒ˜๋ฆฌํ•œ๋‹ค.
    • ์˜ˆ๋ฅผ ๋“ค๋ฉด, DB update๋ฅผ ํ•˜๊ฑฐ๋‚˜, ์ฒ˜๋ฆฌ ๋Œ€์ƒ ์‚ฌ์šฉ์ž์—๊ฒŒ ์•Œ๋ฆผ์„ ๋ณด๋‚ธ๋‹ค.

 

 

Chunk ๋ฐฉ์‹์œผ๋กœ ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ

// step๋ฅผ chunk๋กœ ์ƒ์„ฑํ•ด๋ณด๊ธฐ
@Bean
public Step chunkBaseStep(){
    return stepBuilderFactory.get("chunkBaseStep")
            .<String, String> chunk(10)
            .reader(itemReader())
            .processor(itemProcessor())
            .writer(itemWriter())
            .build();
}

private ItemWriter<String> itemWriter() {
    return items -> log.info("chunk item size: {}", items.size());
}

private ItemProcessor<String, String> itemProcessor() {
    //return ๊ฐ’์ด null์ด๋ฉด wrtier๋กœ ๋„˜์–ด๊ฐ€์ง€์•Š๋Š”๋‹ค.
    return item -> item + "spring batch";
}

private ItemReader<String> itemReader() {
    //ListItemReader๋Š” ์Šคํ”„๋ง์—์„œ ๊ธฐ๋ณธ์ ์œผ๋กœ ์ œ๊ณต.  ๋ฆฌ์ŠคํŠธ๋ฅผ ๋ฐ›์•„์„œ100๊ฐœ์˜ readํ•œ๋‹ค.
    return new ListItemReader<>(getItems());
}


chunksize๋กœ ์ฒ˜๋ฆฌ ์‹œ, Step์— .chunksize(ํฌ๊ธฐ) ๋กœ ์ •์˜ํ•˜๋ฉฐ ์•ž์— ์ฒ˜๋ฆฌ๋  ํƒ€์ž…์„ ์ •์˜ํ•œ๋‹ค.

Processor์—์„œ๋Š” ์•ž์„œ ๋งํ•œ๊ฒƒ๊ณผ ๊ฐ™์ด <reader์—์„œ ์ฝ์€ ํƒ€์ž…, writer์— ๋ณด๋‚ผ ํƒ€์ž…> ์œผ๋กœ ์ •์˜ํ•ด ์ฒ˜๋ฆฌํ•œ๋‹ค.
<String,String>
์•ž์˜ String์€ reader์—์„œ ์ฝ์€ ๋ฐ์ดํ„ฐ์˜ ํƒ€์ž…์ด๊ณ 
๋’ค์˜ String์€ writer์—์„œ ๋ฐ›์„ ๋ฐ์ดํ„ฐ์˜ ํƒ€์ž…์ด๋‹ค.
๋”ฐ๋ผ์„œ processor ์ฒ˜๋ฆฌ ํ•จ์ˆ˜์—์„œ๋Š” ์•ž/๋’ค ํƒ€์ž…์„ ๋ชจ๋‘ ์„ ์–ธํ•ด์„œ ํ•„ํ„ฐ๋‚˜ ๋ฐ์ดํ„ฐ ๋ณ€ํ˜• ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

Reader์—์„œ๋Š” ๋ฆฌํ„ด์œผ๋กœ ์„ ์–ธํ•œ ListItemReader๋Š” spring batch์—์„œ ์ œ๊ณตํ•˜๋Š” item์ฒ˜๋ฆฌ ํด๋ž˜์Šค์ด๋‹ค.
getItems์—์„œ ๊ฐ€์ ธ์˜จ List<String>์„ LIstItemReader์—์„œ ์•Œ์•„์„œ item๋ฐ˜ํ™˜ํ•ด์ค€๋‹ค.

Writer์—์„œ๋Š”  <String>์ด์ง€๋งŒ, List<String>๋‹จ์œ„๋กœ ์ฒ˜๋ฆฌ๋˜๋ฉฐ,
items๋ฅผ ๋ฐ›์•„ log๋ฅผ ์ถœ๋ ฅํ•˜๋ฉด chunksize=10๊ฐœ์”ฉ ์ฝ์€ String์ด List๋กœ ์ถœ๋ ฅ๋œ๋‹ค.

 

 

Tasklet ๋ฐฉ์‹์œผ๋กœ ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ

 

@Bean
public Step taskBaseStep(){
    return stepBuilderFactory.get("taskBaseStep")
            .tasklet(this.tasklet())
            .build();
}

// chunk ๋ฐฉ์‹์„ tasklet ๋ฐฉ์‹์œผ๋กœ ๊ตฌํ˜„
private Tasklet tasklet() { //Tasklet ํด๋ž˜์Šค ์„ค์ •, TaskletStepBuidler๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
    List<String> items = getItems();
    return (contribution, chunkContext) -> {
        StepExecution stepExecution = contribution.getStepExecution();

        int chunkSize = 10;
        int fromIndex = stepExecution.getReadCount();
        int toIndex = fromIndex + chunkSize;
        if (fromIndex >= items.size()){
            return RepeatStatus.FINISHED;
        }
        List<String> subList = items.subList(fromIndex, toIndex);

        log.info("chunk item size: {}", subList.size());

        stepExecution.setReadCount(toIndex);

        return RepeatStatus.CONTINUABLE;
        //๋งŒ์•ฝ fromIndex ๊ฐ€ 10์ด๊ณ , toIndex๊ฐ€ 10์ด๋ฉด ์ธ๋ฑ์Šค10๋ฒˆ๋ถ€ํ„ฐ 10๊ฐœ์˜ item๋ฅผ ๊บผ๋‚ผ์ˆ˜ ์žˆ๋‹ค๋Š”๋œป

    };
}

 

tasklet์˜ contributution์—์„œ StepExecution ์œผ๋กœ step์ด ์–ผ๋งˆ๋‚˜ ์ฒ˜๋ฆฌ๋˜๊ณ  ์žˆ๋Š”์ง€์˜ ์ •๋ณด๋ฅผ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

(chunksize๋ฅผ 10์œผ๋กœ ์ •์˜ํ•˜๊ณ  readCount๋กœ ํŽ˜์ด์ง•์„ ์ฒ˜๋ฆฌ)

 

๋น„๊ต์  ๋Œ€๋Ÿ‰ ๋ฐ์ดํ„ฐ๋Š” chunk๋ฐฉ์‹์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š”๊ฒŒ ๋” ๊ฐ„๋‹จํ•˜๋‹ค.

 

 

์ฐธ์กฐ

https://choisblog.tistory.com/80
https://velog.io/@gkskaks1004/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98%EC%9D%98-Tasklet-%EB%B0%A9%EC%8B%9D%EA%B3%BC-Chunk-Tasklet-%EB%B0%A9%EC%8B%9D