安装Canal
使用docker compose安装
services:
canal-server:
image: canal/canal-server:latest
container_name: canal-server
environment:
- canal.auto.scan=false
- canal.destinations=test
- canal.instance.master.address=mysql:3306
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.connectionCharset=UTF-8
- canal.instance.tsdb.enable=true
- TZ=Asia/Shanghai
volumes:
- canal_conf:/home/admin/canal-server/conf
- canal_logs:/home/admin/canal-server/logs
networks:
- middleware-network
ports:
- "11111:11111"
depends_on:
- mysql
volumes:
canal_conf:
canal_logs:
Canal Server Docker Compose 配置分析
基础设置
使用 canal/canal-server:latest 镜像
容器命名为 canal-server
环境变量配置
关闭自动扫描 (canal.auto.scan=false)
设置目标实例名为 test
设置 MySQL 主库地址为 mysql:3306
配置数据库用户名和密码均为 canal
设置连接字符集为 UTF-8
启用 TSDB
设置时区为上海
网络配置
使用 middleware-network 网络
暴露端口映射 11111:11111
依赖关系
依赖于 mysql 服务
外部卷定义
canal_conf 卷
canal_logs 卷
注意事项:需要在数据库中开启Binlog,查看Canal是否成功连接不能使用docker Logs,应该去/home/admin/canal-server/logs/你创建的destinations名下查看log
FLUSH LOGS;
-- 用于刷新 MySQL 的日志文件。具体操作是关闭当前日志文件并创建一个新的日志文件。
-- 通常用于手动切换二进制日志文件,方便日志管理或备份操作。
SHOW VARIABLES LIKE '%log_bin%';
-- 查询 MySQL 是否启用了二进制日志(binary log)。
-- 返回与 `log_bin` 相关的变量及其值:
-- - `log_bin`: 表示二进制日志是否启用(ON 或 OFF)。
-- - 其他变量如 `log_bin_basename` 或 `log_bin_index`,提供二进制日志的路径或索引信息。
SHOW BINARY LOGS;
-- 列出所有的二进制日志文件及其大小。
-- 输出字段包括:
-- - `Log_name`: 二进制日志文件的名称。
-- - `File_size`: 每个日志文件的大小(字节)。
-- 该命令通常用于确认需要备份或清理的日志文件。
SHOW BINLOG EVENTS;
-- 查看二进制日志中的事件内容。
-- 输出字段包括:
-- - `Log_name`: 二进制日志的文件名。
-- - `Pos`: 事件的起始位置。
-- - `Event_type`: 事件类型(例如 `Query`、`Rotate`)。
-- - `Server_id`: 执行事件的服务器 ID。
-- - `Info`: 事件的详细信息。
-- 此命令常用于调试或检查特定事务或数据变更。
使用方法
首先创建Canal微服务,Canal服务添加以下代码
配置项
canal:
host: 127.0.0.1
port: 11111
destination: #和创建Canal时一样
username: ""
password: ""
配置类:
package com.atguigu.tingshu.config;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
/**
* @author Other_002
*/
@Configuration
@ConfigurationProperties("canal")
@Data
public class CanalConfig {
private String host;
private Integer port;
private String destination;
private String username;
private String password;
@Bean
public CanalConnector canalConnector() {
return CanalConnectors.newSingleConnector(
new InetSocketAddress(host, port),
destination,
username,
password
);
}
}
使用Rabbitmq接收切面类中传来的数据,监听指定表,在表发生数据变动时删除redis中键.
package com.atguigu.tingshu.listener;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.tingshu.model.canal.TableCacheMapping;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Other_002
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class CanalClientListener implements CommandLineRunner {
private final CanalConnector canalConnector;
private final StringRedisTemplate redisTemplate;
private final ConcurrentMap<String, Set<String>> tableMappings = new ConcurrentHashMap<>();
private final AtomicBoolean running = new AtomicBoolean(true);
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(
value = "canal.table.mapping",
durable = "true"
),
exchange = @Exchange(
value = "canal.exchange"
),
key = "canal.mapping.sync"
)
})
public void receiveMapping(TableCacheMapping mapping, org.springframework.amqp.core.Message message, Channel channel) throws IOException {
try {
tableMappings.put(mapping.getTableName(), mapping.getCacheKeys());
log.info("接收到表映射: 表名={}, 缓存键={}", mapping.getTableName(), mapping.getCacheKeys());
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("处理表映射消息失败: {}", mapping, e);
// 消息处理失败,拒绝消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
}
}
@Override
public void run(String... args) {
try {
startListener();
} catch (Exception e) {
log.error("Canal 客户端运行失败", e);
}
}
private void startListener() {
try {
canalConnector.connect();
canalConnector.subscribe(".*\\..*");
log.info("Canal 客户端已启动并订阅变更");
while (running.get()) {
Message message = canalConnector.getWithoutAck(100);
long batchId = message.getId();
if (batchId != -1) {
try {
processEntries(message.getEntries());
canalConnector.ack(batchId);
} catch (Exception e) {
canalConnector.rollback(batchId);
log.error("处理批次 {} 失败", batchId, e);
}
} else {
Thread.sleep(1000L);
}
}
} catch (Exception e) {
log.error("Canal 监听发生错误", e);
} finally {
closeConnector();
}
}
private void processEntries(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}
String tableName = entry.getHeader().getTableName();
Set<String> cacheKeys = tableMappings.get(tableName);
if (cacheKeys == null || cacheKeys.isEmpty()) {
continue;
}
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
handleDataChange(tableName, rowChange, cacheKeys);
} catch (Exception e) {
log.error("处理表 {} 的变更失败", tableName, e);
}
}
}
private void handleDataChange(String tableName, CanalEntry.RowChange rowChange, Set<String> cacheKeys) {
if (rowChange.getEventType() != CanalEntry.EventType.UPDATE &&
rowChange.getEventType() != CanalEntry.EventType.DELETE) {
return;
}
rowChange.getRowDatasList().stream()
.flatMap(rowData -> cacheKeys.stream())
.forEach(cacheKey -> {
try {
redisTemplate.delete(cacheKey);
log.info("成功删除缓存: 表={}, 键={}", tableName, cacheKey);
} catch (Exception e) {
log.error("删除缓存失败, 表={}, 键={}", tableName, cacheKey, e);
}
});
}
private void closeConnector() {
if (canalConnector != null) {
try {
canalConnector.disconnect();
log.info("Canal 连接已关闭");
} catch (Exception e) {
log.error("关闭 Canal 连接失败", e);
}
}
}
}
创建自定义注解
package com.atguigu.tingshu.common.annotation;
import java.lang.annotation.*;
/**
* @author Other_002
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheSync {
TableCache[] tables();
}
package com.atguigu.tingshu.common.annotation;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author Other_002
*/
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface TableCache {
String tableName(); // 表名
String[] cacheKeys(); // 缓存键模式
}
实现切面类
package com.atguigu.tingshu.common.aspects;
import com.atguigu.tingshu.common.annotation.CacheSync;
import com.atguigu.tingshu.common.annotation.TableCache;
import com.atguigu.tingshu.model.canal.TableCacheMapping;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 缓存同步切面
* @author u
*/
@Aspect
@Component
@Slf4j
@RequiredArgsConstructor
public class CacheSyncAspect {
private final RabbitTemplate rabbitTemplate;
@After("@annotation(cacheSync)")
public void sendMapping(JoinPoint joinPoint, CacheSync cacheSync) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
String methodName = signature.getMethod().getName();
Object[] args = joinPoint.getArgs();
String[] parameterNames = signature.getParameterNames();
log.info("开始处理缓存同步注解, 方法: {}", methodName);
for (TableCache table : cacheSync.tables()) {
String tableName = table.tableName();
try {
// 动态替换缓存键中的占位符
Set<String> cacheKeys = Arrays.stream(table.cacheKeys())
.map(cacheKey -> replacePlaceholders(cacheKey, parameterNames, args))
.collect(Collectors.toSet());
TableCacheMapping mapping = new TableCacheMapping(tableName, cacheKeys);
rabbitTemplate.convertAndSend(
"canal.exchange",
"canal.mapping.sync",
mapping
);
log.info("成功发送表 {} 的缓存映射到MQ, 映射信息: {}", tableName, mapping);
} catch (Exception e) {
log.error("处理表 {} 的缓存映射失败, 方法: {}", tableName, methodName, e);
}
}
}
private String replacePlaceholders(String cacheKey, String[] parameterNames, Object[] args) {
for (int i = 0; i < parameterNames.length; i++) {
String placeholder = "{" + parameterNames[i] + "}";
if (cacheKey.contains(placeholder)) {
cacheKey = cacheKey.replace(placeholder, args[i].toString());
}
}
return cacheKey;
}
}
使用方式
指定监听的表和redis中的cacheKeys,当数据库内容发生变动时,发送RabbitMQ消息至Canal微服务模块,Canal删除Radis中的键以确保数据一致性
@CacheSync(tables = {
@TableCache(tableName = "track_info", cacheKeys = {RedisConstant.USER_TRACK_REPEAT_STAT_PREFIX + "{albumId}_{page}_{limit}"})
})
还需优化之处
添加幂等性处理
使用UUID生成唯一标识:
生产者在发送消息时,为每条消息生成一个唯一的标识(通常使用UUID
或者业务相关的唯一ID),将这个标识作为消息的一部分传递给消费者。Redis 中的
SETNX
实现幂等性:消费者接收到消息后,首先将唯一标识存入 Redis,通过
SETNX
(Set if Not Exists)命令判断该标识是否已存在:如果返回成功(即 Redis 中不存在该标识),说明该消息是第一次处理,可以继续执行后续业务逻辑。
如果返回失败(标识已存在),说明该消息已经被处理过,直接丢弃,避免重复处理。
消息处理完成后清理标识:
处理完业务逻辑后,从 Redis 中删除该标识,释放资源,确保唯一标识不会无限制增长。
可选:为了防止处理失败导致标识无法删除,Redis 中设置一个过期时间(例如 10 分钟),以防止标识长期存在。