ItemPorcessor는 데이터를 가공하거나 필터링하는 역할을 하며, 필수가 아니다. 이 역할은 ItemWriter에서도 구현이 가능하지만, 분리함으로써 비즈니스 코드가 섞이는 것을 방지할 수 있다.
public interface ItemProcessor<I, O> {
@Nullable
O process(@NonNull I var1) throws Exception;
}
I는 ItemReader에서 받을 데이터 타입이며, O는 ItemWriter에 보낼 데이터 타입이다. 즉, Reader에서 읽은 데이터가 ItemProcessor의 process()를 통과한 후 Writer에 전달된다. 구현해야할 메소드는 process하나이며, Java 8부터는 인터페이스의 추상 메서드가 1개인 경우 람다식을 사용할 수 있다.
위 예제와 같이 고유한 메세지를 지정할 수 있으며, 필드 값의 길이가 잘못됐는지 형식이 잘못됐는지 식별할 수 있다.
@Bean
public Step validationDelimitedFileStep() {
return this.stepBuilderFactory.get("validationDelimitedFileStep")
.<Customer, Customer>chunk(10)
.reader(validationDelimitedCustomerItemReader(null))
.processor(validationCustomerProcessor()) // processor
.writer(validationDelimitedCustomerItemWriter())
.build();
}
/**
* BeanValidationItemProcessor 설정
* @return
*/
@Bean
public BeanValidatingItemProcessor<Customer> validationCustomerProcessor() {
return new BeanValidatingItemProcessor<>();
}
Field error in object 'item' on field 'middleInitial': rejected value [YS]; codes [Size.item.middleInitial,Size.middleInitial,Size.java.lang.String,Size]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [item.middleInitial,middleInitial]; arguments []; default message [middleInitial],1,1]; default message [크기가 1에서 1 사이여야 합니다]
Field error in object 'item' on field 'middleInitial': rejected value [YS]; codes [Pattern.item.middleInitial,Pattern.middleInitial,Pattern.java.lang.String,Pattern]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [item.middleInitial,middleInitial]; arguments []; default message [middleInitial],[Ljavax.validation.constraints.Pattern$Flag;@5c7a06ec,[a-zA-Z]]; default message [middleInitial는 반드시 영어여야합니다.]
다음과 같이 지정한 validation에 맞지 않으면 예외가 발생하는 것을 볼 수 있다.
ValidatingItemProcessor
데이터셋 내에서 한개의 필드의 값이 고유해야하는 경우가 있을 수 있다. 고유한 값의 필드를 ItemStream 인터페이스를 구현하여, 각 커밋과 필드 값을 ExecutionContext에 저장해 상태를 유지할 수 있다.
/**
* JobExecution 간의 상태를 저장하기 위해 ItemStreamSupport 상속
*/
public class UniqueLastNameValidator extends ItemStreamSupport implements Validator<Customer> {
private Set<String> lastNames = new HashSet<>();
@Override
public void validate(Customer value) throws ValidationException {
if (lastNames.contains(value.getLastName())) {
throw new ValidationException(value.getLastName() + " lastName이 중복됩니다.");
}
this.lastNames.add(value.getLastName());
}
@Override
public void open(ExecutionContext executionContext) {
String lastNames = getExecutionContextKey("lastNames");
// lastNames가 Execution에 저장되어있는지 확인 후 저장되어있다면, 스텝 처리 이전에 해당값으로 원복
if (executionContext.containsKey(lastNames)) {
this.lastNames = (Set<String>) executionContext.get(lastNames);
}
}
/**
* 청크 단위로 수행되는데, 오류가 발생할 경우 현재 상태를 ExecutionContext에 저장
* @param executionContext
*/
@Override
public void update(ExecutionContext executionContext) {
Iterator<String> itr = lastNames.iterator();
Set<String> copiedLastNames = new HashSet<>();
while (itr.hasNext()) {
copiedLastNames.add(itr.next());
}
executionContext.put(getExecutionContextKey("lastNames"), copiedLastNames);
}
}
Validator를 구현한 후 Step을 다음과 같이 구현하면 된다.
@Bean
public Step validationDelimitedFileStep() {
return this.stepBuilderFactory.get("validationDelimitedFileStep")
.<Customer, Customer>chunk(10)
.reader(validationDelimitedCustomerItemReader(null))
.processor(customerValidatingItemProcessor()) // 프로세서
.writer(validationDelimitedCustomerItemWriter())
.stream(uniqueLastNameValidator()) // stream 설정
.build();
}
@Bean
public ValidatingItemProcessor<Customer> customerValidatingItemProcessor() {
return new ValidatingItemProcessor<>(uniqueLastNameValidator());
}
@Bean
public UniqueLastNameValidator uniqueLastNameValidator() {
UniqueLastNameValidator uniqueLastNameValidator = new UniqueLastNameValidator();
uniqueLastNameValidator.setName("uniqueLastNameValidator");
return uniqueLastNameValidator;
}
하지만, 여기서 제네릭 타입은 사용하지 못하며, 만약 제네릭타입을 사용하게 되면 delegates에 포함된 ItemProcessor는 모두 같은 제네릭 타입을 가져야한다. 만약 같은 제네릭 타입을 사용할 수 있는 ItemProcessor간 체이닝이라면 제네릭을 선언하는 것이 더 안전한 코드가 될 수 있다.