Browse Source

新增可靠消息分布式事务模块

hubin 6 years ago
parent
commit
8df12bded9
20 changed files with 962 additions and 0 deletions
  1. 4 0
      build.gradle
  2. 6 0
      mybatis-plus-rmt/build.gradle
  3. 89 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/RmtConstants.java
  4. 32 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/RmtMeta.java
  5. 35 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/annotation/RmTransactional.java
  6. 26 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/config/EnableRmtRabbit.java
  7. 83 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/coordinator/IRmtCoordinator.java
  8. 98 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/coordinator/RedisRmtCoordinator.java
  9. 11 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/daemon/RmtResendProcess.java
  10. 37 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/DeadLetterMessageListener.java
  11. 17 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/IRmtMessageListener.java
  12. 53 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/RmtAbstractMessageListener.java
  13. 154 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/mq/RabbitConfiguration.java
  14. 73 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/mq/RabbitQueueConfiguration.java
  15. 74 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/mq/RabbitTransactionalAspect.java
  16. 39 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/parser/IRmtParser.java
  17. 43 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/parser/JacksonRmtParser.java
  18. 24 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/sender/IRmtSender.java
  19. 63 0
      mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/sender/RabbitRmtSender.java
  20. 1 0
      settings.gradle

+ 4 - 0
build.gradle

@@ -37,6 +37,10 @@ ext {
         "spring-tx"             : "org.springframework:spring-tx:${springVersion}",
         "spring-web"            : "org.springframework:spring-web:${springVersion}",
         "spring-aop"            : "org.springframework:spring-aop:${springVersion}",
+        "spring-rabbit"         : "org.springframework.amqp:spring-rabbit:2.0.3.RELEASE",
+        "spring-data-redis"     : "org.springframework.data:spring-data-redis:2.0.2.RELEASE",
+        "jackson-databind"      : "com.fasterxml.jackson.core:jackson-databind:2.8.9",
+        "aspectjrt"             : "org.aspectj:aspectjrt:1.8.13",
         "cglib"                 : "cglib:cglib:3.2.6",
         "lombok"                : "org.projectlombok:lombok:1.18.4",
 

+ 6 - 0
mybatis-plus-rmt/build.gradle

@@ -0,0 +1,6 @@
+dependencies {
+    implementation "${lib.'spring-rabbit'}"
+    implementation "${lib.'spring-data-redis'}"
+    implementation "${lib.'jackson-databind'}"
+    implementation "${lib.aspectjrt}"
+}

+ 89 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/RmtConstants.java

@@ -0,0 +1,89 @@
+package com.baomidou.mybatisplus.rmt;
+
+/**
+ * <p>
+ * 常量类
+ * </p>
+ *
+ * @author hubin
+ * @since 2019-04-18
+ */
+public interface RmtConstants {
+
+    /**
+     * 消息重发计数
+     */
+    String MQ_RESEND_COUNTER = "mq.resend.counter";
+
+    /**
+     * 消息最大重发次数
+     */
+    long MAX_RETRY_COUNT = 3;
+
+    /**
+     * 分隔符
+     */
+    String DB_SPLIT = ",";
+
+    /**
+     * 缓存超时时间,超时进行重发
+     */
+    long TIME_GAP = 2000;
+
+    /**
+     * 处于ready状态消息
+     */
+    Object MQ_MSG_READY = "mq.msg.ready";
+
+    /**
+     * 处于prepare状态消息
+     */
+    Object MQ_MSG_PREPARE = "mq.msg.prepare";
+
+    /**
+     * 默认交换机名称
+     */
+    String EXCHANGE = "rmt.exchange";
+
+    /**
+     * 默认队列名称
+     */
+    String QUEUE = "rmt.queue";
+
+    /**
+     * 默认 KEY
+     */
+    String KEY = "rmt.key";
+
+
+    String MQ_PRODUCER_RETRY_KEY = "mq.producer.retry.key";
+    String MQ_CONSUMER_RETRY_COUNT_KEY = "mq.consumer.retry.count.key";
+    /**
+     * 死信队列配置
+     */
+    String DLX_EXCHANGE = "dlx.exchange";
+    String DLX_QUEUE = "dlx.queue";
+    String DLX_ROUTING_KEY = "dlx.routing.key";
+    /**
+     * 发送端重试乘数(ms)
+     */
+    int MUTIPLIER_TIME = 500;
+    /**
+     * 发送端最大重试时时间(s)
+     */
+    int MAX_RETRY_TIME = 10;
+    /**
+     * 消费端最大重试次数
+     */
+    int MAX_CONSUMER_COUNT = 5;
+    /**
+     * 递增时的基本常量
+     */
+    int BASE_NUM = 2;
+    /**
+     * 空的字符串
+     */
+    String BLANK_STR = "";
+
+
+}

+ 32 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/RmtMeta.java

@@ -0,0 +1,32 @@
+package com.baomidou.mybatisplus.rmt;
+
+import lombok.Data;
+
+/**
+ * <p>
+ * 可靠消息元数据
+ * </p>
+ *
+ * @author jobob
+ * @since 2019-04-17
+ */
+@Data
+public class RmtMeta {
+    /**
+     * 消息 ID
+     */
+    String messageId;
+    /**
+     * 交换器
+     */
+    String exchange;
+    /**
+     * 路由 KEY
+     */
+    String routingKey;
+    /**
+     * 消息内容
+     */
+    Object payload;
+
+}

+ 35 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/annotation/RmTransactional.java

@@ -0,0 +1,35 @@
+package com.baomidou.mybatisplus.rmt.annotation;
+
+import com.baomidou.mybatisplus.rmt.RmtConstants;
+
+import java.lang.annotation.*;
+
+/**
+ * <p>
+ * 可靠消息事务 reliable message transactional
+ * </p>
+ *
+ * @author jobob
+ * @since 2019-04-17
+ */
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.METHOD, ElementType.TYPE })
+@Documented
+public @interface RmTransactional {
+
+    /**
+     * 业务值
+     */
+    String value() default "";
+
+    /**
+     * 交换器
+     */
+    String exchange() default RmtConstants.EXCHANGE;
+
+    /**
+     * 路由 KEY
+     */
+    String routingKey() default RmtConstants.KEY;
+}

+ 26 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/config/EnableRmtRabbit.java

@@ -0,0 +1,26 @@
+package com.baomidou.mybatisplus.rmt.config;
+
+import com.baomidou.mybatisplus.rmt.mq.RabbitConfiguration;
+import com.baomidou.mybatisplus.rmt.mq.RabbitQueueConfiguration;
+import org.springframework.context.annotation.Import;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+
+/**
+ * <p>
+ * 可靠消息事务自动配置
+ * </p>
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+@Retention(value = java.lang.annotation.RetentionPolicy.RUNTIME)
+@Target(value = { java.lang.annotation.ElementType.TYPE })
+@Documented
+@Import({RabbitConfiguration.class, RabbitQueueConfiguration.class})
+public @interface EnableRmtRabbit {
+
+}

+ 83 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/coordinator/IRmtCoordinator.java

@@ -0,0 +1,83 @@
+package com.baomidou.mybatisplus.rmt.coordinator;
+
+import com.baomidou.mybatisplus.rmt.RmtMeta;
+
+import java.util.Collection;
+
+/**
+ * <p>
+ * RMT 协调器
+ * </p>
+ *
+ * @author jobob
+ * @since 2019-04-19
+ */
+public interface IRmtCoordinator {
+
+    /**
+     * 设置消息为prepare状态
+     *
+     * @param messageId 消息 ID
+     */
+    void setPrepare(String messageId);
+
+    /**
+     * 设置消息为ready状态,删除prepare状态
+     *
+     * @param messageId 消息 ID
+     * @param rmtMeta   可靠消息元数据
+     */
+    void setReady(String messageId, RmtMeta rmtMeta);
+
+    /**
+     * 消息发送成功,删除ready状态消息
+     *
+     * @param messageId 消息 ID
+     */
+    void setSuccess(String messageId);
+
+    /**
+     * 获取消息实体
+     *
+     * @param messageId 消息 ID
+     * @return
+     */
+    RmtMeta getRmtMeta(String messageId);
+
+    /**
+     * 获取prepare状态消息
+     *
+     * @param <T>
+     * @return
+     * @throws Exception
+     */
+    <T extends Collection> T getPrepare() throws Exception;
+
+    /**
+     * 获取ready状态消息T
+     *
+     * @param <T>
+     * @return
+     * @throws Exception
+     */
+    <T extends Collection> T getReady() throws Exception;
+
+    /**
+     * 消息重发次数 +1
+     *
+     * @param key
+     * @param hashKey
+     * @return
+     */
+    Long incrResendKey(String key, String hashKey);
+
+    /**
+     * 获取重发次数值
+     *
+     * @param key
+     * @param hashKey
+     * @return
+     */
+    Long getResendValue(String key, String hashKey);
+
+}

+ 98 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/coordinator/RedisRmtCoordinator.java

@@ -0,0 +1,98 @@
+package com.baomidou.mybatisplus.rmt.coordinator;
+
+import com.baomidou.mybatisplus.rmt.RmtConstants;
+import com.baomidou.mybatisplus.rmt.RmtMeta;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.HashOperations;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.SetOperations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * <p>
+ * Redis 协调器
+ * </p>
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+@Slf4j
+public class RedisRmtCoordinator implements IRmtCoordinator {
+
+    @Autowired
+    private RedisTemplate redisTemplate;
+
+    @Override
+    public void setPrepare(String messageId) {
+        redisTemplate.opsForSet().add(RmtConstants.MQ_MSG_PREPARE, messageId);
+    }
+
+    @Override
+    public void setReady(String messageId, RmtMeta rmtMeta) {
+        redisTemplate.opsForHash().put(RmtConstants.MQ_MSG_READY, messageId, rmtMeta);
+        redisTemplate.opsForSet().remove(RmtConstants.MQ_MSG_PREPARE, messageId);
+    }
+
+    @Override
+    public void setSuccess(String messageId) {
+        redisTemplate.opsForHash().delete(RmtConstants.MQ_MSG_READY, messageId);
+    }
+
+    @Override
+    public RmtMeta getRmtMeta(String messageId) {
+        return (RmtMeta) redisTemplate.opsForHash().get(RmtConstants.MQ_MSG_READY, messageId);
+    }
+
+    @Override
+    public List<String> getPrepare() throws Exception {
+        SetOperations setOperations = redisTemplate.opsForSet();
+        Set<String> messageIds = setOperations.members(RmtConstants.MQ_MSG_PREPARE);
+        List<String> messageAlert = new ArrayList();
+        for (String messageId : messageIds) {
+            // 如果超时加入、超时消息队列
+            if (messageTimeOut(messageId)) {
+                messageAlert.add(messageId);
+            }
+        }
+        // 删除已超时的消息
+        setOperations.remove(RmtConstants.MQ_MSG_READY, messageAlert);
+        return messageAlert;
+    }
+
+    @Override
+    public List<RmtMeta> getReady() throws Exception {
+        HashOperations hashOperations = redisTemplate.opsForHash();
+        List<RmtMeta> messages = hashOperations.values(RmtConstants.MQ_MSG_READY);
+        List<RmtMeta> messageAlert = new ArrayList();
+        List<String> messageIds = new ArrayList<>();
+        for (RmtMeta message : messages) {
+            // 如果超时加入、超时消息队列
+            if (messageTimeOut(message.getMessageId())) {
+                messageIds.add(message.getMessageId());
+                messageAlert.add(message);
+            }
+        }
+        // 删除已超时的消息
+        hashOperations.delete(RmtConstants.MQ_MSG_READY, messageIds);
+        return messageAlert;
+    }
+
+    @Override
+    public Long incrResendKey(String key, String hashKey) {
+        return redisTemplate.opsForHash().increment(key, hashKey, 1);
+    }
+
+    @Override
+    public Long getResendValue(String key, String hashKey) {
+        return (Long) redisTemplate.opsForHash().get(key, hashKey);
+    }
+
+    protected boolean messageTimeOut(String messageId) {
+        return (System.currentTimeMillis() - Long.parseLong((messageId
+                .split(RmtConstants.DB_SPLIT))[1])) > RmtConstants.TIME_GAP;
+    }
+}

+ 11 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/daemon/RmtResendProcess.java

@@ -0,0 +1,11 @@
+package com.baomidou.mybatisplus.rmt.daemon;
+
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class RmtResendProcess {
+
+    // 处理任务重发
+
+}

+ 37 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/DeadLetterMessageListener.java

@@ -0,0 +1,37 @@
+package com.baomidou.mybatisplus.rmt.listener;
+
+import com.baomidou.mybatisplus.rmt.RmtConstants;
+import com.rabbitmq.client.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageProperties;
+import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DeadLetterMessageListener implements ChannelAwareMessageListener {
+    private Logger logger = LoggerFactory.getLogger(DeadLetterMessageListener.class);
+
+    @Autowired
+    private RedisTemplate<String, Object> redisTemplate;
+
+    @Override
+    public void onMessage(Message message, Channel channel) throws Exception {
+        MessageProperties messageProperties = message.getMessageProperties();
+        // 消息体
+        String messageBody = new String(message.getBody());
+
+        logger.warn("dead letter message:{} | tag:{}", messageBody, message.getMessageProperties().getDeliveryTag());
+			/*// 入库
+			insertRecord(logKey, message);
+			// 发邮件
+			sendEmail(logKey, messageProperties.getMessageId(), messageBody);*/
+
+        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+
+        redisTemplate.opsForHash().delete(RmtConstants.MQ_CONSUMER_RETRY_COUNT_KEY, messageProperties.getMessageId());
+    }
+}

+ 17 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/IRmtMessageListener.java

@@ -0,0 +1,17 @@
+package com.baomidou.mybatisplus.rmt.listener;
+
+/**
+ * 可靠消息事务,消息监听
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+public interface IRmtMessageListener<M> {
+
+    /**
+     * 接收消息对象处理
+     *
+     * @param object 接收到消息对象
+     */
+    void receive(M object);
+}

+ 53 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/RmtAbstractMessageListener.java

@@ -0,0 +1,53 @@
+
+package com.baomidou.mybatisplus.rmt.listener;
+
+import com.baomidou.mybatisplus.rmt.RmtConstants;
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageProperties;
+import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+
+/**
+ * RabbitMQ 抽象消息监听,所有消息消费者必须继承此类
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+@Slf4j
+public abstract class RmtAbstractMessageListener implements IRmtMessageListener<Message>, ChannelAwareMessageListener {
+    @Autowired
+    private RedisTemplate<String, Object> redisTemplate;
+
+    @Override
+    public void onMessage(Message message, Channel channel) throws Exception {
+        MessageProperties messageProperties = message.getMessageProperties();
+        Long deliveryTag = messageProperties.getDeliveryTag();
+        Long consumerCount = redisTemplate.opsForHash().increment(RmtConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
+                messageProperties.getMessageId(), 1);
+        log.debug("收到消息,当前消息ID:{} 消费次数:{}", messageProperties.getMessageId(), consumerCount);
+        try {
+            // 处理接收消息对象
+            this.receive(message);
+            // 成功的回执
+            channel.basicAck(deliveryTag, false);
+            // 如果消费成功,将Redis中统计消息消费次数的缓存删除
+            redisTemplate.opsForHash().delete(RmtConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
+                    messageProperties.getMessageId());
+        } catch (Exception e) {
+            log.error("RabbitMQ 消息消费失败," + e.getMessage(), e);
+            if (consumerCount >= RmtConstants.MAX_CONSUMER_COUNT) {
+                // 入死信队列
+                channel.basicReject(deliveryTag, false);
+            } else {
+                // 重回到队列,重新消费, 按照2的指数级递增
+                Thread.sleep((long) (Math.pow(RmtConstants.BASE_NUM, consumerCount) * 1000));
+                redisTemplate.opsForHash().increment(RmtConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
+                        messageProperties.getMessageId(), 1);
+                channel.basicNack(deliveryTag, false, true);
+            }
+        }
+    }
+}

+ 154 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/mq/RabbitConfiguration.java

@@ -0,0 +1,154 @@
+package com.baomidou.mybatisplus.rmt.mq;
+
+import com.baomidou.mybatisplus.rmt.coordinator.IRmtCoordinator;
+import com.baomidou.mybatisplus.rmt.coordinator.RedisRmtCoordinator;
+import com.baomidou.mybatisplus.rmt.parser.IRmtParser;
+import com.baomidou.mybatisplus.rmt.parser.JacksonRmtParser;
+import com.baomidou.mybatisplus.rmt.sender.RabbitRmtSender;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.AcknowledgeMode;
+import org.springframework.amqp.rabbit.annotation.EnableRabbit;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+/**
+ * Rabbit MQ 连接工厂配置
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+@Slf4j
+@EnableRabbit
+@Configuration
+public class RabbitConfiguration {
+    @Value("${spring.rabbitmq.host}")
+    String host;
+    @Value("${spring.rabbitmq.port}")
+    int port;
+    @Value("${spring.rabbitmq.username}")
+    String username;
+    @Value("${spring.rabbitmq.password}")
+    String password;
+    @Value("${spring.rabbitmq.virtual.host}")
+    String virtualHost;
+    @Value("${spring.rabbitmq.cache.channel.size}")
+    int cacheSize;
+
+    /**
+     * 创建RabbitMQ连接工厂
+     *
+     * @param
+     * @return CachingConnectionFactory
+     * @throws Exception 异常
+     */
+    @Bean
+    public CachingConnectionFactory rabbitConnectionFactory() throws Exception {
+        log.debug("custom rabbitmq connection factory");
+        RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
+        factory.setHost(host);
+        factory.setPort(port);
+        factory.setUsername(username);
+        factory.setPassword(password);
+        factory.setVirtualHost(virtualHost);
+        factory.setConnectionTimeout(60000);
+        factory.setAutomaticRecoveryEnabled(true);
+        factory.afterPropertiesSet();
+        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory.getObject());
+        connectionFactory.setPublisherReturns(true);
+        connectionFactory.setPublisherConfirms(true);
+        connectionFactory.setChannelCacheSize(cacheSize);
+        return connectionFactory;
+    }
+
+    @Bean
+    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
+        log.debug("custom rabbitmq Listener factory: {}", connectionFactory);
+        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
+        factory.setConnectionFactory(connectionFactory);
+        factory.setConcurrentConsumers(3);
+        factory.setMaxConcurrentConsumers(10);
+        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        return factory;
+    }
+
+    /**
+     * <p>
+     * 配置可靠消息事务发送者
+     * </p>
+     *
+     * @return
+     */
+    @Bean
+    public RabbitTransactionalAspect rabbitTransactionalAspect() {
+        return new RabbitTransactionalAspect();
+    }
+
+    @Bean
+    public RedisRmtCoordinator redisRmtCoordinator() {
+        return new RedisRmtCoordinator();
+    }
+
+    @Bean
+    public JacksonRmtParser rmtParser() {
+        return new JacksonRmtParser();
+    }
+
+    @Bean
+    public RabbitRmtSender rmtSender() {
+        return new RabbitRmtSender();
+    }
+
+    @Autowired
+    private IRmtCoordinator rmtCoordinator;
+
+    @Autowired
+    private IRmtParser rmtParser;
+
+    boolean returnFlag = false;
+
+    @Bean
+    public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
+        log.debug("==> custom rabbitTemplate, connectionFactory:" + connectionFactory);
+        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
+        rabbitTemplate.setMessageConverter(rmtParser.getMessageConverter());
+        rabbitTemplate.setMandatory(true);
+        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
+            if (returnFlag) {
+                log.error("mq发送错误,无对应的的交换机,confirm回掉,ack={},correlationData={} cause={}",
+                        ack, correlationData, cause);
+            }
+
+            log.debug("confirm回调,ack={} correlationData={} cause={}", ack, correlationData, cause);
+            String msgId = correlationData.getId();
+
+            /** 只要消息能投入正确的消息队列,并持久化,就返回ack为true*/
+            if (ack) {
+                log.debug("消息已正确投递到队列, correlationData:{}", correlationData);
+                rmtCoordinator.setSuccess(msgId);
+            } else {
+                log.error("消息投递至交换机失败,业务号:{},原因:{}", correlationData.getId(), cause);
+            }
+
+        });
+
+        //消息发送到RabbitMQ交换器,但无相应Exchange时的回调
+        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
+            String messageId = message.getMessageProperties().getMessageId();
+            log.error("return回调,没有找到任何匹配的队列!message id:{},replyCode{},replyText:{},"
+                    + "exchange:{},routingKey{}", messageId, replyCode, replyText, exchange, routingKey);
+            returnFlag = true;
+        });
+
+        // rabbitTemplate.waitForConfirms(RmtConstants.TIME_GAP);
+        return rabbitTemplate;
+    }
+}
+

+ 73 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/mq/RabbitQueueConfiguration.java

@@ -0,0 +1,73 @@
+package com.baomidou.mybatisplus.rmt.mq;
+
+import com.baomidou.mybatisplus.rmt.RmtConstants;
+import com.baomidou.mybatisplus.rmt.listener.DeadLetterMessageListener;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+/**
+ * RabbitMQ交换机、队列的配置类.定义交换机、key、queue并做好绑定。
+ * 同时定义每个队列的ttl,队列最大长度,Qos等等
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+@Configuration
+public class RabbitQueueConfiguration {
+
+
+    /**
+     * 死信交换机
+     */
+    @Bean
+    public DirectExchange dlxExchange() {
+        return new DirectExchange(RmtConstants.DLX_EXCHANGE);
+    }
+
+    /**
+     * 死信队列
+     */
+    @Bean
+    public Queue dlxQueue() {
+        return new Queue(RmtConstants.DLX_QUEUE,true,false,false);
+    }
+
+    /**
+     * 通过死信路由key绑定死信交换机和死信队列
+     */
+    @Bean
+    public Binding dlxBinding() {
+        return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
+                .with(RmtConstants.DLX_ROUTING_KEY);
+    }
+
+    @Bean
+    public DeadLetterMessageListener deadLetterMessageListener() {
+        return new DeadLetterMessageListener();
+    }
+
+    /**
+     *
+     * 死信队列的监听
+     *
+     * @param connectionFactory RabbitMQ连接工厂
+     * @param deadLetterMessageListener  死信队列监听
+     * @return 监听容器对象
+     */
+    @Bean
+    public SimpleMessageListenerContainer deadLetterListenerContainer(ConnectionFactory connectionFactory,
+    		DeadLetterMessageListener deadLetterMessageListener) {
+        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
+        container.setQueues(dlxQueue());
+        container.setExposeListenerChannel(true);
+        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        container.setMessageListener(deadLetterMessageListener);
+        // 设置消费者能处理消息的最大个数
+        container.setPrefetchCount(100);
+        return container;
+    }
+}

+ 74 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/mq/RabbitTransactionalAspect.java

@@ -0,0 +1,74 @@
+package com.baomidou.mybatisplus.rmt.mq;
+
+import com.baomidou.mybatisplus.rmt.RmtConstants;
+import com.baomidou.mybatisplus.rmt.RmtMeta;
+import com.baomidou.mybatisplus.rmt.annotation.RmTransactional;
+import com.baomidou.mybatisplus.rmt.coordinator.IRmtCoordinator;
+import com.baomidou.mybatisplus.rmt.sender.IRmtSender;
+import lombok.extern.slf4j.Slf4j;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.springframework.beans.factory.annotation.Autowired;
+
+
+/**
+ * <p>
+ * 可靠消息事务切面
+ * </p>
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+@Slf4j
+@Aspect
+public class RabbitTransactionalAspect {
+    @Autowired
+    private IRmtCoordinator rmtCoordinator;
+    @Autowired
+    private IRmtSender rmtSender;
+
+    @Around(value = "@annotation(rmTransactional)")
+    public void around(ProceedingJoinPoint joinPoint, RmTransactional rmTransactional) throws Throwable {
+        // 消息 ID
+        String messageId = rmTransactional.value() + RmtConstants.DB_SPLIT + System.currentTimeMillis();
+
+        /**
+         * 发送前暂存消息
+         */
+        rmtCoordinator.setPrepare(messageId);
+
+        Object returnObj;
+        try {
+            /**
+             * 执行业务函数
+             */
+            returnObj = joinPoint.proceed();
+        } catch (Exception e) {
+            log.error("joinPoint proceed error! messageId: {}", messageId);
+            throw e;
+        }
+
+        if (returnObj == null) {
+            returnObj = RmtConstants.BLANK_STR;
+        }
+
+        // 生成可靠消息元数据
+        RmtMeta rmtMeta = new RmtMeta();
+        rmtMeta.setMessageId(messageId);
+        rmtMeta.setExchange(rmTransactional.exchange());
+        rmtMeta.setRoutingKey(rmTransactional.routingKey());
+        rmtMeta.setPayload(returnObj);
+
+        // 将消息设置为ready状态
+        rmtCoordinator.setReady(messageId, rmtMeta);
+
+        try {
+            rmtSender.send(rmtMeta);
+        } catch (Exception e) {
+            log.error("The first stage message is sent error. messageId: {} , {}",
+                    messageId, e.getMessage());
+            throw e;
+        }
+    }
+}

+ 39 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/parser/IRmtParser.java

@@ -0,0 +1,39 @@
+package com.baomidou.mybatisplus.rmt.parser;
+
+import org.springframework.amqp.support.converter.MessageConverter;
+
+/**
+ * 可靠消息事务解析器
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+public interface IRmtParser {
+
+    /**
+     * 消息转换器
+     *
+     * @return
+     */
+    MessageConverter getMessageConverter();
+
+    /**
+     * JSON 字符串转为对象
+     *
+     * @param jsonStr   JSON 字符串
+     * @param valueType 转换对象类
+     * @param <T>
+     * @return
+     * @throws Exception
+     */
+    <T> T readValue(String jsonStr, Class<T> valueType) throws Exception;
+
+    /**
+     * 对象转换为 JSON 字符串
+     *
+     * @param object 转换对象
+     * @return
+     * @throws Exception
+     */
+    String toJSONString(Object object) throws Exception;
+}

+ 43 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/parser/JacksonRmtParser.java

@@ -0,0 +1,43 @@
+package com.baomidou.mybatisplus.rmt.parser;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.amqp.support.converter.MessageConverter;
+
+/**
+ * Jackson 可靠消息解析器
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+public class JacksonRmtParser implements IRmtParser {
+
+    private static ObjectMapper objectMapper;
+
+    public static ObjectMapper getObjectMapper() {
+        if (objectMapper == null) {
+            objectMapper = new ObjectMapper();
+            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        }
+        return objectMapper;
+    }
+
+    @Override
+    public MessageConverter getMessageConverter() {
+        return new Jackson2JsonMessageConverter();
+    }
+
+    @Override
+    public <T> T readValue(String jsonStr, Class<T> valueType) throws Exception {
+        if (null == jsonStr || "".equals(jsonStr)) {
+            return null;
+        }
+        return getObjectMapper().readValue(jsonStr, valueType);
+    }
+
+    @Override
+    public String toJSONString(Object object) throws Exception {
+        return getObjectMapper().writeValueAsString(object);
+    }
+}

+ 24 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/sender/IRmtSender.java

@@ -0,0 +1,24 @@
+package com.baomidou.mybatisplus.rmt.sender;
+
+import com.baomidou.mybatisplus.rmt.RmtMeta;
+
+/**
+ * <p>
+ * 可靠消息发送者
+ * </p>
+ *
+ * @author jobob
+ * @since 2019-04-17
+ */
+public interface IRmtSender {
+
+    /**
+     * <p>
+     * 发送消息
+     * </p>
+     *
+     * @param rmtMeta         可靠消息元数据
+     * @return 消息ID
+     */
+    String send(RmtMeta rmtMeta);
+}

+ 63 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/sender/RabbitRmtSender.java

@@ -0,0 +1,63 @@
+package com.baomidou.mybatisplus.rmt.sender;
+
+import com.baomidou.mybatisplus.rmt.RmtMeta;
+import com.baomidou.mybatisplus.rmt.parser.IRmtParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.AmqpException;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageDeliveryMode;
+import org.springframework.amqp.core.MessagePostProcessor;
+import org.springframework.amqp.core.MessageProperties;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.support.CorrelationData;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.UUID;
+
+/**
+ * <p>
+ * RabbitMQ 消息发送者
+ * </p>
+ *
+ * @author hubin
+ * @since 2019-04-17
+ */
+@Slf4j
+public class RabbitRmtSender implements IRmtSender {
+    @Autowired
+    private IRmtParser rmtParser;
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    /**
+     * 发送MQ消息
+     *
+     * @param rmtMeta Rabbit元信息对象,用于存储交换器、队列名、消息体
+     * @return 消息ID
+     * @throws JsonProcessingException
+     */
+    @Override
+    public String send(RmtMeta rmtMeta) {
+        final String messageId = UUID.randomUUID().toString();
+        MessagePostProcessor messagePostProcessor = message -> {
+            message.getMessageProperties().setMessageId(messageId);
+            // 设置消息持久化
+            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+            return message;
+        };
+        try {
+            MessageProperties messageProperties = new MessageProperties();
+            messageProperties.setContentType("application/json");
+            Message message = new Message(rmtParser.toJSONString(rmtMeta.getPayload()).getBytes(), messageProperties);
+            rabbitTemplate.convertAndSend(rmtMeta.getExchange(), rmtMeta.getRoutingKey(),
+                    message, messagePostProcessor, new CorrelationData(messageId));
+            log.info("发送消息,消息ID:{}", messageId);
+            return messageId;
+        } catch (AmqpException e) {
+            throw new RuntimeException("发送RabbitMQ消息失败!", e);
+        } catch (Exception e) {
+            throw new RuntimeException("发送RabbitMQ消息失败!", e);
+        }
+    }
+}

+ 1 - 0
settings.gradle

@@ -4,4 +4,5 @@ include 'mybatis-plus-core'
 include 'mybatis-plus-annotation'
 include 'mybatis-plus-extension'
 include 'mybatis-plus-generator'
+include 'mybatis-plus-rmt'
 include 'mybatis-plus-boot-starter'