Reader์ Processor๋ฅผ ๊ฑฐ์ณ ์ฒ๋ฆฌ๋ Item์ Chunk ๋จ์๋งํผ ์์ ํ ์ด๋ฅผ Writer์ ์ ๋ฌํ๊ณ , ItemWriter
๋ ๋ฐฐ์น์ ์ถ๋ ฅ์ ๋ด๋นํ๋ค.
Copy public interface ItemWriter < T > {
void write ( List < ? extends T > var1) throws Exception ;
}
ItemWriter
์ write()
๋ ์ธ์๋ก Item List๋ฅผ ๋ฐ๋ ๊ฒ์ ๋ณผ ์ ์๋ค. Spring Batch์์๋ ๋ค์ํ Output ํ์
์ ์ฒ๋ฆฌํ ์ ์๋๋ก Writer๋ฅผ ์ ๊ณตํ๊ณ ์๋ค.
Database Writer
๋ค์ 3๊ฐ์ง Writer๊ฐ ์์ผ๋ฉฐ, Database์ ์์์ฑ๊ณผ ๊ด๋ จํด์๋ ํญ์ Flush๋ฅผ ํด์ค์ผํ๋ค. Writer๊ฐ ๋ฐ์ ๋ชจ๋ Item์ด ์ฒ๋ฆฌ ๋ ํ์ Spring Batch๋ ํ์ฌ ํธ๋์ญ์
์ ์ปค๋ฐํ๋ค.
JdbcBatchItemWriter
ORM์ ์ฌ์ฉํ์ง ์๋ ๊ฒฝ์ฐ์๋ ๋๋ถ๋ถ JdbcBatchItemWriter
๋ฅผ ์ฌ์ฉํ๋ค.
JdbcBatchItemWriter
๋ JdbcTemplate
์ ์ฌ์ฉํ๋ฉฐ, JDBC์ Batch ๊ธฐ๋ฅ์ ์ฌ์ฉํด ํ๋ฒ์ DB๋ก ์ ๋ฌํ์ฌ DB๋ด๋ถ์์ ์ฟผ๋ฆฌ๊ฐ ์คํ ๋๋๋ก ํ๋ค. ์ดํ๋ฆฌ์ผ์ด์
๊ณผ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๋ ํ์๋ฅผ ์ต์ํํ์ฌ ์ฑ๋ฅํฅ์์ ํ ์ ์๋๋ก ํ๊ธฐ ์ํด์๋ค.
Copy public void write( final List<? extends T > items) throws Exception {
if ( ! items . isEmpty ()) {
if ( logger . isDebugEnabled ()) {
logger . debug ( "Executing batch with " + items . size () + " items." );
}
int [] updateCounts;
int value;
if ( ! this . usingNamedParameters ) {
updateCounts = (int[])this.namedParameterJdbcTemplate.getJdbcOperations().execute(this.sql, new PreparedStatementCallback<int[]>() {
public int [] doInPreparedStatement( PreparedStatement ps) throws SQLException , DataAccessException {
Iterator var2 = items . iterator ();
while ( var2 . hasNext ()) {
T item = var2 . next ();
JdbcBatchItemWriter . this . itemPreparedStatementSetter . setValues (item , ps);
ps . addBatch ();
}
return ps . executeBatch ();
}
});
} else if ( items . get ( 0 ) instanceof Map && this . itemSqlParameterSourceProvider == null ) {
updateCounts = this.namedParameterJdbcTemplate.batchUpdate(this.sql, (Map[])items.toArray(new Map[items.size()]));
} else {
SqlParameterSource [] batchArgs = new SqlParameterSource [ items . size ()];
value = 0 ;
Object item;
for(Iterator var5 = items.iterator(); var5.hasNext(); batchArgs[value++] = this.itemSqlParameterSourceProvider.createSqlParameterSource(item)) {
item = var5 . next ();
}
updateCounts = this . namedParameterJdbcTemplate . batchUpdate ( this . sql , batchArgs);
}
if ( this . assertUpdates ) {
for ( int i = 0 ; i < updateCounts . length ; ++ i) {
value = updateCounts[i];
if (value == 0 ) {
throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length + " did not update any rows: [" + items.get(i) + "]", 1);
}
}
}
}
}
์ด๋ write()
๋ฉ์๋๋ฅผ ๋ณด๋ฉด SQL๋ฌธ์ ํ๋ฒ์ฉ ํธ์ถํ๋ ๊ฒ์ด ์๋ batchUpdate
๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒญํฌ ๋จ์๋ก ์ผ๊ด์ฒ๋ฆฌ ํ๋ ๊ฒ์ ๋ณผ ์ ์๋ค. ์ด๋ ๊ฒ ํ๋ฉด ์คํ ์ฑ๋ฅ์ ํฌ๊ฒ ํฅ์ ์ํฌ ์ ์์ผ๋ฉฐ, ๋ฐ์ดํฐ ๋ณ๊ฒฝ ์คํ์ ํธ๋์ญ์
๋ด์์ ํ ์ ์๋ค.
Property Parameter Type Default ์ค๋ช
true์ด๋ฉด ๋ชจ๋ ์์ดํ
์ด ์ฝ์
์ด๋ ์์ ๋์๋์ง ๊ฒ์ฆํ๋ค.
์ฆ, ์ ์ด๋ ํ๋์ ํญ๋ชฉ์ด ํ์ ์
๋ฐ์ดํธ ํ๊ฑฐ๋ ์ญ์ ํ์ง ์์ ๊ฒฝ์ฐ ์์ธ(EmptyResultDataAccessException
)๋ฅผ throwํ ์ง ์ค์ ํ๋ค.
ํ์ํ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ํ ์ ๊ทผ ์ ๊ณต
๊ฐ ์์ดํ
๋น ์ํํ SQL
itemPreparedStatementSetter
ItemPreparedStatementSetter
ํ์ค PreparedState๊ฐ ์ ๊ณต๋๋ค๋ฉด(ํ๋ผ๋ฏธํฐ ์์น์ ?์ฌ์ฉ), ์ด ํด๋์ค๋ฅผ ์ฌ์ฉํด ํ๋ผ๋ฏธํฐ ๊ฐ์ ์ฑ์
itemSqlParameterSourceProvider
ItemSqlParameterSourceProvider
์ ๊ณต๋ SQL์ ๋ค์๋ ํ๋ผ๋ฏธํฐ๊ฐ ์ฌ์ฉ๋๋ค๋ฉด, ์ด ํด๋์ค๋ฅผ ์ฌ์ฉํด ํ๋ผ๋ฏธํฐ ๊ฐ ์ฑ์
SimpleJdbcOperations ์ธํฐํ์ด์ค์ ๊ตฌํ์ฒด๋ฅผ ์ฃผ์
๊ฐ๋ฅ
๊ฐ๊ฐ Writer๋ค์ด ์คํ๋๊ธฐ ์ํด ํ์ํ ํ์ ๊ฐ๋ค์ด ์ ๋๋ก ์ธํ
๋์ด ์๋์ง ํ์ธ
JdbcBatchItemWriterBuilder
JdbcBatchItemWriterBuilder๋ ๋ค์ 3๊ฐ์ง ์ค์ ๊ฐ์ ๊ฐ๊ณ ์๋ค.
Property Parameter Type Default ์ค๋ช
true์ด๋ฉด ๋ชจ๋ ์์ดํ
์ด ์ฝ์
์ด๋ ์์ ๋์๋์ง ๊ฒ์ฆํ๋ค.
์ฆ, ์ ์ด๋ ํ๋์ ํญ๋ชฉ์ด ํ์ ์
๋ฐ์ดํธ ํ๊ฑฐ๋ ์ญ์ ํ์ง ์์ ๊ฒฝ์ฐ ์์ธ(EmptyResultDataAccessException
)๋ฅผ throwํ ์ง ์ค์ ํ๋ค.
Key, Value ๊ธฐ๋ฐ์ผ๋ก Insert SQL์ Values๋ฅผ ๋งคํํ๋ค.
POJO ๊ธฐ๋ฐ์ผ๋ก Insert SQL์ Values๋ฅผ ๋งคํํ๋ค.
columnMapped
Copy @ Bean // beanMapped์ ํ์
public JdbcBatchItemWriter< Pay > jdbcBatchItemWriter() {
return new JdbcBatchItemWriterBuilder < Map < String , Object >>() // Map ์ฌ์ฉ
. columnMapped ()
. dataSource ( this . dataSource )
. sql ( "insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)" )
. build ();
}
beanMapped
Copy @ Bean // beanMapped์ ํ์
public JdbcBatchItemWriter< Pay > jdbcBatchItemWriter() {
return new JdbcBatchItemWriterBuilder < Pay >()
. dataSource (dataSource)
. sql ( "insert into pay(amount, tx_name, tx_datetime) values (:amount, :txname, :txDateTime)" )
. beanMapped ()
. build ();
}
afterPropertiesSet
์ด ์ธ์ afterPropertiesSet()
๋ฉ์๋๋ฅผ ์ถ๊ฐ๋ก ์๊ณ ์์ผ๋ฉด ์ข๋ค. ์ด ๋ฉ์๋๋ InitalizingBean
์ธํฐํ์ด์ค์์ ๊ฐ๊ณ ์์ผ๋ฉฐ, ItemWriter ๊ตฌํ์ฒด๋ค์ ๋ชจ๋ InitializingBean
์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๊ณ ์๋ค.
Copy public void afterPropertiesSet() {
Assert . notNull ( this . namedParameterJdbcTemplate , "A DataSource or a NamedParameterJdbcTemplate is required." );
Assert . notNull ( this . sql , "An SQL statement is required." );
List < String > namedParameters = new ArrayList() ;
this . parameterCount = JdbcParameterUtils . countParameterPlaceholders ( this . sql , namedParameters);
if ( namedParameters . size () > 0 ) {
if ( this . parameterCount != namedParameters . size ()) {
throw new InvalidDataAccessApiUsageException("You can't use both named parameters and classic \"?\" placeholders: " + this.sql);
}
this . usingNamedParameters = true ;
}
if ( ! this . usingNamedParameters ) {
Assert.notNull(this.itemPreparedStatementSetter, "Using SQL statement with '?' placeholders requires an ItemPreparedStatementSetter");
}
}
์ด ๋ฉ์๋๋ ๊ฐ๊ฐ Writer๋ค์ด ์คํ๋๊ธฐ ์ํด ํ์ํ ํ์ ๊ฐ๋ค์ด ์ ๋๋ก ์ธํ
๋์ด ์๋์ง ํ์ธํ๋ค. Writer ์์ฑ ํ ํด๋น ๋ฉ์๋๋ฅผ ์คํํ๋ฉด ์ด๋ ๊ฐ์ด ๋๋ฝ๋์๋์ง ์ ์ ์์ด์ ๋ง์ด ์ฌ์ฉํ๋ ์ต์
์ด๋ค.
Copy @ Bean
public JdbcBatchItemWriter< Pay > jdbcBatchItemWriter() {
JdbcBatchItemWriter < Pay > jdbcBatchItemWriter = new JdbcBatchItemWriterBuilder < Pay >()
. dataSource (dataSource)
.sql("insert into pay(amount, tx_name, tx_date_time) values (:amount+1000, :txName, :txDateTime)")
. beanMapped ()
. build ();
jdbcBatchItemWriter . afterPropertiesSet ();
return jdbcBatchItemWriter;
}
HibernateItemWriter
org.springframework.batch.item.database.HibernateItemWriter
Copy public class HibernateItemWriter < T > implements ItemWriter < T > , InitializingBean {
@ Override
public void write ( List < ? extends T > items) {
doWrite(sessionFactory , items) ;
sessionFactory . getCurrentSession () . flush ();
if (clearSession) {
sessionFactory . getCurrentSession () . clear ();
}
}
/**
* Do perform the actual write operation using Hibernate's API.
* This can be overridden in a subclass if necessary.
*
* @param sessionFactory Hibernate SessionFactory to be used
* @param items the list of items to use for the write
*/
protected void doWrite ( SessionFactory sessionFactory , List < ? extends T > items) {
if ( logger . isDebugEnabled ()) {
logger . debug ( "Writing to Hibernate with " + items . size ()
+ " items." );
}
Session currentSession = sessionFactory . getCurrentSession ();
if ( ! items . isEmpty ()) {
long saveOrUpdateCount = 0 ;
for ( T item : items) {
if ( ! currentSession . contains (item)) {
currentSession . saveOrUpdate (item);
saveOrUpdateCount ++ ;
}
}
if ( logger . isDebugEnabled ()) {
logger . debug (saveOrUpdateCount + " entities saved/updated." );
logger . debug (( items . size () - saveOrUpdateCount)
+ " entities found in session." );
}
}
}
HibernateItemWriter
์์ ๊ฐ ์์ดํ
์ ๋ํด session.saveOrUpdate
๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉฐ, ๋ชจ๋ ์์ดํ
์ด ์ ์ฅ๋๊ฑฐ๋ ์์ ๋๋ฉด flush
๋ฉ์๋๋ฅผ ํตํด ๋ชจ๋ ๋ณ๊ฒฝ ์ฌํญ์ ํ๋ฒ์ ์คํํ๋ค.
์์กด์ฑ ์ถ๊ฐ
Copy compileOnly 'org.springframework.boot:spring-boot-starter-data-jpa'
ํ๋กํผํฐ ์ค์
Copy spring :
jpa :
properties :
hibernate :
current_session_context_class : org.springframework.orm.hibernate5.SpringSessionContext
JPA ์ด๋
ธํ
์ด์
์ถ๊ฐ
Copy @ Entity // ๋งคํํ ๊ฐ์ฒด๊ฐ Entity์์ ๋ํ๋
@ Table (name = "customer" ) // Entityrใ
๋งคํ๋๋ ํ
์ด๋ธ ์ง์
public class Customer {
@ Id
@ GeneratedValue (strategy = GenerationType . IDENTITY )
private Long id; // pk
private String firstName;
private String middleInitial;
private String lastName;
private String address;
private String city;
private String state;
private String zipCode;
}
Configurer ์์ฑ
Copy @ Component
public class HibernateBatchConfigurer extends DefaultBatchConfigurer {
private DataSource dataSource;
private SessionFactory sessionFactory;
private PlatformTransactionManager transactionManager;
/**
* Datasource connection๊ณผ ํ์ด๋ฒ๋ค์ดํธ ์ธ์
์ค์
* @param dataSource
* @param entityManagerFactory
*/
public HibernateBatchConfigurer ( DataSource dataSource ,
EntityManagerFactory entityManagerFactory) {
super(dataSource);
this . dataSource = dataSource;
this . sessionFactory = entityManagerFactory . unwrap ( SessionFactory . class );
// ํ์ด๋ฒ๋ค์ดํธ ํธ๋์ญ์
์ค์
this . transactionManager = new HibernateTransactionManager( this . sessionFactory ) ;
}
@ Override
public PlatformTransactionManager getTransactionManager () {
return this . transactionManager ;
}
}
HibernateTransactionManager
๋ฅผ ํธ๋์ญ์
์ผ๋ก ์ค์ ํด์ค๋ค.
Copy @ Bean
public HibernateItemWriter< Customer > hibernateItemWriter() {
return new HibernateItemWriterBuilder < Customer >()
. sessionFactory ( entityManagerFactory . unwrap ( SessionFactory . class ))
. build ();
}
JpaItemWriter
ORM์ ์ฌ์ฉํ ๋, Writer์ ์ ๋ฌํ๋ ๋ฐ์ดํฐ๊ฐ Entity ํด๋์ค์ธ ๊ฒฝ์ฐ JpaItemWriter
๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค. JpaItemWriter
๋ JPA๋ฅผ ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ ์์์ฑ ๊ด๋ฆฌ๋ฅผ ์ํด EntityManager
๋ฅผ ํ ๋นํด์ค์ผํ๋ค.
์ผ๋ฐ์ ์ผ๋ก spring-boot-starter-data-jpa
๋ฅผ ์์กด์ฑ์ ๋ฑ๋กํ๋ฉด EntityManager
๊ฐ Bean์ผ๋ก ์๋ ์์ฑ๋์ด DI์ฝ๋๋ง ์ถ๊ฐํ๋ฉด ๋๋ค.
Copy compileOnly 'org.springframework.boot:spring-boot-starter-data-jpa'
afterPropertiesSet
Copy public void afterPropertiesSet() throws Exception {
Assert . notNull ( this . entityManagerFactory , "An EntityManagerFactory is required" );
}
JpaItemWriter
์ afterPropertiesSet()
์์๋ EntityManagerFactory
๋ง ํ์ ๊ฐ์ผ๋ก ํ์ธํ๊ณ ์์ด ์ฒดํฌํ ์์๊ฐ ์ ๋ค. ์ฆ, setEntityManger
๋ง ํด์ฃผ๋ฉด ๋ชจ๋ ์ค์ ์ด ๋๋๋ค.
Copy @ Bean
public JpaItemWriter< Pay > jpaCursorItemWriter() {
JpaItemWriter < Pay > jpaItemWriter = new JpaItemWriter <>();
jpaItemWriter . setEntityManagerFactory (entityManagerFactory);
return jpaItemWriter;
}
write()
Copy public void write( List<? extends T > items) {
EntityManager entityManager = EntityManagerFactoryUtils.getTransactionalEntityManager(this.entityManagerFactory);
if (entityManager == null ) {
throw new DataAccessResourceFailureException( "Unable to obtain a transactional EntityManager" ) ;
} else {
this . doWrite (entityManager , items);
entityManager . flush ();
}
}
protected void doWrite( EntityManager entityManager , List<? extends T > items) {
if ( logger . isDebugEnabled ()) {
logger . debug ( "Writing to JPA with " + items . size () + " items." );
}
if ( ! items . isEmpty ()) {
long addedToContextCount = 0L ;
Iterator var5 = items . iterator ();
while ( var5 . hasNext ()) {
T item = var5 . next ();
if ( ! entityManager . contains (item)) {
if ( this . usePersist ) {
entityManager . persist (item);
} else {
entityManager . merge (item);
}
++ addedToContextCount;
}
}
if ( logger . isDebugEnabled ()) {
logger . debug (addedToContextCount + " entities " + ( this . usePersist ? " persisted." : "merged." ));
logger . debug (( long ) items . size () - addedToContextCount + " entities found in persistence context." );
}
}
}
JpaItemWriter
์ doWrite()
๋ฅผ ๋ณด๋ฉด ๋์ด์จ item ๊ทธ๋๋ก entityManager.merge(item)
๋ฅผ ์ํํ์ฌ ํ
์ด๋ธ์ ๋ฐ๋ก ๋ฐ์ํ๊ธฐ ๋๋ฌธ์, JpaItemWriter
๋ Entity ํด๋์ค๋ฅผ ์ ๋ค๋ฆญ ํ์
์ผ๋ก ๋ฐ์์ผ๋ง ํ๋ค .
MyBatisBatchItemWriter
Step์์ ์ ์ํ Chunk Size(FetchSize)๋งํผ ์ฒ๋ฆฌํด์ฃผ๋ ค๋ฉด executorType์ BATCH ๋ก ์ค์ ํด์ค์ผํ๋ค.
Copy mybatis :
config - location : classpath : mybatis / mybatis - config . xml
mapper - locations : classpath *: mybatis /**/ * . sql
executorType : BATCH
๊ทธ ๋ค์ ์ํํ ์ฟผ๋ฆฌ๋ฅผ mapper์ ์์ฑํด์ฃผ๊ณ ์ํํ๋ฉด ๋๋ค.
Copy <? xml version = "1.0" encoding = "UTF-8" ?>
<! DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
< mapper namespace = "spring.batch.practice.dao.customerMapper" >
< select id = "insertCustomer" parameterType = "spring.batch.practice.domain.Customer" >
INSERT INTO CUSTOMER(FIRST_NAME, MIDDLE_INITIAL, LAST_NAME, ADDRESS, CITY, STATE, ZIP_CODE)
VALUES (#{firstName}, #{middleInitial}, #{lastName}, #{address}, #{city}, #{state}, #{zipCode})
</ select >
</ mapper >
Copy @ Bean
public MyBatisBatchItemWriter< Customer > testWriter( SqlSessionFactory sqlSessionFactory) {
return new MyBatisBatchItemWriterBuilder < Customer >()
. sqlSessionFactory (sqlSessionFactory)
. statementId ( "spring.batch.practice.dao.customerMapper.insertCustomer" )
. build ();
}
RepositoryItemWriter
์ฐ๊ธฐ ์์
์ํ์์๋ ํ์ด์ง์ด๋ ์ ๋ ฌ์ด ํ์์์ผ๋ฏ๋ก, CrudRepository
๋ฅผ ์ฌ์ฉํ๋ค.
Copy public interface CustomerRepository extends CrudRepository < Customer , Long > {
}
Copy @ Bean
public RepositoryItemWriter< Customer > repositoryItemWriter() {
return new RepositoryItemWriterBuilder < Customer >()
. repository (customerRepository)
. methodName ( "save" )
. build ();
}
์์์ ๊ตฌํํ repository๋ฅผ ์ค์ ํด์ฃผ๊ณ , ํธ์ถํ ๋ฉ์๋๋ช
๋ง ์ง์ ํด์ฃผ๋ฉด๋๋ค.
Custom ItemWriter
Reader์๋ ๋ค๋ฅด๊ฒ Writer์ ๊ฒฝ์ฐ customํ๊ฒ ๊ตฌํํด์ผํ๋ ๊ฒฝ์ฐ๊ฐ ๋ง๋ค.
Reader์์ ์ฝ์ด์จ ๋ฐ์ดํฐ๋ฅผ RestTemplate์ผ๋ก ์ธ๋ถ API๋ฅผ ์ ๋ฌํด์ผํ๋ ๊ฒฝ์ฐ
์์ ์ ์ฅ์ ํ๊ณ ๋น๊ตํ๊ธฐ ์ํด singleton ๊ฐ์ฒด์ ๊ฐ์ ๋ฃ์ด์ผํ๋ ๊ฒฝ์ฐ
์ฌ๋ฌ Entity๋ฅผ ๋์์ ์ ์ฅํด์ผํ๋ ๊ฒฝ์ฐ
๋ค์๊ณผ ๊ฐ์ด ์ฌ๋ฌ ์ํฉ์ด ์์ ์ ์๋ค. ์ด๋ฌํ ๊ฒฝ์ฐ ItemWriter
์ธํฐํ์ด์ค๋ฅผ ์ง์ ๊ตฌํํ๋ฉด ๋๋ค.
java7 ์ดํ
Copy @ Bean
public ItemWriter< Pay > customItemWriter() {
return new ItemWriter < Pay >() {
@ Override
public void write ( List < ? extends Pay > items) throws Exception {
for ( Pay item : items) {
System . out . println (item);
}
}
};
}
java8 ์ด์(ItemWriter์ ์ถ์๋ฉ์๋๊ฐ write()
ํ๊ฐ ์ด๋ฏ๋ก ๋๋ค์ ์ฌ์ฉ ๊ฐ๋ฅ)
Copy @ Bean
public ItemWriter< Pay > customItemWriter() {
return items -> {
for ( Pay item : items) {
System . out . println (item);
}
};
}
๋ค์๊ณผ ๊ฐ์ด write()
ํจ์๋ฅผ @Override
ํ๋ฉด ๊ตฌํ์ฒด ์์ฑ์ ๋๋๋ค.
์ฐธ๊ณ