🗄️ 데이터베이스

MongoDB

The Developer Data Platform

MongoDB는 가장 인기 있는 문서 지향(Document-Oriented) NoSQL 데이터베이스입니다. JSON 형태의 BSON 문서를 저장하며, 스키마리스(Schema-less) 특성으로 유연한 데이터 모델링이 가능합니다. 레플리카셋과 샤딩으로 고가용성과 수평 확장을 지원합니다.

📖 상세 설명

문서 지향 데이터 모델 - MongoDB는 테이블/행 대신 컬렉션/문서 구조를 사용합니다. 문서는 JSON과 유사한 BSON(Binary JSON) 형태로 저장되며, 중첩 객체와 배열을 자연스럽게 표현합니다. 관계형 DB에서 여러 테이블로 분산될 데이터를 하나의 문서에 임베딩하여 JOIN 없이 조회할 수 있습니다.

스키마 유연성 - 컬렉션에 미리 스키마를 정의할 필요가 없습니다. 같은 컬렉션 내 문서가 서로 다른 필드를 가질 수 있어, 빠르게 변화하는 요구사항에 대응하기 좋습니다. MongoDB 3.6부터는 JSON Schema로 선택적 스키마 검증도 가능합니다.

레플리카셋(Replica Set) - MongoDB의 고가용성 솔루션입니다. Primary 1대와 Secondary 여러 대로 구성되며, Primary 장애 시 자동 페일오버가 발생합니다. Secondary에서 읽기 부하를 분산하고, 데이터 복제로 내구성을 보장합니다.

샤딩(Sharding) - 대용량 데이터를 여러 서버에 분산 저장하는 수평 확장 방식입니다. Shard Key를 기준으로 데이터가 분할되며, mongos 라우터가 쿼리를 적절한 샤드로 전달합니다. 페타바이트급 데이터 처리가 가능합니다.

Aggregation Pipeline - 데이터 변환과 분석을 위한 강력한 프레임워크입니다. $match, $group, $sort, $lookup(JOIN) 등의 스테이지를 연결하여 복잡한 데이터 처리가 가능합니다. Map-Reduce보다 성능이 우수하며, 대부분의 분석 작업에 권장됩니다.

💻 코드 예제

# MongoDB 연결 및 CRUD - Python + pymongo
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.errors import DuplicateKeyError, BulkWriteError
from bson import ObjectId
from datetime import datetime

# 연결 설정 (레플리카셋)
client = MongoClient(
    "mongodb://localhost:27017,localhost:27018,localhost:27019",
    replicaSet="rs0",
    readPreference="secondaryPreferred"  # 읽기는 Secondary 우선
)

db = client["myapp"]
products = db["products"]


# 문서 삽입
def create_product(product_data: dict):
    """문서 삽입 - 자동 _id 생성"""
    product = {
        "name": product_data["name"],
        "price": product_data["price"],
        "category": product_data["category"],
        "specs": product_data.get("specs", {}),  # 중첩 객체
        "tags": product_data.get("tags", []),    # 배열
        "created_at": datetime.utcnow(),
        "updated_at": datetime.utcnow()
    }
    result = products.insert_one(product)
    print(f"✅ 삽입됨: {result.inserted_id}")
    return result.inserted_id


# 문서 조회 (다양한 쿼리)
def find_products():
    """다양한 쿼리 패턴"""

    # 기본 조회
    product = products.find_one({"name": "MacBook Pro"})

    # 비교 연산자
    expensive = products.find({"price": {"$gte": 1000, "$lte": 2000}})

    # 배열 검색 - 태그에 'flagship' 포함
    flagships = products.find({"tags": "flagship"})

    # 중첩 객체 필드 검색 (점 표기법)
    apple_products = products.find({"specs.brand": "Apple"})

    # 정규식 검색
    iphones = products.find({"name": {"$regex": "iPhone", "$options": "i"}})

    # 프로젝션 (필드 선택)
    names_only = products.find(
        {"category": "smartphone"},
        {"name": 1, "price": 1, "_id": 0}  # 1=포함, 0=제외
    )

    # 정렬, 제한, 스킵
    recent = products.find().sort("created_at", DESCENDING).limit(10).skip(0)

    return list(recent)


# 문서 업데이트
def update_product(product_id: str, updates: dict):
    """문서 업데이트 - 다양한 연산자"""

    result = products.update_one(
        {"_id": ObjectId(product_id)},
        {
            "$set": {
                "price": updates.get("price"),
                "updated_at": datetime.utcnow()
            },
            "$push": {"tags": "sale"},           # 배열에 추가
            "$inc": {"view_count": 1},           # 숫자 증가
            "$addToSet": {"categories": "new"}   # 중복 없이 추가
        }
    )
    print(f"수정됨: {result.modified_count}개")


# Aggregation Pipeline
def get_category_stats():
    """Aggregation으로 카테고리별 통계"""

    pipeline = [
        # 1. 필터링
        {"$match": {"price": {"$gt": 0}}},

        # 2. 그룹핑 및 집계
        {"$group": {
            "_id": "$category",
            "count": {"$sum": 1},
            "avg_price": {"$avg": "$price"},
            "max_price": {"$max": "$price"},
            "products": {"$push": "$name"}  # 배열로 수집
        }},

        # 3. 정렬
        {"$sort": {"count": -1}},

        # 4. 결과 형태 변환
        {"$project": {
            "category": "$_id",
            "count": 1,
            "avg_price": {"$round": ["$avg_price", 2]},
            "max_price": 1,
            "top_3_products": {"$slice": ["$products", 3]},
            "_id": 0
        }}
    ]

    return list(products.aggregate(pipeline))


# $lookup으로 JOIN
def get_products_with_reviews():
    """$lookup으로 컬렉션 조인"""

    pipeline = [
        {"$lookup": {
            "from": "reviews",              # 조인할 컬렉션
            "localField": "_id",            # products의 필드
            "foreignField": "product_id",   # reviews의 필드
            "as": "reviews"                 # 결과 필드명
        }},
        {"$addFields": {
            "review_count": {"$size": "$reviews"},
            "avg_rating": {"$avg": "$reviews.rating"}
        }},
        {"$project": {
            "reviews": 0  # 상세 리뷰는 제외
        }}
    ]

    return list(products.aggregate(pipeline))


# 트랜잭션 (MongoDB 4.0+)
def transfer_with_transaction(from_account: str, to_account: str, amount: float):
    """Multi-document 트랜잭션"""

    accounts = db["accounts"]

    with client.start_session() as session:
        with session.start_transaction():
            # 출금
            accounts.update_one(
                {"_id": from_account, "balance": {"$gte": amount}},
                {"$inc": {"balance": -amount}},
                session=session
            )

            # 입금
            accounts.update_one(
                {"_id": to_account},
                {"$inc": {"balance": amount}},
                session=session
            )

            # 트랜잭션 커밋 (자동)
            print(f"✅ 이체 완료: {from_account} → {to_account}, {amount}")


# 인덱스 생성
def create_indexes():
    """인덱스 생성 - 쿼리 성능 최적화"""

    # 단일 필드 인덱스
    products.create_index("name")

    # 복합 인덱스
    products.create_index([("category", ASCENDING), ("price", DESCENDING)])

    # 고유 인덱스
    products.create_index("sku", unique=True)

    # 텍스트 인덱스 (전문검색)
    products.create_index([("name", "text"), ("description", "text")])

    # TTL 인덱스 (자동 삭제)
    db["sessions"].create_index("created_at", expireAfterSeconds=3600)

    print("✅ 인덱스 생성 완료")


if __name__ == "__main__":
    # 상품 생성
    product_id = create_product({
        "name": "iPhone 15 Pro",
        "price": 1299,
        "category": "smartphone",
        "specs": {"brand": "Apple", "storage": "256GB"},
        "tags": ["flagship", "5G"]
    })

    # 통계 조회
    stats = get_category_stats()
    print(f"카테고리 통계: {stats}")
// MongoDB 연결 및 CRUD - Node.js + mongodb
const { MongoClient, ObjectId } = require('mongodb');

// 연결 설정
const uri = 'mongodb://localhost:27017';
const client = new MongoClient(uri, {
    maxPoolSize: 50,
    wtimeoutMS: 2500,
});

let db, products;

async function connect() {
    await client.connect();
    db = client.db('myapp');
    products = db.collection('products');
    console.log('✅ MongoDB 연결됨');
}

// 문서 삽입
async function createProduct(productData) {
    const product = {
        ...productData,
        createdAt: new Date(),
        updatedAt: new Date(),
    };

    const result = await products.insertOne(product);
    console.log(`삽입됨: ${result.insertedId}`);
    return result.insertedId;
}

// 배치 삽입
async function bulkInsert(items) {
    const operations = items.map(item => ({
        updateOne: {
            filter: { sku: item.sku },
            update: { $set: item, $setOnInsert: { createdAt: new Date() } },
            upsert: true
        }
    }));

    const result = await products.bulkWrite(operations);
    console.log(`삽입: ${result.upsertedCount}, 수정: ${result.modifiedCount}`);
}

// 문서 조회
async function findProducts(filters = {}) {
    const cursor = products.find(filters)
        .sort({ createdAt: -1 })
        .limit(20)
        .project({ name: 1, price: 1, category: 1 });

    return await cursor.toArray();
}

// Aggregation Pipeline
async function getCategoryStats() {
    const pipeline = [
        { $match: { price: { $gt: 0 } } },
        {
            $group: {
                _id: '$category',
                count: { $sum: 1 },
                avgPrice: { $avg: '$price' },
                maxPrice: { $max: '$price' },
                products: { $push: '$name' }
            }
        },
        { $sort: { count: -1 } },
        {
            $project: {
                category: '$_id',
                count: 1,
                avgPrice: { $round: ['$avgPrice', 2] },
                top3: { $slice: ['$products', 3] },
                _id: 0
            }
        }
    ];

    return await products.aggregate(pipeline).toArray();
}

// $lookup (JOIN)
async function getProductsWithReviews() {
    const pipeline = [
        {
            $lookup: {
                from: 'reviews',
                localField: '_id',
                foreignField: 'productId',
                as: 'reviews'
            }
        },
        {
            $addFields: {
                reviewCount: { $size: '$reviews' },
                avgRating: { $avg: '$reviews.rating' }
            }
        },
        {
            $project: { reviews: 0 }
        }
    ];

    return await products.aggregate(pipeline).toArray();
}

// 트랜잭션 (MongoDB 4.0+)
async function transferWithTransaction(fromId, toId, amount) {
    const session = client.startSession();
    const accounts = db.collection('accounts');

    try {
        session.startTransaction();

        // 출금
        const fromResult = await accounts.updateOne(
            { _id: fromId, balance: { $gte: amount } },
            { $inc: { balance: -amount } },
            { session }
        );

        if (fromResult.modifiedCount === 0) {
            throw new Error('잔액 부족');
        }

        // 입금
        await accounts.updateOne(
            { _id: toId },
            { $inc: { balance: amount } },
            { session }
        );

        await session.commitTransaction();
        console.log(`✅ 이체 완료: ${fromId} → ${toId}, ${amount}`);

    } catch (error) {
        await session.abortTransaction();
        console.error(`❌ 이체 실패: ${error.message}`);
        throw error;

    } finally {
        session.endSession();
    }
}

// Change Streams (실시간 변경 감지)
async function watchChanges() {
    const changeStream = products.watch([
        { $match: { operationType: { $in: ['insert', 'update'] } } }
    ]);

    changeStream.on('change', (change) => {
        console.log('변경 감지:', change.operationType, change.documentKey);
    });
}

// 인덱스 생성
async function createIndexes() {
    await products.createIndex({ name: 1 });
    await products.createIndex({ category: 1, price: -1 });
    await products.createIndex({ name: 'text', description: 'text' });
    console.log('✅ 인덱스 생성 완료');
}

// 실행
(async () => {
    try {
        await connect();
        await createIndexes();

        const stats = await getCategoryStats();
        console.log('카테고리 통계:', stats);

    } finally {
        await client.close();
    }
})();
// MongoDB Shell (mongosh) 명령어

// ============================================
// 1. 기본 CRUD
// ============================================
// 데이터베이스 선택
use myapp

// 문서 삽입
db.products.insertOne({
    name: "iPhone 15 Pro",
    price: 1299,
    category: "smartphone",
    specs: { brand: "Apple", storage: "256GB" },
    tags: ["flagship", "5G"],
    createdAt: new Date()
})

// 여러 문서 삽입
db.products.insertMany([
    { name: "Galaxy S24", price: 999, category: "smartphone" },
    { name: "Pixel 8", price: 699, category: "smartphone" }
])

// 조회
db.products.find({ category: "smartphone" })
db.products.findOne({ name: "iPhone 15 Pro" })

// 조건 연산자
db.products.find({ price: { $gte: 500, $lte: 1000 } })
db.products.find({ tags: { $in: ["flagship", "premium"] } })
db.products.find({ "specs.brand": "Apple" })

// 프로젝션
db.products.find({}, { name: 1, price: 1, _id: 0 })

// 정렬, 제한
db.products.find().sort({ price: -1 }).limit(5)


// ============================================
// 2. 업데이트
// ============================================
// 단일 문서 업데이트
db.products.updateOne(
    { name: "iPhone 15 Pro" },
    {
        $set: { price: 1199 },
        $push: { tags: "sale" },
        $inc: { viewCount: 1 }
    }
)

// 여러 문서 업데이트
db.products.updateMany(
    { category: "smartphone" },
    { $set: { inStock: true } }
)

// Upsert (없으면 삽입)
db.products.updateOne(
    { sku: "IPHONE15PRO" },
    { $set: { name: "iPhone 15 Pro", price: 1299 } },
    { upsert: true }
)


// ============================================
// 3. Aggregation Pipeline
// ============================================
// 카테고리별 통계
db.products.aggregate([
    { $match: { price: { $gt: 0 } } },
    { $group: {
        _id: "$category",
        count: { $sum: 1 },
        avgPrice: { $avg: "$price" },
        maxPrice: { $max: "$price" }
    }},
    { $sort: { count: -1 } }
])

// $lookup (JOIN)
db.products.aggregate([
    { $lookup: {
        from: "reviews",
        localField: "_id",
        foreignField: "productId",
        as: "reviews"
    }},
    { $addFields: {
        reviewCount: { $size: "$reviews" },
        avgRating: { $avg: "$reviews.rating" }
    }}
])

// $facet (다중 파이프라인)
db.products.aggregate([
    { $facet: {
        byCategory: [
            { $group: { _id: "$category", count: { $sum: 1 } } }
        ],
        priceRanges: [
            { $bucket: {
                groupBy: "$price",
                boundaries: [0, 500, 1000, 2000, Infinity],
                output: { count: { $sum: 1 } }
            }}
        ],
        totalCount: [
            { $count: "total" }
        ]
    }}
])


// ============================================
// 4. 인덱스
// ============================================
// 인덱스 생성
db.products.createIndex({ name: 1 })
db.products.createIndex({ category: 1, price: -1 })
db.products.createIndex({ name: "text", description: "text" })

// 고유 인덱스
db.products.createIndex({ sku: 1 }, { unique: true })

// 인덱스 확인
db.products.getIndexes()

// 쿼리 실행 계획
db.products.find({ category: "smartphone" }).explain("executionStats")


// ============================================
// 5. 관리 명령어
// ============================================
// 컬렉션 통계
db.products.stats()

// 현재 작업
db.currentOp()

// 레플리카셋 상태
rs.status()

// 샤딩 상태
sh.status()

// 사용자 생성
db.createUser({
    user: "appUser",
    pwd: "securePassword",
    roles: [{ role: "readWrite", db: "myapp" }]
})

🗣️ 실무에서 이렇게 말하세요

💬 데이터 모델링 논의에서
"상품과 리뷰 데이터를 어떻게 저장할지 고민인데요, 리뷰가 상품당 평균 10개 이하라면 상품 문서에 임베딩하는 게 좋고, 그 이상이면 별도 컬렉션으로 분리해서 $lookup으로 조인하는 게 낫습니다. 문서 크기 16MB 제한도 고려해야 해요."
💬 성능 이슈 대응에서
"explain() 결과를 보니 COLLSCAN이 발생하고 있네요. category와 createdAt에 복합 인덱스를 걸어야 합니다. 그리고 find()에 projection을 추가해서 필요한 필드만 가져오면 네트워크 비용도 줄일 수 있어요."
💬 확장성 설계 회의에서
"데이터가 TB 단위로 늘어나면 샤딩이 필요합니다. Shard Key 선택이 중요한데, user_id처럼 카디널리티가 높고 쿼리에 자주 쓰이는 필드를 선택해야 해요. 잘못 선택하면 핫스팟이 생겨서 나중에 재샤딩해야 합니다."

⚠️ 주의사항 & 베스트 프랙티스

무분별한 임베딩 금지

문서 크기는 16MB로 제한됩니다. 무한히 증가할 수 있는 배열(댓글, 로그 등)은 별도 컬렉션으로 분리하세요.

인덱스 없는 쿼리 주의

인덱스 없이 대량 데이터 조회 시 COLLSCAN이 발생합니다. explain()으로 실행 계획을 확인하고 적절한 인덱스를 생성하세요.

트랜잭션 과용 금지

MongoDB 트랜잭션은 RDBMS보다 오버헤드가 큽니다. 가능하면 단일 문서 작업으로 설계하고, 트랜잭션은 꼭 필요한 경우만 사용하세요.

MongoDB 베스트 프랙티스

적절한 데이터 모델링(임베딩 vs 참조), 인덱스 전략 수립, 레플리카셋으로 고가용성, Change Streams 활용, Atlas 사용 시 관리 부담 감소.

🔗 관련 용어

📚 더 배우기