Stream
Last updated
Last updated
Stream은 Java8부터 추가된 Collection의 저장 요소를 하나씩 참조해서 Lambda식으로 처리할 수 있도록 해주는 반복자이다.
Stream은 Iterator와 비슷한 역할을 하는 반복자 이지만 다음과 같은 특징을 가진다.
람다식으로 요소 처리 코드를 제공한다.
내부 반복자를 사용해 병렬 처리가 쉽다.
외부 반복자(external iterator)는 개발자가 코드로 직접 Collection의 요소를 반복해서 가져오는 패턴을 말한다. for문, Iterator를 이용하는 whlie문 모두 외부 반복자를 이용하는 것이다.
내부 반복자(internal iterator)는 Collection 내부에서 요소들을 반복시키고, 개발자는 요소당 처리해야할 코드만 제공하는 패턴을 말한다. 내부 반복자를 사용해 개발자는 요소 처리 코드에만 집중할 수 있으며, 내부 반복자는 병렬 작업을 할 수 있도록 도와주기 때문에 효율적으로 요소를 반복시킬 수 있다. 스트림을 이용하면 코드도 간결해지지만, 요소의 병렬 처리가 Collecton 내부에서 처리되는것이 더 좋은 효과를 가져온다.
다음과 같이 병렬처리 되는 것을 확인할 수 있다.
스트림은 중간 처리와 최종 처리를 할 수 있다.
중간 처리 : 매핑, 필터링, 정렬
최종 처리 : 반복, 카운팅, 평균, 총합 등의 집계 처리
BaseStream
인터페이스에는 모든 스트림에서 사용할 수 있는 공통 메소드들이 정의 되어있으며, 직접 사용되지는 않는다.
Stream<T>
java.util.Collection.stream() java.util.Collection.parallelStream()
Collection
Stream<T> IntStream LongStream DoubleStream
Arrays.stream(T[]) Stream.of(T[]) Arrays.stream(int[]) IntStream.of(int[]) Arrays.stream(long[]) LongStream.of(long[]) Arrays.stream(double[]) DoubleStream.of(double[])
배열
IntStream
IntStream.range(int, int) IntStream.rangeClosed(int, int)
int 범위
LongStream
LongStream.range(long, long) LongStream.rangeClosed(long, long)
long 범위
Stream<Path>
Files.find(Path, int, BiPredicate, FileVisitOption) Files.list(Path)
디렉토리
Stream<String>
Files.lines(Path, Charset) Files.list(Path)
파일
DoubleStream IntStream LongStream
Random.doubles() Random.ints() Random.longs()
랜덤수
rangeClosed는 두번째 인자를 포함하며, range는 두번째 인자를 포함하지 않는다.
대량의 데이터를 가공해 축소하는 것을 일반적으로 Reduction
이라고 한다. 데이터의 합계, 평균값, 카운팅, 최대, 최소값 등이 대표적인 리덕션의 결과물이라고 볼 수 있다. 이때 결과물로 바로 집계할 수 없는 경우에는 중간처리(필터링, 매핑, 정렬, 그룹핑)이 필요하다.
파이프라인은 여러개의 스트림이 연결되어 있는 구조를 말하며, 최종처리를 제외하고는 모두 중간 처리 스트림이다.
중간 스트림이 생성될 때 요소들이 바로 처리되는 것이 아닌, 최종 처리가 시작되기 전까지 중간처리는 지연(lazy)되며, 최종 처리가 시작되면 중간 스트림에서 처리를 시작한다.
Java Platform SE8 에서 중간처리, 최정처리 관련 메소드 예시를 살펴볼 수 있다.
리턴타입이 스트림이라면 중간 처리 메소드이고, 기본타입 이거나 OptionalXXX
라면 최종 처리 메소드이다.
Optional
클래스는 저장하는 값의 타입만 다를 뿐 제공하는 기능은 거의 동일하다. Optional
클래스는 단순히 집계 값만 저장하는 것이 아니라, 집계 값이 존재하지 않을 경우 디폴트 값을 설정할 수 있고, 집계 값을 처리하는 Consumer
도 등록할 수 있다.
boolean
isPresent()
값이 저장되어 있는지 여부
T double int long
orElse(T) orElse(double) orElse(int) orElse(long)
값이 저장되어 있지 않을 경우 디폴트 값 지정
void
ifPresent(Consumer) ifPresent(DoubleConsumer) ifPresent(IntConsumer) ifPresent(LongConsumer)
값이 저장되어 있을 경우 Consumer에서 처리
reduce는 스트림의 모든 원소들을 하나의 결과로 합친다.
Stream
Optional<T>
reduce(BinaryOperator<T> accumulator)
T
reduce(T identity, BinaryOperator<T> accumulator)
IntStream
OptionalInt
reduce(IntBinaryOperator op)
int
reduce(int identity, IntBinaryOperator op)
LongStream
OptionalLong
reduce(LongBinaryOperator op)
long
reduce(long identity, LongBinaryOperator op)
DoubleStream
OptionalDouble
reduce(DoubleBinaryOperator op)
double
reduce(double identity, DoubleBinaryOperator op)
학생들의 성적 총점을 구하는 예시이다. 여기서 스트림에 요소가 없을 경우에 NoSuchElementException
이 발생한다.
identity(디폴트 값)을 주어, 스트림에 요소가 없을 경우에는 0을 리턴하는 예시이다.
다음은 3개의 매개변수를 받은 예시이다.
다음과 같이 병렬로 처리를 하면 어떻게 실행되는지 확인할 수 있다.
collect 메소드는 필요한 요소만 Collection
으로 담을 수 있고, 요소들을 그룹핑한 후 집계할 수 있다.
R
collect(Collector<T,A,R> collertor)
Stream
매개값인 Collector는 어떤 요소를 어떤 Collection에 수집할 것인지를 결정한다. Collectors 구현 객체는 아래 Collectors 클래스의 다양한 정적 메소드를 이용해서 얻을 수 있다.
Collector<T, ?, List<T>>
toList()
T를 List에 저장
Collector<T, ?, Set<T>>
toSet()
T를 Set에 저장
Collector<T, ?, Collection<T>>
toCollection(Supplier<Collection<T>>)
T를 Supplier가 제공한 Collection에 저장
Collertor<T, ?, Map<K,U>>
toMap( Function<T,K> keyMapper, Function<T,U> valueMapper)
T를 K와 U로 매핑해 K를 키로, U를 값으로 Map에 저장
Collector<T, ?, ConcurrentMap<K,U>>
toConcurrentMap( Function<T,K> keyMapper, Function<T,U> valueMapper)
T를 K와 U로 매핑해 K를 키로, U를 값으로 ConcurrentMap에 저장
다음과 같이 구현할 수 있다.
collect는 단순히 요소를 수집하는 기능 외에 Collection의 요소를 그룹핑해서 Map 객체를 생성하는 기능도 제공한다.
Collertor<T, ?, Map<K,List<T>>>
groupingBy(Function<T,K> classfier)
T를 K로 매핑하고 K키에 저장된 List에 T를 저장한 Map 생성
Collertor<T, ?, ConcurrentMap<K,List<T>>>
groupingByConcurrent(Function<T,K> classfier)
Collertor<T, ?, Map<K, D>>
groupingBy( Function<T,K> classfier, Collector<T,A,D> collector)
T를 K에 매핑하고 K키에 저장된 D객체에 T를 누적한 Map 생성
Collertor<T, ?, ConcurrentMap<K, D>>
groupingByConcurrent( Function<T,K> classfier, Collector<T,A,D> collector)
Collertor<T, ?, Map<K, D>>
groupingBy( Function<T,K> classfier, Supplier<Map<K,D>> mapFactory, Collector<T,A,D> collector)
T를 K로 매핑하고 Supplier가 제공하는 Map에서 K키에 저장된 D객체에 T를 누적
Collertor<T, ?, ConcurrentMap<K, D>>
groupingBy( Function<T,K> classfier, Supplier<ConcurrentMap<K,D>> mapFactory, Collector<T,A,D> collector)
다음 예제는 성적별로 해당 Student List를 구할 수 있다.
다음은 성별로 평균 점수를 저장한 Map을 구현한 예제이다. 위의 예제 처럼 그룹핑 후에 매핑이나 집계(평균, 카운팅, 연결, 최대, 최소, 합계)를 할 수 있다.
병렬 처리란 멀티 코어 CPU 환경에서 하나의 작업을 분할해서 각각의 코어가 병렬적으로 처리하는 것을 말하며, 작업 처리 시간을 줄이기 위해 사용한다. Java8부터 병렬 스트림을 제공하고 있다.
멀티 쓰레드는 동시성(Concurrency)와 병렬성(Parallelism)으로 실행되고 있다.
동시성 : 멀티 스레드가 번갈아가며 실행하는 성질
병렬성 : 멀티 코어를 이용해 동시에 실행하는 성질
데이터 병렬성 : 전체 데이터를 쪼개어 서브 데이터들로 만들고 이 데이터들을 병렬 처리해 작업을 빨리 끝내는 것(parallelStream)
작업 병렬성 : 서로 다른 작업을 병렬 처리하는 것(Web Server : 각각 브라우저에서 요청한 내용을 개별 스레드에서 병렬로 처리)
싱글 코어 CPU를 이용한 멀티 작업은 병렬적으로 실행되는 것처럼 보이지만, 번갈아가며 실행하는 동시성 작업이다.