Spring Batch Scaling

์ฒ˜๋ฆฌํ•ด์•ผํ•  ๋ฐ์ดํ„ฐ๊ฐ€ ์ฆ๊ฐ€ํ•˜์—ฌ ์ผ์ • ๊ทœ๋ชจ ์ด์ƒ์ด ๋˜๋Š” ๊ฒฝ์šฐ ๋ฐฐ์น˜๋„ Scaling์ด ํ•„์š”ํ•˜๋‹ค. Spring Batch์—์„œ ์ œ๊ณตํ•˜๋Š” Scaling๊ธฐ๋Šฅ์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

๊ธฐ๋Šฅ
ํ”„๋กœ์„ธ์Šค
์„ค๋ช…

Multi-threaded Step

Single process

Local

๋‹จ์ผ Step ์ˆ˜ํ–‰์‹œ, ํ•ด๋‹น Step๋‚ด์˜ ๊ฐ Chunk๋ฅผ ๋ณ„๋„์˜ ์—ฌ๋Ÿฌ ์“ฐ๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•

Parallel Steps

Single process

Local

์—ฌ๋Ÿฌ๊ฐœ์˜ Step์„ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•์œผ๋กœ, ๋‹จ์ผ Step๋‚ด์˜ ์„ฑ๋Šฅ ํ–ฅ์ƒ์€ ์—†๋‹ค.

Remote Chunking

Multi process

Remote

Step์ฒ˜๋ฆฌ๊ฐ€ ์—ฌ๋Ÿฌ ํ”„๋กœ์„ธ์Šค๋กœ ๋ถ„ํ• ๋˜์–ด ์™ธ๋ถ€์˜ ๋‹ค๋ฅธ ์„œ๋ฒ„๋กœ ์ „์†กํ•˜์—ฌ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹ ์–ด๋А ์„œ๋ฒ„์—์„œ ์–ด๋–ค ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ณ  ์žˆ๋Š”์ง€ ๊ด€๋ฆฌํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ๋ฉ”์„ธ์ง€ ์œ ์‹ค์ด ์•ˆ๋˜๋Š” ๊ฒƒ์ด 100% ๋ณด์žฅ ๋˜์–ด์•ผํ•œ๋‹ค.(AWS SQS, Kafka ๋“ฑ MQ์‚ฌ์šฉ ๊ถŒ์žฅ)

Partitioning

Single/Multi process

Local/Remote

๋งค๋‹ˆ์ €(๋งˆ์Šคํ„ฐ)๋ฅผ ์ด์šฉํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ๋” ์ž‘์€ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ„๊ณ , ํŒŒํ‹ฐ์…˜์—์„œ ์Šฌ๋ ˆ์ด๋ธŒ๊ฐ€ ๋…๋ฆฝ์ ์œผ๋กœ ์ž‘๋™ํ•˜๋Š” ๋ฐฉ์‹

AsyncItemProcessor์™€ AsyncItemWriter

Local

๋ณ„๊ฐœ์˜ ์“ฐ๋ ˆ๋“œ๋ฅผ ํ†ตํ•ด ItemProcessor์™€ ItemWriter๋ฅผ ์ฒ˜๋ฆฌ spring-batch-integration์˜์กด์„ฑ์—์„œ ์ง€์›

์ด ์ค‘ ์ผ๋ถ€๋ฅผ ๊ตฌํ˜„ํ•ด๋ณผ ๊ฒƒ์ด๋‹ค.

Multi-threaded Step

The thread model | Spring Batch Essentials

์Šคํ”„๋ง ๋ฐฐ์น˜์˜ ๋ฉ€ํ‹ฐ ์“ฐ๋ ˆ๋“œ Step์€ TaskExecutor๋ฅผ ์ด์šฉํ•ด ๊ฐ ์“ฐ๋ ˆ๋“œ๊ฐ€ Chunk๋‹จ์œ„๋กœ ์‹คํ–‰๋˜๊ฒŒ ํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค.

  • SimpleAsyncTaskExecutor : chunk ๋‹จ์œ„๋ณ„๋กœ ์“ฐ๋ ˆ๋“œ ์ƒ์„ฑ

  • ThreadPoolTaskExecutor : ์“ฐ๋ ˆ๋“œํ’€ ๋‚ด์—์„œ ์ง€์ •๋œ ๊ฐฏ์ˆ˜์˜ ์“ฐ๋ ˆ๋“œ๋งŒ์„ ์žฌ์‚ฌ์šฉํ•˜๋ฉด์„œ ์‹คํ–‰(์šด์˜์—์„œ ์‚ฌ์šฉํ•  ๋•Œ๋Š” ์“ฐ๋ ˆ๋“œํ’€๋กœ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์„ ๊ถŒ์žฅ)

๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ํ™˜๊ฒฝ ๊ตฌ์„ฑ์‹œ ์‚ฌ์šฉํ•˜๊ณ ์ž ํ•˜๋Š” Reader์™€ Writer๊ฐ€ ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋ฅผ ์ง€์›ํ•˜๋Š”์ง€ ํŒŒ์•…์ด ํ•„์š”ํ•˜๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด JpaPagingItemReader์˜ ๊ฒฝ์šฐ๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด thread-safe๋ฅผ ์ œ๊ณตํ•ด์ฃผ๊ณ  ์žˆ๋Š”๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์œผ๋ฉฐ,

JpaCursorItemReader๋Š” thread-safeํ•˜์ง€ ์•Š์€ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

์—ฌ๊ธฐ์„œ ๋˜ ํ•œ๊ฐ€์ง€ ์•Œ์•„๋‘ฌ์•ผํ•  ์ ์ด ์žˆ๋‹ค. ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ๊ฐ chunk๋“ค์ด ๊ฐœ๋ณ„๋กœ ์ง„ํ–‰๋˜๋Š” ๊ฒฝ์šฐ ์‹คํŒจ ์ง€์ ์—์„œ ์žฌ์‹œ์ž‘ํ•˜๋Š” ๊ฒƒ์ด ๋ถˆ๊ฐ€๋Šฅํ•˜๋‹ค. ์™œ๋ƒํ•˜๋ฉด, ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ์˜ ๊ฒฝ์šฐ 1n๊ฐœ์˜ chunk๊ฐ€ ๋™์‹œ์— ์‹คํ–‰๋˜๋ฉฐ, 5๋ฒˆ์งธ chunk๊ฐ€ ์‹คํŒจํ–ˆ๋‹ค๊ณ  ํ•ด์„œ 14 chunk๊ฐ€ ๋ชจ๋‘ ์„ฑ๊ณตํ–ˆ๋‹ค๋Š” ๋ณด์žฅ์ด ์—†๋‹ค.

๊ทธ๋ž˜์„œ ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ์ ์šฉ์‹œ ์ผ๋ฐ˜์ ์œผ๋กœ ItemReader์˜ saveState ์˜ต์…˜์„ false๋กœ ์„ค์ •ํ•œ๋‹ค.

saveState : ItemStream#update(ExecutionContext) ๋ฉ”์†Œ๋“œ๋กœ ExectuionContext์— reader์˜ ์ƒํƒœ๊ฐ’์„ ์ €์žฅํ• ์ง€ ๊ฒฐ์ •ํ•œ๋‹ค. (Defualt : true)

Thread-safe

PagingItemReader

์œ„์—์„œ ๋ดค๋“ฏ์ด PagingItemReader๋Š” thread-safeํ•œ ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค. ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐฐ์น˜๊ฐ€ ์žˆ๋‹ค๋ฉด, DB์ ‘๊ทผ์‹œ PagingItemReader๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์„ ๊ถŒ์žฅํ•œ๋‹ค.

  • application.yml

  • main() ๋ฉ”์„œ๋“œ ์„ค์ •

  • Job ๊ตฌํ˜„

    ์—ฌ๊ธฐ์„œ ํ•ต์‹ฌ์€ TaskExecutor์„ ๊ตฌํ˜„ํ•˜๋Š” ๋ถ€๋ถ„๊ณผ Step์‹คํ–‰์‹œ .saveState(false)๋กœ ์„ค์ •ํ•˜์—ฌ, Reader๊ฐ€ ์‹คํŒจํ•œ ์ง€์ ์„ ์ €์žฅํ•˜์ง€ ์•Š๊ณ , ์‹คํŒจ์‹œ ๋‹ค์‹œ ์ฒ˜์Œ๋ถ€ํ„ฐ ์‹คํ–‰ํ•˜๋„๋ก ํ•˜๋Š” ๊ฒƒ์ด๋‹ค. (๋‹ค๋ฅธ thread๋“ค์˜ ์„ฑ๊ณต์„ ๋ณด์žฅํ•˜์ง€ ์•Š์œผ๋ฏ€๋กœ!)

    TaskExecutor ๊ตฌํ˜„์‹œ์—๋Š” allowCoreThreadTimeOut์„ ์„ค์ •ํ•ด ํŠน์ •์‹œ๊ฐ„(KeepAliveSeconds)์ดํ›„์— ์‚ฌ์šฉํ•˜์ง€ ์•Š์œผ๋ฉด ์ข…๋ฃŒ๋˜๋„๋ก ์„ค์ •ํ•œ๋‹ค.

    ๋‹ค์Œ Job์„ ์ˆ˜ํ–‰ํ•˜๋ฉด ๊ฐ thread๋ณ„๋กœ ๋ณ‘๋ ฌ๋กœ ์ˆ˜ํ–‰๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

Not Thread Safety

CursorItemReader

cursorItemReader์˜ ๊ฒฝ์šฐ์—๋Š” thread safety๋ฅผ ๋ณด์žฅํ•˜์ง€ ์•Š๋Š”๋‹ค. Reader ์˜์—ญ์„ SynchronizedItemStreamReader๋กœ wrappingํ•˜์—ฌ thread safetyํ•˜๊ฒŒ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.

SynchronizedItemStreamReader์˜ delegate์— ์ˆ˜ํ–‰ํ•˜๊ณ  ์‹ถ์€ CursorItemReader๋ฅผ ๋“ฑ๋กํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

SynchronizedItemStreamReader์˜ read() ๋ฉ”์„œ๋“œ๋ฅผ ๋ณด๋ฉด synchronized ๋ฉ”์„œ๋“œ๋กœ ๊ฐ์‹ธ ๋™๊ธฐํ™”๋œ ์ฝ๊ธฐ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

์ˆ˜ํ–‰ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด, ๊ฐ thread๊ฐ€ ์ˆœ์ฐจ์ ์œผ๋กœ ์ฝ์–ด์™€ ๊ฐœ๋ณ„๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

์ด๋ฏธ ๋„คํŠธ์›Œํฌ/DISK IO/CPU/Memory ๋“ฑ ์„œ๋ฒ„ ์ž์›์ด ์ด๋ฏธ ๋‹จ์ผ ์“ฐ๋ ˆ๋“œ์—์„œ๋„ ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ๋Ÿ‰์ด ํ•œ๊ณ„์น˜์— ๋‹ฌํ–ˆ๋‹ค๋ฉด ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ์ง„ํ–‰ํ•œ๋‹ค๊ณ  ํ•ด์„œ ์„ฑ๋Šฅ ํ–ฅ์ƒ์„ ๊ธฐ๋Œ€ํ•  ์ˆ˜ ์—†์œผ๋ฉฐ, ์‹ค์ œ ์šด์˜ ํ™˜๊ฒฝ ์ ์šฉ ์ด์ „์— ์ถฉ๋ถ„ํžˆ ํ…Œ์ŠคํŠธ๋ฅผ ์ง„ํ–‰ํ•ด๋ณด๊ณ  ํ•ด์•ผํ•œ๋‹ค.

Parallel Steps

๊ฐ ์—ญํ• ์„ ์—ฌ๋Ÿฌ step(Flow)์œผ๋กœ ๋‚˜๋ˆ„์–ด ๋ณ‘๋ ฌํ™” ํ•  ์ˆ˜ ์žˆ๋‹ค.

Flow๋Š” ์ž์ฒด ์Šค๋ ˆ๋“œ์—์„œ ์ง„ํ–‰๋˜๋ฉฐ, ์—ฌ๋Ÿฌ Flow๋ฅผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค. FlowBuilder์˜ .split() ๋ฉ”์„œ๋“œ๋Š” TaskExecutor๋ฅผ ๋ฐ›์•„ ๊ฐ Flow๋Š” ์ž์ฒด ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋œ๋‹ค. ์œ„์™€ ๊ฐ™์ด ๊ตฌํ˜„ํ•˜๋ฉด ๊ฐ step์ด๋‚˜ step์˜ flow๋ฅผ ๋ณ‘๋ ฌ๋กœ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ฐ Step๋“ค์ด ๋ณ‘๋ ฌ๋กœ ์ˆ˜ํ–‰๋˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์œผ๋ฉฐ, ์ด๋•Œ ๋ชจ๋“  Flow๊ฐ€ ์ข…๋ฃŒ๋˜๋ฉด Job์ด ์ข…๋ฃŒ๋œ๋‹ค.

AsyncItemProcessor์™€ AsyncItemWriter

ํŠน์ • ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ์˜ ๋ณ‘๋ชฉ๊ตฌ๊ฐ„์ด ItemProcessor์— ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ์žˆ๋Š”๋ฐ, ๊ทธ๋Ÿฐ ๊ฒฝ์šฐ Step์˜ ItemProcessor ๋ถ€๋ถ„๋งŒ ๋ณ„๋„ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•˜์—ฌ ์„ฑ๋Šฅ์„ ํ–ฅ์ƒ์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค. AsyncItemProcessor๋Š” ItemProcessor ๊ตฌํ˜„์ฒด๋ฅผ ๋ž˜ํ•‘ํ•˜๋Š” ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋กœ, ์ƒˆ๋กœ์šด ์Šค๋ ˆ๋“œ์—์„œ ItemProcessor ๋กœ์ง์„ ์ˆ˜ํ–‰ํ•˜๋ฉฐ, ์ฒ˜๋ฆฌ๋ฅผ ์™„๋ฃŒํ•˜๊ณ  ๋‚œ ํ›„ ๊ฒฐ๊ณผ๊ฐ’์œผ๋กœ ๋ฐ˜ํ™˜๋œ Future๋ฅผ AsyncItemWriter๋กœ ์ „๋‹ฌํ•œ๋‹ค. AsyncItemWriter ๋˜ํ•œ, ItemWrite ๊ตฌํ˜„์ฒด๋ฅผ ๋ž˜ํ•‘ํ•œ ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ์ด๋‹ค. AsyncItemWriter๋Š” Future๋ฅผ ์ฒ˜๋ฆฌํ•œ ํ›„ ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ItemWriter์— ์ „๋‹ฌํ•œ๋‹ค. ๊ทธ๋Ÿฌ๋ฏ€๋กœ ๋ฐ˜๋“œ์‹œ AsyncItemProcessor์™€ AsyncItemWriter๋Š” ํ•จ๊ป˜ ์‚ฌ์šฉํ•ด์•ผํ•œ๋‹ค.

dependency

์œ„ ์˜์กด์„ฑ์„ ์ถ”๊ฐ€ํ•ด์ฃผ์–ด์•ผ AsyncItemProcessor์™€ AsyncItemWriter๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

example

ํ”„๋กœ์„ธ์„œ ๋ณ‘๋ชฉํ˜„์ƒ์„ ๊ตฌํ˜„ํ•˜๊ธฐ ์œ„ํ•ด ์ž„์˜๋กœ 5๋ฐ€๋ฆฌ์ดˆ์”ฉ ์‰ฌ๋„๋ก ๋งŒ๋“ค์—ˆ๋‹ค. ๊ทธ ํ›„ ๋ฐฐ์น˜๋ฅผ ์ˆ˜ํ–‰ํ•ด๋ณด๋ฉด,

๋ฐฐ์น˜ ์ˆ˜ํ–‰์‹œ์ž‘๋ถ€ํ„ฐ ์ข…๋ฃŒ๊นŒ์ง€ ์•ฝ 9์ดˆ๊ฐ€ ๊ฑธ๋ฆฐ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

์ด์ œ ๋น„๋™๊ธฐ์ฒ˜๋ฆฌ๋ฅผ ํ†ตํ•ด ์„ฑ๋Šฅ์„ ๊ฐœ์„ ํ•ด๋ณผ ๊ฒƒ์ด๋‹ค.

AysncItemProcessor์˜ setDelegate() ๋ฉ”์„œ๋“œ์— ๋ž˜ํ•‘ํ•  ItemProcessor๋ฅผ ์ง€์ •ํ•ด์ฃผ๊ณ , ๋ณ„๋„ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•œ TaskExecutor๋ฅผ ์„ค์ •ํ•ด์ค€๋‹ค. ๊ทธ ํ›„ AsyncItemWriter๋ฅผ ๊ตฌํ˜„ํ•˜์—ฌ Future ๋ฅผ ๋ฐ›์•„์™€ ์“ฐ๊ธฐ ์ž‘์—…์„ ํ•  ์ˆ˜ ์žˆ๋„๋ก ๊ตฌํ˜„ํ•œ๋‹ค. ์ˆ˜ํ–‰ํ•  ItemWriter๋ฅผ setDelegate ๋ฉ”์„œ๋“œ์— ์ง€์ •ํ•ด์ค€๋‹ค.

chunk๋ฉ”์„œ๋“œ ์ฒ˜๋ฆฌ์‹œ AsyncItemProcessor๊ฐ€ ๋ฐ˜ํ™˜ํ•˜๋Š” ํƒ€์ž…์ธ Future<T>๋กœ ๋ฐ˜ํ™˜ ํƒ€์ž…์„ ๋ณ€๊ฒฝํ•ด์ฃผ๋ฉด๋œ๋‹ค. ๊ทธ ํ›„ ๋ฐฐ์น˜๋ฅผ ์ˆ˜ํ–‰ํ•ด๋ณด๋ฉด,

์•ฝ 4์ดˆ๋งŒ์— ์ˆ˜ํ–‰์ด ์™„๋ฃŒ๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

Partitioning

๋ฐฐ์น˜ ๊ธฐ๋ฐ˜ ์›Œํฌ๋กœ๋“œ์˜ ๋Œ€๋ถ€๋ถ„์€ I/O์— ์žˆ๋‹ค. ์Šคํ”„๋ง ๋ฐฐ์น˜๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์›Œ์ปค๊ฐ€ ์™„์ „ํ•œ ์Šคํ…์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋„๋ก ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค.

https://docs.spring.io/spring-batch/docs/current/reference/html/images/partitioning-overview.png

ํŒŒํ‹ฐ์…”๋‹์€ Master Step์ด ๋Œ€๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ์ง€์ •๋œ ์ˆ˜์˜ Worker Step์œผ๋กœ ์ผ์„ ๋ถ„ํ• ํ•˜์—ฌ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹์„ ๋งํ•œ๋‹ค. ๋Œ€๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋” ์ž‘์€ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ„์–ด ๊ฐ ์›Œ์ปค๊ฐ€ ๋‚˜๋ˆ ์ง„ ํŒŒํ‹ฐ์…˜์„ ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์ด๋‹ค. ๊ฐ ์›Œ์ปค๋Š” ์ž์ฒด์ ์œผ๋กœ ์ฝ๊ธฐ(ItemReader), ์ฒ˜๋ฆฌ(ItemProcessor), ์“ฐ๊ธฐ(ItemWriter) ๋“ฑ์„ ๋‹ด๋‹นํ•˜๋Š” ์˜จ์ „ํ•œ ๋ฐฐ์น˜ Step์ด๋ฉฐ, ์žฌ์‹œ์ž‘๊ณผ ๊ฐ™์ด ์Šคํ”„๋ง ๋ฐฐ์น˜๊ฐ€ ๊ธฐ๋ณธ์ ์œผ๋กœ ์ œ๊ณตํ•˜๋Š” ๋ชจ๋“  ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

Multi-threaded Step vs Partitioning

  • ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ Step์€ ๋‹จ์ผ Step์„ Chunk ๋‹จ์œ„๋กœ ์“ฐ๋ ˆ๋“œ๋ฅผ ์ƒ์„ฑํ•ด ๋ถ„ํ•  ์ฒ˜๋ฆฌ

    • ์–ด๋–ค ์“ฐ๋ ˆ๋“œ์—์„œ ์–ด๋–ค ๋ฐ์ดํ„ฐ๋“ค์„ ์ฒ˜๋ฆฌํ• ์ง€์— ๋Œ€ํ•œ ์„ธ๋ฐ€ํ•œ ์กฐ์ •์ด ๋ถˆ๊ฐ€๋Šฅ

    • ํ•ด๋‹น Step์˜ ItemReader/ItemWriter๋“ฑ์ด ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ํ™˜๊ฒฝ์„ ์ง€์›ํ•˜๋Š”์ง€ ์œ ๋ฌด๊ฐ€ ์ค‘์š”ํ•จ

  • ํŒŒํ‹ฐ์…”๋‹์€ ๋…๋ฆฝ์ ์ธ Step(Worker Step)์„ ๊ตฌ์„ฑํ•˜๊ณ , ๊ฐ๊ฐ ๋ณ„๋„์˜ StepExecution ํŒŒ๋ผ๋ฏธํ„ฐ ํ™˜๊ฒฝ์„ ๊ฐ€์ง€๊ณ  ์ฒ˜๋ฆฌ

    • ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ๋™์ž‘ํ•˜์ง€๋งŒ, ItemReader/ItemWriter์ด ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ํ™˜๊ฒฝ์„ ์ง€์›ํ•˜๋Š”์ง€ ์ค‘์š”ํ•˜์ง€ ์•Š๋‹ค.

์ฃผ์š” ์ธํ„ฐํŽ˜์ด์Šค

Partitioner

Partitioner๋Š” ํŒŒํ‹ฐ์…”๋‹ํ•  ๋ฐ์ดํ„ฐ๋ฅผ ์—ฌ๋Ÿฌ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ„๋Š” ์—ญํ• ์„ ํ•œ๋‹ค. ์ฆ‰, ํŒŒํ‹ฐ์…”๋‹๋œ Worker Step์„ ์œ„ํ•œ StepExecution์„ ์ƒ์„ฑํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค๋‹ค.

partition(int gridSize) ๋‹จ์ผ ๋ฉ”์„œ๋“œ๋กœ ๊ตฌํ˜„๋˜์–ด์žˆ์œผ๋ฉฐ, ์—ฌ๊ธฐ์„œ gridSize๋Š” ๋ถ„ํ• ํ•  ์›Œ์ปค ๊ฐœ์ˆ˜๊ฐ€ ๋ช‡๊ฐœ์ธ์ง€ ๊ฒฐ์ •ํ•˜๋Š” ์„ค์ •๊ฐ’์ด๋‹ค. gridSize๋ฅผ ๊ณ„์‚ฐํ•˜๊ฑฐ๋‚˜ ์„ค์ •ํ•˜๋Š”๊ฒƒ์€ ์˜ค๋กœ์ง€ ๊ฐœ๋ฐœ์ž๋“ค ๋ชซ์ด๋‹ค.

  • org.springframework.batch.core.partition.support.SimplePartitioner

    • ๊ธฐ๋ณธ ๊ตฌํ˜„์œผ๋กœ, ๋นˆ StepExecution์„ ์ƒ์„ฑ

  • org.springframework.batch.core.partition.support.MultiResourcePartitioner

    • ์—ฌ๋Ÿฌ ๋ฆฌ์†Œ์Šค์˜ ๋ฐฐ์—ด์„ ํ™•์ธํ•˜๊ณ , ๋ฆฌ์†Œ์Šค๋‹น ํŒŒํ‹ฐ์…˜์„ ์ƒ์„ฑ

PartitionHandler

PartitionHandler๋Š” Master Step์ด Worker Step์„ ์–ด๋–ป๊ฒŒ ๋‹ค๋ฃฐ์ง€ ์ •์˜ํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค์ด๋‹ค. ์ฆ‰, ์›Œ์ปค์™€ ์˜์‚ฌ์†Œํ†ต์„ ํ•˜๋Š”๋ฐ ์‚ฌ์šฉํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค์ด๋‹ค.

์—ฌ๊ธฐ์„œ ๋‹ค๋ฃจ๋Š” ๋‚ด์šฉ์€ ๊ฐ ์›Œ์ปค์—๊ฒŒ ์ž‘์—… ๋Œ€์ƒ์„ ์–ด๋–ป๊ฒŒ ์•Œ๋ ค์ค„์ง€, ๋ณ‘๋ ฌ๋กœ ์ˆ˜ํ–‰ํ•˜๊ฒŒ๋˜๋Š” ๊ฒฝ์šฐ ์“ฐ๋ ˆ๋“œํ’€์€ ์–ด๋–ป๊ฒŒ ๊ด€๋ฆฌํ• ์ง€, ๋ชจ๋“  ์ž‘์—…์ด ์™„๋ฃŒ๋˜์—ˆ๋Š”์ง€ ์‹๋ณ„ํ•˜๋Š”์ง€ ์ด๋‹ค.

์Šคํ”„๋ง ๋ฐฐ์น˜ ์‚ฌ์šฉ์‹œ Partitioner ๊ตฌํ˜„์ฒด๋ฅผ ์ง์ ‘ ๊ตฌํ˜„ํ•˜๋Š” ๊ฒฝ์šฐ๋Š” ์žˆ์ง€๋งŒ, PartitionHandler๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ๊ฒฝ์šฐ๋Š” ์—†์„ ๊ฒƒ์ด๋‹ค.

  • org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler

    • ๋‹จ์ผ JVM ๋‚ด์—์„œ ํŒŒํ‹ฐ์…”๋‹ ๊ฐœ๋…์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก, ๋™์ผํ•œ JVM๋‚ด์˜ ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ์—์„œ ์›Œ์ปค ๋ถ„ํ•  ์‹คํ–‰

  • org.springframework.batch.core.partition.support.MessageChannelPartitionHandler

    • ์›๊ฒฉ JVM์— ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ „์†ก

  • org.springframework.cloud.task.batch.partition.DeployerPartitionHandler

    • Spring Cloud Task ํ”„๋กœ์ ํŠธ๊ฐ€ ์ œ๊ณต

    • ์ง€์›๋˜๋Š” ํ”Œ๋žซํผ์ƒ์—์„œ ์˜จ๋””๋งจ๋“œ ๋ฐฉ์‹์œผ๋กœ ์›Œ์ปค ์‹คํ–‰

TaskExecutorPartitionHandler (๋‹จ์ผ JVM) ์˜ˆ์ œ

๋‹จ์ผ JVM๋‚ด์—์„œ ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ๋ฅผ ์‚ฌ์šฉํ•ด ์›Œ์ปค๋ฅผ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ฃผ๋Š” ์ปดํฌ๋„ŒํŠธ๋กœ, ๋‹จ์ผ ์žฅ๋น„๋‚ด์—์„œ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋Š” ์ž‘์—…์—๋Š” ์ œ์•ฝ(๋„คํŠธ์›Œํฌ/DISK IO/CPU/Memory)์ด ์กด์žฌํ•œ๋‹ค.

๋‹ค์Œ๊ณผ ๊ฐ™์ด TaskExecutorPartitionHandler์— worker๋กœ ์ˆ˜ํ–‰ํ•  step, ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ TaskExecutor ์„ค์ •, ํŒŒํ‹ฐ์…”๋‹ ์ˆ˜(gridSize)๋ฅผ ์ง€์ •ํ•ด์ค€๋‹ค.

Partitioner ๊ตฌํ˜„

  • ์ƒ์„ฑํ•  Worker Step ์ˆ˜ ๊ฒฐ์ •

    • long targetSize = (max - min) / gridSize + 1

  • ๊ฐ Worker Step์ด ์–ด๋–ค ๋ณ€์ˆ˜๋ฅผ ๊ฐ€์ง€๊ฒŒ ํ• ์ง€ ๊ฒฐ์ •

    • value.putLong("minId", start)

    • value.putLong("maxId", end)

๋‹ค์Œ๊ณผ ๊ฐ™์ด ๊ฐ ํŒŒํ‹ฐ์…˜๋ณ„๋กœ ์ˆ˜ํ–‰ํ•  StepExecution์ด ์ •์ƒ์ ์œผ๋กœ ์ž˜ ๊ตฌํ˜„๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

Step ๊ตฌํ˜„

partitioner() ๋ฉ”์„œ๋“œ๋ฅผ ์ด์šฉํ•ด step์— ์‚ฌ์šฉ๋  Partitioner ๊ตฌํ˜„์ฒด๋ฅผ ๋“ฑ๋กํ•ด์ฃผ๊ณ , ํŒŒํ‹ฐ์…”๋‹๋  Step์„ .step() ๋ฉ”์„œ๋“œ์— ๋“ฑ๋กํ•ด์ค€๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์‚ฌ์šฉํ•  partitionHandler()๋ฅผ ๋“ฑ๋กํ•ด์ฃผ๋ฉด master step๊ตฌํ˜„์ด ์™„๋ฃŒ๋œ๋‹ค.

ItemReader/ItemWriter

๋‹ค์Œ๊ณผ ๊ฐ™์ด #{stepExecutionContext['๋ณ€์ˆ˜๋ช…']}์œผ๋กœ ์œ„ Partitioner์—์„œ ๊ฐ ํŒŒํ‹ฐ์…˜ ๋ณ„๋กœ ์ง€์ •ํ•œ StepExecution ๋ณ€์ˆ˜๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค. ํ•ด๋‹น ๋ณ€์ˆ˜๋ฅผ ์ „๋‹ฌ๋ฐ›์•„ ๊ฐ ํŒŒํ‹ฐ์…˜ ๋ณ„๋กœ ์ฒ˜๋ฆฌํ•˜๋„๋ก ๊ตฌํ˜„ํ•˜๋ฉด ๋œ๋‹ค.

Job

๊ทธ ํ›„ ํ•ด๋‹น Master Step์„ Job์— ๋“ฑ๋กํ•ด์ฃผ๋ฉด ๊ตฌํ˜„์ด ์™„๋ฃŒ๋œ๋‹ค.

์ฐธ๊ณ 

Last updated

Was this helpful?