📊 데이터공학

MapReduce

맵리듀스

분산 데이터 처리 프로그래밍 모델. Map과 Reduce 함수로 병렬 처리.

상세 설명

MapReduce는 Google에서 발표한 분산 데이터 처리 프로그래밍 모델로, 대규모 데이터셋을 여러 컴퓨터에서 병렬로 처리할 수 있게 합니다. Map과 Reduce 두 단계로 구성되며, Hadoop의 핵심 처리 엔진으로 사용됩니다.

MapReduce 처리 단계

  • Input Split: 입력 데이터를 청크로 분할
  • Map: 각 청크를 처리하여 키-값 쌍 생성
  • Shuffle & Sort: 같은 키의 값들을 그룹화
  • Reduce: 그룹화된 값들을 집계하여 최종 결과 생성
  • Output: 결과를 파일 시스템에 저장

MapReduce 동작 원리

단계입력출력설명
Map(key1, value1)list(key2, value2)데이터 변환 및 필터링
Shufflelist(key2, value2)(key2, list(value2))키별 그룹화 및 정렬
Reduce(key2, list(value2))(key3, value3)집계 및 결과 생성

MapReduce vs Spark

특성MapReduceSpark
처리 방식디스크 기반인메모리
속도느림 (디스크 I/O)빠름 (10~100배)
반복 처리비효율적효율적
API저수준고수준 (DataFrame, SQL)
스트리밍미지원지원

코드 예제

Python WordCount (Hadoop Streaming)

# mapper.py - Map 단계
#!/usr/bin/env python3
"""
WordCount Mapper
입력: 텍스트 라인
출력: (단어, 1) 쌍
"""
import sys
import re

def mapper():
    for line in sys.stdin:
        # 소문자 변환 및 특수문자 제거
        line = line.strip().lower()
        words = re.findall(r'\b[a-z]+\b', line)

        for word in words:
            # 탭으로 구분된 키-값 출력
            print(f"{word}\t1")

if __name__ == "__main__":
    mapper()
# reducer.py - Reduce 단계
#!/usr/bin/env python3
"""
WordCount Reducer
입력: 정렬된 (단어, 1) 쌍
출력: (단어, 총 카운트)
"""
import sys

def reducer():
    current_word = None
    current_count = 0

    for line in sys.stdin:
        line = line.strip()
        try:
            word, count = line.split('\t')
            count = int(count)
        except ValueError:
            continue

        # 같은 단어면 카운트 누적
        if current_word == word:
            current_count += count
        else:
            # 이전 단어 결과 출력
            if current_word:
                print(f"{current_word}\t{current_count}")
            current_word = word
            current_count = count

    # 마지막 단어 출력
    if current_word:
        print(f"{current_word}\t{current_count}")

if __name__ == "__main__":
    reducer()

Hadoop Streaming 실행

# Hadoop Streaming으로 MapReduce 작업 실행

# 입력 데이터 HDFS에 업로드
hdfs dfs -mkdir -p /input/wordcount
hdfs dfs -put sample_text.txt /input/wordcount/

# MapReduce 작업 실행
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
    -files mapper.py,reducer.py \
    -mapper "python3 mapper.py" \
    -reducer "python3 reducer.py" \
    -input /input/wordcount/ \
    -output /output/wordcount

# 결과 확인
hdfs dfs -cat /output/wordcount/part-00000 | head -20

# 로컬 테스트 (Unix 파이프라인)
cat sample_text.txt | python3 mapper.py | sort | python3 reducer.py

Java MapReduce (전통적 방식)

// Java MapReduce WordCount
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    // Mapper 클래스
    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            // 단어 토큰화
            StringTokenizer itr = new StringTokenizer(
                value.toString().toLowerCase()
            );
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken().replaceAll("[^a-z]", ""));
                if (word.getLength() > 0) {
                    context.write(word, one);
                }
            }
        }
    }

    // Combiner/Reducer 클래스
    public static class IntSumReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");

        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);  // 로컬 집계
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

MRJob (Python 래퍼 라이브러리)

# MRJob을 사용한 Python MapReduce
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

class WordCount(MRJob):
    """MRJob으로 구현한 WordCount"""

    # 정규식 패턴
    WORD_RE = re.compile(r"[a-z]+")

    def mapper(self, _, line):
        """Map: 라인을 단어로 분할"""
        for word in self.WORD_RE.findall(line.lower()):
            yield word, 1

    def combiner(self, word, counts):
        """Combiner: 로컬 집계 (선택적)"""
        yield word, sum(counts)

    def reducer(self, word, counts):
        """Reduce: 최종 집계"""
        yield word, sum(counts)


class TopNWords(MRJob):
    """상위 N개 단어 찾기 (다단계 MapReduce)"""

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_top_n)
        ]

    def mapper_get_words(self, _, line):
        for word in re.findall(r'[a-z]+', line.lower()):
            yield word, 1

    def combiner_count_words(self, word, counts):
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        # 모든 결과를 하나의 Reducer로
        yield None, (sum(counts), word)

    def reducer_find_top_n(self, _, word_count_pairs):
        # 상위 10개 추출
        top_n = sorted(word_count_pairs, reverse=True)[:10]
        for count, word in top_n:
            yield word, count


if __name__ == '__main__':
    WordCount.run()

# 실행 방법
# 로컬: python wordcount.py input.txt
# Hadoop: python wordcount.py -r hadoop hdfs:///input/text.txt

실무 대화 예시

주니어 개발자: "대용량 로그 분석해야 하는데 MapReduce로 해야 하나요?"
시니어 개발자: "요즘은 Spark를 더 많이 써. MapReduce는 매 단계마다 디스크에 쓰기 때문에 느리거든. Spark는 인메모리라 반복 작업에 훨씬 빨라."
주니어 개발자: "그럼 MapReduce는 이제 안 쓰나요?"
시니어 개발자: "개념은 알아야 해. Spark도 내부적으로 Map/Reduce 같은 변환을 하거든. 그리고 레거시 시스템이나 Hive 쿼리 실행 엔진으로 아직 쓰이기도 해."
주니어 개발자: "Map과 Reduce 사이의 Shuffle이 병목이라던데요?"
시니어 개발자: "맞아. Shuffle은 네트워크 전송이 많아서 느려. Combiner로 Map 단계에서 미리 로컬 집계하면 Shuffle 데이터량을 줄일 수 있어."

주의사항

성능 최적화

  • Combiner 사용으로 Shuffle 데이터량 감소
  • 적절한 파티션 수 설정 (너무 많으면 오버헤드)
  • 데이터 스큐(Skew) 방지 - 키 분포 확인

MapReduce 한계

  • 반복 알고리즘(ML)에 비효율적 - 매번 디스크 I/O
  • 실시간 처리 불가 - 배치 전용
  • 복잡한 DAG 워크플로우 구현 어려움

개발 시 주의점

  • Reducer 출력 키 타입 = Map 출력 키 타입 확인
  • Reducer에서 Iterator는 한 번만 순회 가능
  • 대용량 데이터는 로컬 테스트 후 클러스터 실행

관련 용어

더 배우기