高并发缓存同步 RSC方案

前言

在物联网平台开发中,我遇到了一个典型的高并发难题:百万级设备实时上报状态,如何高效写入缓存并定期同步到数据库?

传统方案要么实时写库(数据库扛不住),要么异步批量写(容易丢数据或产生冲突)。经过反复思考,我从 JVM 垃圾回收的 Survivor 机制中获得灵感,设计了 RSC(Redis Survivor Cache)方案。

核心思想很简单:让每个组件做自己最擅长的事,然后巧妙组合。

  • Redis:擅长高速读写 → 接收实时更新
  • Kafka:擅长解耦分发 → 分发同步任务
  • MongoDB:擅长持久化 → 存储最终数据
  • Survivor 机制:擅长隔离冲突 → 分离读写操作

这个方案在生产环境跑了一年多,支撑了每秒数万次设备状态更新,数据库负载降低了 90%+,同步延迟控制在分钟级。本文分享完整的设计思路和实现细节。

问题分析:传统方案的痛点

场景描述

物联网平台的设备状态管理是一个典型的高并发写场景:

  • 数据量:百万级设备,每个设备有几十个属性
  • 更新频率:设备每隔几秒到几分钟上报一次状态
  • 读写比例:写多读少(大量设备上报,少量查询)
  • 一致性要求:最终一致性即可,允许分钟级延迟

传统方案的问题

方案一:实时写库

设备上报 → 直接写 MongoDB

问题:每秒数万次写入,MongoDB 直接崩溃。

方案二:先写缓存,定时同步

设备上报 → 写 Redis → 定时任务 → 批量写 MongoDB

问题:同步任务执行期间,新数据持续写入,导致:

  • 数据竞争:正在同步的数据被覆盖
  • 数据丢失:同步后清空缓存,新写入的数据一起被删
  • 性能下降:加锁影响写入性能

方案三:消息队列异步

设备上报 → Kafka → 消费者写 MongoDB

问题:

  • 消息积压时延迟不可控
  • 相同设备多次更新产生大量冗余消息
  • 无法合并更新,数据库压力仍然大

问题本质

归根结底,问题在于:更新操作和同步操作在同一份数据上竞争

如果能让它们操作不同的数据区域,问题就迎刃而解了。

灵感来源:JVM 的 Survivor 机制

JVM 新生代 GC 简介

JVM 的新生代使用了一个巧妙的设计:Eden + Survivor(S0, S1)。

新生代内存布局:
┌──────────────────────────────────────────────┐
│  Eden(新对象分配区)                          │
├─────────────────────┬────────────────────────┤
│  Survivor S0        │  Survivor S1           │
│  (交替使用)        │  (交替使用)           │
└─────────────────────┴────────────────────────┘

工作原理:

  1. 新对象在 Eden 分配
  2. Minor GC 时,存活对象复制到一个 Survivor(如 S0)
  3. 下次 GC 时,Eden + S0 的存活对象复制到 S1
  4. S0 和 S1 交替使用,始终有一个是空的

关键洞察:通过空间换时间,用两个区域交替使用,避免在同一区域同时进行分配和回收。

迁移到缓存同步场景

把这个思想应用到缓存同步:

  • Eden → 设备的实时更新数据
  • Survivor S0/S1 → 两个交替使用的缓存区域
  • Minor GC → 定时同步任务
  • 对象复制 → 数据同步到数据库
RSC 缓存布局:
┌──────────────────────────────────────────────┐
│  活跃区域(接收新更新)                        │
│  ┌─────────────────────────────────────────┐ │
│  │  Survivor S0 或 S1(交替)               │ │
│  └─────────────────────────────────────────┘ │
├──────────────────────────────────────────────┤
│  非活跃区域(正在同步)                        │
│  ┌─────────────────────────────────────────┐ │
│  │  Survivor S1 或 S0(交替)               │ │
│  └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘

核心机制

  • 同步开始时,切换活跃区域
  • 新数据写入新的活跃区域
  • 旧数据在非活跃区域被同步和清空
  • 两个操作互不干扰

RSC 方案设计

整体架构

┌─────────────┐    ┌─────────────────────────────────────┐    ┌─────────────┐
│  设备上报   │ →  │              Redis                  │ →  │   MongoDB   │
│  (高并发)   │    │  ┌─────────┐      ┌─────────┐      │    │  (持久化)   │
└─────────────┘    │  │   S0    │ ←→   │   S1    │      │    └─────────────┘
                   │  │ (活跃)  │      │ (同步中) │      │          ↑
                   │  └─────────┘      └─────────┘      │          │
                   └─────────────────────────────────────┘          │
                                                                    │
                   ┌─────────────────────────────────────┐          │
                   │              Kafka                  │ ─────────┘
                   │  (任务分发,解耦同步处理)             │
                   └─────────────────────────────────────┘

组件职责

组件 擅长的事 在 RSC 中的角色
Redis 高速读写、原子操作 缓存层,存储临时更新数据
Survivor 机制 隔离冲突、空间换时间 双区域交替,分离读写
Kafka 解耦、分发、削峰 分发同步任务,支持并行处理
MongoDB 持久化、批量写入 最终存储,BulkWrite 高效入库

数据结构设计

为了支持高并发和高效同步,设计了以下 Redis 数据结构:

# 活跃区域标识(0 或 1)
device_doc:survivor:active = "0"  # TTL 7 天

# 槽位集合(分片存储 SN,支持并行处理)
device_doc:survivor:s0:slot:0 = {sn1, sn2, sn3...}
device_doc:survivor:s0:slot:1 = {sn4, sn5, sn6...}
...
device_doc:survivor:s0:slot:63 = {...}

# 设备更新数据(Hash 存储具体内容)
device_doc:hash:sn1 = {attr1: value1, attr2: value2...}

# 属性集合(记录本次更新了哪些属性)
device_doc:survivor:s0:attrs:sn1 = {attr1, attr2}

为什么用槽位分片?

  • 负载均衡:根据 SN 的 Hash 值分配到不同槽位
  • 并行处理:每个槽位可以独立同步,Kafka 消费者并行处理
  • 可控粒度:64 个槽位,每个槽位数据量适中

数据更新流程

def update_device_status(sn, attrs):
    # 1. 获取当前活跃区域(0 或 1)
    active = redis.get("device_doc:survivor:active") or "0"
    
    # 2. 计算槽位(根据 SN 的 Hash 值)
    slot = murmur_hash64(sn) % 64
    
    # 3. 原子批量写入(Redisson RBatch)
    batch = redis.create_batch()
    
    # 添加 SN 到槽位集合
    batch.sadd(f"device_doc:survivor:s{active}:slot:{slot}", sn)
    
    # 更新设备数据(支持部分属性更新)
    for attr, value in attrs.items():
        if value is None:
            batch.hdel(f"device_doc:hash:{sn}", attr)
        else:
            batch.hset(f"device_doc:hash:{sn}", attr, value)
    
    # 记录更新的属性名
    batch.sadd(f"device_doc:survivor:s{active}:attrs:{sn}", *attrs.keys())
    
    batch.execute()  # 原子执行,无锁高并发

关键点

  • 使用 Redis 批量操作(RBatch),原子执行
  • 无锁设计,支持高并发写入
  • 只记录变更的属性,减少数据量

同步任务流程

def minor_sync():
    # 1. 切换活跃区域(原子操作)
    old_active = redis.getset("device_doc:survivor:active", toggle(active))
    
    # 2. 非活跃区域就是要同步的区域
    sync_region = old_active
    
    # 3. 分发槽位任务到 Kafka
    for slot in range(64):
        kafka.send("device-sync-topic", {
            "region": sync_region,
            "slot": slot
        })

def process_slot(region, slot):
    # 4. 获取并清空槽位数据
    sns = redis.smembers_and_delete(f"device_doc:survivor:s{region}:slot:{slot}")
    
    # 5. 并行读取每个 SN 的数据
    updates = []
    for sn in sns:
        attrs = redis.smembers_and_delete(f"device_doc:survivor:s{region}:attrs:{sn}")
        data = redis.hmget(f"device_doc:hash:{sn}", *attrs)
        updates.append((sn, dict(zip(attrs, data))))
    
    # 6. 批量 upsert 到 MongoDB
    bulk = mongodb.bulk_write()
    for sn, data in updates:
        bulk.upsert({"sn": sn}, {"$set": data})
    bulk.execute()

关键点

  • 切换活跃区域是原子操作,瞬间完成
  • Kafka 分发任务,解耦处理逻辑
  • 每个槽位独立处理,支持并行
  • MongoDB BulkWrite,高效批量入库

时序图

时间线  设备上报                  同步任务                    MongoDB
  │
  │     写入 S0 ────────────→
  │     写入 S0 ────────────→
  │     写入 S0 ────────────→
  │                              ┌─ 切换:S0→S1 ─┐
  │     写入 S1 ────────────→   │               │
  │     写入 S1 ────────────→   │  读取 S0 数据  │────────→ 批量写入
  │     写入 S1 ────────────→   │  清空 S0      │
  │                              └───────────────┘
  │     写入 S1 ────────────→
  │     写入 S1 ────────────→
  │                              ┌─ 切换:S1→S0 ─┐
  │     写入 S0 ────────────→   │               │
  │     写入 S0 ────────────→   │  读取 S1 数据  │────────→ 批量写入
  ↓                              └───────────────┘

为什么这样设计?

设计哲学:组合各自的优势

RSC 方案的核心不是发明新轮子,而是让每个组件做自己最擅长的事

需求 选择 原因
高速缓存 Redis 内存操作,微秒级响应
原子操作 Redis RBatch 无锁并发,批量执行
读写隔离 Survivor 机制 空间换时间,零冲突
任务分发 Kafka 解耦、削峰、可重试
并行处理 槽位分片 独立处理,水平扩展
持久化 MongoDB BulkWrite 批量操作,高效入库

没有银弹,只有合适的组合。

为什么不用分布式锁?

分布式锁(如 Redisson Lock)可以解决数据竞争,但:

  • 性能损耗:锁竞争导致等待
  • 死锁风险:异常情况可能死锁
  • 复杂度高:需要处理锁超时、重入等问题

Survivor 机制通过空间隔离,从根本上消除了竞争,无需加锁。

为什么用 Kafka 而不是直接处理?

直接在同步任务中处理所有槽位:

// 不推荐:同步阻塞
for (int slot = 0; slot < 64; slot++) {
    processSlot(slot);  // 串行处理,耗时长
}

使用 Kafka 分发:

// 推荐:解耦并行
for (int slot = 0; slot < 64; slot++) {
    kafka.send(slotTask);  // 快速分发
}
// Kafka 消费者并行处理

好处

  • 解耦:同步触发和处理分离
  • 并行:多消费者同时处理不同槽位
  • 可靠:消息持久化,失败可重试
  • 削峰:突发任务不会压垮系统

生产实践与性能数据

配置参数

rsc:
  survivor-slots: 64          # 槽位数量
  sync-interval: 5m           # 同步周期
  batch-size: 1000            # MongoDB 批量大小
  active-ttl: 7d              # 活跃标识 TTL

性能测试数据

在生产环境(100 万设备,峰值 QPS 3 万)的表现:

指标 数值
更新延迟(P99) < 5ms
同步周期 5 分钟
每次同步数据量 10-50 万条更新
单槽位处理耗时 1-3 秒
MongoDB 写入 QPS 约 3000(批量后)
Redis 内存占用 约 2GB

对比传统方案

指标 实时写库 异步队列 RSC
更新延迟 高(数据库瓶颈)
数据库 QPS 3 万 3 万 3000
数据一致性 强一致 可能丢失 最终一致
复杂度
扩展性

数据库负载降低 90%,这是最直观的收益。

监控指标

生产环境需要监控以下指标:

# Grafana 仪表盘
- rsc_sync_duration_seconds      # 同步耗时
- rsc_slot_data_count            # 每槽位数据量
- rsc_sync_success_total         # 成功同步次数
- rsc_sync_failure_total         # 失败同步次数
- rsc_active_region              # 当前活跃区域

适用场景与局限性

适用场景

RSC 特别适合以下场景:

  • 物联网设备状态管理:设备数量大,更新频繁
  • 用户行为日志:写多读少,允许延迟聚合
  • 实时指标统计:高频更新,定期落库
  • 缓存预热:批量加载数据到缓存

局限性

RSC 不适合以下场景:

  • 强一致性要求:数据有分钟级延迟
  • 实时查询最新值:需要从缓存读取,可能不是最新
  • 事务性操作:跨多个设备的原子操作

改进方向

  • 增量同步:支持秒级同步,降低延迟
  • 多级缓存:本地缓存 + Redis,进一步降低延迟
  • 智能分片:根据设备活跃度动态调整槽位
  • 故障恢复:同步失败的数据重试机制

总结

RSC 方案的核心思想是:借鉴成熟的设计模式,组合各组件的优势

  1. 从 JVM Survivor 机制借鉴:双区域交替,读写隔离
  2. 让 Redis 做缓存:高速写入,原子操作
  3. 让 Kafka 做分发:解耦处理,支持并行
  4. 让 MongoDB 做持久化:批量写入,高效入库

这个方案在我们的物联网平台上稳定运行了一年多,处理了数十亿次设备状态更新。它不是最复杂的方案,但它简单、可靠、易于维护

技术选型没有银弹,关键是理解每个组件的特点,让它们各司其职,协同工作。


相关文章

参考资料

更新时间:2025年12月24日