List — LPUSH, RPOP, 그리고 메시지 큐로 활용하기
단순한 리스트가 어떻게 메시지 큐, 작업 대기열, 타임라인 등 다양한 용도로 활용될 수 있을까요?
개념 정의
Redis List는 문자열의 순서 있는 컬렉션입니다. 양쪽 끝에서의 삽입/삭제가 O(1)이므로 큐(FIFO)나 스택(LIFO)으로 활용할 수 있습니다. 내부적으로는 quicklist(listpack 노드들의 이중 연결 리스트)로 구현되어 있습니다.
왜 필요한가
- 작업 큐: 백그라운드 작업을 큐에 넣고 워커가 소비
- 최근 항목 유지: 최근 N개의 로그, 알림, 활동 기록
- 타임라인: 소셜 미디어의 피드 구현
- 간단한 메시지 전달: 서비스 간 비동기 메시지 전달
기본 명령어
삽입
# 왼쪽(Head)에 삽입 — O(1)
LPUSH tasks "task3" "task2" "task1"
# 결과: ["task1", "task2", "task3"]
# 여러 개를 넣으면 마지막에 넣은 것이 Head
# 오른쪽(Tail)에 삽입 — O(1)
RPUSH logs "log1" "log2" "log3"
# 결과: ["log1", "log2", "log3"]
# 키가 존재할 때만 삽입
LPUSHX tasks "task0" # tasks가 있을 때만 삽입
RPUSHX tasks "task4" # tasks가 있을 때만 삽입
# 특정 원소 앞/뒤에 삽입 — O(N)
LINSERT tasks BEFORE "task2" "task1.5"
LINSERT tasks AFTER "task2" "task2.5"
삭제 및 조회
# 왼쪽에서 꺼내기 — O(1)
LPOP tasks # 하나 꺼냄
LPOP tasks 3 # 3개 꺼냄 (6.2+)
# 오른쪽에서 꺼내기 — O(1)
RPOP tasks
RPOP tasks 3 # 3개 꺼냄 (6.2+)
# 인덱스로 조회 — O(N)
LINDEX tasks 0 # 첫 번째 원소
LINDEX tasks -1 # 마지막 원소
# 범위 조회 — O(S+N)
LRANGE tasks 0 9 # 처음 10개
LRANGE tasks 0 -1 # 전체 (주의: 큰 리스트에서는 위험)
# 리스트 길이 — O(1)
LLEN tasks
수정
# 인덱스로 값 변경 — O(N)
LSET tasks 0 "updated_task1"
# 값으로 삭제 — O(N)
LREM tasks 0 "task2" # 모든 "task2" 삭제
LREM tasks 1 "task2" # 앞에서부터 1개만 삭제
LREM tasks -1 "task2" # 뒤에서부터 1개만 삭제
# 범위 밖 원소 삭제 (리스트 트리밍)
LTRIM tasks 0 99 # 처음 100개만 유지, 나머지 삭제
내부 구조 — quicklist
quicklist의 구조
quicklist
├── head → [listpack: a, b, c] ←→ [listpack: d, e, f] ←→ [listpack: g, h] ← tail
├── count: 8 (전체 원소 수)
├── len: 3 (listpack 노드 수)
└── compress: 0 (압축 깊이)
설정
# 각 listpack 노드의 최대 크기
# 양수: 최대 원소 수
# 음수: 최대 바이트 크기
# -1: 4KB, -2: 8KB(기본), -3: 16KB, -4: 32KB, -5: 64KB
list-max-listpack-size -2
# 중간 노드 LZF 압축 (양쪽 끝은 압축하지 않음)
# 0: 압축 안 함(기본)
# 1: 양쪽 각 1개 노드 제외하고 압축
# 2: 양쪽 각 2개 노드 제외하고 압축
list-compress-depth 0
왜 quicklist인가?
| 자료구조 | LPUSH/RPOP | LINDEX | 메모리 | 비고 |
|---|---|---|---|---|
| 연결 리스트 | O(1) | O(N) | 포인터 오버헤드 큼 | — |
| 배열 | O(1)/O(N) | O(1) | 최소 | 삽입 시 이동 |
| quicklist | O(1) | O(N) | 중간 | 양쪽의 장점 |
quicklist는 listpack의 메모리 효율과 연결 리스트의 양방향 O(1) 접근을 결합합니다.
BRPOP — 블로킹 연산
BRPOP(Blocking RPOP)은 리스트가 비어있을 때 원소가 추가될 때까지 블로킹하며 대기합니다.
기본 사용법
# 타임아웃 30초로 블로킹 대기
BRPOP tasks 30
# 리스트에 원소가 있으면 즉시 반환
# 없으면 최대 30초 대기 후 nil
# 타임아웃 0: 무한 대기
BRPOP tasks 0
# 여러 리스트 동시 대기 (첫 번째로 원소가 나온 리스트에서 꺼냄)
BRPOP queue:high queue:medium queue:low 0
# 우선순위 큐 패턴
블로킹 동작 흐름
워커 A: BRPOP tasks 0 → 대기 중...
워커 B: BRPOP tasks 0 → 대기 중...
프로듀서: LPUSH tasks "job1"
→ 워커 A가 "job1"을 받음 (FIFO 순서로 대기 중인 첫 워커)
프로듀서: LPUSH tasks "job2"
→ 워커 B가 "job2"을 받음
BLPOP vs BRPOP
# BLPOP: 왼쪽(Head)에서 블로킹 꺼내기
BLPOP tasks 0 # FIFO에서 가장 오래된 것
# BRPOP: 오른쪽(Tail)에서 블로킹 꺼내기
BRPOP tasks 0 # FIFO에서 가장 최근 것
# 큐(FIFO): LPUSH + BRPOP
# 스택(LIFO): LPUSH + BLPOP
LPOS — 값 검색 (6.0.6+)
RPUSH colors "red" "blue" "green" "blue" "red"
# 첫 번째 "blue"의 인덱스
LPOS colors "blue"
# (integer) 1
# 두 번째 "blue"의 인덱스
LPOS colors "blue" RANK 2
# (integer) 3
# 뒤에서부터 검색
LPOS colors "blue" RANK -1
# (integer) 3
# 모든 "blue"의 인덱스
LPOS colors "blue" COUNT 0
# 1) (integer) 1
# 2) (integer) 3
# 최대 N개까지만 검색
LPOS colors "red" COUNT 1
# 1) (integer) 0
# 검색 범위 제한 (최대 100개 원소만 탐색)
LPOS colors "blue" MAXLEN 100
LMOVE — 원자적 이동 (6.2+)
# 소스 리스트에서 꺼내서 대상 리스트에 넣기
LMOVE source destination LEFT RIGHT
# source의 왼쪽에서 꺼내서 destination의 오른쪽에 삽입
# 블로킹 버전
BLMOVE source destination LEFT RIGHT 30
# 같은 리스트에서 회전 (원형 큐)
LMOVE tasks tasks LEFT RIGHT
# Head 원소를 Tail로 이동
LMOVE는 이전의 RPOPLPUSH를 대체하는 명령어입니다.
메시지 큐 패턴
기본 큐 패턴
프로듀서 Redis 컨슈머
│ │ │
│── LPUSH tasks ────→ │ │
│ │ ←── BRPOP tasks ───│
│ │────→ "job data" ───│
# 프로듀서
def produce(task_data):
r.lpush('tasks', json.dumps(task_data))
# 컨슈머
def consume():
while True:
_, data = r.brpop('tasks', timeout=0)
task = json.loads(data)
process(task)
안정적인 큐 — 처리 중 리스트 패턴
기본 큐의 문제점: BRPOP 후 처리 중 장애가 발생하면 메시지가 유실됩니다.
프로듀서 → [tasks] → LMOVE → [processing] → 처리 완료 → LREM
↑
장애 시 복구 가능
def reliable_consume():
while True:
# tasks에서 꺼내서 processing에 보관
data = r.blmove('tasks', 'processing', 0, 'LEFT', 'RIGHT')
task = json.loads(data)
try:
process(task)
# 처리 완료 후 processing에서 삭제
r.lrem('processing', 1, data)
except Exception:
# 장애 시 processing에 남아있으므로 나중에 재처리 가능
log_error(task)
def recover_stuck_tasks():
"""주기적으로 processing에 남은 작업을 다시 tasks로 이동"""
stuck = r.lrange('processing', 0, -1)
for task_data in stuck:
r.lmove('processing', 'tasks', 'LEFT', 'RIGHT')
한계점
Redis List 기반 큐는 간단하지만 다음과 같은 한계가 있습니다.
| 기능 | List 큐 | Redis Streams | 전용 MQ (Kafka 등) |
|---|---|---|---|
| 소비자 그룹 | X | O | O |
| 메시지 ACK | 수동 구현 | 내장 | 내장 |
| 메시지 재전송 | 수동 구현 | XPENDING | 내장 |
| 메시지 영속성 | RDB/AOF | RDB/AOF | 디스크 기반 |
| 복잡도 | 낮음 | 중간 | 높음 |
간단한 작업 큐에는 List가 적합하지만, 메시지 손실이 허용되지 않는 시스템에서는 Redis Streams나 전용 메시지 큐를 사용하는 것이 좋습니다.
실전 패턴
패턴 1: 최근 N개 항목 유지
def add_recent_activity(user_id, activity):
key = f"recent:{user_id}"
pipe = r.pipeline()
pipe.lpush(key, json.dumps(activity))
pipe.ltrim(key, 0, 49) # 최근 50개만 유지
pipe.expire(key, 86400) # 24시간 TTL
pipe.execute()
def get_recent_activities(user_id, count=10):
return r.lrange(f"recent:{user_id}", 0, count - 1)
패턴 2: 우선순위 큐
def enqueue(priority, task):
"""우선순위별 큐에 삽입"""
r.lpush(f"queue:{priority}", json.dumps(task))
def dequeue(timeout=0):
"""높은 우선순위부터 소비"""
result = r.brpop(
['queue:critical', 'queue:high', 'queue:normal', 'queue:low'],
timeout=timeout
)
if result:
queue_name, data = result
return json.loads(data)
return None
패턴 3: 로그 버퍼
def buffer_log(service, message):
key = f"log:{service}"
pipe = r.pipeline()
pipe.lpush(key, f"{time.time()}|{message}")
pipe.ltrim(key, 0, 9999) # 최대 10000건
pipe.execute()
def flush_logs(service, batch_size=100):
"""로그를 배치로 꺼내서 영구 저장소에 기록"""
key = f"log:{service}"
logs = []
for _ in range(batch_size):
log = r.rpop(key)
if log is None:
break
logs.append(log)
# 영구 저장소(DB, S3 등)에 배치 저장
save_to_storage(logs)
정리
Redis List는 단순하지만 강력한 자료구조입니다.
- 내부적으로 quicklist(listpack의 이중 연결 리스트)로 구현되어 양쪽 끝 O(1) 연산을 제공합니다
- BRPOP/BLPOP으로 블로킹 큐를 구현할 수 있습니다
- LMOVE로 원자적 이동을 통해 메시지 유실을 방지할 수 있습니다
- 메시지 큐로 사용할 때는 처리 중 리스트 패턴을 적용하고, 더 복잡한 요구사항에는 Redis Streams를 고려해야 합니다
LTRIM으로 리스트 크기를 제한하여 메모리를 관리할 수 있습니다
댓글 로딩 중...