# 四、MongoDB 聚合

聚合操作将来自多个文档的值组合在一起,并且可以对分组数据执行各种操作以返回单个结果。

三种聚合方法:

  • 聚合管道
  • map-reduce function
  • 单一目的聚合方法

# 1. 聚合方法

# 1.1 聚合管道

  • db.collection.aggregate(pipeline, options)

聚合管道是以数据处理管道的概念为蓝本的。文档进入多阶段管道,将文档转换为聚合结果。例如:

db.orders.aggregate([
   { $match: { status: "A" } },
   { $group: { _id: "$cust_id", total: { $sum: "$amount" } } }
])

第一阶段:$match 阶段按 status 字段过滤文档,并将 status = “A” 的文档传递给下一个阶段。

第二阶段:$group 阶段按 cust_id 字段将文档分组,以计算每个唯一值 cust_id 的金额总和。

  • 管道使用 MongoDB 中的原生操作提供有效的数据聚合,是 MongoDB 中数据聚合的首选方法;
  • 聚合管道可以在分片集合 sharded collection 上运行;
  • 聚合管道可以使用索引来改善其某些阶段的性能。此外,聚合管道具有内部优化阶段。

# 1.2 map-reduce

MongoDB 5.0 已经废除该方法。

  • db.collection.mapReduce()

Diagram of the annotated map-reduce operation.

MongoDB 还提供 map-reduce 操作来执行聚合。通常,map-reduce 操作有四个阶段:

query 阶段: 匹配文档。

map 阶段: 处理每个文档并为每个输入文档发出一个或多个对象,以及将 map 操作的输出组合在一起。

reduce 阶段: 对哪些具有多个值的键进行收集和压缩聚合数据,然后将结果存储在集合中。

output 阶段: 输出结果。(optional)可以通过 finalize 函数来进一步压缩或处理聚合的结果。

MongoDB 中的所有 map-reduce 函数都是 JavaScript,并在 mongod 进程中运行。map-reduce 操作将单个集合的文档作为输入,并且可以在开始映射阶段之前执行任何任意排序和限制。map-reduce 可以将 map-reduce 操作的结果作为文档返回,也可以将结果写入集合。

# 1.3 单一目的聚合方法

  • db.collection.estimatedDocumentCount(): 估计文档数,不加条件
  • db.collection.count(): 获得文档数,可加条件筛选
  • db.collection.distinct(): 去重,获取不同值

MongoDB 提供以上所有操作,它们可以聚合来自单个集合的文档。虽然这些操作提供了对常见聚合过程的简单访问,但它们缺乏聚合管道和 map-reduce 的灵活性和功能。

Diagram of the annotated distinct operation.

以上操作对 orders 这个集合进行聚合,使用 distinct() 函数得到了不同的 cust_id。

# 2. 聚合表达式

字段路径表达式

  • $<field> 使用 $ 来指示字段路径
  • $<field>.<sub-field> 使用 $. 来指示内嵌文档字段路径

系统变量表达式

  • $$<variable> 使用 $$ 来指示系统变量
  • $$CURRENT 指示管道中当前操作的文档(游标)
    • $$CURRENT.info === $info

常量表达式

  • $literal: <value> 指示常量 value
  • $literal: "$name" 指示常量字符串 $name,此处 $ 仅为字符串

# 3. 聚合管道阶段

本篇笔记内容为“聚合方法 —— 聚合管道”中的各个聚合阶段。

db.collection.aggregate(pipeline, options)

  • $project 对输出文档进行再次投影
  • $match 对输出文档进行筛选
  • $limit 筛选出管道内前 N 篇文档
  • $skip 跳过管道内前 N 篇文档
  • $unwind 展开输入文档中的数组字段
  • $sort 对输入文档进行排序
  • $lookup 多文档联合查询
  • $group 对输入文档进行分组
  • $out 将聚合管道中的文档写入到一个新集合中

# 3.1 $project

对输出文档进行再次投影,不会影响原文档,默认输出前 20 个文档。

  1. 先插入银行账户数据

    db.accounts.insertMany([
      {
         name: {firstName: "Alice", lastName: "Wong"},
         balance: 50
      },
      {  
         name: {firstName: "Bob", lastName: "Yang"},
         balance: 20
      }
    ])
    
  2. 对银行账户文档进行重新投影,只要 balance 和 firstName,并将 firstName 输出为 clientName:

    db.accounts.aggregate([
    		{
    				$project: {
    					_id: 0,
    					balance: 1,
    					clientName: "$name.firstName"
    				}
    		}
    ])
    

    输出:

    { "balance" : 50, "clientName" : "Alice" }
    { "balance" : 20, "clientName" : "Bob" }
    
  3. 尝试输出不存在的字段,会发现输出为 null

    db.accounts.aggregate([
       {
          $project: {
            _id: 0,
            balance: 1,
            nameArray: [
              "$name.firstName",
              "$name.middleName",
              "$name.lastName"
            ]
          }
       }
    ])
    

    输出:

    { "balance" : 50, "nameArray" : [ "Alice", null, "Wong" ] }
    { "balance" : 20, "nameArray" : [ "Bob", null, "Yang" ] }
    

# 3.2 $match

对输出文档进行筛选,语法与读取文档时的筛选语法相同。

  1. 对银行账户文档进行筛选

    db.accounts.aggregate([
    	{
    		$match: {
    			"name.firstName": "Alice"
    		}
    	}
    ])
    

    输出:

    { "_id" : ObjectId("61a8cd63961ff8653992f818"), "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50 }
    
  2. 增加逻辑操作符,如 $or

    db.accounts.aggregate([
    	{
    		$match: {
    			$or: [
    				{	balance: {	$gt: 40, $lt: 80	}},
    				{	"name.lastName": "Yang"	}
    			]
    		}
    	}
    ])
    

    输出:

    { "_id" : ObjectId("61a8cd63961ff8653992f818"), "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50 }
    { "_id" : ObjectId("61a8cd63961ff8653992f819"), "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20 }
    
  3. 结合 $project

    db.accounts.aggregate([
    	{
    		$match: {
    			$or: [
    				{ balance: {	$gt: 40,  $lt: 80	}},
    				{ "name.lastName": "Yang"}
    			]
    		},
    	},
    	{
        $project: {
        	_id: 0,
        }
    	}
    ])
    

    输出:

    { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50 }
    { "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20 }
    

# 3.3 $limit

筛选出管道内前 N 篇文档。

  1. 只要第一条记录

    db.accounts.aggregate([
    	{
    		$match: {
    			$or: [
    				{ balance: {	$gt: 40,  $lt: 80	}},
    				{ "name.lastName": "Yang"}
    			]
    		},
    	},
    	{
        $project: {
        	_id: 0,
        }
    	},
    	{
    		$limit: 1
    	}
    ])
    

    输出:

    { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50 }
    

# 3.4 $skip

跳过管道内前 N 篇文档。

  1. 不要第一条记录

    db.accounts.aggregate([
    	{
    		$match: {
    			$or: [
    				{ balance: {	$gt: 40,  $lt: 80	}},
    				{ "name.lastName": "Yang"}
    			]
    		},
    	},
    	{
        $project: {
        	_id: 0,
        }
    	},
    	{
    		$skip: 1
    	}
    ])
    

    输出:

    { "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20 }
    

# 3.5 $unwind

展开输入文档中的数组字段。

  1. 向现有的银行账户文档中加入数组字段

    db.accounts.update(
    	{
    		"name.firstName": "Alice",
    	},
    	{
    		$set: {
    			currency: ["CNY", "USD"]
    		}
    	}
    )
    
    db.accounts.update(
    	{
    		"name.firstName": "Bob",
    	},
    	{
    		$set: {
    			currency: "GBP"
    		}
    	}
    )
    
  2. 查询银行账户文档

    db.accounts.find({}, {_id: 0})
    

    输出:

    { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : [ "CNY", "USD" ] }
    { "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP" }
    
  3. 使用 $unwind 展开 currency 数组字段

    db.accounts.aggregate([
    	{
    			$unwind: {
    				path: "$currency"
    			}
    	},
    	{
    		$project: {
    			_id: 0
    		}
    	}
    ])
    

    输出:数组的 currency 会被展开为一个个字符串

    { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : "CNY" }
    { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : "USD" }
    { "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP" }
    
  4. 展开数组的时候,使用 includeArrayIndex 添加元素位置信息

    db.accounts.aggregate([
    	{
    		$unwind: {
    			path: "$currency",
    			includeArrayIndex: "ccyIndex"
    		}
    	},
    	{
    		$project: {
    			_id: 0
    		}
    	}
    ])
    

    输出:

    { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : "CNY", "ccyIndex" : NumberLong(0) }
    { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : "USD", "ccyIndex" : NumberLong(1) }
    { "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP", "ccyIndex" : null }
    
  5. 默认情况下只展开含有值的数组,可通过 preserveNullAndEmptyArrays 来保留空数组:

    db.accounts.aggregate([
    	{
    		$unwind: {
    			path: "$currency",
    			preserveNullAndEmptyArrays: true
    		}
    	}
    ])
    

# 3.6 $sort

对输入文档进行排序。

  1. 对 balance 进行正序排序,再对 name.lastName 做反向排序

    db.accounts.aggregate([
    	{
    		$sort: {
    			balance: 1,
    			"name.lastName": -1
    		}
    	},
    	{
    		$project: {
    			_id: 0
    		}
    	}
    ])
    

    输出:

    { "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP" }
    { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : [ "CNY", "USD" ] }
    

# 3.7 $lookup

多文档联合查询。

使用单一字段值进行查询

$lookup: {
	from: <collection to join>,
	localField: <field from the input documents>,
	foreignField: <field from the documents of the "from" collection>,
	as: <output array field>
}

# 类似于 MySQL 的 join
  1. 在前面的基础,我们再增加一个集合来存储外汇数据

    db.forex.insertMany([
    	{
    		ccy: "USD",
    		rate: 6.91,
    		date: new Date("2021-01-01")
    	},
    	{
    		ccy: "GBP",
    		rate: 8.91,
    		date: new Date("2021-12-01")
    	},
    	{
    		ccy: "CNY",
    		rate: 9.91,
    		date: new Date("2021-11-01")
    	}
    ])
    
  2. 将查询到的外汇汇率写入银行账户文档

    db.accounts.aggregate([
    	{
    		$lookup: {
    			from: "forex",
    			localField: "currency",
    			foreignField: "ccy",
    			as: "forexData"
    		}
    	}
    ])
    

    输出:

    { "_id" : ObjectId("61a8cd63961ff8653992f818"), "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : [ "CNY", "USD" ], "forexData" : [ { "_id" : ObjectId("61a9ac4ed8df19167b7eaef9"), "ccy" : "USD", "rate" : 6.91, "date" : ISODate("2021-01-01T00:00:00Z") }, { "_id" : ObjectId("61a9ac4ed8df19167b7eaefb"), "ccy" : "CNY", "rate" : 9.91, "date" : ISODate("2021-11-01T00:00:00Z") } ] }
    { "_id" : ObjectId("61a8cd63961ff8653992f819"), "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP", "forexData" : [ { "_id" : ObjectId("61a9ac4ed8df19167b7eaefa"), "ccy" : "GBP", "rate" : 8.91, "date" : ISODate("2021-12-01T00:00:00Z") } ] }
    

使用复杂条件进行查询

$lookup: {
	from: <collection to join>,
	let: { <var_1>: <expression>, ..., <var_n>: <expression>},
	pipeline: [<pipeline to execute on the collection to join>],
	as: <output array field>
}
  • pipeline : 先对 from 中的文档使用聚合阶段进行处理
  • let : 当对 from 中的文档使用聚合阶段进行处理时,如果需要参考管道文档中的字段,则必须使用 let 参数对字段进行声明
  1. 将特定日期外汇汇率写入银行账户文档

    db.accounts.aggregate([
    	{
    		$lookup: {
    			from: "forex",
    			pipeline: [
    				{
    					$match: {
    						date: new Date("2021-12-01")
    					}
    				}
    			],
    			as: "forexData"
    		}
    	}
    ])
    

    输出:注意此时两个 collection 这件并没有进行关联,称为 不相关查询

    { "_id" : ObjectId("61a8cd63961ff8653992f818"), "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : [ "CNY", "USD" ], "forexData" : [ { "_id" : ObjectId("61a9ac4ed8df19167b7eaefa"), "ccy" : "GBP", "rate" : 8.91, "date" : ISODate("2021-12-01T00:00:00Z") } ] }
    { "_id" : ObjectId("61a8cd63961ff8653992f819"), "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP", "forexData" : [ { "_id" : ObjectId("61a9ac4ed8df19167b7eaefa"), "ccy" : "GBP", "rate" : 8.91, "date" : ISODate("2021-12-01T00:00:00Z") } ] }
    
  2. 将特定日期外汇汇率写入余额大于 40 的银行账户文档

    db.accounts.aggregate([
    	{
    			$lookup: {
            from: "forex",
            let: { bal: "$balance"},
            pipeline: [
              { $match: 
                  {	$expr: 
                      {	$and:
                        [
                          {$eq: ["$date", new Date("2021-12-01")]},
                          {$gt: ["$$bal", 40]}
                        ]
                      }
                  }
               }
            ],
            as: "forexData"
          }
    	}
    ])
    

    输出:

    { "_id" : ObjectId("61a8cd63961ff8653992f818"), "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : [ "CNY", "USD" ], "forexData" : [ { "_id" : ObjectId("61a9ac4ed8df19167b7eaefa"), "ccy" : "GBP", "rate" : 8.91, "date" : ISODate("2021-12-01T00:00:00Z") } ] }
    { "_id" : ObjectId("61a8cd63961ff8653992f819"), "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP", "forexData" : [ ] }
    

# 3.8 $group

对输入文档进行分组。

$group: {
	_id: <expression>,
	<field1>: {
		<accumulator1>: <expression1>
	},
	...,
	<fieldN>:{
		<accumulatorN>: <expressionN>
	}
}
  • _id: 定义分组规则
  • <fieldN>: 可以使用聚合操作符来定义新字段
  1. 添加一个新集合用来存储股票交易记录

    db.transactions.insertMany([
    	{
    		symbol: "600519",
    		qty: 100,
    		price: 567.4,
    		currency: "CNY"
    	},
    	{
    		symbol: "AMZN",
    		qty: 1,
    		price: 1388.4,
    		currency: "USD"
    	},
    	{
    		symbol: "AAPL",
    		qty: 2,
    		price: 145.4,
    		currency: "USD"
    	}
    ])
    
  2. 按照交易货币 currency 来分组交易记录

    db.transactions.aggregate([
    	{
    		$group: {
    			_id: "$currency"
    		}
    	}
    ])
    

    输出:看得出不使用聚合操作符的情况下,$group 可以返回管道文档中某一字段的所有不重复的值。

    { "_id" : "USD" }
    { "_id" : "CNY" }
    
  3. 使用聚合操作符计算分组聚合值

    db.transactions.aggregate([
    	{
    		$group: {
    			_id: "$currency",
    			totalQty: { $sum: "$qty"},
    			totalNotional: { $sum: { $multiply: ["$price", "$qty"] }},
    			avgPrice: { $avg: "$price" },
    			count: {$sum: 1},
    			maxNotional: { $max: { $multiply: ["$price", "$qty"]}},
    			minNotional: { $min: { $multiply: ["$price", "$qty"]}}
    		}
    	}
    ])
    

    输出:

    { "_id" : "CNY", "totalQty" : 100, "totalNotional" : 56740, "avgPrice" : 567.4, "count" : 1, "maxNotional" : 56740, "minNotional" : 56740 }
    { "_id" : "USD", "totalQty" : 3, "totalNotional" : 1679.2, "avgPrice" : 766.9000000000001, "count" : 2, "maxNotional" : 1388.4, "minNotional" : 290.8 }
    
  4. 使用聚合操作创建数组字段

    db.transactions.aggregate([
    	{
    		$group: {
    			_id: "$currency",
    			symbols: { $push: "$symbol"	}
    		}
    	}
    ])
    

    输出:

    { "_id" : "CNY", "symbols" : [ "600519" ] }
    { "_id" : "USD", "symbols" : [ "AMZN", "AAPL" ] }
    

# 3.9 $out

将聚合管道中的文档写入到一个新集合中。

  1. 将聚合管道中的文档输出到 output 集合中

    db.transactions.aggregate([
    	{
    		$group: {
    			_id: "$currency"
    		}
    	},
    	{
    		$out: "output"
    	}
    ])
    
  2. 查看当前所有集合

    show tables;
    

    输出:

    accounts
    forex
    inventory
    output
    test
    transactions
    
  3. 看 output

    db.output.find()
    

    输出:

    { "_id" : "USD" }
    { "_id" : "CNY" }
    

$out 注意点

如果 $out 指定一个已经存在的集合,它会保留之前的索引,但是会清空之前的所有数据。

如果聚合发生错误,则$out 不会执行,而是直接报错。

# 4. 聚合操作可选项

db.collection.aggregate( pipeline, options)

  • allowDiskUse: <boolean>

    每个聚合管道阶段使用的内存不能超过 100 MB。

    如果数据量较大,为了防止聚合管道阶段超出内存上限并且抛出错误,可以启用 allowDiskUse 选项。

    allowDiskUse 启用之后,聚合阶段可以在内存容量不足时,将操作数据写入临时文件中。

    临时文件会被写入 dbPath 下的 _tmp 文件夹,dbPath 的默认值为 /data/db

# 5. 优化

# 5.1 聚合阶段顺序优化

$project + $Match

  • $match 阶段会在 $project 之前运行,这样后面就只需要处理少量文档了。

# 5.2 聚合阶段合并优化

如果两者之间没有夹杂着会改变文档数量的聚合阶段(如 $unwind 和 $match 就会改变管道中文档数量),聚合阶段可以合并。

  • $sort + $limit

  • $limit + $limit

  • $skip + $skip

  • $match + $match

  • $lookup + $unwind

    连续排列在一起的 $lookup 和 $unwind 阶段,如果 $unwind 应用在 $lookup 阶段创建的 as 字段上,则两者可以合并。

    db.accounts.aggregate([
    	{
    		$lookup: {
    			from: "forex",
    			localField: "currency",
    			foreignField: "ccy",
    			as: "forexData"
    		}
    	},
    	{
    		$unwind: "forexData"
    	}
    ])
    
上次更新: 12/3/2021, 10:36:18 PM