Skip to content

SivanCache 实现 AOF 持久化

Sivan_Xin edited this page Jan 2, 2024 · 2 revisions

AOF持久化原理详解以及实现

我们的信息都是直接放在内存中的,如果断电或者应用重启,那么内容就全部丢失了。有时候我们希望这些信息重启之后还在,就像 redis 重启一样。

Redis的数据都是存储在内存中的,如果我们不对Redis做持久化,当Redis故障重启后,内存中的数据就会丢失.

但是Redis实现了数据持久化的方式。主要通过AOF日志和RDB日志来实现。

  • AOF日志采用追加写的方式来记录每一条数据,故障发生时,将AOF日志的所有操作回溯,也就恢复了数据。
  • RDB日志采用全量快照的方式来记录数据,故障发生时,直接拿全量快照的数据进行恢复。
  • 混合持久化的方式是汲两者优势与一身,在Redis4.0之后才提出。

我们这里仿照AOF日志的思路,来对我们的cache进行持久化操作。

利用AOP思想写入List

定义持久化注释

这里主要为了通过注释控制不同的方法,为了和耗时统计,刷新等特性保持一致,对于操作类的动作才添加到文件中(append to file)我们也基于注解属性来指定,而不是固定写死在代码中,便于后期拓展调整。

/**
 * 缓存拦截器
 */
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CacheInterceptor {

    /**
     * 操作是否需要 append to file,默认为 false
     * 主要针对 cache 内容有变更的操作,不包括查询操作。
     * 包括删除,添加,过期等操作。
     * @return 是否
     */
    boolean aof() default false;

}

我们在原来的 @CacheInterceptor 注解中添加 aof 属性,用于指定是否对操作开启 aof 模式。

AOF模式的方法

类似于 spring 的事务拦截器,我们使用代理类调用 expireAt。

/**
 * 设置过期时间
 * @param key         key
 * @param timeInMills 毫秒时间之后过期
 * @return this
 */
@Override
@CacheInterceptor
public ICache<K, V> expire(K key, long timeInMills) {
    long expireTime = System.currentTimeMillis() + timeInMills;
    // 使用代理调用
    Cache<K,V> cachePoxy = (Cache<K, V>) CacheProxy.getProxy(this);
    return cachePoxy.expireAt(key, expireTime);
}

/**
 * 指定过期信息
 * @param key key
 * @param timeInMills 时间戳
 * @return this
 */
@Override
@CacheInterceptor(aof = true)
public ICache<K, V> expireAt(K key, long timeInMills) {
    this.expire.expire(key, timeInMills);
    return this;
}

put、expire方法也需要定义:

@Override
@CacheInterceptor(aof = true)
public V put(K key, V value) {
    //1.1 尝试驱除
    CacheEvictContext<K,V> context = new CacheEvictContext<>();
    context.key(key).size(sizeLimit).cache(this);
    boolean evictResult = evict.evict(context);
    if(evictResult) {
        // 执行淘汰监听器
        ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(value).type(CacheRemoveType.EVICT.code());
        for(ICacheRemoveListener<K,V> listener : this.removeListeners) {
            listener.listen(removeListenerContext);
        }
    }
    //2. 判断驱除后的信息
    if(isSizeLimit()) {
        throw new CacheRuntimeException("当前队列已满,数据添加失败!");
    }
    //3. 执行添加
    return map.put(key, value);
}

@Override
@CacheInterceptor(aof = true)
public V remove(Object key) {
    return map.remove(key);
}

@Override
@CacheInterceptor(aof = true)
public void putAll(Map<? extends K, ? extends V> m) {
    map.putAll(m);
}

@Override
@CacheInterceptor(refresh = true, aof = true)
public void clear() {
    map.clear();
}

定义持久化对象

/**
 * AOF 持久化明细
 */
public class PersistAofEntry {

    /**
     * 参数信息
     */
    private Object[] params;

    /**
     * 方法名称
     */
    private String methodName;

    //getter & setter &toString
}

这里我们只需要方法名,和参数对象。

暂时实现的简单一些即可。

定义持久化拦截器(重点)

我们定义拦截器,当 cache 中定义的持久化类为 CachePersistAof 时,将操作的信息放入到 CachePersistAof 的 buffer 列表中。

public class CacheInterceptorAof<K,V> implements ICacheInterceptor<K, V> {

    private static final Log log = LogFactory.getLog(CacheInterceptorAof.class);

    @Override
    public void before(ICacheInterceptorContext<K,V> context) {
    }

    @Override
    public void after(ICacheInterceptorContext<K,V> context) {
        // 持久化类
        ICache<K,V> cache = context.cache();
        ICachePersist<K,V> persist = cache.persist();

        if(persist instanceof CachePersistAof) {
            CachePersistAof<K,V> cachePersistAof = (CachePersistAof<K,V>) persist;

            String methodName = context.method().getName();
            PersistAofEntry aofEntry = PersistAofEntry.newInstance();
            aofEntry.setMethodName(methodName);
            aofEntry.setParams(context.params());

            String json = JSON.toJSONString(aofEntry);

            // 直接持久化
            log.debug("AOF 开始追加文件内容:{}", json);
            cachePersistAof.append(json);
            log.debug("AOF 完成追加文件内容:{}", json);
        }
    }

}

调用拦截器

这里判断:只有当持久化类为 AOF 模式时,才进行调用。

//3. AOF 追加
final ICachePersist cachePersist = cache.persist();
if(cacheInterceptor.aof() && (cachePersist instanceof CachePersistAof)) {
    if(before) {
        persistInterceptors.before(interceptorContext);
    } else {
        persistInterceptors.after(interceptorContext);
    }
}

到这里,我们就做完了put ——> 加入buffer的流程,接下来将buffer写入到文件。

List写入AOF文件

持久化接口

public interface ICachePersist<K, V> {

    /**
     * 持久化缓存信息
     * @param cache 缓存
     */
    void persist(final ICache<K, V> cache);

    /**
     * 延迟时间
     * @return 延迟
     */
    long delay();

    /**
     * 时间间隔
     * @return 间隔
     */
    long period();

    /**
     * 时间单位
     * @return 时间单位
     */
    TimeUnit timeUnit();
}

实现持久化类

这里主要关注persist方法即可。

/**
 * 缓存持久化-AOF 持久化模式
 */
public class CachePersistAof<K,V> extends CachePersistAdaptor<K,V> {

    private static final Log log = LogFactory.getLog(CachePersistAof.class);

    /**
     * 缓存列表
     */
    private final List<String> bufferList = new ArrayList<>();

    /**
     * 数据持久化路径
     */
    private final String dbPath;

    public CachePersistAof(String dbPath) {
        this.dbPath = dbPath;
    }

    /**
     * 持久化
     * key长度 key+value
     * 第一个空格,获取 key 的长度,然后截取
     */
    @Override
    public void persist(ICache<K, V> cache) {
        log.info("开始 AOF 持久化到文件");
        // 1. 创建文件
        if(!FileUtil.exists(dbPath)) {
            FileUtil.createFile(dbPath);
        }
        // 2. 持久化追加到文件中
        FileUtil.append(dbPath, bufferList);

        // 3. 清空 buffer 列表
        bufferList.clear();
        log.info("完成 AOF 持久化到文件");
    }

    @Override
    public long delay() {
        return 1;
    }

    @Override
    public long period() {
        return 1;
    }

    @Override
    public TimeUnit timeUnit() {
        return TimeUnit.SECONDS;
    }

    /**
     * 添加文件内容到 buffer 列表中
     * @param json json 信息
     */
    public void append(final String json) {
        if(StringUtil.isNotEmpty(json)) {
            bufferList.add(json);
        }
    }

}

测试AOF写入

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .persist(CachePersists.<String, String>aof("1.aof"))
        .build();
cache.put("1", "1");
cache.expire("1", 10);
cache.remove("2");
TimeUnit.SECONDS.sleep(1);

1.aof 的文件内容如下

{"methodName":"put","params":["1","1"]}
{"methodName":"expireAt","params":["1",1601612441990]}
{"methodName":"remove","params":["2"]}

将每一次的操作,简单的存储到文件中。

AOF加载实现

我们需要根据文件的内容,还原以前的缓存的内容。

实现思路:遍历文件内容,反射调用原来的方法。

解析文件

@Override
public void load(ICache<K, V> cache) {
    List<String> lines = FileUtil.readAllLines(dbPath);
    log.info("[load] 开始处理 path: {}", dbPath);
    if(CollectionUtil.isEmpty(lines)) {
        log.info("[load] path: {} 文件内容为空,直接返回", dbPath);
        return;
    }

    for(String line : lines) {
        if(StringUtil.isEmpty(line)) {
            continue;
        }
        // 执行
        // 简单的类型还行,复杂的这种反序列化会失败
        PersistAofEntry entry = JSON.parseObject(line, PersistAofEntry.class);
        final String methodName = entry.getMethodName();
        final Object[] objects = entry.getParams();
        final Method method = METHOD_MAP.get(methodName);
        // 反射调用
        ReflectMethodUtil.invoke(cache, method, objects);
    }
}

方法映射的预加载

Method 反射是固定的,为了提升性能,我们做一下预处理。

/**
 * 方法缓存
 *
 * 暂时比较简单,直接通过方法判断即可,不必引入参数类型增加复杂度。
 */
private static final Map<String, Method> METHOD_MAP = new HashMap<>();
static {
    Method[] methods = Cache.class.getMethods();
    for(Method method : methods){
        CacheInterceptor cacheInterceptor = method.getAnnotation(CacheInterceptor.class);
        if(cacheInterceptor != null) {
            // 暂时
            if(cacheInterceptor.aof()) {
                String methodName = method.getName();
                METHOD_MAP.put(methodName, method);
            }
        }
    }
}

测试AOF加载

  • default.aof
{"methodName":"put","params":["1","1"]}
ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .load(CacheLoads.<String, String>aof("default.aof"))
        .build();

Assert.assertEquals(1, cache.size());
System.out.println(cache.keySet());

直接将 default.aof 文件加载到 cache 缓存中。

总结

redis 的文件持久化,实际上更加丰富。

可以支持 rdb 和 aof 两种模式混合使用。

aof 模式的文件体积会非常大,redis 为了解决这个问题,会定时对命令进行压缩处理。

可以理解为 aof 就是一个操作流水表,我们实际上关心的只是一个终态,不论中间经过了多少步骤,我们只关心最后的值。