📊
데이터공학
MapReduce
맵리듀스
분산 데이터 처리 프로그래밍 모델. Map과 Reduce 함수로 병렬 처리.
맵리듀스
분산 데이터 처리 프로그래밍 모델. Map과 Reduce 함수로 병렬 처리.
MapReduce는 Google에서 발표한 분산 데이터 처리 프로그래밍 모델로, 대규모 데이터셋을 여러 컴퓨터에서 병렬로 처리할 수 있게 합니다. Map과 Reduce 두 단계로 구성되며, Hadoop의 핵심 처리 엔진으로 사용됩니다.
| 단계 | 입력 | 출력 | 설명 |
|---|---|---|---|
| Map | (key1, value1) | list(key2, value2) | 데이터 변환 및 필터링 |
| Shuffle | list(key2, value2) | (key2, list(value2)) | 키별 그룹화 및 정렬 |
| Reduce | (key2, list(value2)) | (key3, value3) | 집계 및 결과 생성 |
| 특성 | MapReduce | Spark |
|---|---|---|
| 처리 방식 | 디스크 기반 | 인메모리 |
| 속도 | 느림 (디스크 I/O) | 빠름 (10~100배) |
| 반복 처리 | 비효율적 | 효율적 |
| API | 저수준 | 고수준 (DataFrame, SQL) |
| 스트리밍 | 미지원 | 지원 |
# mapper.py - Map 단계
#!/usr/bin/env python3
"""
WordCount Mapper
입력: 텍스트 라인
출력: (단어, 1) 쌍
"""
import sys
import re
def mapper():
for line in sys.stdin:
# 소문자 변환 및 특수문자 제거
line = line.strip().lower()
words = re.findall(r'\b[a-z]+\b', line)
for word in words:
# 탭으로 구분된 키-값 출력
print(f"{word}\t1")
if __name__ == "__main__":
mapper()
# reducer.py - Reduce 단계
#!/usr/bin/env python3
"""
WordCount Reducer
입력: 정렬된 (단어, 1) 쌍
출력: (단어, 총 카운트)
"""
import sys
def reducer():
current_word = None
current_count = 0
for line in sys.stdin:
line = line.strip()
try:
word, count = line.split('\t')
count = int(count)
except ValueError:
continue
# 같은 단어면 카운트 누적
if current_word == word:
current_count += count
else:
# 이전 단어 결과 출력
if current_word:
print(f"{current_word}\t{current_count}")
current_word = word
current_count = count
# 마지막 단어 출력
if current_word:
print(f"{current_word}\t{current_count}")
if __name__ == "__main__":
reducer()
# Hadoop Streaming으로 MapReduce 작업 실행
# 입력 데이터 HDFS에 업로드
hdfs dfs -mkdir -p /input/wordcount
hdfs dfs -put sample_text.txt /input/wordcount/
# MapReduce 작업 실행
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-mapper "python3 mapper.py" \
-reducer "python3 reducer.py" \
-input /input/wordcount/ \
-output /output/wordcount
# 결과 확인
hdfs dfs -cat /output/wordcount/part-00000 | head -20
# 로컬 테스트 (Unix 파이프라인)
cat sample_text.txt | python3 mapper.py | sort | python3 reducer.py
// Java MapReduce WordCount
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
// Mapper 클래스
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 단어 토큰화
StringTokenizer itr = new StringTokenizer(
value.toString().toLowerCase()
);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().replaceAll("[^a-z]", ""));
if (word.getLength() > 0) {
context.write(word, one);
}
}
}
}
// Combiner/Reducer 클래스
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class); // 로컬 집계
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
# MRJob을 사용한 Python MapReduce
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
class WordCount(MRJob):
"""MRJob으로 구현한 WordCount"""
# 정규식 패턴
WORD_RE = re.compile(r"[a-z]+")
def mapper(self, _, line):
"""Map: 라인을 단어로 분할"""
for word in self.WORD_RE.findall(line.lower()):
yield word, 1
def combiner(self, word, counts):
"""Combiner: 로컬 집계 (선택적)"""
yield word, sum(counts)
def reducer(self, word, counts):
"""Reduce: 최종 집계"""
yield word, sum(counts)
class TopNWords(MRJob):
"""상위 N개 단어 찾기 (다단계 MapReduce)"""
def steps(self):
return [
MRStep(mapper=self.mapper_get_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(reducer=self.reducer_find_top_n)
]
def mapper_get_words(self, _, line):
for word in re.findall(r'[a-z]+', line.lower()):
yield word, 1
def combiner_count_words(self, word, counts):
yield word, sum(counts)
def reducer_count_words(self, word, counts):
# 모든 결과를 하나의 Reducer로
yield None, (sum(counts), word)
def reducer_find_top_n(self, _, word_count_pairs):
# 상위 10개 추출
top_n = sorted(word_count_pairs, reverse=True)[:10]
for count, word in top_n:
yield word, count
if __name__ == '__main__':
WordCount.run()
# 실행 방법
# 로컬: python wordcount.py input.txt
# Hadoop: python wordcount.py -r hadoop hdfs:///input/text.txt