리액티브 프로그래밍 — Reactor와 Reactive Streams 이해하기
만 명이 동시에 API를 호출하면, 스레드 만 개를 만들어야 할까? 전통적인 서블릿 모델에서는 요청 하나당 스레드 하나를 할당한다. 그런데 스레드는 무한히 만들 수 있는 게 아니다. 메모리를 잡아먹고, 컨텍스트 스위칭 비용도 만만찮다. 리액티브 프로그래밍은 이 문제에 대한 하나의 답이다.
▸ TIP 이 글의 코드 예제를 직접 실행해보고 싶다면 Java 기본기 핸드북을 확인해보세요.
1. 리액티브가 왜 필요한가
동기 블로킹의 한계
[요청 1] → [스레드 1] → DB 쿼리 (200ms 대기) → 응답
[요청 2] → [스레드 2] → 외부 API (500ms 대기) → 응답
...
[요청 201] → 스레드 풀 고갈 → 대기열에서 기다림
Tomcat의 기본 스레드 풀은 200개다. 201번째 요청부터는 앞선 요청이 끝나기를 기다려야 한다. 스레드가 실제로 CPU를 쓰는 시간은 얼마 안 되는데, 대부분 I/O 대기 상태로 놀고 있다.
thread-per-request 모델의 문제
- 메모리: 스레드 하나당 약 1MB의 스택 메모리. 1,000개면 1GB
- 컨텍스트 스위칭: 스레드가 많아지면 OS의 전환 비용이 급격히 증가
- 확장성 한계: 동시 접속자가 수만 명이면 스레드를 수만 개 만들 수 없다
리액티브 프로그래밍은 적은 수의 스레드로 많은 요청을 처리하는 방식이다. I/O 대기 동안 스레드를 점유하지 않고, 데이터가 준비되면 콜백으로 이어서 처리한다.
2. Reactive Streams 스펙
리액티브 프로그래밍의 표준 인터페이스가 Reactive Streams다. Java 9에서 java.util.concurrent.Flow로 편입되었고, 핵심은 네 개의 인터페이스다.
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s); // 구독 등록
}
public interface Subscriber<T> {
void onSubscribe(Subscription s); // 구독 시작
void onNext(T t); // 데이터 수신
void onError(Throwable t); // 에러 발생
void onComplete(); // 완료 신호
}
public interface Subscription {
void request(long n); // n개의 데이터를 요청 (배압의 핵심)
void cancel(); // 구독 취소
}
// Publisher이자 Subscriber — 중간 처리 단계
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
동작 흐름
Publisher Subscriber
│◄── subscribe() ────────────│
│── onSubscribe(Subscription)──►│
│◄── request(3) ─────────────│ ← 3개 요청
│── onNext(data1) ──────────►│
│── onNext(data2) ──────────►│
│── onNext(data3) ──────────►│
│◄── request(2) ─────────────│ ← 추가 2개 요청
│── onNext(data4) ──────────►│
│── onNext(data5) ──────────►│
│── onComplete() ───────────►│
핵심은 Subscriber가 먼저 request(n)을 호출해야 Publisher가 데이터를 보낸다는 것이다. 이것이 배압(Backpressure)의 기본 원리다.
3. 배압(Backpressure)
Publisher가 초당 10,000건을 발행하는데 Subscriber가 초당 100건만 처리할 수 있으면? 배압이 없으면 메모리에 데이터가 쌓여서 OutOfMemoryError가 발생한다.
request(n)을 통해 Subscriber가 처리 가능한 만큼만 요청하면, 소비 속도에 맞춰 발행 속도가 자연스럽게 조절된다.
| 전략 | 설명 | 사용 시나리오 |
|---|---|---|
| Buffer | 초과 데이터를 버퍼에 저장 | 일시적인 속도 차이 |
| Drop | 초과 데이터를 버림 | 최신 데이터만 중요한 경우 |
| Latest | 가장 최신 데이터만 유지 | 실시간 가격, 센서 데이터 |
| Error | 초과 시 에러 발생 | 데이터 유실 불가 |
Flux.range(1, 1000)
.onBackpressureBuffer(100) // 최대 100개까지 버퍼링
.subscribe(data -> slowProcess(data));
4. Reactor 소개 — Mono와 Flux
Reactor는 Reactive Streams 스펙의 구현체이자 Spring WebFlux의 기반 라이브러리다. 핵심 타입은 딱 두 개.
Mono<T>: 0 또는 1개의 데이터를 발행Flux<T>: 0 ~ N개의 데이터를 발행
중요한 원칙: 구독하기 전까지 아무 일도 일어나지 않는다. subscribe()를 호출해야 파이프라인이 실행된다.
Mono<String> mono = Mono.just("안녕하세요");
// 여기서는 아무 일도 안 일어남
mono.subscribe(System.out::println);
// 이제 "안녕하세요" 출력
5. Mono — 0 또는 1개의 데이터
// 생성
Mono<String> mono = Mono.just("데이터");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("실패"));
// 지연 생성 — 구독 시점에 실행
Mono<String> deferred = Mono.defer(() ->
Mono.just("현재 시간: " + System.currentTimeMillis())
);
// 블로킹 호출을 감싸기
Mono<String> fromCallable = Mono.fromCallable(() -> fetchFromDatabase());
변환과 에러 처리
Mono.just("hello")
.map(s -> s.toUpperCase()) // 동기 변환: "HELLO"
.flatMap(s -> saveToDb(s)) // 비동기 변환: Mono<Result> 반환
.onErrorResume(e -> { // 에러 시 대체 Mono
log.warn("실패: {}", e.getMessage());
return fetchFromCache();
})
.subscribe(result -> log.info("완료: {}", result));
실무 패턴 — 있으면 반환, 없으면 생성
public Mono<User> findOrCreate(String email) {
return userRepository.findByEmail(email)
.switchIfEmpty(Mono.defer(() -> { // 비어있으면 실행
User newUser = new User(email);
return userRepository.save(newUser);
}));
}
Mono.defer()로 감싸는 이유는 구독 시점에 실행되도록 지연하기 위해서다. 감싸지 않으면 switchIfEmpty와 관계없이 즉시 실행된다.
6. Flux — 0~N개의 데이터
Flux<String> flux = Flux.just("A", "B", "C");
Flux<Integer> range = Flux.range(1, 10);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
// 필터와 변환
Flux.range(1, 100)
.filter(n -> n % 2 == 0) // 짝수만
.map(n -> n * 10) // 10배
.take(5) // 처음 5개만
.subscribe(System.out::println); // 20, 40, 60, 80, 100
// 여러 Flux 결합
Flux.zip(flux1, flux2, (s, n) -> s + n); // 쌍으로 묶기: A1, B2, C3
Flux.concat(flux1, flux2); // 순서대로 이어 붙이기
Flux.merge(flux1, flux2); // 도착 순서대로 (순서 미보장)
7. 핵심 연산자
map vs flatMap
이 둘의 차이를 확실히 알아야 한다. 면접에서도 자주 나온다.
// map: 동기 1:1 변환 (T → R)
Flux.just(1, 2, 3)
.map(n -> n * 2)
.subscribe(System.out::println); // 2, 4, 6
// flatMap: 비동기 1:N 변환 (T → Publisher<R>)
Flux.just(1, 2, 3)
.flatMap(n -> fetchData(n)) // 각 값마다 비동기 호출
.subscribe(System.out::println); // 결과 순서 보장 안 됨
| 구분 | map | flatMap |
|---|---|---|
| 변환 타입 | T → R | T → Publisher<R> |
| 동기/비동기 | 동기 | 비동기 |
| 순서 보장 | 보장 | 보장 안 됨 |
| 사용 시점 | 단순 값 변환 | DB 조회, API 호출 등 |
flatMap vs concatMap
flatMap은 순서를 보장하지 않는다. 순서가 중요하면 concatMap을 쓴다.
// flatMap: 병렬 실행, 순서 미보장 (빠름)
Flux.just(3, 1, 2)
.flatMap(n -> Mono.just(n).delayElement(Duration.ofMillis(n * 100)))
.subscribe(System.out::println); // 1, 2, 3
// concatMap: 순차 실행, 순서 보장 (느림)
Flux.just(3, 1, 2)
.concatMap(n -> Mono.just(n).delayElement(Duration.ofMillis(n * 100)))
.subscribe(System.out::println); // 3, 1, 2
switchIfEmpty vs defaultIfEmpty
Mono.empty().defaultIfEmpty("기본값"); // 고정 값 반환
Mono.empty().switchIfEmpty(fetchDefaultFromDb()); // 대체 Publisher 실행
zip — 여러 비동기 작업 동시 실행
Mono<User> user = userService.findById(userId);
Mono<List<Order>> orders = orderService.findByUserId(userId);
Mono<Point> point = pointService.getBalance(userId);
Mono.zip(user, orders, point)
.map(tuple -> new UserProfile(tuple.getT1(), tuple.getT2(), tuple.getT3()))
.subscribe(profile -> log.info("프로필 조회 완료"));
세 API가 동시에 실행되므로, 가장 느린 API의 응답 시간이 전체 응답 시간이 된다.
8. 스케줄러
| 스케줄러 | 용도 | 스레드 특성 |
|---|---|---|
Schedulers.immediate() | 현재 스레드 | 스레드 전환 없음 |
Schedulers.single() | 단일 스레드 | 순차 처리 |
Schedulers.parallel() | CPU 집약적 작업 | CPU 코어 수만큼 |
Schedulers.boundedElastic() | 블로킹 I/O | 최대 10 × CPU 코어 |
publishOn vs subscribeOn
Flux.range(1, 3)
.map(i -> {
log.info("[map1] {} on {}", i, Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.parallel()) // 여기부터 parallel 스레드
.map(i -> {
log.info("[map2] {} on {}", i, Thread.currentThread().getName());
return i + 1;
})
.subscribeOn(Schedulers.boundedElastic()) // 소스 구독은 elastic 스레드
.subscribe();
// [map1] 1 on boundedElastic-1 ← subscribeOn의 영향
// [map2] 10 on parallel-1 ← publishOn의 영향
기억할 포인트:
subscribeOn은 체인 어디에 있든 소스의 실행 스레드를 변경한다publishOn은 선언 위치 이후의 연산자에만 영향을 준다- 블로킹 작업은 반드시
Schedulers.boundedElastic()에서 실행한다
// 블로킹 DB 호출을 리액티브로 감싸기
Mono<User> user = Mono.fromCallable(() ->
jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = ?", userRowMapper, userId)
)
.subscribeOn(Schedulers.boundedElastic());
절대로 Schedulers.parallel()에서 블로킹 호출을 하면 안 된다. parallel 스레드가 고갈되면 전체 파이프라인이 멈춘다.
9. 에러 처리
리액티브에서는 try-catch를 쓸 수 없다. 파이프라인 안에서 에러를 처리해야 한다.
// onErrorReturn — 기본값 반환
Mono.just("abc")
.map(s -> Integer.parseInt(s))
.onErrorReturn(0)
.subscribe(System.out::println); // 0
// onErrorResume — 대체 로직 실행 (특정 예외 타입 지정 가능)
Mono.just("abc")
.map(s -> Integer.parseInt(s))
.onErrorResume(NumberFormatException.class, e -> Mono.just(-1))
.subscribe(System.out::println); // -1
// retry — 재시도
webClient.get().uri("/api/data").retrieve()
.bodyToMono(String.class)
.retry(3) // 최대 3회 재시도
.subscribe(System.out::println);
// retryWhen — 지수 백오프로 세밀한 재시도 제어
webClient.get().uri("/api/data").retrieve()
.bodyToMono(String.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10)))
.subscribe(System.out::println);
doOnError는 에러를 소비하지 않는다. 로깅용으로 쓰고, 실제 복구는 onErrorReturn이나 onErrorResume에서 한다.
10. WebFlux와의 관계
Spring MVC (동기 블로킹)
요청 → 스레드 → DB 쿼리 ── 대기 ── 응답 (스레드가 대기하는 동안 아무것도 못 함)
Spring WebFlux (비동기 논블로킹)
요청 → 이벤트 루프 → DB 쿼리 ── 콜백 등록 (스레드가 다른 요청 처리)
WebFlux는 Reactor를 기반으로 동작한다. 컨트롤러에서 Mono와 Flux를 반환하면 된다.
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findById(id);
}
@GetMapping("/users")
public Flux<User> getAllUsers() {
return userService.findAll();
}
언제 리액티브를 쓰고, 언제 쓰지 않는가
적합한 경우: 높은 동시성(채팅, 알림, 스트리밍), I/O 대기가 긴 서비스, MSA 게이트웨이
불필요하거나 오히려 안 좋은 경우: CRUD 위주 서비스, 팀이 리액티브에 익숙하지 않은 경우, JPA·MyBatis 같은 블로킹 라이브러리 중심, CPU 집약적 작업
면접에서 "리액티브를 무조건 써야 한다"고 답하면 감점 요인이다. 트레이드오프를 설명할 수 있어야 한다.
11. Virtual Threads vs Reactive
Java 21에서 가상 스레드가 정식 출시되면서 선택지가 하나 더 생겼다.
// 가상 스레드: 기존 동기 코드를 그대로 쓰면서 확장성 확보
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10_000; i++) {
executor.submit(() -> {
// 블로킹 코드도 OK — 가상 스레드가 알아서 양보
return httpClient.send(request, BodyHandlers.ofString()).body();
});
}
}
| 구분 | Reactive (Reactor) | Virtual Threads |
|---|---|---|
| 프로그래밍 모델 | 선언적 파이프라인 | 기존 명령형 코드 |
| 학습 곡선 | 높음 | 낮음 |
| 디버깅 | 어려움 (스택 트레이스 복잡) | 쉬움 (일반 스레드와 동일) |
| 배압 지원 | 내장 | 직접 구현 필요 |
| 생태계 | WebFlux, R2DBC, WebClient | 기존 JDBC, JPA 그대로 사용 |
| 최소 Java 버전 | Java 8+ | Java 21+ |
방향성:
- 새 프로젝트 + Java 21+이라면 가상 스레드부터 고려
- 기존 WebFlux 프로젝트는 굳이 마이그레이션할 필요 없음
- 실시간 스트리밍/이벤트 기반이면 Reactor가 더 적합
- 면접에서는 "둘 다 알고 있고, 상황에 따라 선택한다"고 답하는 게 가장 좋다
12. 정리 테이블
| 개념 | 설명 |
|---|---|
| Reactive Streams | 비동기 스트림 처리의 표준 인터페이스 (Publisher, Subscriber, Subscription) |
| 배압 (Backpressure) | Subscriber가 처리 가능한 속도로 데이터를 요청하는 메커니즘 |
| Mono | 0~1개의 데이터를 발행하는 Publisher |
| Flux | 0~N개의 데이터를 발행하는 Publisher |
| Cold Sequence | 구독 시점에 데이터 발행 시작 (subscribe 전까지 아무 일도 안 일어남) |
| Hot Sequence | 구독 여부와 관계없이 데이터 발행 (실시간 이벤트) |
| 연산자 | 용도 |
|---|---|
map | 동기 1:1 변환 |
flatMap | 비동기 1:N 변환 (순서 미보장) |
concatMap | 비동기 1:N 변환 (순서 보장) |
zip | 여러 Publisher를 조합 |
switchIfEmpty | 빈 결과 시 대체 Publisher |
publishOn | 다운스트림 실행 스레드 변경 |
subscribeOn | 소스 구독 스레드 변경 |
onErrorReturn | 에러 시 기본값 |
onErrorResume | 에러 시 대체 Publisher |
retry / retryWhen | 에러 시 재시도 |
마무리
리액티브 프로그래밍은 "적은 스레드로 많은 요청을 처리"하기 위한 패러다임이다. 핵심을 정리하면:
- Reactive Streams는 Publisher-Subscriber 패턴에 배압을 추가한 표준이다
- Reactor의 Mono(0
1개)와 Flux(0N개)가 이 스펙의 구현체다 - 구독 전에는 아무 일도 일어나지 않는다 — cold 시퀀스의 원칙
- map은 동기, flatMap은 비동기 — 이 차이를 코드로 설명할 수 있어야 한다
- publishOn은 다운스트림, subscribeOn은 소스 — 스레드 제어의 두 축
- Virtual Threads와 비교할 수 있으면 면접에서 큰 플러스다
공부하면서 느낀 건데, 리액티브는 머리로 이해하는 것과 실제로 디버깅하는 것 사이의 갭이 크다. log() 연산자로 흐름을 직접 확인해보는 게 가장 좋은 학습법이다.
// log()로 파이프라인 흐름 확인
Flux.range(1, 5)
.log("range") // 구독, request, onNext 등 로그 출력
.map(n -> n * 2)
.log("mapped")
.subscribe();
다음 글에서는 자바 디자인 패턴을 다룬다. 면접에서 "싱글톤 패턴을 구현해보세요"라고 했을 때, 코드로 설명할 수 있어야 하는 패턴들을 정리할 예정이다.