问题

小公司,2个GPS网关服器,3个业务服务器,mq用于将网关收到的gps点位数据传输到业务服务器上
从旧平台迁移近2万老用户,发现阿里RocketMq消息量从原来一天2百万左右激增到5千万以上,随着近一步迁移及新用户,消息量还会成倍增长
每天成本逼近100,小公司有点承担不起

可选方案

前提:公司没运维

  • 自建RocketMq,从阿里商用版改为自建版本
    • 如果用单机方案的话,稳定性是问题,用集群方案得3台服务器成本不低
  • 使用其他消息队列代替RocketMq
    • 同上,kafka,rabbitMq一般都是要用集群方案布署的
  • 在网关业务里做一个消息队列,缓存消息然后定时批量发送到rocketMq
    • 可行,但如果这台服务器需要升级或重启,机器上的消息将丢失
  • 使用redis list的lpush,rpop代替RocketMq
    • 可行性没问题,但需要自己做线程管理及负载均衡
  • 使用redis list的lpush,rpop缓存积压消息,定时发送到mq
    • 上一个方案的拆中方案,由于业务代码之前就支持处理列表数据,基本不需要改动

执行

  • redis,需要6.2以上的版本,才支持lpush rpop写入,弹出多个消息

code

  • redisUtil封装一下put和get,同时做了单机避免积压过多的处理put消息时先判断是否超过最大限制
public class RedisCacheUtil {
    @Value("${redisMqCacheList.key}")
    private String key;
    @Value("${redisMqCacheList.maxSize}")
    private int maxSize;

    static final int defaultGetSize = 18;
    @Autowired
    RedisTemplate<String,String> redisTemplate;
    private final AtomicLong lastGetTime = new AtomicLong(0);
    private final AtomicInteger cachedSize = new AtomicInteger(0);
    private boolean lastGetIsFull = true;



    //判断是否超过缓存池最大限制
    public boolean sizeOver(){
        return cachedSize.get()>maxSize;
    }
    //put
    public void  put(String data){
        redisTemplate.opsForList().rightPush(key,data);
        //写入缓存的条数
        cachedSize.incrementAndGet();
    }

    //get
    public List<String> getCacheMsgList(){
        //最后一次获取不是满队列的情况下,默认积压.5s
        var now = System.currentTimeMillis();
        if(!lastGetIsFull && (now - lastGetTime.get())<500) return List.of();
        var result =  redisTemplate.opsForList().leftPop(key,defaultGetSize);
        var resultSize = CollectionUtils.isEmpty(result)?0:result.size();
        lastGetIsFull = resultSize>=defaultGetSize;
        var x = lastGetIsFull?cachedSize.addAndGet(-defaultGetSize):cachedSize.getAndSet(0);
        lastGetTime.set(System.currentTimeMillis());
        return result;
    }
}
  • 然后用一个定时任务获取到一起推给消息队列
    //处理缓存的消息
    @Scheduled(fixedRate = 75)
    public void processCacheMsgTask1(){
        processCacheMsg();
    }
	 void processCacheMsg(){
			var msgList = cacheUtil.getCacheMsgList();
			producer.send(JSON.toJSONString(msgList));
	 }