MongoDB
The Developer Data Platform
MongoDB는 가장 인기 있는 문서 지향(Document-Oriented) NoSQL 데이터베이스입니다. JSON 형태의 BSON 문서를 저장하며, 스키마리스(Schema-less) 특성으로 유연한 데이터 모델링이 가능합니다. 레플리카셋과 샤딩으로 고가용성과 수평 확장을 지원합니다.
The Developer Data Platform
MongoDB는 가장 인기 있는 문서 지향(Document-Oriented) NoSQL 데이터베이스입니다. JSON 형태의 BSON 문서를 저장하며, 스키마리스(Schema-less) 특성으로 유연한 데이터 모델링이 가능합니다. 레플리카셋과 샤딩으로 고가용성과 수평 확장을 지원합니다.
문서 지향 데이터 모델 - MongoDB는 테이블/행 대신 컬렉션/문서 구조를 사용합니다. 문서는 JSON과 유사한 BSON(Binary JSON) 형태로 저장되며, 중첩 객체와 배열을 자연스럽게 표현합니다. 관계형 DB에서 여러 테이블로 분산될 데이터를 하나의 문서에 임베딩하여 JOIN 없이 조회할 수 있습니다.
스키마 유연성 - 컬렉션에 미리 스키마를 정의할 필요가 없습니다. 같은 컬렉션 내 문서가 서로 다른 필드를 가질 수 있어, 빠르게 변화하는 요구사항에 대응하기 좋습니다. MongoDB 3.6부터는 JSON Schema로 선택적 스키마 검증도 가능합니다.
레플리카셋(Replica Set) - MongoDB의 고가용성 솔루션입니다. Primary 1대와 Secondary 여러 대로 구성되며, Primary 장애 시 자동 페일오버가 발생합니다. Secondary에서 읽기 부하를 분산하고, 데이터 복제로 내구성을 보장합니다.
샤딩(Sharding) - 대용량 데이터를 여러 서버에 분산 저장하는 수평 확장 방식입니다. Shard Key를 기준으로 데이터가 분할되며, mongos 라우터가 쿼리를 적절한 샤드로 전달합니다. 페타바이트급 데이터 처리가 가능합니다.
Aggregation Pipeline - 데이터 변환과 분석을 위한 강력한 프레임워크입니다. $match, $group, $sort, $lookup(JOIN) 등의 스테이지를 연결하여 복잡한 데이터 처리가 가능합니다. Map-Reduce보다 성능이 우수하며, 대부분의 분석 작업에 권장됩니다.
# MongoDB 연결 및 CRUD - Python + pymongo
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.errors import DuplicateKeyError, BulkWriteError
from bson import ObjectId
from datetime import datetime
# 연결 설정 (레플리카셋)
client = MongoClient(
"mongodb://localhost:27017,localhost:27018,localhost:27019",
replicaSet="rs0",
readPreference="secondaryPreferred" # 읽기는 Secondary 우선
)
db = client["myapp"]
products = db["products"]
# 문서 삽입
def create_product(product_data: dict):
"""문서 삽입 - 자동 _id 생성"""
product = {
"name": product_data["name"],
"price": product_data["price"],
"category": product_data["category"],
"specs": product_data.get("specs", {}), # 중첩 객체
"tags": product_data.get("tags", []), # 배열
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
result = products.insert_one(product)
print(f"✅ 삽입됨: {result.inserted_id}")
return result.inserted_id
# 문서 조회 (다양한 쿼리)
def find_products():
"""다양한 쿼리 패턴"""
# 기본 조회
product = products.find_one({"name": "MacBook Pro"})
# 비교 연산자
expensive = products.find({"price": {"$gte": 1000, "$lte": 2000}})
# 배열 검색 - 태그에 'flagship' 포함
flagships = products.find({"tags": "flagship"})
# 중첩 객체 필드 검색 (점 표기법)
apple_products = products.find({"specs.brand": "Apple"})
# 정규식 검색
iphones = products.find({"name": {"$regex": "iPhone", "$options": "i"}})
# 프로젝션 (필드 선택)
names_only = products.find(
{"category": "smartphone"},
{"name": 1, "price": 1, "_id": 0} # 1=포함, 0=제외
)
# 정렬, 제한, 스킵
recent = products.find().sort("created_at", DESCENDING).limit(10).skip(0)
return list(recent)
# 문서 업데이트
def update_product(product_id: str, updates: dict):
"""문서 업데이트 - 다양한 연산자"""
result = products.update_one(
{"_id": ObjectId(product_id)},
{
"$set": {
"price": updates.get("price"),
"updated_at": datetime.utcnow()
},
"$push": {"tags": "sale"}, # 배열에 추가
"$inc": {"view_count": 1}, # 숫자 증가
"$addToSet": {"categories": "new"} # 중복 없이 추가
}
)
print(f"수정됨: {result.modified_count}개")
# Aggregation Pipeline
def get_category_stats():
"""Aggregation으로 카테고리별 통계"""
pipeline = [
# 1. 필터링
{"$match": {"price": {"$gt": 0}}},
# 2. 그룹핑 및 집계
{"$group": {
"_id": "$category",
"count": {"$sum": 1},
"avg_price": {"$avg": "$price"},
"max_price": {"$max": "$price"},
"products": {"$push": "$name"} # 배열로 수집
}},
# 3. 정렬
{"$sort": {"count": -1}},
# 4. 결과 형태 변환
{"$project": {
"category": "$_id",
"count": 1,
"avg_price": {"$round": ["$avg_price", 2]},
"max_price": 1,
"top_3_products": {"$slice": ["$products", 3]},
"_id": 0
}}
]
return list(products.aggregate(pipeline))
# $lookup으로 JOIN
def get_products_with_reviews():
"""$lookup으로 컬렉션 조인"""
pipeline = [
{"$lookup": {
"from": "reviews", # 조인할 컬렉션
"localField": "_id", # products의 필드
"foreignField": "product_id", # reviews의 필드
"as": "reviews" # 결과 필드명
}},
{"$addFields": {
"review_count": {"$size": "$reviews"},
"avg_rating": {"$avg": "$reviews.rating"}
}},
{"$project": {
"reviews": 0 # 상세 리뷰는 제외
}}
]
return list(products.aggregate(pipeline))
# 트랜잭션 (MongoDB 4.0+)
def transfer_with_transaction(from_account: str, to_account: str, amount: float):
"""Multi-document 트랜잭션"""
accounts = db["accounts"]
with client.start_session() as session:
with session.start_transaction():
# 출금
accounts.update_one(
{"_id": from_account, "balance": {"$gte": amount}},
{"$inc": {"balance": -amount}},
session=session
)
# 입금
accounts.update_one(
{"_id": to_account},
{"$inc": {"balance": amount}},
session=session
)
# 트랜잭션 커밋 (자동)
print(f"✅ 이체 완료: {from_account} → {to_account}, {amount}")
# 인덱스 생성
def create_indexes():
"""인덱스 생성 - 쿼리 성능 최적화"""
# 단일 필드 인덱스
products.create_index("name")
# 복합 인덱스
products.create_index([("category", ASCENDING), ("price", DESCENDING)])
# 고유 인덱스
products.create_index("sku", unique=True)
# 텍스트 인덱스 (전문검색)
products.create_index([("name", "text"), ("description", "text")])
# TTL 인덱스 (자동 삭제)
db["sessions"].create_index("created_at", expireAfterSeconds=3600)
print("✅ 인덱스 생성 완료")
if __name__ == "__main__":
# 상품 생성
product_id = create_product({
"name": "iPhone 15 Pro",
"price": 1299,
"category": "smartphone",
"specs": {"brand": "Apple", "storage": "256GB"},
"tags": ["flagship", "5G"]
})
# 통계 조회
stats = get_category_stats()
print(f"카테고리 통계: {stats}")
// MongoDB 연결 및 CRUD - Node.js + mongodb
const { MongoClient, ObjectId } = require('mongodb');
// 연결 설정
const uri = 'mongodb://localhost:27017';
const client = new MongoClient(uri, {
maxPoolSize: 50,
wtimeoutMS: 2500,
});
let db, products;
async function connect() {
await client.connect();
db = client.db('myapp');
products = db.collection('products');
console.log('✅ MongoDB 연결됨');
}
// 문서 삽입
async function createProduct(productData) {
const product = {
...productData,
createdAt: new Date(),
updatedAt: new Date(),
};
const result = await products.insertOne(product);
console.log(`삽입됨: ${result.insertedId}`);
return result.insertedId;
}
// 배치 삽입
async function bulkInsert(items) {
const operations = items.map(item => ({
updateOne: {
filter: { sku: item.sku },
update: { $set: item, $setOnInsert: { createdAt: new Date() } },
upsert: true
}
}));
const result = await products.bulkWrite(operations);
console.log(`삽입: ${result.upsertedCount}, 수정: ${result.modifiedCount}`);
}
// 문서 조회
async function findProducts(filters = {}) {
const cursor = products.find(filters)
.sort({ createdAt: -1 })
.limit(20)
.project({ name: 1, price: 1, category: 1 });
return await cursor.toArray();
}
// Aggregation Pipeline
async function getCategoryStats() {
const pipeline = [
{ $match: { price: { $gt: 0 } } },
{
$group: {
_id: '$category',
count: { $sum: 1 },
avgPrice: { $avg: '$price' },
maxPrice: { $max: '$price' },
products: { $push: '$name' }
}
},
{ $sort: { count: -1 } },
{
$project: {
category: '$_id',
count: 1,
avgPrice: { $round: ['$avgPrice', 2] },
top3: { $slice: ['$products', 3] },
_id: 0
}
}
];
return await products.aggregate(pipeline).toArray();
}
// $lookup (JOIN)
async function getProductsWithReviews() {
const pipeline = [
{
$lookup: {
from: 'reviews',
localField: '_id',
foreignField: 'productId',
as: 'reviews'
}
},
{
$addFields: {
reviewCount: { $size: '$reviews' },
avgRating: { $avg: '$reviews.rating' }
}
},
{
$project: { reviews: 0 }
}
];
return await products.aggregate(pipeline).toArray();
}
// 트랜잭션 (MongoDB 4.0+)
async function transferWithTransaction(fromId, toId, amount) {
const session = client.startSession();
const accounts = db.collection('accounts');
try {
session.startTransaction();
// 출금
const fromResult = await accounts.updateOne(
{ _id: fromId, balance: { $gte: amount } },
{ $inc: { balance: -amount } },
{ session }
);
if (fromResult.modifiedCount === 0) {
throw new Error('잔액 부족');
}
// 입금
await accounts.updateOne(
{ _id: toId },
{ $inc: { balance: amount } },
{ session }
);
await session.commitTransaction();
console.log(`✅ 이체 완료: ${fromId} → ${toId}, ${amount}`);
} catch (error) {
await session.abortTransaction();
console.error(`❌ 이체 실패: ${error.message}`);
throw error;
} finally {
session.endSession();
}
}
// Change Streams (실시간 변경 감지)
async function watchChanges() {
const changeStream = products.watch([
{ $match: { operationType: { $in: ['insert', 'update'] } } }
]);
changeStream.on('change', (change) => {
console.log('변경 감지:', change.operationType, change.documentKey);
});
}
// 인덱스 생성
async function createIndexes() {
await products.createIndex({ name: 1 });
await products.createIndex({ category: 1, price: -1 });
await products.createIndex({ name: 'text', description: 'text' });
console.log('✅ 인덱스 생성 완료');
}
// 실행
(async () => {
try {
await connect();
await createIndexes();
const stats = await getCategoryStats();
console.log('카테고리 통계:', stats);
} finally {
await client.close();
}
})();
// MongoDB Shell (mongosh) 명령어
// ============================================
// 1. 기본 CRUD
// ============================================
// 데이터베이스 선택
use myapp
// 문서 삽입
db.products.insertOne({
name: "iPhone 15 Pro",
price: 1299,
category: "smartphone",
specs: { brand: "Apple", storage: "256GB" },
tags: ["flagship", "5G"],
createdAt: new Date()
})
// 여러 문서 삽입
db.products.insertMany([
{ name: "Galaxy S24", price: 999, category: "smartphone" },
{ name: "Pixel 8", price: 699, category: "smartphone" }
])
// 조회
db.products.find({ category: "smartphone" })
db.products.findOne({ name: "iPhone 15 Pro" })
// 조건 연산자
db.products.find({ price: { $gte: 500, $lte: 1000 } })
db.products.find({ tags: { $in: ["flagship", "premium"] } })
db.products.find({ "specs.brand": "Apple" })
// 프로젝션
db.products.find({}, { name: 1, price: 1, _id: 0 })
// 정렬, 제한
db.products.find().sort({ price: -1 }).limit(5)
// ============================================
// 2. 업데이트
// ============================================
// 단일 문서 업데이트
db.products.updateOne(
{ name: "iPhone 15 Pro" },
{
$set: { price: 1199 },
$push: { tags: "sale" },
$inc: { viewCount: 1 }
}
)
// 여러 문서 업데이트
db.products.updateMany(
{ category: "smartphone" },
{ $set: { inStock: true } }
)
// Upsert (없으면 삽입)
db.products.updateOne(
{ sku: "IPHONE15PRO" },
{ $set: { name: "iPhone 15 Pro", price: 1299 } },
{ upsert: true }
)
// ============================================
// 3. Aggregation Pipeline
// ============================================
// 카테고리별 통계
db.products.aggregate([
{ $match: { price: { $gt: 0 } } },
{ $group: {
_id: "$category",
count: { $sum: 1 },
avgPrice: { $avg: "$price" },
maxPrice: { $max: "$price" }
}},
{ $sort: { count: -1 } }
])
// $lookup (JOIN)
db.products.aggregate([
{ $lookup: {
from: "reviews",
localField: "_id",
foreignField: "productId",
as: "reviews"
}},
{ $addFields: {
reviewCount: { $size: "$reviews" },
avgRating: { $avg: "$reviews.rating" }
}}
])
// $facet (다중 파이프라인)
db.products.aggregate([
{ $facet: {
byCategory: [
{ $group: { _id: "$category", count: { $sum: 1 } } }
],
priceRanges: [
{ $bucket: {
groupBy: "$price",
boundaries: [0, 500, 1000, 2000, Infinity],
output: { count: { $sum: 1 } }
}}
],
totalCount: [
{ $count: "total" }
]
}}
])
// ============================================
// 4. 인덱스
// ============================================
// 인덱스 생성
db.products.createIndex({ name: 1 })
db.products.createIndex({ category: 1, price: -1 })
db.products.createIndex({ name: "text", description: "text" })
// 고유 인덱스
db.products.createIndex({ sku: 1 }, { unique: true })
// 인덱스 확인
db.products.getIndexes()
// 쿼리 실행 계획
db.products.find({ category: "smartphone" }).explain("executionStats")
// ============================================
// 5. 관리 명령어
// ============================================
// 컬렉션 통계
db.products.stats()
// 현재 작업
db.currentOp()
// 레플리카셋 상태
rs.status()
// 샤딩 상태
sh.status()
// 사용자 생성
db.createUser({
user: "appUser",
pwd: "securePassword",
roles: [{ role: "readWrite", db: "myapp" }]
})
"상품과 리뷰 데이터를 어떻게 저장할지 고민인데요, 리뷰가 상품당 평균 10개 이하라면 상품 문서에 임베딩하는 게 좋고, 그 이상이면 별도 컬렉션으로 분리해서 $lookup으로 조인하는 게 낫습니다. 문서 크기 16MB 제한도 고려해야 해요."
"explain() 결과를 보니 COLLSCAN이 발생하고 있네요. category와 createdAt에 복합 인덱스를 걸어야 합니다. 그리고 find()에 projection을 추가해서 필요한 필드만 가져오면 네트워크 비용도 줄일 수 있어요."
"데이터가 TB 단위로 늘어나면 샤딩이 필요합니다. Shard Key 선택이 중요한데, user_id처럼 카디널리티가 높고 쿼리에 자주 쓰이는 필드를 선택해야 해요. 잘못 선택하면 핫스팟이 생겨서 나중에 재샤딩해야 합니다."
문서 크기는 16MB로 제한됩니다. 무한히 증가할 수 있는 배열(댓글, 로그 등)은 별도 컬렉션으로 분리하세요.
인덱스 없이 대량 데이터 조회 시 COLLSCAN이 발생합니다. explain()으로 실행 계획을 확인하고 적절한 인덱스를 생성하세요.
MongoDB 트랜잭션은 RDBMS보다 오버헤드가 큽니다. 가능하면 단일 문서 작업으로 설계하고, 트랜잭션은 꼭 필요한 경우만 사용하세요.
적절한 데이터 모델링(임베딩 vs 참조), 인덱스 전략 수립, 레플리카셋으로 고가용성, Change Streams 활용, Atlas 사용 시 관리 부담 감소.