# 六、MongoDB 复制集

如同许多关系型数据库一样,MongoDB 支持以实时或准实时的方式,将数据内容复制到另一台服务器中。

# 1. 复制目标

  • 提高可扩展性
    • 提高冗余度
    • 改善性能
  • 改善持久性/可靠性
  • 提供隔离性

# 2. 复制基础

  • 主动成员:有数据,可以投票,可能成为主服务器
  • 主服务器:负责读写
  • 辅助服务器:负责读
  • 被动成员:有数据,可以投票,不会成为主服务器
  • 仲裁成员:无数据,可以投票,不会成为主服务器

# 2.1 复制集

在 MongoDB 中,复制集由一个主节点以及许多辅助节点或仲裁节点组成。复制集需要大量的主动成员才能维护主服务器。因此最少应该有 3 个成员。通常的建议是有奇数个成员。在 MongoDB 3.0 中,复制集最多可以有 50 个被动成员和 7 个主动成员。这条规则主要是为了避免“脑裂”问题,也就是当网络出现问题时,有两台服务器称为主服务器的情况,如下图所示:

image-20211206223244525

脑裂

在 HA 集群系统中,假设有同一个整体、动作协调的节点 A 和节点 B,节点 A 和 B 之间通过 heartBeat 来检查对方的存活状态,负责协调保证整个集群服务的可用性。

正常情况下,如果节点 A 通过心跳检测不到 B 的存在的时候,就会接管 B 的资源,同理节点 B 检查不到 A 的存活状态的时候也会接管 A 的资源。如果出现网络故障,就会导致 A 和 B 同时检查不到对方的存活状态认为对方出现异常,这个时候就会导致 A 接管 B 的资源,B 也会接管 A 的资源。原来被一个节点访问的资源就会出现被多个节点同时访问的情况,这种情况就是脑裂现象。

# 2.2 主服务器

  • 复制集的数据来源;
  • 数据集中唯一用来写入的节点;

# 2.3 辅助服务器

  • 具有数据的非主服务器成员,理论上可以成为主服务器;
  • 用于读取数据;
  • 以尽可能接近于实时的方式从主服务器复制数据;
  • 默认情况下,如果连接到辅助服务器但不使用任何读取偏好,就不能执行读操作,因为复制有延迟,读到的可能是旧数据。可以使用 rs.slaveOK() 将当前连接设置为可以从辅助服务器读取数据;

# 2.4 仲裁服务器

  • 不含数据;

  • 如果复制集中的主动成员数是偶数,它就用于提供额外的主动成员。它不会投出决定性的一票或者直接决定哪个节点是主服务器,但会参与并作为主动成员中的一员,决定哪个节点成员主服务器;

  • 仲裁服务器用于帮助避免“脑裂”问题。如下图:

    image-20211206224909028

    有了站点 A 中的仲裁服务器,我们总是可以在某一边成立完成大多数服务器的投票选举。这意味着当网络出现问题时,不会出现两个主服务器。

    我们可以进一步增加冗余性,在第 3 个站点 C 中添加一个仲裁服务器。这样如果站点 A 宕机,站点 B 和 C 仍然可以成功完成投票选举。通过以这种方式使用第 3 个站点,哪怕失去任何一个站点的连接,服务器都可以正常运行。

# 3. oplog

oplog(操作日志)就是一个固定大小的集合,保存了主服务器实例对数据库做出修改的记录,目的是在辅助服务器上重做这些操作,保证数据库处于一致状态;服务器的每个成员都将维护自己的 oplog,并且辅助服务器将查询主服务器(或者通过复制链进行其他数据更新的辅助服务器)的 oplog,从而获得新条目,并应用到自己的数据副本中。

oplog 将为每个条目创建一个时间戳。通过这种方式,辅助服务器可以记录从上一次读取开始过去了多久,以及有多少 oplog 需要读取。如果终止辅助服务器并在较短的时间内重启它,它将从主服务器的 oplog 中读取在它离线的时间内所发生的所有修改。

因为具有一个无限大的 oplog 是不现实的,所以 oplog 通过具有固定的大小。

可将 oplog 看成主服务器最近活动的窗口:如果窗口太小,那么记录中的某些操作在被应用到辅助服务器之前将丢失。如果当前实例的 oplog 尚未创建,那么使用 --oplogSize 启动选项可以设置 oplog 的代下(以 MB 为单位)。在 linux 和 Window64 系统中,oplogSize 默认设置为可用磁盘空间的 5%,最小位 1GB,最大为 50GB。如果系统是写入/更新密集型的,那么可能需要增加该大小,以保证辅助服务器在合理的时间范围内处于离线状态而不会丢失数据。

例如,如果需要从辅助服务器执行日常备份,这个任务需要花费一个小时,那么 oplog 的大小将不得不被重新设置,该大小应该允许辅助服务器离线一个小时再加上一定的安全边际时间。

通过执行 db.printReplicationInfo() 命令可以得到一些 oplog 大小方面的参考,该命令将运行在主服务器上。

# 4. 复制集搭建

Docker

  • 创建复制集
  • 向复制集中添加服务器
  • 向复制集中删除服务器
  • 配置复制集
  • 向复制集中添加仲裁服务器

# 4.1 创建 docker network

docker network create mynetwork

# 4.2 启动 3 个 mongod

docker run --net mynetwork --name mongo1 -v $HOME/mymongo/data1:/data/db -p 27017:27017 -d mongo:4 --replSet myset --port 27017
docker run --net mynetwork --name mongo2 -v $HOME/mymongo/data2:/data/db -p 27018:27018 -d mongo:4 --replSet myset --port 27018
docker run --net mynetwork --name mongo3 -v $HOME/mymongo/data3:/data/db -p 27019:27019 -d mongo:4 --replSet myset --port 27019
  • --net: 指定 Docker 网络,这样不同容器才可以进行连接
  • --replSet: 声明复制集名字
  • --port: 指定 mongod 启动的端口
  • -p: 指定容器与主机的端口映射

# 4.3 创建复制集

  1. 启动 mongo1

    docker exec -it mongo1 mongo
    
  2. 使用 rs.initiate() 初始化复制集中的第一个成员,创建它的 oplog 和默认的配置文档

    rs.initiate()
    

    输出:

    {
    	"info2" : "no configuration specified. Using a default configuration for the set",
    	"me" : "2b6433ebe83c:27017",
    	"ok" : 1
    }
    

    也可以直接用 rs.initiate() 来直接将 3 个 mongod 绑定到 myset 复制集中,如下

    rs.initiate(
    	{
    		id: "myset",
    		members: [
    			{_id: 0, host: "mongo1: 27017"},
    			{_id: 1, host: "mongo2: 27018"},
    			{_id: 2, host: "mongo3: 27019"}
    		]
    	}
    )
    

    为了演示添加复制集成员,本次实验没有如此运行。

  3. 检查复制集状态

    rs.status()
    

    输出:

    {
    	"set" : "myset",
    	"heartbeatIntervalMillis" : NumberLong(2000),   # 默认心跳间隙为 2s
    	# 省略...
    	# 复制集成员
    	"members" : [
    		{
    			"_id" : 0,
    			"name" : "2b6433ebe83c:27017",
    			"health" : 1,
    			"state" : 1,
    			"stateStr" : "PRIMARY",
    			"uptime" : 535,
    		# 省略...
    		}
    	],
    	# 省略...
    }
    

    这输出表示没有问题,已经成功配置、创建和初始化一个新的复制集 myset 了。

# 4.4 向复制集中添加服务器

  1. 使用 rs.add() 命令并提供实例的主机名和端口来添加

    rs.add("mongo2:27018")
    

    输出:

    {
    	"ok" : 1,
    	"$clusterTime" : {
    		"clusterTime" : Timestamp(1638846493, 1),
    		"signature" : {
    			"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
    			"keyId" : NumberLong(0)
    		}
    	},
    	"operationTime" : Timestamp(1638846493, 1)
    }
    
  2. 再使用 rs.status() 看复制集状态

    {
    	"set" : "myset",
    	"members" : [
    		{
    			"_id" : 0,
    			"name" : "2b6433ebe83c:27017",
    			"health" : 1,
    			"state" : 1,
    			"stateStr" : "PRIMARY",
    		},
    		{
    			"_id" : 1,
    			"name" : "mongo2:27018",
    			"health" : 1,
    			"state" : 2,
    			"stateStr" : "SECONDARY",
    		}
    	],
    }
    
    • PRIMARY: 主服务器
    • SECONDARY: 辅助服务器
  3. 再添加第三个 mongod

    rs.add("mongo3:27019")
    

    输出同上。

# 4.5 向复制集中移除服务器

这里我们使用 rs.remove("hostname:port") 将 mongo3 移除

rs.remove("mongo3:27019")

# 4.6 配置复制集

  1. 使用 rs.conf() 获取复制集配置文档

    conf = rs.conf()
    

    输出:

    {
    	"_id" : "myset",
    	"version" : 4,
    	"term" : 1,
    	"protocolVersion" : NumberLong(1),
    	"writeConcernMajorityJournalDefault" : true,
    	"members" : [
    		{
    			"_id" : 0,
    			"host" : "2b6433ebe83c:27017",
    			"arbiterOnly" : false,
    			"buildIndexes" : true,
    			"hidden" : false,
    			"priority" : 1,
    			"tags" : {
    
    			},
    			"slaveDelay" : NumberLong(0),
    			"votes" : 1
    		},
    		{
    			"_id" : 1,
    			"host" : "mongo2:27018",
    			"arbiterOnly" : false,
    			"buildIndexes" : true,
    			"hidden" : false,
    			"priority" : 1,
    			"tags" : {
    
    			},
    			"slaveDelay" : NumberLong(0),
    			"votes" : 1
    		}
    	],
    	"settings" : {
    		"chainingAllowed" : true,
    		"heartbeatIntervalMillis" : 2000,
    		"heartbeatTimeoutSecs" : 10,
    		"electionTimeoutMillis" : 10000,
    		"catchUpTimeoutMillis" : -1,
    		"catchUpTakeoverDelayMillis" : 30000,
    		"getLastErrorModes" : {
    
    		},
    		"getLastErrorDefaults" : {
    			"w" : 1,
    			"wtimeout" : 0
    		},
    		"replicaSetId" : ObjectId("61aece806b7c62feba73e982")
    	}
    }
    

    memebers 就是复制集成员的详细信息了。

  2. 我们来修改 members[1], 将其设置为隐藏(hidden: true),并且优先级为 0(priority: 0),这样它就不会被选举为主服务器了,运行

    conf.members[1].hidden = true
    
    conf.members[1].priority = 0
    
  3. 使用 rs.reconfig(conf) 刷新配置

    rs.reconfig(conf)
    

# 4.7 向复制集中添加仲裁服务器

  1. 使用 rs.addArb("hostname:port") 来添加仲裁服务器

    rs.addArb("mongo3:27019")
    

    输出:

    {
    	"ok" : 1,
    }
    
  2. 查看复制集状态

    rs.status()
    

    输出:

    {
    	"set" : "myset",
    	"myState" : 1,
    	"heartbeatIntervalMillis" : NumberLong(2000),
    	"majorityVoteCount" : 2,
    	"writeMajorityCount" : 2,
    	"votingMembersCount" : 3,
    	"writableVotingMembersCount" : 2,
    	"members" : [
    		{
    			"_id" : 0,
    			"name" : "2b6433ebe83c:27017",
    			"health" : 1,
    			"state" : 1,
    			"stateStr" : "PRIMARY",
    		},
    		{
    			"_id" : 1,
    			"name" : "mongo2:27018",
    			"health" : 1,
    			"state" : 2,
    			"stateStr" : "SECONDARY",
    		},
    		{
    			"_id" : 2,
    			"name" : "mongo3:27019",
    			"health" : 1,
    			"state" : 7,
    			"stateStr" : "ARBITER",
    		}
    	],
    }
    
    • ARBITER: 仲裁服务器

# 5. 复制集命令

命令 描述
rs.help() 返回命令列表
rs.status() 返回复制集当前的状态信息。该命令列出了每个成员服务器及其状态信息,包括最后联系时间。该调用可被用于提供整个集群的简单健康检查。
rs.initiate() 使用默认参数初始化复制集。
rs.initiate(replSetCfg) 使用配置描述初始化复制集。
rs.add("host:port") 使用含有主机名和特定端口(可选)的简单字符串向复制集中添加成员服务器。
rs.add(membercfg) 使用配置描述向复制集中添加成员服务器。如果希望指定特定的属性(如设置新成员服务器的优先级),那么必须使用这种方法。
rs.addArb("host:port") 添加新的成员服务器作为仲裁者。该成员不需要使用 --replSet 选项:任何运行在可达机器上的 mongod 实例都可以执行该任务。注意该服务器必须对复制集中的所有成员可达。
rs.remove("host:port") 从复制集中删除指定成员。
rs.stepDown() 在复制集的主服务器成员中使用该命令时,将使主服务器放弃它的角色,并且在集群中重新选举新的主服务器。注意只有主动辅助服务器可用作主服务器的候选,并且在 60 秒(默认)之内如果没有出现其它可用的成员,那么原有的主服务器将重新成为主服务器。
rs.syncFrom("host:port") 使辅助服务器从指定的成员同步数据,可用于组成复制链。
rs.freeze(secs) 冻结指定的成员,并使它在指定秒数内无法成为主服务器。
rs.slaveOk() 通过该选项,可以允许从辅助服务器读取数据,默认是不允许的。
rs.conf() 重新显示当前副本集的配置结构。改配置结构可以被修改,然后用作 rs.reconfig() 的参数,从而修改结构的配置。
rs.reconfig(conf) 使用 conf 配置来刷新配置。
db.isMaster() 该函数不只可作用于复制集:它是一个通用的复制支持函数。通过它,应用或驱动可以判断出被连接的特定实例在复制拓扑结构中是否是主服务器。

# 6. 复制集数据同步原理

参考:https://www.cnblogs.com/purpleraintear/p/6035111.html

# 6.1 initial_sync

当一个节点刚加入集群时,它需要初始化数据使得 自身与集群中其它节点的数据量差距尽量少,这个过程称为 initial-sync

一个 initial-sync 包括六步:

① 删除本地除 local 库以外的所有 db;

② 选取一个源节点,将源节点中的所有 db 导入到本地(注意,此处只导入数据,不导入索引);

③ 将 ② 开始执行到执行结束中源产生的 oplog 应用到本地;

④ 将 ③ 开始执行到执行结束中源产生的 oplog 应用到本地;

⑤ 从源将所有 table 的索引在本地重建(导入索引);

⑥ 将 ⑤ 开始执行到执行结束中源产生的 oplog 应用到本地;

⑦ 当第 ⑥ 步结束后,源和本地的差距足够小,MongoDB 进入 Secondary(从节点)状态。

第 ② 步要拷贝所有数据,因此一般第 ② 步消耗时间最长,第 ③ 与第 ④ 步是一个连续逼近的过程,MongoDB 这里做了两次是因为第② 步一般耗时太长,导致第 ③ 步数据量变多,间接受到影响。

img

# 6.2 steady_sync

当节点初始化完成后,会进入 steady-sync 状态,顾名思义,正常情况下,这是一个稳定静默运行于后台的,从复制源不断同步新 oplog 的过程。该过程一般会出现这两种问题:

  1. 复制源写入过快(或者相对的,本地写入速度过慢),复制源的 oplog 覆盖了本地用于同步源 oplog 而维持在源的游标。

  2. 本节点在宕机之前是 Primary,在重启后本地 oplog 有和当前 Primary 不一致的 oplog。

    这两种情况分别如下图所示:

    img

img

这两种情况在 bgsync.cpp:_produce 函数中,虽然这两种情况很不一样,但是最终都会进入 bgsync.cpp:_rollback函数处理,

对于第二种情况,处理过程在 rs_rollback.cpp 中,具体步骤为:

  1. 维持本地与远程的两个反向游标,以线性的时间复杂度找到 LCA(最近公共祖先,上 conflict.png 中为 Record4)

    该过程与经典的两个有序链表找公共节点的过程类似,具体实现在 roll_back_local_operations.cpp:syncRollBackLocalOperations 中,读者可以自行思考这一过程如何以线性时间复杂度实现。

  2. 针对本地每个冲突的 oplog,枚举该 oplog 的类型,推断出回滚该 oplog 需要的逆操作并记录,如下:

    1. create_table -> drop_table;
    2. drop_table -> 重新同步该表;
    3. drop_index -> 重新同步并构建索引;
    4. drop_db -> 放弃 rollback,改由用户手工 init_resync;
    5. apply_ops -> 针对 apply_ops 中的每一条子 oplog,递归执行 2 )这一过程;
    6. create_index -> drop_index
    7. 普通文档的 CUD 操作 -> 从 primary 重新读取真实值并替换。

    相关函数为:rs_rollback.cpp:refetch

  3. 针对 2)中分析出的每条 oplog 的处理方式,执行处理,相关函数为 rs_rollback.cpp:syncFixUp,此处操作主要是对步骤 2)的实践,实际处理过程相当繁琐。

  4. truncate 掉本地冲突的 oplog。

    上面我们说到,对于本地失速(stale)的情况,也是走 _rollback 流程统一处理的,对于失速,走 _rollback 时会在找 LCA 这步失败,之后会尝试更换复制源。

    方法为:从当前存活的所有 secondary 和 primary 节点中找一个使自己“不处于失速”的节点。

    这里有必要解释一下,oplog 是一个有限大小的 ring-buffer,失速的唯一判断条件为:本地维护在复制源的游标被复制源的写覆盖(想象一下你和同学同时开始绕着操场跑步,当你被同学超过一圈时,你和同学相遇了)。因此如果某些节点的 oplog 设置的比较大,绕完一圈的时间就更长,利用这样的节点作为复制源,失速的可能性会更小。

    img

steady-sync 的线程模型与 oplog 指令乱序加速

与 steady-sync 相关的代码有 bgsync.cpp, sync_tail.cpp。

上面我们介绍过,steady-sync 过程从复制源读取新产生的 oplog,并应用到本地,这样的过程脱不离是一个 producer-consumer 模型。

由于 oplog 需要保证顺序性,producer 只能单线程实现。对于 consumer 端,是否有并发提速机制呢?

  1. 首先,不相干的文档之间无需保证 oplog apply 的顺序,因此可以对 oplog 按照 objid 哈希分组。每一组内必须保证严格的写入顺序性。

    572 void fillWriterVectors(OperationContext* txn,
    573                        MultiApplier::Operations* ops,
    574                        std::vector<MultiApplier::OperationPtrs>* writerVectors) {
    581     for (auto&& op : *ops) {
    582         StringMapTraits::HashedKey hashedNs(op.ns);
    583         uint32_t hash = hashedNs.hash();
    584
    585         // For doc locking engines, include the _id of the document in the hash so we get
    586         // parallelism even if all writes are to a single collection. We can't do this for capped
    587         // collections because the order of inserts is a guaranteed property, unlike for normal
    588         // collections.
    589         if (supportsDocLocking && op.isCrudOpType() && !isCapped(txn, hashedNs)) {
    590             BSONElement id = op.getIdElement();
    591             const size_t idHash = BSONElement::Hasher()(id);
    592             MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
    593         }
    601         auto& writer = (*writerVectors)[hash % numWriters];
    602         if (writer.empty())
    603             writer.reserve(8);  // skip a few growth rounds.
    604         writer.push_back(&op);
    605     }
    606 }
    
  2. 其次对于 command 命令,会对表或者库有全局性的影响,因此 command 命令必须在当前的 consumer 完成工作之后单独处理,而且在处理 command oplog 时,不能有其他命令同时执行。这里可以类比 SMP 体系结构下的 cpu-memory-barrior

    899     // Check for ops that must be processed one at a time.
    900     if (entry.raw.isEmpty() ||       // sentinel that network queue is drained.
    901         (entry.opType[0] == 'c') ||  // commands.
    902         // Index builds are achieved through the use of an insert op, not a command op.
    903         // The following line is the same as what the insert code uses to detect an index build.
    904         (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) {
    905         if (ops->getCount() == 1) {
    906             // apply commands one-at-a-time
    907             _networkQueue->consume(txn);
    908         } else {
    909             // This op must be processed alone, but we already had ops in the queue so we can't
    910             // include it in this batch. Since we didn't call consume(), we'll see this again next
    911             // time and process it alone.
    912             ops->pop_back();
    913         }
    
  3. 从库和主库的 oplog 顺序必须完全一致,因此不管 1、2 步写入用户数据的顺序如何,oplog 的必须保证顺序性。对于 mmap 引擎的 capped-collection,只能以顺序插入来保证,因此对 oplog 的插入是单线程进行的。对于 wiredtiger 引擎的 capped-collection,可以在 ts(时间戳字段)上加上索引,从而保证读取的顺序与插入的顺序无关。

    517     // Only doc-locking engines support parallel writes to the oplog because they are required to
    518     // ensure that oplog entries are ordered correctly, even if inserted out-of-order. Additionally,
    519     // there would be no way to take advantage of multiple threads if a storage engine doesn't
    520     // support document locking.
    521     if (!enoughToMultiThread ||
    522         !txn->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()) {
    523
    524         threadPool->schedule(makeOplogWriterForRange(0, ops.size()));
    525         return false;
    526     }
    

steady-sync 的类依赖与线程模型总结如下图:

img

# 7. 复制集选举原理

参考:https://www.cnblogs.com/purpleraintear/p/6035111.html

# 7.1 主节点降级

MongoDB 的主节点选举由心跳触发。一个复制集 N 个节点中的任意两个节点维持心跳,每个节点维护其他 N-1 个节点的状态(该状态仅是该节点的 POV,比如因为网络分区,在同一时刻 A 观察 C 处于 down 状态,B 观察 C 处于 seconary 状态)。

以任意一个节点的 POV,在每一次心跳后会企图将主节点降级(step down primary),主节点降级的理由如下:

  1. 心跳检测到有其他 primary 节点的优先级高于当前主节点,则尝试将主节点降级(stepDown) 为 secondary, primary 值的动态变更提供给了运维一个可以热变更主节点的方式;
  2. 本节点若是主节点,但是无法 ping 通集群中超过半数的节点(majority原则),则将自身降级为 secondary。

# 7.2 选举主节点

secondary 节点检测到当前集群没有存活的主节点,则尝试将自身选举为 primary。主节点选举是一个 二阶段过程+多数派协议

第一阶段

以自身 POV,检测自身是否有被选举的资格:

  1. 能 ping 通集群的过半数节点;
  2. priority 必须大于 0;
  3. 不能是 arbitor 节点(仲裁服务器);

如果检测通过,向集群中所有存活节点发送 FreshnessCheck(询问其他节点关于“我”是否有被选举的资格)

同僚仲裁

选举第一阶段中,某节点收到其他节点的选举请求后,会执行更严格的同僚仲裁:

  1. 集群中有其他节点的 primary 比发起者高;
  2. 不能是 arbitor 节点;
  3. primary 必须大于 0;
  4. 以冲裁者的 POV,发起者的 oplog 必须是集群存活节点中 oplog 最新的(可以有相等的情况,大家都是最新的);

第二阶段

发起者向集群中存活节点发送 Elect 请求,仲裁者收到请求的节点会执行一系列合法性检查,如果检查通过,则仲裁者给发起者投一票,并获得 30 秒钟“选举锁”,选举锁的作用是:在持有锁的时间内不得给其他发起者投票

发起者如果或者超过半数的投票,则选举通过,自身成为 primary 节点。获得低于半数选票的原因,除了常见的网络问题外,相同优先级的节点同时通过第一阶段的同僚仲裁并进入第二阶段也是一个原因。因此,当选票不足时,会 sleep[0,1] 秒内的随机时间,之后再次尝试选举。

上次更新: 12/8/2021, 5:20:28 PM