Pārlūkot izejas kodu

分布式事务优化

hubin 6 gadi atpakaļ
vecāks
revīzija
b024e067f7
16 mainītis faili ar 99 papildinājumiem un 475 dzēšanām
  1. 25 0
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/DtsConstants.java
  2. 24 0
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/DtsMeta.java
  3. 3 4
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/EnableDtsRabbit.java
  4. 0 86
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/RmtConstants.java
  5. 27 0
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/config/DtsAutoConfiguration.java
  6. 1 1
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/IRmtParser.java
  7. 1 1
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/JacksonRmtParser.java
  8. 10 10
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/RabbitConfiguration.java
  9. 0 32
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/RmtMeta.java
  10. 0 42
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/config/RmtAutoConfiguration.java
  11. 0 83
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/coordinator/IRmtCoordinator.java
  12. 0 98
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/coordinator/RedisRmtCoordinator.java
  13. 1 38
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/listener/RmtMessageListener.java
  14. 0 74
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/mq/RabbitTransactionalAspect.java
  15. 2 2
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/sender/IRmtSender.java
  16. 5 4
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/sender/RabbitRmtSender.java

+ 25 - 0
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/DtsConstants.java

@@ -0,0 +1,25 @@
+package com.baomidou.mybatisplus.dts;
+
+/**
+ * <p>
+ * 常量类
+ * </p>
+ *
+ * @author hubin
+ * @since 2019-04-20
+ */
+public interface DtsConstants {
+    /**
+     * 队列配置
+     */
+    String EXCHANGE = "rabbit-exchange";
+    String QUEUE = "rabbit-queue";
+    String ROUTING_KEY = "rabbit-routing-key";
+    /**
+     * 死信队列配置
+     */
+    String DL_EXCHANGE = "rabbit-dl-exchange";
+    String DL_QUEUE = "rabbit-dl-queue";
+    String DL_ROUTING_KEY = "rabbit-dl-routing-key";
+
+}

+ 24 - 0
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/DtsMeta.java

@@ -0,0 +1,24 @@
+package com.baomidou.mybatisplus.dts;
+
+import lombok.Data;
+
+/**
+ * <p>
+ * 可靠消息元数据
+ * </p>
+ *
+ * @author jobob
+ * @since 2019-04-17
+ */
+@Data
+public class DtsMeta {
+    /**
+     * 业务 KEY
+     */
+    String key;
+    /**
+     * 消息内容
+     */
+    Object payload;
+
+}

+ 3 - 4
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/EnableRmtRabbit.java → mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/EnableDtsRabbit.java

@@ -1,7 +1,6 @@
 package com.baomidou.mybatisplus.dts;
 
-import com.baomidou.mybatisplus.dts.rabbit.config.RmtAutoConfiguration;
-import com.baomidou.mybatisplus.dts.rabbit.mq.RabbitConfiguration;
+import com.baomidou.mybatisplus.dts.rabbit.RabbitConfiguration;
 import org.springframework.context.annotation.Import;
 
 import java.lang.annotation.Documented;
@@ -20,7 +19,7 @@ import java.lang.annotation.Target;
 @Retention(value = java.lang.annotation.RetentionPolicy.RUNTIME)
 @Target(value = { java.lang.annotation.ElementType.TYPE })
 @Documented
-@Import({RabbitConfiguration.class, RmtAutoConfiguration.class})
-public @interface EnableRmtRabbit {
+@Import({RabbitConfiguration.class})
+public @interface EnableDtsRabbit {
 
 }

+ 0 - 86
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/RmtConstants.java

@@ -1,86 +0,0 @@
-package com.baomidou.mybatisplus.dts;
-
-/**
- * <p>
- * 常量类
- * </p>
- *
- * @author hubin
- * @since 2019-04-18
- */
-public interface RmtConstants {
-    /**
-     * 队列配置
-     */
-    String EXCHANGE = "rabbit-exchange";
-    String QUEUE = "rabbit-queue";
-    String ROUTING_KEY = "rabbit-routing-key";
-    /**
-     * 死信队列配置
-     */
-    String DL_EXCHANGE = "rabbit-dl-exchange";
-    String DL_QUEUE = "rabbit-dl-queue";
-    String DL_ROUTING_KEY = "rabbit-dl-routing-key";
-
-
-    /**
-     * 默认 KEY
-     */
-    String KEY = "rabbit.key";
-    /**
-     * 消息重发计数
-     */
-    String MQ_RESEND_COUNTER = "mq.resend.counter";
-
-    /**
-     * 消息最大重发次数
-     */
-    long MAX_RETRY_COUNT = 3;
-
-    /**
-     * 分隔符
-     */
-    String DB_SPLIT = ",";
-
-    /**
-     * 缓存超时时间,超时进行重发
-     */
-    long TIME_GAP = 2000;
-
-    /**
-     * 处于ready状态消息
-     */
-    Object MQ_MSG_READY = "mq.msg.ready";
-
-    /**
-     * 处于prepare状态消息
-     */
-    Object MQ_MSG_PREPARE = "mq.msg.prepare";
-
-
-    String MQ_PRODUCER_RETRY_KEY = "mq.producer.retry.key";
-    String MQ_CONSUMER_RETRY_COUNT_KEY = "mq.consumer.retry.count.key";
-
-    /**
-     * 发送端重试乘数(ms)
-     */
-    int MUTIPLIER_TIME = 500;
-    /**
-     * 发送端最大重试时时间(s)
-     */
-    int MAX_RETRY_TIME = 10;
-    /**
-     * 消费端最大重试次数
-     */
-    int MAX_CONSUMER_COUNT = 5;
-    /**
-     * 递增时的基本常量
-     */
-    int BASE_NUM = 2;
-    /**
-     * 空的字符串
-     */
-    String BLANK_STR = "";
-
-
-}

+ 27 - 0
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/config/DtsAutoConfiguration.java

@@ -0,0 +1,27 @@
+package com.baomidou.mybatisplus.dts.config;
+
+import com.baomidou.mybatisplus.dts.parser.JacksonRmtParser;
+import com.baomidou.mybatisplus.dts.rabbit.sender.RabbitRmtSender;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Rabbit MQ 可靠消息配置
+ *
+ * @author jobob
+ * @since 2019-04-19
+ */
+@Configuration
+public class DtsAutoConfiguration {
+
+    @Bean
+    public JacksonRmtParser rmtParser() {
+        return new JacksonRmtParser();
+    }
+
+    @Bean
+    public RabbitRmtSender rmtSender() {
+        return new RabbitRmtSender();
+    }
+
+}

+ 1 - 1
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/parser/IRmtParser.java → mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/IRmtParser.java

@@ -1,4 +1,4 @@
-package com.baomidou.mybatisplus.dts.rabbit.parser;
+package com.baomidou.mybatisplus.dts.parser;
 
 import org.springframework.amqp.support.converter.MessageConverter;
 

+ 1 - 1
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/parser/JacksonRmtParser.java → mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/JacksonRmtParser.java

@@ -1,4 +1,4 @@
-package com.baomidou.mybatisplus.dts.rabbit.parser;
+package com.baomidou.mybatisplus.dts.parser;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;

+ 10 - 10
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/mq/RabbitConfiguration.java → mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/RabbitConfiguration.java

@@ -1,6 +1,6 @@
-package com.baomidou.mybatisplus.dts.rabbit.mq;
+package com.baomidou.mybatisplus.dts.rabbit;
 
-import com.baomidou.mybatisplus.dts.rabbit.RmtConstants;
+import com.baomidou.mybatisplus.dts.DtsConstants;
 import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.DirectExchange;
 import org.springframework.amqp.core.Queue;
@@ -43,18 +43,18 @@ public class RabbitConfiguration {
         rabbitTemplate.setChannelTransacted(true);
 
         // define deadletter exchange and queue
-        rabbitAdmin.declareExchange(new DirectExchange(RmtConstants.DL_EXCHANGE, true, false));
-        rabbitAdmin.declareQueue(new Queue(RmtConstants.DL_QUEUE, true, false, false, null));
-        rabbitAdmin.declareBinding(new Binding(RmtConstants.DL_QUEUE, Binding.DestinationType.QUEUE, RmtConstants.DL_EXCHANGE, RmtConstants.DL_ROUTING_KEY, null));
+        rabbitAdmin.declareExchange(new DirectExchange(DtsConstants.DL_EXCHANGE, true, false));
+        rabbitAdmin.declareQueue(new Queue(DtsConstants.DL_QUEUE, true, false, false, null));
+        rabbitAdmin.declareBinding(new Binding(DtsConstants.DL_QUEUE, Binding.DestinationType.QUEUE, DtsConstants.DL_EXCHANGE, DtsConstants.DL_ROUTING_KEY, null));
 
         // define simple exchange, queue with deadletter support and binding
-        rabbitAdmin.declareExchange(new TopicExchange(RmtConstants.EXCHANGE, true, false));
+        rabbitAdmin.declareExchange(new TopicExchange(DtsConstants.EXCHANGE, true, false));
         Map<String, Object> args = new HashMap<>(2);
-        args.put("x-dead-letter-exchange", RmtConstants.DL_EXCHANGE);
-        args.put("x-dead-letter-routing-key", RmtConstants.DL_ROUTING_KEY);
-        rabbitAdmin.declareQueue(new Queue(RmtConstants.QUEUE, true, false, true, args));
+        args.put("x-dead-letter-exchange", DtsConstants.DL_EXCHANGE);
+        args.put("x-dead-letter-routing-key", DtsConstants.DL_ROUTING_KEY);
+        rabbitAdmin.declareQueue(new Queue(DtsConstants.QUEUE, true, false, true, args));
 
         // declare binding
-        rabbitAdmin.declareBinding(new Binding(RmtConstants.QUEUE, Binding.DestinationType.QUEUE, RmtConstants.EXCHANGE, RmtConstants.ROUTING_KEY, null));
+        rabbitAdmin.declareBinding(new Binding(DtsConstants.QUEUE, Binding.DestinationType.QUEUE, DtsConstants.EXCHANGE, DtsConstants.ROUTING_KEY, null));
     }
 }

+ 0 - 32
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/RmtMeta.java

@@ -1,32 +0,0 @@
-package com.baomidou.mybatisplus.dts.rabbit;
-
-import lombok.Data;
-
-/**
- * <p>
- * 可靠消息元数据
- * </p>
- *
- * @author jobob
- * @since 2019-04-17
- */
-@Data
-public class RmtMeta {
-    /**
-     * 消息 ID
-     */
-    String messageId;
-    /**
-     * 交换器
-     */
-    String exchange;
-    /**
-     * 路由 KEY
-     */
-    String routingKey;
-    /**
-     * 消息内容
-     */
-    Object payload;
-
-}

+ 0 - 42
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/config/RmtAutoConfiguration.java

@@ -1,42 +0,0 @@
-package com.baomidou.mybatisplus.dts.rabbit.config;
-
-import com.baomidou.mybatisplus.dts.rabbit.coordinator.RedisRmtCoordinator;
-import com.baomidou.mybatisplus.dts.rabbit.mq.RabbitTransactionalAspect;
-import com.baomidou.mybatisplus.dts.rabbit.parser.JacksonRmtParser;
-import com.baomidou.mybatisplus.dts.rabbit.sender.RabbitRmtSender;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * Rabbit MQ 可靠消息配置
- *
- * @author jobob
- * @since 2019-04-19
- */
-@Configuration
-public class RmtAutoConfiguration {
-
-    @Bean
-    public RedisRmtCoordinator redisRmtCoordinator() {
-        return new RedisRmtCoordinator();
-    }
-
-    @Bean
-    public JacksonRmtParser rmtParser() {
-        return new JacksonRmtParser();
-    }
-
-    @Bean
-    public RabbitRmtSender rmtSender() {
-        return new RabbitRmtSender();
-    }
-
-    /**
-     * 配置可靠消息事务发送者
-     */
-    @Bean
-    public RabbitTransactionalAspect rabbitTransactionalAspect() {
-        return new RabbitTransactionalAspect();
-    }
-
-}

+ 0 - 83
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/coordinator/IRmtCoordinator.java

@@ -1,83 +0,0 @@
-package com.baomidou.mybatisplus.dts.rabbit.coordinator;
-
-import com.baomidou.mybatisplus.dts.rabbit.RmtMeta;
-
-import java.util.Collection;
-
-/**
- * <p>
- * RMT 协调器
- * </p>
- *
- * @author jobob
- * @since 2019-04-19
- */
-public interface IRmtCoordinator {
-
-    /**
-     * 设置消息为prepare状态
-     *
-     * @param messageId 消息 ID
-     */
-    void setPrepare(String messageId);
-
-    /**
-     * 设置消息为ready状态,删除prepare状态
-     *
-     * @param messageId 消息 ID
-     * @param rmtMeta   可靠消息元数据
-     */
-    void setReady(String messageId, RmtMeta rmtMeta);
-
-    /**
-     * 消息发送成功,删除ready状态消息
-     *
-     * @param messageId 消息 ID
-     */
-    void setSuccess(String messageId);
-
-    /**
-     * 获取消息实体
-     *
-     * @param messageId 消息 ID
-     * @return
-     */
-    RmtMeta getRmtMeta(String messageId);
-
-    /**
-     * 获取prepare状态消息
-     *
-     * @param <T>
-     * @return
-     * @throws Exception
-     */
-    <T extends Collection> T getPrepare() throws Exception;
-
-    /**
-     * 获取ready状态消息T
-     *
-     * @param <T>
-     * @return
-     * @throws Exception
-     */
-    <T extends Collection> T getReady() throws Exception;
-
-    /**
-     * 消息重发次数 +1
-     *
-     * @param key
-     * @param hashKey
-     * @return
-     */
-    Long incrResendKey(String key, String hashKey);
-
-    /**
-     * 获取重发次数值
-     *
-     * @param key
-     * @param hashKey
-     * @return
-     */
-    Long getResendValue(String key, String hashKey);
-
-}

+ 0 - 98
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/coordinator/RedisRmtCoordinator.java

@@ -1,98 +0,0 @@
-package com.baomidou.mybatisplus.dts.rabbit.coordinator;
-
-import com.baomidou.mybatisplus.dts.rabbit.RmtConstants;
-import com.baomidou.mybatisplus.dts.rabbit.RmtMeta;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.HashOperations;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.SetOperations;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * <p>
- * Redis 协调器
- * </p>
- *
- * @author jobob
- * @since 2019-04-18
- */
-@Slf4j
-public class RedisRmtCoordinator implements IRmtCoordinator {
-
-    @Autowired
-    private RedisTemplate redisTemplate;
-
-    @Override
-    public void setPrepare(String messageId) {
-        redisTemplate.opsForSet().add(RmtConstants.MQ_MSG_PREPARE, messageId);
-    }
-
-    @Override
-    public void setReady(String messageId, RmtMeta rmtMeta) {
-        redisTemplate.opsForHash().put(RmtConstants.MQ_MSG_READY, messageId, rmtMeta);
-        redisTemplate.opsForSet().remove(RmtConstants.MQ_MSG_PREPARE, messageId);
-    }
-
-    @Override
-    public void setSuccess(String messageId) {
-        redisTemplate.opsForHash().delete(RmtConstants.MQ_MSG_READY, messageId);
-    }
-
-    @Override
-    public RmtMeta getRmtMeta(String messageId) {
-        return (RmtMeta) redisTemplate.opsForHash().get(RmtConstants.MQ_MSG_READY, messageId);
-    }
-
-    @Override
-    public List<String> getPrepare() throws Exception {
-        SetOperations setOperations = redisTemplate.opsForSet();
-        Set<String> messageIds = setOperations.members(RmtConstants.MQ_MSG_PREPARE);
-        List<String> messageAlert = new ArrayList();
-        for (String messageId : messageIds) {
-            // 如果超时加入、超时消息队列
-            if (messageTimeOut(messageId)) {
-                messageAlert.add(messageId);
-            }
-        }
-        // 删除已超时的消息
-        setOperations.remove(RmtConstants.MQ_MSG_READY, messageAlert);
-        return messageAlert;
-    }
-
-    @Override
-    public List<RmtMeta> getReady() throws Exception {
-        HashOperations hashOperations = redisTemplate.opsForHash();
-        List<RmtMeta> messages = hashOperations.values(RmtConstants.MQ_MSG_READY);
-        List<RmtMeta> messageAlert = new ArrayList();
-        List<String> messageIds = new ArrayList<>();
-        for (RmtMeta message : messages) {
-            // 如果超时加入、超时消息队列
-            if (messageTimeOut(message.getMessageId())) {
-                messageIds.add(message.getMessageId());
-                messageAlert.add(message);
-            }
-        }
-        // 删除已超时的消息
-        hashOperations.delete(RmtConstants.MQ_MSG_READY, messageIds);
-        return messageAlert;
-    }
-
-    @Override
-    public Long incrResendKey(String key, String hashKey) {
-        return redisTemplate.opsForHash().increment(key, hashKey, 1);
-    }
-
-    @Override
-    public Long getResendValue(String key, String hashKey) {
-        return (Long) redisTemplate.opsForHash().get(key, hashKey);
-    }
-
-    protected boolean messageTimeOut(String messageId) {
-        return (System.currentTimeMillis() - Long.parseLong((messageId
-                .split(RmtConstants.DB_SPLIT))[1])) > RmtConstants.TIME_GAP;
-    }
-}

+ 1 - 38
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/listener/RmtMessageListener.java

@@ -1,14 +1,8 @@
 
 package com.baomidou.mybatisplus.dts.rabbit.listener;
 
-import com.baomidou.mybatisplus.dts.rabbit.RmtConstants;
-import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
-import org.springframework.amqp.core.MessageProperties;
-import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.RedisTemplate;
 
 /**
  * RabbitMQ 抽象消息监听,所有消息消费者必须继承此类
@@ -17,37 +11,6 @@ import org.springframework.data.redis.core.RedisTemplate;
  * @since 2019-04-18
  */
 @Slf4j
-public abstract class RmtMessageListener implements IRmtListener<Message>, ChannelAwareMessageListener {
-    @Autowired
-    private RedisTemplate<String, Object> redisTemplate;
+public abstract class RmtMessageListener implements IRmtListener<Message> {
 
-    @Override
-    public void onMessage(Message message, Channel channel) throws Exception {
-        MessageProperties messageProperties = message.getMessageProperties();
-        Long deliveryTag = messageProperties.getDeliveryTag();
-        Long consumerCount = redisTemplate.opsForHash().increment(RmtConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
-                messageProperties.getMessageId(), 1);
-        log.debug("收到消息,当前消息ID:{} 消费次数:{}", messageProperties.getMessageId(), consumerCount);
-        try {
-            // 处理接收消息对象
-            this.receive(message);
-            // 成功的回执
-            channel.basicAck(deliveryTag, false);
-            // 如果消费成功,将Redis中统计消息消费次数的缓存删除
-            redisTemplate.opsForHash().delete(RmtConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
-                    messageProperties.getMessageId());
-        } catch (Exception e) {
-            log.error("RabbitMQ 消息消费失败," + e.getMessage(), e);
-            if (consumerCount >= RmtConstants.MAX_CONSUMER_COUNT) {
-                // 入死信队列
-                channel.basicReject(deliveryTag, false);
-            } else {
-                // 重回到队列,重新消费, 按照2的指数级递增
-                Thread.sleep((long) (Math.pow(RmtConstants.BASE_NUM, consumerCount) * 1000));
-                redisTemplate.opsForHash().increment(RmtConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
-                        messageProperties.getMessageId(), 1);
-                channel.basicNack(deliveryTag, false, true);
-            }
-        }
-    }
 }

+ 0 - 74
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/mq/RabbitTransactionalAspect.java

@@ -1,74 +0,0 @@
-package com.baomidou.mybatisplus.dts.rabbit.mq;
-
-import com.baomidou.mybatisplus.dts.rabbit.RmtConstants;
-import com.baomidou.mybatisplus.dts.rabbit.RmtMeta;
-import com.baomidou.mybatisplus.dts.annotation.RmTransactional;
-import com.baomidou.mybatisplus.dts.rabbit.coordinator.IRmtCoordinator;
-import com.baomidou.mybatisplus.dts.rabbit.sender.IRmtSender;
-import lombok.extern.slf4j.Slf4j;
-import org.aspectj.lang.ProceedingJoinPoint;
-import org.aspectj.lang.annotation.Around;
-import org.aspectj.lang.annotation.Aspect;
-import org.springframework.beans.factory.annotation.Autowired;
-
-
-/**
- * <p>
- * 可靠消息事务切面
- * </p>
- *
- * @author jobob
- * @since 2019-04-18
- */
-@Slf4j
-@Aspect
-public class RabbitTransactionalAspect {
-    @Autowired
-    private IRmtCoordinator rmtCoordinator;
-    @Autowired
-    private IRmtSender rmtSender;
-
-    @Around(value = "@annotation(rmTransactional)")
-    public void around(ProceedingJoinPoint joinPoint, RmTransactional rmTransactional) throws Throwable {
-        // 消息 ID
-        String messageId = rmTransactional.value() + RmtConstants.DB_SPLIT + System.currentTimeMillis();
-
-        /**
-         * 发送前暂存消息
-         */
-        rmtCoordinator.setPrepare(messageId);
-
-        Object returnObj;
-        try {
-            /**
-             * 执行业务函数
-             */
-            returnObj = joinPoint.proceed();
-        } catch (Exception e) {
-            log.error("joinPoint proceed error! messageId: {}", messageId);
-            throw e;
-        }
-
-        if (returnObj == null) {
-            returnObj = RmtConstants.BLANK_STR;
-        }
-
-        // 生成可靠消息元数据
-        RmtMeta rmtMeta = new RmtMeta();
-        rmtMeta.setMessageId(messageId);
-        rmtMeta.setExchange(rmTransactional.exchange());
-        rmtMeta.setRoutingKey(rmTransactional.routingKey());
-        rmtMeta.setPayload(returnObj);
-
-        // 将消息设置为ready状态
-        rmtCoordinator.setReady(messageId, rmtMeta);
-
-        try {
-            rmtSender.send(rmtMeta);
-        } catch (Exception e) {
-            log.error("The first stage message is sent error. messageId: {} , {}",
-                    messageId, e.getMessage());
-            throw e;
-        }
-    }
-}

+ 2 - 2
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/sender/IRmtSender.java

@@ -1,6 +1,6 @@
 package com.baomidou.mybatisplus.dts.rabbit.sender;
 
-import com.baomidou.mybatisplus.dts.rabbit.RmtMeta;
+import com.baomidou.mybatisplus.dts.DtsMeta;
 
 /**
  * <p>
@@ -20,5 +20,5 @@ public interface IRmtSender {
      * @param rmtMeta         可靠消息元数据
      * @return 消息ID
      */
-    String send(RmtMeta rmtMeta);
+    String send(DtsMeta rmtMeta);
 }

+ 5 - 4
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/sender/RabbitRmtSender.java

@@ -1,7 +1,8 @@
 package com.baomidou.mybatisplus.dts.rabbit.sender;
 
-import com.baomidou.mybatisplus.dts.rabbit.RmtMeta;
-import com.baomidou.mybatisplus.dts.rabbit.parser.IRmtParser;
+import com.baomidou.mybatisplus.dts.DtsConstants;
+import com.baomidou.mybatisplus.dts.DtsMeta;
+import com.baomidou.mybatisplus.dts.parser.IRmtParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.AmqpException;
@@ -38,7 +39,7 @@ public class RabbitRmtSender implements IRmtSender {
      * @throws JsonProcessingException
      */
     @Override
-    public String send(RmtMeta rmtMeta) {
+    public String send(DtsMeta rmtMeta) {
         final String messageId = UUID.randomUUID().toString();
         MessagePostProcessor messagePostProcessor = message -> {
             message.getMessageProperties().setMessageId(messageId);
@@ -50,7 +51,7 @@ public class RabbitRmtSender implements IRmtSender {
             MessageProperties messageProperties = new MessageProperties();
             messageProperties.setContentType("application/json");
             Message message = new Message(rmtParser.toJSONString(rmtMeta.getPayload()).getBytes(), messageProperties);
-            rabbitTemplate.convertAndSend(rmtMeta.getExchange(), rmtMeta.getRoutingKey(),
+            rabbitTemplate.convertAndSend(DtsConstants.EXCHANGE, DtsConstants.ROUTING_KEY,
                     message, messagePostProcessor, new CorrelationData(messageId));
             log.info("发送消息,消息ID:{}", messageId);
             return messageId;