安装Canal

使用docker compose安装

services:
  canal-server:
    image: canal/canal-server:latest
    container_name: canal-server
    environment:
      - canal.auto.scan=false
      - canal.destinations=test
      - canal.instance.master.address=mysql:3306
      - canal.instance.dbUsername=canal
      - canal.instance.dbPassword=canal
      - canal.instance.connectionCharset=UTF-8
      - canal.instance.tsdb.enable=true
      - TZ=Asia/Shanghai
    volumes:
      - canal_conf:/home/admin/canal-server/conf
      - canal_logs:/home/admin/canal-server/logs
    networks:
      - middleware-network
    ports:
      - "11111:11111"
    depends_on:
      - mysql
volumes:
  canal_conf:
  canal_logs:

Canal Server Docker Compose 配置分析

  • 基础设置

    • 使用 canal/canal-server:latest 镜像

    • 容器命名为 canal-server

  • 环境变量配置

    • 关闭自动扫描 (canal.auto.scan=false)

    • 设置目标实例名为 test

    • 设置 MySQL 主库地址为 mysql:3306

    • 配置数据库用户名和密码均为 canal

    • 设置连接字符集为 UTF-8

    • 启用 TSDB

    • 设置时区为上海

  • 网络配置

    • 使用 middleware-network 网络

    • 暴露端口映射 11111:11111

  • 依赖关系

    • 依赖于 mysql 服务

  • 外部卷定义

    • canal_conf 卷

    • canal_logs 卷

注意事项:需要在数据库中开启Binlog,查看Canal是否成功连接不能使用docker Logs,应该去/home/admin/canal-server/logs/你创建的destinations名下查看log

FLUSH LOGS;
-- 用于刷新 MySQL 的日志文件。具体操作是关闭当前日志文件并创建一个新的日志文件。
-- 通常用于手动切换二进制日志文件,方便日志管理或备份操作。

SHOW VARIABLES LIKE '%log_bin%';
-- 查询 MySQL 是否启用了二进制日志(binary log)。
-- 返回与 `log_bin` 相关的变量及其值:
-- - `log_bin`: 表示二进制日志是否启用(ON 或 OFF)。
-- - 其他变量如 `log_bin_basename` 或 `log_bin_index`,提供二进制日志的路径或索引信息。

SHOW BINARY LOGS;
-- 列出所有的二进制日志文件及其大小。
-- 输出字段包括:
-- - `Log_name`: 二进制日志文件的名称。
-- - `File_size`: 每个日志文件的大小(字节)。
-- 该命令通常用于确认需要备份或清理的日志文件。

SHOW BINLOG EVENTS;
-- 查看二进制日志中的事件内容。
-- 输出字段包括:
-- - `Log_name`: 二进制日志的文件名。
-- - `Pos`: 事件的起始位置。
-- - `Event_type`: 事件类型(例如 `Query`、`Rotate`)。
-- - `Server_id`: 执行事件的服务器 ID。
-- - `Info`: 事件的详细信息。
-- 此命令常用于调试或检查特定事务或数据变更。

使用方法

首先创建Canal微服务,Canal服务添加以下代码

配置项

canal:
  host: 127.0.0.1
  port: 11111
  destination: #和创建Canal时一样 
  username: ""
  password: ""

配置类:

package com.atguigu.tingshu.config;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;

/**
 * @author Other_002
 */
@Configuration
@ConfigurationProperties("canal")
@Data
public class CanalConfig {

    private String host;

    private Integer port;

    private String destination;

    private String username;

    private String password;

    @Bean
    public CanalConnector canalConnector() {
        return CanalConnectors.newSingleConnector(
            new InetSocketAddress(host, port),
            destination,
            username,
            password
        );
    }
}

使用Rabbitmq接收切面类中传来的数据,监听指定表,在表发生数据变动时删除redis中键.

package com.atguigu.tingshu.listener;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.tingshu.model.canal.TableCacheMapping;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author Other_002
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class CanalClientListener implements CommandLineRunner {

    private final CanalConnector canalConnector;
    private final StringRedisTemplate redisTemplate;
    private final ConcurrentMap<String, Set<String>> tableMappings = new ConcurrentHashMap<>();
    private final AtomicBoolean running = new AtomicBoolean(true);

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(
                            value = "canal.table.mapping",
                            durable = "true"
                    ),
                    exchange = @Exchange(
                            value = "canal.exchange"
                    ),
                    key = "canal.mapping.sync"
            )
    })
    public void receiveMapping(TableCacheMapping mapping, org.springframework.amqp.core.Message message, Channel channel) throws IOException {
        try {
            tableMappings.put(mapping.getTableName(), mapping.getCacheKeys());
            log.info("接收到表映射: 表名={}, 缓存键={}", mapping.getTableName(), mapping.getCacheKeys());
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("处理表映射消息失败: {}", mapping, e);
            // 消息处理失败,拒绝消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
        }
    }

    @Override
    public void run(String... args) {
        try {
            startListener();
        } catch (Exception e) {
            log.error("Canal 客户端运行失败", e);
        }
    }

    private void startListener() {
        try {
            canalConnector.connect();
            canalConnector.subscribe(".*\\..*");
            log.info("Canal 客户端已启动并订阅变更");

            while (running.get()) {
                Message message = canalConnector.getWithoutAck(100);
                long batchId = message.getId();

                if (batchId != -1) {
                    try {
                        processEntries(message.getEntries());
                        canalConnector.ack(batchId);
                    } catch (Exception e) {
                        canalConnector.rollback(batchId);
                        log.error("处理批次 {} 失败", batchId, e);
                    }
                } else {
                    Thread.sleep(1000L);
                }
            }
        } catch (Exception e) {
            log.error("Canal 监听发生错误", e);
        } finally {
            closeConnector();
        }
    }

    private void processEntries(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
                continue;
            }

            String tableName = entry.getHeader().getTableName();
            Set<String> cacheKeys = tableMappings.get(tableName);

            if (cacheKeys == null || cacheKeys.isEmpty()) {
                continue;
            }

            try {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                handleDataChange(tableName, rowChange, cacheKeys);
            } catch (Exception e) {
                log.error("处理表 {} 的变更失败", tableName, e);
            }
        }
    }

    private void handleDataChange(String tableName, CanalEntry.RowChange rowChange, Set<String> cacheKeys) {
        if (rowChange.getEventType() != CanalEntry.EventType.UPDATE &&
                rowChange.getEventType() != CanalEntry.EventType.DELETE) {
            return;
        }

        rowChange.getRowDatasList().stream()
                .flatMap(rowData -> cacheKeys.stream())
                .forEach(cacheKey -> {
                    try {
                        redisTemplate.delete(cacheKey);
                        log.info("成功删除缓存: 表={}, 键={}", tableName, cacheKey);
                    } catch (Exception e) {
                        log.error("删除缓存失败, 表={}, 键={}", tableName, cacheKey, e);
                    }
                });
    }

    private void closeConnector() {
        if (canalConnector != null) {
            try {
                canalConnector.disconnect();
                log.info("Canal 连接已关闭");
            } catch (Exception e) {
                log.error("关闭 Canal 连接失败", e);
            }
        }
    }
}

创建自定义注解

package com.atguigu.tingshu.common.annotation;

import java.lang.annotation.*;

/**
 * @author Other_002
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheSync {
    TableCache[] tables();
}
package com.atguigu.tingshu.common.annotation;

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author Other_002
 */
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface TableCache {
    String tableName();    // 表名
    String[] cacheKeys();  // 缓存键模式
}

实现切面类

package com.atguigu.tingshu.common.aspects;

import com.atguigu.tingshu.common.annotation.CacheSync;
import com.atguigu.tingshu.common.annotation.TableCache;
import com.atguigu.tingshu.model.canal.TableCacheMapping;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * 缓存同步切面
 * @author u
 */
@Aspect
@Component
@Slf4j
@RequiredArgsConstructor
public class CacheSyncAspect {

    private final RabbitTemplate rabbitTemplate;

    @After("@annotation(cacheSync)")
    public void sendMapping(JoinPoint joinPoint, CacheSync cacheSync) {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        String methodName = signature.getMethod().getName();
        Object[] args = joinPoint.getArgs();
        String[] parameterNames = signature.getParameterNames();

        log.info("开始处理缓存同步注解, 方法: {}", methodName);

        for (TableCache table : cacheSync.tables()) {
            String tableName = table.tableName();
            try {
                // 动态替换缓存键中的占位符
                Set<String> cacheKeys = Arrays.stream(table.cacheKeys())
                        .map(cacheKey -> replacePlaceholders(cacheKey, parameterNames, args))
                        .collect(Collectors.toSet());

                TableCacheMapping mapping = new TableCacheMapping(tableName, cacheKeys);

                rabbitTemplate.convertAndSend(
                        "canal.exchange",
                        "canal.mapping.sync",
                        mapping
                );
                log.info("成功发送表 {} 的缓存映射到MQ, 映射信息: {}", tableName, mapping);
            } catch (Exception e) {
                log.error("处理表 {} 的缓存映射失败, 方法: {}", tableName, methodName, e);
            }
        }
    }

    private String replacePlaceholders(String cacheKey, String[] parameterNames, Object[] args) {
        for (int i = 0; i < parameterNames.length; i++) {
            String placeholder = "{" + parameterNames[i] + "}";
            if (cacheKey.contains(placeholder)) {
                cacheKey = cacheKey.replace(placeholder, args[i].toString());
            }
        }
        return cacheKey;
    }
}

使用方式

指定监听的表和redis中的cacheKeys,当数据库内容发生变动时,发送RabbitMQ消息至Canal微服务模块,Canal删除Radis中的键以确保数据一致性

@CacheSync(tables = {
        @TableCache(tableName = "track_info", cacheKeys = {RedisConstant.USER_TRACK_REPEAT_STAT_PREFIX + "{albumId}_{page}_{limit}"})
})

还需优化之处

添加幂等性处理

  • 使用UUID生成唯一标识
    生产者在发送消息时,为每条消息生成一个唯一的标识(通常使用UUID或者业务相关的唯一ID),将这个标识作为消息的一部分传递给消费者。

  • Redis 中的 SETNX 实现幂等性

    • 消费者接收到消息后,首先将唯一标识存入 Redis,通过 SETNX(Set if Not Exists)命令判断该标识是否已存在:

      • 如果返回成功(即 Redis 中不存在该标识),说明该消息是第一次处理,可以继续执行后续业务逻辑。

      • 如果返回失败(标识已存在),说明该消息已经被处理过,直接丢弃,避免重复处理。

  • 消息处理完成后清理标识

    • 处理完业务逻辑后,从 Redis 中删除该标识,释放资源,确保唯一标识不会无限制增长。

    • 可选:为了防止处理失败导致标识无法删除,Redis 中设置一个过期时间(例如 10 分钟),以防止标识长期存在。