Hero image for MongoDB Aggregation Pipeline: Data Processing & $lookup

MongoDB Aggregation Pipeline: Data Processing & $lookup

nosql mongodb database aggregation data-processing

Prerequisites: Understanding of MongoDB CRUD. See NOSQL 02 MongoDB Basics.

The Aggregation Framework is MongoDB’s answer to SQL’s GROUP BY, JOIN, and complex analytics queries. Think of it as a data processing factory.


Part A: Pipeline Concept

1. What is a Pipeline?

Data flows through a series of stages, each transforming the data:

graph LR
    INPUT[Collection<br/>Documents] --> S1[$match<br/>Filter]
    S1 --> S2[$group<br/>Aggregate]
    S2 --> S3[$sort<br/>Order]
    S3 --> S4[$project<br/>Shape]
    S4 --> OUTPUT[Result<br/>Documents]
    
    style INPUT fill:#3498db,color:#fff
    style OUTPUT fill:#27ae60,color:#fff

Factory Analogy:

StageFactory StationAction
$matchQuality ControlFilter out defective items
$groupAssembly LineCombine parts into products
$sortSorting MachineArrange by criteria
$projectPackagingSelect what goes in the box

2. Basic Syntax

db.collection.aggregate([
  { $stage1: { ... } },
  { $stage2: { ... } },
  { $stage3: { ... } }
]);

Part B: Essential Stages

3. $match — Filter Documents (WHERE)

Filter documents early to reduce processing.

// SQL: SELECT * FROM orders WHERE status = 'completed'
db.orders.aggregate([
  { $match: { status: "completed" } }
]);

// Multiple conditions
db.orders.aggregate([
  { $match: { 
    status: "completed",
    total: { $gte: 100 }
  }}
]);

Best Practice: Always put $match first when possible — it can use indexes!

4. $project — Select Fields (SELECT)

Choose which fields to include/exclude or create new ones.

// SQL: SELECT name, email, total FROM orders
db.orders.aggregate([
  { $project: {
    name: 1,
    email: 1,
    total: 1,
    _id: 0              // Exclude _id
  }}
]);

// Create computed field
db.orders.aggregate([
  { $project: {
    name: 1,
    totalWithTax: { $multiply: ["$total", 1.1] }  // Add 10% tax
  }}
]);

[!TIP] addFieldsvsaddFields vs project: If you only want to add a field without manually listing all existing fields, use $addFields instead of $project. It preserves all original fields automatically:

// $project requires listing all fields you want to keep
{ $project: { name: 1, email: 1, total: 1, discount: { $multiply: ["$total", 0.1] } } }

// $addFields just adds/modifies, keeps everything else
{ $addFields: { discount: { $multiply: ["$total", 0.1] } } }

5. $group — Aggregate Data (GROUP BY)

Group documents and compute aggregates.

// SQL: SELECT category, SUM(amount) as total 
//      FROM sales GROUP BY category
db.sales.aggregate([
  { $group: {
    _id: "$category",           // Group by category
    totalSales: { $sum: "$amount" },
    avgSales: { $avg: "$amount" },
    count: { $sum: 1 }
  }}
]);

Common Accumulators

AccumulatorSQL EquivalentExample
$sumSUM(){ $sum: "$amount" }
$avgAVG(){ $avg: "$price" }
$minMIN(){ $min: "$date" }
$maxMAX(){ $max: "$score" }
$first(subquery){ $first: "$name" }
$last(subquery){ $last: "$timestamp" }
$push(no equivalent){ $push: "$item" }
$addToSet(no equivalent){ $addToSet: "$tag" }

Group by Multiple Fields

// SQL: GROUP BY year, month
db.sales.aggregate([
  { $group: {
    _id: {
      year: { $year: "$date" },
      month: { $month: "$date" }
    },
    monthlySales: { $sum: "$amount" }
  }}
]);

6. $sort — Order Results (ORDER BY)

// SQL: ORDER BY total DESC
db.orders.aggregate([
  { $sort: { total: -1 } }   // -1 = descending, 1 = ascending
]);

// Sort by multiple fields
db.orders.aggregate([
  { $sort: { status: 1, total: -1 } }
]);

7. limitandlimit and skip — Pagination

// SQL: LIMIT 10 OFFSET 20
db.orders.aggregate([
  { $sort: { date: -1 } },
  { $skip: 20 },
  { $limit: 10 }
]);

Part C: $lookup — The JOIN

8. $lookup Basics (LEFT JOIN)

Connect documents from different collections.

// SQL: SELECT * FROM orders LEFT JOIN customers ON orders.customerId = customers._id
db.orders.aggregate([
  { $lookup: {
    from: "customers",              // Join with this collection
    localField: "customerId",       // Field in orders
    foreignField: "_id",            // Field in customers
    as: "customerDetails"           // Output array name
  }}
]);

Output:

{
  "_id": ObjectId("order123"),
  "total": 500,
  "customerId": ObjectId("cust456"),
  "customerDetails": [                // Array with matched documents
    {
      "_id": ObjectId("cust456"),
      "name": "Alice",
      "email": "alice@example.com"
    }
  ]
}

9. lookupwithlookup with unwind

Since $lookup returns an array, use $unwind to flatten it:

db.orders.aggregate([
  { $lookup: {
    from: "customers",
    localField: "customerId",
    foreignField: "_id",
    as: "customer"
  }},
  { $unwind: "$customer" },          // Flatten the array
  { $project: {
    orderId: "$_id",
    total: 1,
    customerName: "$customer.name",  // Now accessible as object
    customerEmail: "$customer.email"
  }}
]);

10. Advanced $lookup (Pipeline)

For complex joins with conditions:

db.orders.aggregate([
  { $lookup: {
    from: "products",
    let: { orderItems: "$items" },   // Pass variables
    pipeline: [
      { $match: {
        $expr: { $in: ["$_id", "$$orderItems.productId"] }
      }},
      { $project: { name: 1, price: 1 } }
    ],
    as: "productDetails"
  }}
]);

Part D: Real-World Examples

11. Sales Report by Category

Task: Get total sales and average order value per category for 2024.

db.orders.aggregate([
  // Stage 1: Filter 2024 orders
  { $match: {
    orderDate: {
      $gte: ISODate("2024-01-01"),
      $lt: ISODate("2025-01-01")
    }
  }},
  
  // Stage 2: Unwind items array
  { $unwind: "$items" },
  
  // Stage 3: Group by category
  { $group: {
    _id: "$items.category",
    totalRevenue: { $sum: { $multiply: ["$items.price", "$items.qty"] } },
    totalOrders: { $sum: 1 },
    avgOrderValue: { $avg: { $multiply: ["$items.price", "$items.qty"] } }
  }},
  
  // Stage 4: Sort by revenue
  { $sort: { totalRevenue: -1 } },
  
  // Stage 5: Format output
  { $project: {
    category: "$_id",
    totalRevenue: { $round: ["$totalRevenue", 2] },
    totalOrders: 1,
    avgOrderValue: { $round: ["$avgOrderValue", 2] },
    _id: 0
  }}
]);

SQL Equivalent:

SELECT 
  category,
  SUM(price * qty) as totalRevenue,
  COUNT(*) as totalOrders,
  AVG(price * qty) as avgOrderValue
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE order_date BETWEEN '2024-01-01' AND '2024-12-31'
GROUP BY category
ORDER BY totalRevenue DESC;

12. Customer Order Summary with $lookup

Task: Get each customer’s total spending and order count.

db.customers.aggregate([
  // Join with orders
  { $lookup: {
    from: "orders",
    localField: "_id",
    foreignField: "customerId",
    as: "orders"
  }},
  
  // Calculate metrics
  { $project: {
    name: 1,
    email: 1,
    orderCount: { $size: "$orders" },
    totalSpent: { $sum: "$orders.total" },
    avgOrderValue: { $avg: "$orders.total" }
  }},
  
  // Filter active customers
  { $match: { orderCount: { $gt: 0 } } },
  
  // Sort by spending
  { $sort: { totalSpent: -1 } },
  
  // Top 10
  { $limit: 10 }
]);

13. Time-Series Aggregation

Task: Daily order count for the past week.

db.orders.aggregate([
  { $match: {
    orderDate: { $gte: new Date(Date.now() - 7*24*60*60*1000) }
  }},
  
  { $group: {
    _id: {
      year: { $year: "$orderDate" },
      month: { $month: "$orderDate" },
      day: { $dayOfMonth: "$orderDate" }
    },
    orderCount: { $sum: 1 },
    revenue: { $sum: "$total" }
  }},
  
  { $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1 } },
  
  { $project: {
    date: {
      $dateFromParts: {
        year: "$_id.year",
        month: "$_id.month",
        day: "$_id.day"
      }
    },
    orderCount: 1,
    revenue: 1,
    _id: 0
  }}
]);

Part E: Performance Tips

14. Pipeline Optimization

graph TD
    subgraph "❌ Slow Pipeline"
        A1[$project] --> A2[$sort]
        A2 --> A3[$match]
    end
    
    subgraph "✅ Fast Pipeline"
        B1[$match] --> B2[$sort]
        B2 --> B3[$project]
    end
    
    style A3 fill:#e74c3c,color:#fff
    style B1 fill:#27ae60,color:#fff

Optimization Rules

RuleReason
Put $match firstUses indexes, reduces documents
Put $project lastKeeps data for processing
Limit $lookup resultsAvoid loading huge arrays
Use allowDiskUse: trueFor large datasets

[!WARNING] Index Usage Limitation: MongoDB can only use indexes on the first stage of the pipeline (specifically $match and $sort). Once data passes through $group, $project, or $unwind, subsequent stages cannot use the original collection’s indexes — the data is already transformed in memory.

// Allow disk use for large aggregations
db.orders.aggregate([
  { $group: { _id: "$category", total: { $sum: "$amount" } } }
], { allowDiskUse: true });

15. Stage-to-SQL Mapping Reference

MongoDB StageSQL Clause
$matchWHERE / HAVING
$projectSELECT
$groupGROUP BY
$sortORDER BY
$limitLIMIT
$skipOFFSET
$lookupLEFT JOIN
$unwind(unnest array)
$countCOUNT(*)
$outSELECT INTO

Summary

Pipeline Stage Cheat Sheet

StagePurposeExample
$matchFilter{ status: "active" }
$projectShape{ name: 1, total: 1 }
$groupAggregate{ _id: "$category", sum: { $sum: "$price" } }
$sortOrder{ date: -1 }
$limitCap results10
$lookupJoin{ from: "users", ... }
$unwindFlatten array"$items"

Key Takeaways

  1. Pipeline = Factory Line — data flows through stages
  2. $match first — filter early for performance
  3. $lookup = LEFT JOIN — connect collections
  4. $unwind — flatten arrays for processing
  5. Use indexesmatchandmatch and sort can use indexes

💡 Practice Questions

Conceptual

  1. What is the MongoDB aggregation pipeline? How is it different from find()?

  2. Explain the ESR principle in ordering pipeline stages. Why does putting $match first improve performance?

  3. What does $lookup do and how is it similar to SQL JOIN?

  4. When would you use $unwind and what does it do?

Hands-on

// Write an aggregation pipeline to:
// 1. Filter orders from 2024
// 2. Group by customer email
// 3. Calculate total spent and order count per customer
// 4. Sort by total spent descending
// 5. Return top 5 customers
💡 View Answer
db.orders.aggregate([
  { $match: { orderDate: { $gte: ISODate("2024-01-01"), $lt: ISODate("2025-01-01") } } },
  { $group: { _id: "$customerEmail", totalSpent: { $sum: "$total" }, orderCount: { $sum: 1 } } },
  { $sort: { totalSpent: -1 } },
  { $limit: 5 },
  { $project: { email: "$_id", totalSpent: 1, orderCount: 1, _id: 0 } }
]);

Scenario

  1. Performance: An aggregation pipeline is slow. The first stage is $project, then $match. What’s wrong and how would you fix it?
💡 View Answer

Problem: $project as the first stage forces MongoDB to:

  1. Full collection scan — cannot use indexes
  2. Transform every document — creates new document structure in memory
  3. Lose index access — the following $match operates on transformed data, so it cannot use any collection indexes

Fix: Swap the order — put $match first:

// ❌ Slow: $project first
db.orders.aggregate([
  { $project: { status: 1, total: 1 } },
  { $match: { status: "completed" } }    // Cannot use index!
]);

// ✅ Fast: $match first
db.orders.aggregate([
  { $match: { status: "completed" } },   // Uses index on status
  { $project: { status: 1, total: 1 } }
]);

Rule of Thumb: Always filter ($match) before transforming ($project/$group).