高并发缓存同步 RSC方案
前言
在物联网平台开发中,我遇到了一个典型的高并发难题:百万级设备实时上报状态,如何高效写入缓存并定期同步到数据库?
传统方案要么实时写库(数据库扛不住),要么异步批量写(容易丢数据或产生冲突)。经过反复思考,我从 JVM 垃圾回收的 Survivor 机制中获得灵感,设计了 RSC(Redis Survivor Cache)方案。
核心思想很简单:让每个组件做自己最擅长的事,然后巧妙组合。
- Redis:擅长高速读写 → 接收实时更新
- Kafka:擅长解耦分发 → 分发同步任务
- MongoDB:擅长持久化 → 存储最终数据
- Survivor 机制:擅长隔离冲突 → 分离读写操作
这个方案在生产环境跑了一年多,支撑了每秒数万次设备状态更新,数据库负载降低了 90%+,同步延迟控制在分钟级。本文分享完整的设计思路和实现细节。
问题分析:传统方案的痛点
场景描述
物联网平台的设备状态管理是一个典型的高并发写场景:
- 数据量:百万级设备,每个设备有几十个属性
- 更新频率:设备每隔几秒到几分钟上报一次状态
- 读写比例:写多读少(大量设备上报,少量查询)
- 一致性要求:最终一致性即可,允许分钟级延迟
传统方案的问题
方案一:实时写库
设备上报 → 直接写 MongoDB
问题:每秒数万次写入,MongoDB 直接崩溃。
方案二:先写缓存,定时同步
设备上报 → 写 Redis → 定时任务 → 批量写 MongoDB
问题:同步任务执行期间,新数据持续写入,导致:
- 数据竞争:正在同步的数据被覆盖
- 数据丢失:同步后清空缓存,新写入的数据一起被删
- 性能下降:加锁影响写入性能
方案三:消息队列异步
设备上报 → Kafka → 消费者写 MongoDB
问题:
- 消息积压时延迟不可控
- 相同设备多次更新产生大量冗余消息
- 无法合并更新,数据库压力仍然大
问题本质
归根结底,问题在于:更新操作和同步操作在同一份数据上竞争。
如果能让它们操作不同的数据区域,问题就迎刃而解了。
灵感来源:JVM 的 Survivor 机制
JVM 新生代 GC 简介
JVM 的新生代使用了一个巧妙的设计:Eden + Survivor(S0, S1)。
新生代内存布局:
┌──────────────────────────────────────────────┐
│ Eden(新对象分配区) │
├─────────────────────┬────────────────────────┤
│ Survivor S0 │ Survivor S1 │
│ (交替使用) │ (交替使用) │
└─────────────────────┴────────────────────────┘
工作原理:
- 新对象在 Eden 分配
- Minor GC 时,存活对象复制到一个 Survivor(如 S0)
- 下次 GC 时,Eden + S0 的存活对象复制到 S1
- S0 和 S1 交替使用,始终有一个是空的
关键洞察:通过空间换时间,用两个区域交替使用,避免在同一区域同时进行分配和回收。
迁移到缓存同步场景
把这个思想应用到缓存同步:
- Eden → 设备的实时更新数据
- Survivor S0/S1 → 两个交替使用的缓存区域
- Minor GC → 定时同步任务
- 对象复制 → 数据同步到数据库
RSC 缓存布局:
┌──────────────────────────────────────────────┐
│ 活跃区域(接收新更新) │
│ ┌─────────────────────────────────────────┐ │
│ │ Survivor S0 或 S1(交替) │ │
│ └─────────────────────────────────────────┘ │
├──────────────────────────────────────────────┤
│ 非活跃区域(正在同步) │
│ ┌─────────────────────────────────────────┐ │
│ │ Survivor S1 或 S0(交替) │ │
│ └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘
核心机制:
- 同步开始时,切换活跃区域
- 新数据写入新的活跃区域
- 旧数据在非活跃区域被同步和清空
- 两个操作互不干扰
RSC 方案设计
整体架构
┌─────────────┐ ┌─────────────────────────────────────┐ ┌─────────────┐
│ 设备上报 │ → │ Redis │ → │ MongoDB │
│ (高并发) │ │ ┌─────────┐ ┌─────────┐ │ │ (持久化) │
└─────────────┘ │ │ S0 │ ←→ │ S1 │ │ └─────────────┘
│ │ (活跃) │ │ (同步中) │ │ ↑
│ └─────────┘ └─────────┘ │ │
└─────────────────────────────────────┘ │
│
┌─────────────────────────────────────┐ │
│ Kafka │ ─────────┘
│ (任务分发,解耦同步处理) │
└─────────────────────────────────────┘
组件职责
| 组件 | 擅长的事 | 在 RSC 中的角色 |
|---|---|---|
| Redis | 高速读写、原子操作 | 缓存层,存储临时更新数据 |
| Survivor 机制 | 隔离冲突、空间换时间 | 双区域交替,分离读写 |
| Kafka | 解耦、分发、削峰 | 分发同步任务,支持并行处理 |
| MongoDB | 持久化、批量写入 | 最终存储,BulkWrite 高效入库 |
数据结构设计
为了支持高并发和高效同步,设计了以下 Redis 数据结构:
# 活跃区域标识(0 或 1)
device_doc:survivor:active = "0" # TTL 7 天
# 槽位集合(分片存储 SN,支持并行处理)
device_doc:survivor:s0:slot:0 = {sn1, sn2, sn3...}
device_doc:survivor:s0:slot:1 = {sn4, sn5, sn6...}
...
device_doc:survivor:s0:slot:63 = {...}
# 设备更新数据(Hash 存储具体内容)
device_doc:hash:sn1 = {attr1: value1, attr2: value2...}
# 属性集合(记录本次更新了哪些属性)
device_doc:survivor:s0:attrs:sn1 = {attr1, attr2}
为什么用槽位分片?
- 负载均衡:根据 SN 的 Hash 值分配到不同槽位
- 并行处理:每个槽位可以独立同步,Kafka 消费者并行处理
- 可控粒度:64 个槽位,每个槽位数据量适中
数据更新流程
def update_device_status(sn, attrs):
# 1. 获取当前活跃区域(0 或 1)
active = redis.get("device_doc:survivor:active") or "0"
# 2. 计算槽位(根据 SN 的 Hash 值)
slot = murmur_hash64(sn) % 64
# 3. 原子批量写入(Redisson RBatch)
batch = redis.create_batch()
# 添加 SN 到槽位集合
batch.sadd(f"device_doc:survivor:s{active}:slot:{slot}", sn)
# 更新设备数据(支持部分属性更新)
for attr, value in attrs.items():
if value is None:
batch.hdel(f"device_doc:hash:{sn}", attr)
else:
batch.hset(f"device_doc:hash:{sn}", attr, value)
# 记录更新的属性名
batch.sadd(f"device_doc:survivor:s{active}:attrs:{sn}", *attrs.keys())
batch.execute() # 原子执行,无锁高并发
关键点:
- 使用 Redis 批量操作(RBatch),原子执行
- 无锁设计,支持高并发写入
- 只记录变更的属性,减少数据量
同步任务流程
def minor_sync():
# 1. 切换活跃区域(原子操作)
old_active = redis.getset("device_doc:survivor:active", toggle(active))
# 2. 非活跃区域就是要同步的区域
sync_region = old_active
# 3. 分发槽位任务到 Kafka
for slot in range(64):
kafka.send("device-sync-topic", {
"region": sync_region,
"slot": slot
})
def process_slot(region, slot):
# 4. 获取并清空槽位数据
sns = redis.smembers_and_delete(f"device_doc:survivor:s{region}:slot:{slot}")
# 5. 并行读取每个 SN 的数据
updates = []
for sn in sns:
attrs = redis.smembers_and_delete(f"device_doc:survivor:s{region}:attrs:{sn}")
data = redis.hmget(f"device_doc:hash:{sn}", *attrs)
updates.append((sn, dict(zip(attrs, data))))
# 6. 批量 upsert 到 MongoDB
bulk = mongodb.bulk_write()
for sn, data in updates:
bulk.upsert({"sn": sn}, {"$set": data})
bulk.execute()
关键点:
- 切换活跃区域是原子操作,瞬间完成
- Kafka 分发任务,解耦处理逻辑
- 每个槽位独立处理,支持并行
- MongoDB BulkWrite,高效批量入库
时序图
时间线 设备上报 同步任务 MongoDB
│
│ 写入 S0 ────────────→
│ 写入 S0 ────────────→
│ 写入 S0 ────────────→
│ ┌─ 切换:S0→S1 ─┐
│ 写入 S1 ────────────→ │ │
│ 写入 S1 ────────────→ │ 读取 S0 数据 │────────→ 批量写入
│ 写入 S1 ────────────→ │ 清空 S0 │
│ └───────────────┘
│ 写入 S1 ────────────→
│ 写入 S1 ────────────→
│ ┌─ 切换:S1→S0 ─┐
│ 写入 S0 ────────────→ │ │
│ 写入 S0 ────────────→ │ 读取 S1 数据 │────────→ 批量写入
↓ └───────────────┘
为什么这样设计?
设计哲学:组合各自的优势
RSC 方案的核心不是发明新轮子,而是让每个组件做自己最擅长的事:
| 需求 | 选择 | 原因 |
|---|---|---|
| 高速缓存 | Redis | 内存操作,微秒级响应 |
| 原子操作 | Redis RBatch | 无锁并发,批量执行 |
| 读写隔离 | Survivor 机制 | 空间换时间,零冲突 |
| 任务分发 | Kafka | 解耦、削峰、可重试 |
| 并行处理 | 槽位分片 | 独立处理,水平扩展 |
| 持久化 | MongoDB BulkWrite | 批量操作,高效入库 |
没有银弹,只有合适的组合。
为什么不用分布式锁?
分布式锁(如 Redisson Lock)可以解决数据竞争,但:
- 性能损耗:锁竞争导致等待
- 死锁风险:异常情况可能死锁
- 复杂度高:需要处理锁超时、重入等问题
Survivor 机制通过空间隔离,从根本上消除了竞争,无需加锁。
为什么用 Kafka 而不是直接处理?
直接在同步任务中处理所有槽位:
// 不推荐:同步阻塞
for (int slot = 0; slot < 64; slot++) {
processSlot(slot); // 串行处理,耗时长
}
使用 Kafka 分发:
// 推荐:解耦并行
for (int slot = 0; slot < 64; slot++) {
kafka.send(slotTask); // 快速分发
}
// Kafka 消费者并行处理
好处:
- 解耦:同步触发和处理分离
- 并行:多消费者同时处理不同槽位
- 可靠:消息持久化,失败可重试
- 削峰:突发任务不会压垮系统
生产实践与性能数据
配置参数
rsc:
survivor-slots: 64 # 槽位数量
sync-interval: 5m # 同步周期
batch-size: 1000 # MongoDB 批量大小
active-ttl: 7d # 活跃标识 TTL
性能测试数据
在生产环境(100 万设备,峰值 QPS 3 万)的表现:
| 指标 | 数值 |
|---|---|
| 更新延迟(P99) | < 5ms |
| 同步周期 | 5 分钟 |
| 每次同步数据量 | 10-50 万条更新 |
| 单槽位处理耗时 | 1-3 秒 |
| MongoDB 写入 QPS | 约 3000(批量后) |
| Redis 内存占用 | 约 2GB |
对比传统方案
| 指标 | 实时写库 | 异步队列 | RSC |
|---|---|---|---|
| 更新延迟 | 高(数据库瓶颈) | 低 | 低 |
| 数据库 QPS | 3 万 | 3 万 | 3000 |
| 数据一致性 | 强一致 | 可能丢失 | 最终一致 |
| 复杂度 | 低 | 中 | 中 |
| 扩展性 | 差 | 中 | 好 |
数据库负载降低 90%,这是最直观的收益。
监控指标
生产环境需要监控以下指标:
# Grafana 仪表盘
- rsc_sync_duration_seconds # 同步耗时
- rsc_slot_data_count # 每槽位数据量
- rsc_sync_success_total # 成功同步次数
- rsc_sync_failure_total # 失败同步次数
- rsc_active_region # 当前活跃区域
适用场景与局限性
适用场景
RSC 特别适合以下场景:
- 物联网设备状态管理:设备数量大,更新频繁
- 用户行为日志:写多读少,允许延迟聚合
- 实时指标统计:高频更新,定期落库
- 缓存预热:批量加载数据到缓存
局限性
RSC 不适合以下场景:
- 强一致性要求:数据有分钟级延迟
- 实时查询最新值:需要从缓存读取,可能不是最新
- 事务性操作:跨多个设备的原子操作
改进方向
- 增量同步:支持秒级同步,降低延迟
- 多级缓存:本地缓存 + Redis,进一步降低延迟
- 智能分片:根据设备活跃度动态调整槽位
- 故障恢复:同步失败的数据重试机制
总结
RSC 方案的核心思想是:借鉴成熟的设计模式,组合各组件的优势。
- 从 JVM Survivor 机制借鉴:双区域交替,读写隔离
- 让 Redis 做缓存:高速写入,原子操作
- 让 Kafka 做分发:解耦处理,支持并行
- 让 MongoDB 做持久化:批量写入,高效入库
这个方案在我们的物联网平台上稳定运行了一年多,处理了数十亿次设备状态更新。它不是最复杂的方案,但它简单、可靠、易于维护。
技术选型没有银弹,关键是理解每个组件的特点,让它们各司其职,协同工作。
相关文章:
参考资料: