# 四、MongoDB 聚合
聚合操作将来自多个文档的值组合在一起,并且可以对分组数据执行各种操作以返回单个结果。
三种聚合方法:
- 聚合管道
- map-reduce function
- 单一目的聚合方法
# 1. 聚合方法
# 1.1 聚合管道
db.collection.aggregate(pipeline, options)
聚合管道是以数据处理管道的概念为蓝本的。文档进入多阶段管道,将文档转换为聚合结果。例如:
db.orders.aggregate([
{ $match: { status: "A" } },
{ $group: { _id: "$cust_id", total: { $sum: "$amount" } } }
])
第一阶段:$match 阶段按 status 字段过滤文档,并将 status = “A” 的文档传递给下一个阶段。
第二阶段:$group 阶段按 cust_id 字段将文档分组,以计算每个唯一值 cust_id 的金额总和。
- 管道使用 MongoDB 中的原生操作提供有效的数据聚合,是 MongoDB 中数据聚合的首选方法;
- 聚合管道可以在分片集合 sharded collection 上运行;
- 聚合管道可以使用索引来改善其某些阶段的性能。此外,聚合管道具有内部优化阶段。
# 1.2 map-reduce
MongoDB 5.0 已经废除该方法。
db.collection.mapReduce()
MongoDB 还提供 map-reduce 操作来执行聚合。通常,map-reduce 操作有四个阶段:
query 阶段: 匹配文档。
map 阶段: 处理每个文档并为每个输入文档发出一个或多个对象,以及将 map 操作的输出组合在一起。
reduce 阶段: 对哪些具有多个值的键进行收集和压缩聚合数据,然后将结果存储在集合中。
output 阶段: 输出结果。(optional)可以通过 finalize 函数来进一步压缩或处理聚合的结果。
MongoDB 中的所有 map-reduce 函数都是 JavaScript,并在 mongod 进程中运行。map-reduce 操作将单个集合的文档作为输入,并且可以在开始映射阶段之前执行任何任意排序和限制。map-reduce 可以将 map-reduce 操作的结果作为文档返回,也可以将结果写入集合。
# 1.3 单一目的聚合方法
db.collection.estimatedDocumentCount()
: 估计文档数,不加条件db.collection.count()
: 获得文档数,可加条件筛选db.collection.distinct()
: 去重,获取不同值
MongoDB 提供以上所有操作,它们可以聚合来自单个集合的文档。虽然这些操作提供了对常见聚合过程的简单访问,但它们缺乏聚合管道和 map-reduce 的灵活性和功能。
以上操作对 orders 这个集合进行聚合,使用 distinct() 函数得到了不同的 cust_id。
# 2. 聚合表达式
字段路径表达式
$<field>
使用$
来指示字段路径$<field>.<sub-field>
使用$
和.
来指示内嵌文档字段路径
系统变量表达式
$$<variable>
使用$$
来指示系统变量$$CURRENT
指示管道中当前操作的文档(游标)- $$CURRENT.info === $info
常量表达式
$literal: <value>
指示常量 value$literal: "$name"
指示常量字符串 $name,此处$
仅为字符串
# 3. 聚合管道阶段
本篇笔记内容为“聚合方法 —— 聚合管道”中的各个聚合阶段。
db.collection.aggregate(pipeline, options)
$project
对输出文档进行再次投影$match
对输出文档进行筛选$limit
筛选出管道内前 N 篇文档$skip
跳过管道内前 N 篇文档$unwind
展开输入文档中的数组字段$sort
对输入文档进行排序$lookup
多文档联合查询$group
对输入文档进行分组$out
将聚合管道中的文档写入到一个新集合中
# 3.1 $project
对输出文档进行再次投影,不会影响原文档,默认输出前 20 个文档。
先插入银行账户数据
db.accounts.insertMany([ { name: {firstName: "Alice", lastName: "Wong"}, balance: 50 }, { name: {firstName: "Bob", lastName: "Yang"}, balance: 20 } ])
对银行账户文档进行重新投影,只要 balance 和 firstName,并将 firstName 输出为 clientName:
db.accounts.aggregate([ { $project: { _id: 0, balance: 1, clientName: "$name.firstName" } } ])
输出:
{ "balance" : 50, "clientName" : "Alice" } { "balance" : 20, "clientName" : "Bob" }
尝试输出不存在的字段,会发现输出为
null
db.accounts.aggregate([ { $project: { _id: 0, balance: 1, nameArray: [ "$name.firstName", "$name.middleName", "$name.lastName" ] } } ])
输出:
{ "balance" : 50, "nameArray" : [ "Alice", null, "Wong" ] } { "balance" : 20, "nameArray" : [ "Bob", null, "Yang" ] }
# 3.2 $match
对输出文档进行筛选,语法与读取文档时的筛选语法相同。
对银行账户文档进行筛选
db.accounts.aggregate([ { $match: { "name.firstName": "Alice" } } ])
输出:
{ "_id" : ObjectId("61a8cd63961ff8653992f818"), "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50 }
增加逻辑操作符,如 $or
db.accounts.aggregate([ { $match: { $or: [ { balance: { $gt: 40, $lt: 80 }}, { "name.lastName": "Yang" } ] } } ])
输出:
{ "_id" : ObjectId("61a8cd63961ff8653992f818"), "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50 } { "_id" : ObjectId("61a8cd63961ff8653992f819"), "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20 }
结合 $project
db.accounts.aggregate([ { $match: { $or: [ { balance: { $gt: 40, $lt: 80 }}, { "name.lastName": "Yang"} ] }, }, { $project: { _id: 0, } } ])
输出:
{ "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50 } { "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20 }
# 3.3 $limit
筛选出管道内前 N 篇文档。
只要第一条记录
db.accounts.aggregate([ { $match: { $or: [ { balance: { $gt: 40, $lt: 80 }}, { "name.lastName": "Yang"} ] }, }, { $project: { _id: 0, } }, { $limit: 1 } ])
输出:
{ "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50 }
# 3.4 $skip
跳过管道内前 N 篇文档。
不要第一条记录
db.accounts.aggregate([ { $match: { $or: [ { balance: { $gt: 40, $lt: 80 }}, { "name.lastName": "Yang"} ] }, }, { $project: { _id: 0, } }, { $skip: 1 } ])
输出:
{ "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20 }
# 3.5 $unwind
展开输入文档中的数组字段。
向现有的银行账户文档中加入数组字段
db.accounts.update( { "name.firstName": "Alice", }, { $set: { currency: ["CNY", "USD"] } } ) db.accounts.update( { "name.firstName": "Bob", }, { $set: { currency: "GBP" } } )
查询银行账户文档
db.accounts.find({}, {_id: 0})
输出:
{ "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : [ "CNY", "USD" ] } { "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP" }
使用 $unwind 展开 currency 数组字段
db.accounts.aggregate([ { $unwind: { path: "$currency" } }, { $project: { _id: 0 } } ])
输出:数组的 currency 会被展开为一个个字符串
{ "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : "CNY" } { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : "USD" } { "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP" }
展开数组的时候,使用
includeArrayIndex
添加元素位置信息db.accounts.aggregate([ { $unwind: { path: "$currency", includeArrayIndex: "ccyIndex" } }, { $project: { _id: 0 } } ])
输出:
{ "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : "CNY", "ccyIndex" : NumberLong(0) } { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : "USD", "ccyIndex" : NumberLong(1) } { "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP", "ccyIndex" : null }
默认情况下只展开含有值的数组,可通过
preserveNullAndEmptyArrays
来保留空数组:db.accounts.aggregate([ { $unwind: { path: "$currency", preserveNullAndEmptyArrays: true } } ])
# 3.6 $sort
对输入文档进行排序。
对 balance 进行正序排序,再对 name.lastName 做反向排序
db.accounts.aggregate([ { $sort: { balance: 1, "name.lastName": -1 } }, { $project: { _id: 0 } } ])
输出:
{ "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP" } { "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : [ "CNY", "USD" ] }
# 3.7 $lookup
多文档联合查询。
使用单一字段值进行查询
$lookup: {
from: <collection to join>,
localField: <field from the input documents>,
foreignField: <field from the documents of the "from" collection>,
as: <output array field>
}
# 类似于 MySQL 的 join
在前面的基础,我们再增加一个集合来存储外汇数据
db.forex.insertMany([ { ccy: "USD", rate: 6.91, date: new Date("2021-01-01") }, { ccy: "GBP", rate: 8.91, date: new Date("2021-12-01") }, { ccy: "CNY", rate: 9.91, date: new Date("2021-11-01") } ])
将查询到的外汇汇率写入银行账户文档
db.accounts.aggregate([ { $lookup: { from: "forex", localField: "currency", foreignField: "ccy", as: "forexData" } } ])
输出:
{ "_id" : ObjectId("61a8cd63961ff8653992f818"), "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : [ "CNY", "USD" ], "forexData" : [ { "_id" : ObjectId("61a9ac4ed8df19167b7eaef9"), "ccy" : "USD", "rate" : 6.91, "date" : ISODate("2021-01-01T00:00:00Z") }, { "_id" : ObjectId("61a9ac4ed8df19167b7eaefb"), "ccy" : "CNY", "rate" : 9.91, "date" : ISODate("2021-11-01T00:00:00Z") } ] } { "_id" : ObjectId("61a8cd63961ff8653992f819"), "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP", "forexData" : [ { "_id" : ObjectId("61a9ac4ed8df19167b7eaefa"), "ccy" : "GBP", "rate" : 8.91, "date" : ISODate("2021-12-01T00:00:00Z") } ] }
使用复杂条件进行查询
$lookup: {
from: <collection to join>,
let: { <var_1>: <expression>, ..., <var_n>: <expression>},
pipeline: [<pipeline to execute on the collection to join>],
as: <output array field>
}
pipeline
: 先对 from 中的文档使用聚合阶段进行处理let
: 当对 from 中的文档使用聚合阶段进行处理时,如果需要参考管道文档中的字段,则必须使用 let 参数对字段进行声明
将特定日期外汇汇率写入银行账户文档
db.accounts.aggregate([ { $lookup: { from: "forex", pipeline: [ { $match: { date: new Date("2021-12-01") } } ], as: "forexData" } } ])
输出:注意此时两个 collection 这件并没有进行关联,称为 不相关查询。
{ "_id" : ObjectId("61a8cd63961ff8653992f818"), "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : [ "CNY", "USD" ], "forexData" : [ { "_id" : ObjectId("61a9ac4ed8df19167b7eaefa"), "ccy" : "GBP", "rate" : 8.91, "date" : ISODate("2021-12-01T00:00:00Z") } ] } { "_id" : ObjectId("61a8cd63961ff8653992f819"), "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP", "forexData" : [ { "_id" : ObjectId("61a9ac4ed8df19167b7eaefa"), "ccy" : "GBP", "rate" : 8.91, "date" : ISODate("2021-12-01T00:00:00Z") } ] }
将特定日期外汇汇率写入余额大于 40 的银行账户文档
db.accounts.aggregate([ { $lookup: { from: "forex", let: { bal: "$balance"}, pipeline: [ { $match: { $expr: { $and: [ {$eq: ["$date", new Date("2021-12-01")]}, {$gt: ["$$bal", 40]} ] } } } ], as: "forexData" } } ])
输出:
{ "_id" : ObjectId("61a8cd63961ff8653992f818"), "name" : { "firstName" : "Alice", "lastName" : "Wong" }, "balance" : 50, "currency" : [ "CNY", "USD" ], "forexData" : [ { "_id" : ObjectId("61a9ac4ed8df19167b7eaefa"), "ccy" : "GBP", "rate" : 8.91, "date" : ISODate("2021-12-01T00:00:00Z") } ] } { "_id" : ObjectId("61a8cd63961ff8653992f819"), "name" : { "firstName" : "Bob", "lastName" : "Yang" }, "balance" : 20, "currency" : "GBP", "forexData" : [ ] }
# 3.8 $group
对输入文档进行分组。
$group: {
_id: <expression>,
<field1>: {
<accumulator1>: <expression1>
},
...,
<fieldN>:{
<accumulatorN>: <expressionN>
}
}
_id
: 定义分组规则<fieldN>
: 可以使用聚合操作符来定义新字段
添加一个新集合用来存储股票交易记录
db.transactions.insertMany([ { symbol: "600519", qty: 100, price: 567.4, currency: "CNY" }, { symbol: "AMZN", qty: 1, price: 1388.4, currency: "USD" }, { symbol: "AAPL", qty: 2, price: 145.4, currency: "USD" } ])
按照交易货币 currency 来分组交易记录
db.transactions.aggregate([ { $group: { _id: "$currency" } } ])
输出:看得出不使用聚合操作符的情况下,$group 可以返回管道文档中某一字段的所有不重复的值。
{ "_id" : "USD" } { "_id" : "CNY" }
使用聚合操作符计算分组聚合值
db.transactions.aggregate([ { $group: { _id: "$currency", totalQty: { $sum: "$qty"}, totalNotional: { $sum: { $multiply: ["$price", "$qty"] }}, avgPrice: { $avg: "$price" }, count: {$sum: 1}, maxNotional: { $max: { $multiply: ["$price", "$qty"]}}, minNotional: { $min: { $multiply: ["$price", "$qty"]}} } } ])
输出:
{ "_id" : "CNY", "totalQty" : 100, "totalNotional" : 56740, "avgPrice" : 567.4, "count" : 1, "maxNotional" : 56740, "minNotional" : 56740 } { "_id" : "USD", "totalQty" : 3, "totalNotional" : 1679.2, "avgPrice" : 766.9000000000001, "count" : 2, "maxNotional" : 1388.4, "minNotional" : 290.8 }
使用聚合操作创建数组字段
db.transactions.aggregate([ { $group: { _id: "$currency", symbols: { $push: "$symbol" } } } ])
输出:
{ "_id" : "CNY", "symbols" : [ "600519" ] } { "_id" : "USD", "symbols" : [ "AMZN", "AAPL" ] }
# 3.9 $out
将聚合管道中的文档写入到一个新集合中。
将聚合管道中的文档输出到 output 集合中
db.transactions.aggregate([ { $group: { _id: "$currency" } }, { $out: "output" } ])
查看当前所有集合
show tables;
输出:
accounts forex inventory output test transactions
看 output
db.output.find()
输出:
{ "_id" : "USD" } { "_id" : "CNY" }
$out 注意点
如果 $out
指定一个已经存在的集合,它会保留之前的索引,但是会清空之前的所有数据。
如果聚合发生错误,则$out
不会执行,而是直接报错。
# 4. 聚合操作可选项
db.collection.aggregate( pipeline, options)
allowDiskUse: <boolean>
每个聚合管道阶段使用的内存不能超过 100 MB。
如果数据量较大,为了防止聚合管道阶段超出内存上限并且抛出错误,可以启用 allowDiskUse 选项。
allowDiskUse 启用之后,聚合阶段可以在内存容量不足时,将操作数据写入临时文件中。
临时文件会被写入 dbPath 下的 _tmp 文件夹,dbPath 的默认值为 /data/db
# 5. 优化
# 5.1 聚合阶段顺序优化
$project + $Match
- $match 阶段会在 $project 之前运行,这样后面就只需要处理少量文档了。
# 5.2 聚合阶段合并优化
如果两者之间没有夹杂着会改变文档数量的聚合阶段(如 $unwind 和 $match 就会改变管道中文档数量),聚合阶段可以合并。
$sort + $limit
$limit + $limit
$skip + $skip
$match + $match
$lookup + $unwind
连续排列在一起的 $lookup 和 $unwind 阶段,如果 $unwind 应用在 $lookup 阶段创建的 as 字段上,则两者可以合并。
db.accounts.aggregate([ { $lookup: { from: "forex", localField: "currency", foreignField: "ccy", as: "forexData" } }, { $unwind: "forexData" } ])