Reader์ Processor๋ฅผ ๊ฑฐ์ณ ์ฒ๋ฆฌ๋ Item์ Chunk ๋จ์๋งํผ ์์ ํ ์ด๋ฅผ Writer์ ์ ๋ฌํ๊ณ , ItemWriter
๋ ๋ฐฐ์น์ ์ถ๋ ฅ์ ๋ด๋นํ๋ค.
Copy public interface ItemWriter<T> {
void write(List<? extends T> var1) throws Exception;
}
ItemWriter
์ write()
๋ ์ธ์๋ก Item List๋ฅผ ๋ฐ๋ ๊ฒ์ ๋ณผ ์ ์๋ค. Spring Batch์์๋ ๋ค์ํ Output ํ์
์ ์ฒ๋ฆฌํ ์ ์๋๋ก Writer๋ฅผ ์ ๊ณตํ๊ณ ์๋ค.
Database Writer
๋ค์ 3๊ฐ์ง Writer๊ฐ ์์ผ๋ฉฐ, Database์ ์์์ฑ๊ณผ ๊ด๋ จํด์๋ ํญ์ Flush๋ฅผ ํด์ค์ผํ๋ค. Writer๊ฐ ๋ฐ์ ๋ชจ๋ Item์ด ์ฒ๋ฆฌ ๋ ํ์ Spring Batch๋ ํ์ฌ ํธ๋์ญ์
์ ์ปค๋ฐํ๋ค.
JdbcBatchItemWriter
ORM์ ์ฌ์ฉํ์ง ์๋ ๊ฒฝ์ฐ์๋ ๋๋ถ๋ถ JdbcBatchItemWriter
๋ฅผ ์ฌ์ฉํ๋ค.
JdbcBatchItemWriter
๋ JdbcTemplate
์ ์ฌ์ฉํ๋ฉฐ, JDBC์ Batch ๊ธฐ๋ฅ์ ์ฌ์ฉํด ํ๋ฒ์ DB๋ก ์ ๋ฌํ์ฌ DB๋ด๋ถ์์ ์ฟผ๋ฆฌ๊ฐ ์คํ ๋๋๋ก ํ๋ค. ์ดํ๋ฆฌ์ผ์ด์
๊ณผ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๋ ํ์๋ฅผ ์ต์ํํ์ฌ ์ฑ๋ฅํฅ์์ ํ ์ ์๋๋ก ํ๊ธฐ ์ํด์๋ค.
Copy public void write(final List<? extends T> items) throws Exception {
if (!items.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("Executing batch with " + items.size() + " items.");
}
int[] updateCounts;
int value;
if (!this.usingNamedParameters) {
updateCounts = (int[])this.namedParameterJdbcTemplate.getJdbcOperations().execute(this.sql, new PreparedStatementCallback<int[]>() {
public int[] doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
Iterator var2 = items.iterator();
while(var2.hasNext()) {
T item = var2.next();
JdbcBatchItemWriter.this.itemPreparedStatementSetter.setValues(item, ps);
ps.addBatch();
}
return ps.executeBatch();
}
});
} else if (items.get(0) instanceof Map && this.itemSqlParameterSourceProvider == null) {
updateCounts = this.namedParameterJdbcTemplate.batchUpdate(this.sql, (Map[])items.toArray(new Map[items.size()]));
} else {
SqlParameterSource[] batchArgs = new SqlParameterSource[items.size()];
value = 0;
Object item;
for(Iterator var5 = items.iterator(); var5.hasNext(); batchArgs[value++] = this.itemSqlParameterSourceProvider.createSqlParameterSource(item)) {
item = var5.next();
}
updateCounts = this.namedParameterJdbcTemplate.batchUpdate(this.sql, batchArgs);
}
if (this.assertUpdates) {
for(int i = 0; i < updateCounts.length; ++i) {
value = updateCounts[i];
if (value == 0) {
throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length + " did not update any rows: [" + items.get(i) + "]", 1);
}
}
}
}
}
์ด๋ write()
๋ฉ์๋๋ฅผ ๋ณด๋ฉด SQL๋ฌธ์ ํ๋ฒ์ฉ ํธ์ถํ๋ ๊ฒ์ด ์๋ batchUpdate
๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒญํฌ ๋จ์๋ก ์ผ๊ด์ฒ๋ฆฌ ํ๋ ๊ฒ์ ๋ณผ ์ ์๋ค. ์ด๋ ๊ฒ ํ๋ฉด ์คํ ์ฑ๋ฅ์ ํฌ๊ฒ ํฅ์ ์ํฌ ์ ์์ผ๋ฉฐ, ๋ฐ์ดํฐ ๋ณ๊ฒฝ ์คํ์ ํธ๋์ญ์
๋ด์์ ํ ์ ์๋ค.
Property
Parameter Type
Default
์ค๋ช
true์ด๋ฉด ๋ชจ๋ ์์ดํ
์ด ์ฝ์
์ด๋ ์์ ๋์๋์ง ๊ฒ์ฆํ๋ค.
์ฆ, ์ ์ด๋ ํ๋์ ํญ๋ชฉ์ด ํ์ ์
๋ฐ์ดํธ ํ๊ฑฐ๋ ์ญ์ ํ์ง ์์ ๊ฒฝ์ฐ ์์ธ(EmptyResultDataAccessException
)๋ฅผ throwํ ์ง ์ค์ ํ๋ค.
ํ์ํ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ํ ์ ๊ทผ ์ ๊ณต
๊ฐ ์์ดํ
๋น ์ํํ SQL
itemPreparedStatementSetter
ItemPreparedStatementSetter
ํ์ค PreparedState๊ฐ ์ ๊ณต๋๋ค๋ฉด(ํ๋ผ๋ฏธํฐ ์์น์ ?์ฌ์ฉ), ์ด ํด๋์ค๋ฅผ ์ฌ์ฉํด ํ๋ผ๋ฏธํฐ ๊ฐ์ ์ฑ์
itemSqlParameterSourceProvider
ItemSqlParameterSourceProvider
์ ๊ณต๋ SQL์ ๋ค์๋ ํ๋ผ๋ฏธํฐ๊ฐ ์ฌ์ฉ๋๋ค๋ฉด, ์ด ํด๋์ค๋ฅผ ์ฌ์ฉํด ํ๋ผ๋ฏธํฐ ๊ฐ ์ฑ์
SimpleJdbcOperations ์ธํฐํ์ด์ค์ ๊ตฌํ์ฒด๋ฅผ ์ฃผ์
๊ฐ๋ฅ
๊ฐ๊ฐ Writer๋ค์ด ์คํ๋๊ธฐ ์ํด ํ์ํ ํ์ ๊ฐ๋ค์ด ์ ๋๋ก ์ธํ
๋์ด ์๋์ง ํ์ธ
JdbcBatchItemWriterBuilder
JdbcBatchItemWriterBuilder๋ ๋ค์ 3๊ฐ์ง ์ค์ ๊ฐ์ ๊ฐ๊ณ ์๋ค.
Property
Parameter Type
Default
์ค๋ช
true์ด๋ฉด ๋ชจ๋ ์์ดํ
์ด ์ฝ์
์ด๋ ์์ ๋์๋์ง ๊ฒ์ฆํ๋ค.
์ฆ, ์ ์ด๋ ํ๋์ ํญ๋ชฉ์ด ํ์ ์
๋ฐ์ดํธ ํ๊ฑฐ๋ ์ญ์ ํ์ง ์์ ๊ฒฝ์ฐ ์์ธ(EmptyResultDataAccessException
)๋ฅผ throwํ ์ง ์ค์ ํ๋ค.
Key, Value ๊ธฐ๋ฐ์ผ๋ก Insert SQL์ Values๋ฅผ ๋งคํํ๋ค.
POJO ๊ธฐ๋ฐ์ผ๋ก Insert SQL์ Values๋ฅผ ๋งคํํ๋ค.
columnMapped
Copy @Bean // beanMapped์ ํ์
public JdbcBatchItemWriter<Pay> jdbcBatchItemWriter(){
return new JdbcBatchItemWriterBuilder<Map<String, Object>>() // Map ์ฌ์ฉ
.columnMapped()
.dataSource(this.dataSource)
.sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)")
.build();
}
beanMapped
Copy @Bean // beanMapped์ ํ์
public JdbcBatchItemWriter<Pay> jdbcBatchItemWriter(){
return new JdbcBatchItemWriterBuilder<Pay>()
.dataSource(dataSource)
.sql("insert into pay(amount, tx_name, tx_datetime) values (:amount, :txname, :txDateTime)")
.beanMapped()
.build();
}
afterPropertiesSet
์ด ์ธ์ afterPropertiesSet()
๋ฉ์๋๋ฅผ ์ถ๊ฐ๋ก ์๊ณ ์์ผ๋ฉด ์ข๋ค. ์ด ๋ฉ์๋๋ InitalizingBean
์ธํฐํ์ด์ค์์ ๊ฐ๊ณ ์์ผ๋ฉฐ, ItemWriter ๊ตฌํ์ฒด๋ค์ ๋ชจ๋ InitializingBean
์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๊ณ ์๋ค.
Copy public void afterPropertiesSet() {
Assert.notNull(this.namedParameterJdbcTemplate, "A DataSource or a NamedParameterJdbcTemplate is required.");
Assert.notNull(this.sql, "An SQL statement is required.");
List<String> namedParameters = new ArrayList();
this.parameterCount = JdbcParameterUtils.countParameterPlaceholders(this.sql, namedParameters);
if (namedParameters.size() > 0) {
if (this.parameterCount != namedParameters.size()) {
throw new InvalidDataAccessApiUsageException("You can't use both named parameters and classic \"?\" placeholders: " + this.sql);
}
this.usingNamedParameters = true;
}
if (!this.usingNamedParameters) {
Assert.notNull(this.itemPreparedStatementSetter, "Using SQL statement with '?' placeholders requires an ItemPreparedStatementSetter");
}
}
์ด ๋ฉ์๋๋ ๊ฐ๊ฐ Writer๋ค์ด ์คํ๋๊ธฐ ์ํด ํ์ํ ํ์ ๊ฐ๋ค์ด ์ ๋๋ก ์ธํ
๋์ด ์๋์ง ํ์ธํ๋ค. Writer ์์ฑ ํ ํด๋น ๋ฉ์๋๋ฅผ ์คํํ๋ฉด ์ด๋ ๊ฐ์ด ๋๋ฝ๋์๋์ง ์ ์ ์์ด์ ๋ง์ด ์ฌ์ฉํ๋ ์ต์
์ด๋ค.
Copy @Bean
public JdbcBatchItemWriter<Pay> jdbcBatchItemWriter(){
JdbcBatchItemWriter<Pay> jdbcBatchItemWriter = new JdbcBatchItemWriterBuilder<Pay>()
.dataSource(dataSource)
.sql("insert into pay(amount, tx_name, tx_date_time) values (:amount+1000, :txName, :txDateTime)")
.beanMapped()
.build();
jdbcBatchItemWriter.afterPropertiesSet();
return jdbcBatchItemWriter;
}
HibernateItemWriter
org.springframework.batch.item.database.HibernateItemWriter
Copy public class HibernateItemWriter<T> implements ItemWriter<T>, InitializingBean {
@Override
public void write(List<? extends T> items) {
doWrite(sessionFactory, items);
sessionFactory.getCurrentSession().flush();
if(clearSession) {
sessionFactory.getCurrentSession().clear();
}
}
/**
* Do perform the actual write operation using Hibernate's API.
* This can be overridden in a subclass if necessary.
*
* @param sessionFactory Hibernate SessionFactory to be used
* @param items the list of items to use for the write
*/
protected void doWrite(SessionFactory sessionFactory, List<? extends T> items) {
if (logger.isDebugEnabled()) {
logger.debug("Writing to Hibernate with " + items.size()
+ " items.");
}
Session currentSession = sessionFactory.getCurrentSession();
if (!items.isEmpty()) {
long saveOrUpdateCount = 0;
for (T item : items) {
if (!currentSession.contains(item)) {
currentSession.saveOrUpdate(item);
saveOrUpdateCount++;
}
}
if (logger.isDebugEnabled()) {
logger.debug(saveOrUpdateCount + " entities saved/updated.");
logger.debug((items.size() - saveOrUpdateCount)
+ " entities found in session.");
}
}
}
HibernateItemWriter
์์ ๊ฐ ์์ดํ
์ ๋ํด session.saveOrUpdate
๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉฐ, ๋ชจ๋ ์์ดํ
์ด ์ ์ฅ๋๊ฑฐ๋ ์์ ๋๋ฉด flush
๋ฉ์๋๋ฅผ ํตํด ๋ชจ๋ ๋ณ๊ฒฝ ์ฌํญ์ ํ๋ฒ์ ์คํํ๋ค.
์์กด์ฑ ์ถ๊ฐ
Copy compileOnly 'org.springframework.boot:spring-boot-starter-data-jpa'
ํ๋กํผํฐ ์ค์
Copy spring:
jpa:
properties:
hibernate:
current_session_context_class: org.springframework.orm.hibernate5.SpringSessionContext
JPA ์ด๋
ธํ
์ด์
์ถ๊ฐ
Copy @Entity // ๋งคํํ ๊ฐ์ฒด๊ฐ Entity์์ ๋ํ๋
@Table(name = "customer") // Entityrใ
๋งคํ๋๋ ํ
์ด๋ธ ์ง์
public class Customer {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id; // pk
private String firstName;
private String middleInitial;
private String lastName;
private String address;
private String city;
private String state;
private String zipCode;
}
Configurer ์์ฑ
Copy @Component
public class HibernateBatchConfigurer extends DefaultBatchConfigurer {
private DataSource dataSource;
private SessionFactory sessionFactory;
private PlatformTransactionManager transactionManager;
/**
* Datasource connection๊ณผ ํ์ด๋ฒ๋ค์ดํธ ์ธ์
์ค์
* @param dataSource
* @param entityManagerFactory
*/
public HibernateBatchConfigurer(DataSource dataSource,
EntityManagerFactory entityManagerFactory) {
super(dataSource);
this.dataSource = dataSource;
this.sessionFactory = entityManagerFactory.unwrap(SessionFactory.class);
// ํ์ด๋ฒ๋ค์ดํธ ํธ๋์ญ์
์ค์
this.transactionManager = new HibernateTransactionManager(this.sessionFactory);
}
@Override
public PlatformTransactionManager getTransactionManager() {
return this.transactionManager;
}
}
HibernateTransactionManager
๋ฅผ ํธ๋์ญ์
์ผ๋ก ์ค์ ํด์ค๋ค.
Copy @Bean
public HibernateItemWriter<Customer> hibernateItemWriter() {
return new HibernateItemWriterBuilder<Customer>()
.sessionFactory(entityManagerFactory.unwrap(SessionFactory.class))
.build();
}
JpaItemWriter
ORM์ ์ฌ์ฉํ ๋, Writer์ ์ ๋ฌํ๋ ๋ฐ์ดํฐ๊ฐ Entity ํด๋์ค์ธ ๊ฒฝ์ฐ JpaItemWriter
๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค. JpaItemWriter
๋ JPA๋ฅผ ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ ์์์ฑ ๊ด๋ฆฌ๋ฅผ ์ํด EntityManager
๋ฅผ ํ ๋นํด์ค์ผํ๋ค.
์ผ๋ฐ์ ์ผ๋ก spring-boot-starter-data-jpa
๋ฅผ ์์กด์ฑ์ ๋ฑ๋กํ๋ฉด EntityManager
๊ฐ Bean์ผ๋ก ์๋ ์์ฑ๋์ด DI์ฝ๋๋ง ์ถ๊ฐํ๋ฉด ๋๋ค.
Copy compileOnly 'org.springframework.boot:spring-boot-starter-data-jpa'
afterPropertiesSet
Copy public void afterPropertiesSet() throws Exception {
Assert.notNull(this.entityManagerFactory, "An EntityManagerFactory is required");
}
JpaItemWriter
์ afterPropertiesSet()
์์๋ EntityManagerFactory
๋ง ํ์ ๊ฐ์ผ๋ก ํ์ธํ๊ณ ์์ด ์ฒดํฌํ ์์๊ฐ ์ ๋ค. ์ฆ, setEntityManger
๋ง ํด์ฃผ๋ฉด ๋ชจ๋ ์ค์ ์ด ๋๋๋ค.
Copy @Bean
public JpaItemWriter<Pay> jpaCursorItemWriter() {
JpaItemWriter<Pay> jpaItemWriter = new JpaItemWriter<>();
jpaItemWriter.setEntityManagerFactory(entityManagerFactory);
return jpaItemWriter;
}
write()
Copy public void write(List<? extends T> items) {
EntityManager entityManager = EntityManagerFactoryUtils.getTransactionalEntityManager(this.entityManagerFactory);
if (entityManager == null) {
throw new DataAccessResourceFailureException("Unable to obtain a transactional EntityManager");
} else {
this.doWrite(entityManager, items);
entityManager.flush();
}
}
protected void doWrite(EntityManager entityManager, List<? extends T> items) {
if (logger.isDebugEnabled()) {
logger.debug("Writing to JPA with " + items.size() + " items.");
}
if (!items.isEmpty()) {
long addedToContextCount = 0L;
Iterator var5 = items.iterator();
while(var5.hasNext()) {
T item = var5.next();
if (!entityManager.contains(item)) {
if (this.usePersist) {
entityManager.persist(item);
} else {
entityManager.merge(item);
}
++addedToContextCount;
}
}
if (logger.isDebugEnabled()) {
logger.debug(addedToContextCount + " entities " + (this.usePersist ? " persisted." : "merged."));
logger.debug((long)items.size() - addedToContextCount + " entities found in persistence context.");
}
}
}
JpaItemWriter
์ doWrite()
๋ฅผ ๋ณด๋ฉด ๋์ด์จ item ๊ทธ๋๋ก entityManager.merge(item)
๋ฅผ ์ํํ์ฌ ํ
์ด๋ธ์ ๋ฐ๋ก ๋ฐ์ํ๊ธฐ ๋๋ฌธ์, JpaItemWriter
๋ Entity ํด๋์ค๋ฅผ ์ ๋ค๋ฆญ ํ์
์ผ๋ก ๋ฐ์์ผ๋ง ํ๋ค .
MyBatisBatchItemWriter
Step์์ ์ ์ํ Chunk Size(FetchSize)๋งํผ ์ฒ๋ฆฌํด์ฃผ๋ ค๋ฉด executorType์ BATCH ๋ก ์ค์ ํด์ค์ผํ๋ค.
Copy mybatis:
config-location: classpath:mybatis/mybatis-config.xml
mapper-locations: classpath*:mybatis/**/*.sql
executorType: BATCH
๊ทธ ๋ค์ ์ํํ ์ฟผ๋ฆฌ๋ฅผ mapper์ ์์ฑํด์ฃผ๊ณ ์ํํ๋ฉด ๋๋ค.
Copy <?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="spring.batch.practice.dao.customerMapper">
<select id="insertCustomer" parameterType="spring.batch.practice.domain.Customer">
INSERT INTO CUSTOMER(FIRST_NAME, MIDDLE_INITIAL, LAST_NAME, ADDRESS, CITY, STATE, ZIP_CODE)
VALUES (#{firstName}, #{middleInitial}, #{lastName}, #{address}, #{city}, #{state}, #{zipCode})
</select>
</mapper>
Copy @Bean
public MyBatisBatchItemWriter<Customer> testWriter(SqlSessionFactory sqlSessionFactory) {
return new MyBatisBatchItemWriterBuilder<Customer>()
.sqlSessionFactory(sqlSessionFactory)
.statementId("spring.batch.practice.dao.customerMapper.insertCustomer")
.build();
}
RepositoryItemWriter
์ฐ๊ธฐ ์์
์ํ์์๋ ํ์ด์ง์ด๋ ์ ๋ ฌ์ด ํ์์์ผ๋ฏ๋ก, CrudRepository
๋ฅผ ์ฌ์ฉํ๋ค.
Copy public interface CustomerRepository extends CrudRepository<Customer, Long> {
}
Copy @Bean
public RepositoryItemWriter<Customer> repositoryItemWriter() {
return new RepositoryItemWriterBuilder<Customer>()
.repository(customerRepository)
.methodName("save")
.build();
}
์์์ ๊ตฌํํ repository๋ฅผ ์ค์ ํด์ฃผ๊ณ , ํธ์ถํ ๋ฉ์๋๋ช
๋ง ์ง์ ํด์ฃผ๋ฉด๋๋ค.
Custom ItemWriter
Reader์๋ ๋ค๋ฅด๊ฒ Writer์ ๊ฒฝ์ฐ customํ๊ฒ ๊ตฌํํด์ผํ๋ ๊ฒฝ์ฐ๊ฐ ๋ง๋ค.
Reader์์ ์ฝ์ด์จ ๋ฐ์ดํฐ๋ฅผ RestTemplate์ผ๋ก ์ธ๋ถ API๋ฅผ ์ ๋ฌํด์ผํ๋ ๊ฒฝ์ฐ
์์ ์ ์ฅ์ ํ๊ณ ๋น๊ตํ๊ธฐ ์ํด singleton ๊ฐ์ฒด์ ๊ฐ์ ๋ฃ์ด์ผํ๋ ๊ฒฝ์ฐ
์ฌ๋ฌ Entity๋ฅผ ๋์์ ์ ์ฅํด์ผํ๋ ๊ฒฝ์ฐ
๋ค์๊ณผ ๊ฐ์ด ์ฌ๋ฌ ์ํฉ์ด ์์ ์ ์๋ค. ์ด๋ฌํ ๊ฒฝ์ฐ ItemWriter
์ธํฐํ์ด์ค๋ฅผ ์ง์ ๊ตฌํํ๋ฉด ๋๋ค.
java7 ์ดํ
Copy @Bean
public ItemWriter<Pay> customItemWriter() {
return new ItemWriter<Pay>() {
@Override
public void write(List<? extends Pay> items) throws Exception {
for (Pay item : items) {
System.out.println(item);
}
}
};
}
java8 ์ด์(ItemWriter์ ์ถ์๋ฉ์๋๊ฐ write()
ํ๊ฐ ์ด๋ฏ๋ก ๋๋ค์ ์ฌ์ฉ ๊ฐ๋ฅ)
Copy @Bean
public ItemWriter<Pay> customItemWriter() {
return items -> {
for (Pay item : items) {
System.out.println(item);
}
};
}
๋ค์๊ณผ ๊ฐ์ด write()
ํจ์๋ฅผ @Override
ํ๋ฉด ๊ตฌํ์ฒด ์์ฑ์ ๋๋๋ค.
์ฐธ๊ณ