问题
小公司,2个GPS网关服器,3个业务服务器,mq用于将网关收到的gps点位数据传输到业务服务器上
从旧平台迁移近2万老用户,发现阿里RocketMq消息量从原来一天2百万左右激增到5千万以上,随着近一步迁移及新用户,消息量还会成倍增长
每天成本逼近100,小公司有点承担不起
可选方案
前提:公司没运维
- 自建RocketMq,从阿里商用版改为自建版本
- 如果用单机方案的话,稳定性是问题,用集群方案得3台服务器成本不低
- 使用其他消息队列代替RocketMq
- 同上,kafka,rabbitMq一般都是要用集群方案布署的
- 在网关业务里做一个消息队列,缓存消息然后定时批量发送到rocketMq
- 可行,但如果这台服务器需要升级或重启,机器上的消息将丢失
- 使用redis list的lpush,rpop代替RocketMq
- 可行性没问题,但需要自己做线程管理及负载均衡
- 使用redis list的lpush,rpop缓存积压消息,定时发送到mq
- 上一个方案的拆中方案,由于业务代码之前就支持处理列表数据,基本不需要改动
执行
- redis,需要6.2以上的版本,才支持lpush rpop写入,弹出多个消息
code
- redisUtil封装一下put和get,同时做了单机避免积压过多的处理put消息时先判断是否超过最大限制
public class RedisCacheUtil {
@Value("${redisMqCacheList.key}")
private String key;
@Value("${redisMqCacheList.maxSize}")
private int maxSize;
static final int defaultGetSize = 18;
@Autowired
RedisTemplate<String,String> redisTemplate;
private final AtomicLong lastGetTime = new AtomicLong(0);
private final AtomicInteger cachedSize = new AtomicInteger(0);
private boolean lastGetIsFull = true;
//判断是否超过缓存池最大限制
public boolean sizeOver(){
return cachedSize.get()>maxSize;
}
//put
public void put(String data){
redisTemplate.opsForList().rightPush(key,data);
//写入缓存的条数
cachedSize.incrementAndGet();
}
//get
public List<String> getCacheMsgList(){
//最后一次获取不是满队列的情况下,默认积压.5s
var now = System.currentTimeMillis();
if(!lastGetIsFull && (now - lastGetTime.get())<500) return List.of();
var result = redisTemplate.opsForList().leftPop(key,defaultGetSize);
var resultSize = CollectionUtils.isEmpty(result)?0:result.size();
lastGetIsFull = resultSize>=defaultGetSize;
var x = lastGetIsFull?cachedSize.addAndGet(-defaultGetSize):cachedSize.getAndSet(0);
lastGetTime.set(System.currentTimeMillis());
return result;
}
}
- 然后用一个定时任务获取到一起推给消息队列
//处理缓存的消息
@Scheduled(fixedRate = 75)
public void processCacheMsgTask1(){
processCacheMsg();
}
void processCacheMsg(){
var msgList = cacheUtil.getCacheMsgList();
producer.send(JSON.toJSONString(msgList));
}