Преглед изворни кода

优化可靠消息分布式事务

hubin пре 6 година
родитељ
комит
d1510d1314

+ 16 - 3
mybatis-plus-rmt/build.gradle

@@ -1,6 +1,19 @@
+ext{
+    springBootVersion = "2.1.2.RELEASE"
+}
 dependencies {
-    implementation "${lib.'spring-rabbit'}"
-    implementation "${lib.'spring-data-redis'}"
-    implementation "${lib.'jackson-databind'}"
+    implementation(enforcedPlatform("org.springframework.boot:spring-boot-dependencies:${springBootVersion}" as String))
+    annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}"
+    annotationProcessor "org.springframework.boot:spring-boot-autoconfigure-processor:${springBootVersion}"
+    api 'org.springframework.boot:spring-boot-autoconfigure'
+    api 'org.springframework.boot:spring-boot-starter-jdbc'
     implementation "${lib.aspectjrt}"
+    implementation 'org.springframework.boot:spring-boot-starter-web'
+    implementation 'org.springframework.boot:spring-boot-starter-amqp'
+    implementation 'org.springframework.boot:spring-boot-starter-data-redis'
+    implementation 'org.springframework.boot:spring-boot-configuration-processor'
+    implementation 'org.springframework.boot:spring-boot-autoconfigure-processor'
+    testImplementation 'org.springframework.boot:spring-boot-starter-test'
 }
+
+compileJava.dependsOn(processResources)

+ 18 - 21
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/RmtConstants.java

@@ -9,7 +9,24 @@ package com.baomidou.mybatisplus.rmt;
  * @since 2019-04-18
  */
 public interface RmtConstants {
+    /**
+     * 队列配置
+     */
+    String EXCHANGE = "rmt.exchange";
+    String QUEUE = "rmt.queue";
+    String ROUTING_KEY = "rmt.routing.key";
+    /**
+     * 死信队列配置
+     */
+    String DL_EXCHANGE = "rmt.dl.exchange";
+    String DL_QUEUE = "rmt.dl.queue";
+    String DL_ROUTING_KEY = "dlx.routing.key";
+
 
+    /**
+     * 默认 KEY
+     */
+    String KEY = "rmt.key";
     /**
      * 消息重发计数
      */
@@ -40,30 +57,10 @@ public interface RmtConstants {
      */
     Object MQ_MSG_PREPARE = "mq.msg.prepare";
 
-    /**
-     * 默认交换机名称
-     */
-    String EXCHANGE = "rmt.exchange";
-
-    /**
-     * 默认队列名称
-     */
-    String QUEUE = "rmt.queue";
-
-    /**
-     * 默认 KEY
-     */
-    String KEY = "rmt.key";
-
 
     String MQ_PRODUCER_RETRY_KEY = "mq.producer.retry.key";
     String MQ_CONSUMER_RETRY_COUNT_KEY = "mq.consumer.retry.count.key";
-    /**
-     * 死信队列配置
-     */
-    String DLX_EXCHANGE = "dlx.exchange";
-    String DLX_QUEUE = "dlx.queue";
-    String DLX_ROUTING_KEY = "dlx.routing.key";
+
     /**
      * 发送端重试乘数(ms)
      */

+ 1 - 2
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/config/EnableRmtRabbit.java

@@ -1,7 +1,6 @@
 package com.baomidou.mybatisplus.rmt.config;
 
 import com.baomidou.mybatisplus.rmt.mq.RabbitConfiguration;
-import com.baomidou.mybatisplus.rmt.mq.RabbitQueueConfiguration;
 import org.springframework.context.annotation.Import;
 
 import java.lang.annotation.Documented;
@@ -20,7 +19,7 @@ import java.lang.annotation.Target;
 @Retention(value = java.lang.annotation.RetentionPolicy.RUNTIME)
 @Target(value = { java.lang.annotation.ElementType.TYPE })
 @Documented
-@Import({RabbitConfiguration.class, RabbitQueueConfiguration.class})
+@Import({RabbitConfiguration.class})
 public @interface EnableRmtRabbit {
 
 }

+ 0 - 37
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/DeadLetterMessageListener.java

@@ -1,37 +0,0 @@
-package com.baomidou.mybatisplus.rmt.listener;
-
-import com.baomidou.mybatisplus.rmt.RmtConstants;
-import com.rabbitmq.client.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.core.MessageProperties;
-import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.stereotype.Component;
-
-@Component
-public class DeadLetterMessageListener implements ChannelAwareMessageListener {
-    private Logger logger = LoggerFactory.getLogger(DeadLetterMessageListener.class);
-
-    @Autowired
-    private RedisTemplate<String, Object> redisTemplate;
-
-    @Override
-    public void onMessage(Message message, Channel channel) throws Exception {
-        MessageProperties messageProperties = message.getMessageProperties();
-        // 消息体
-        String messageBody = new String(message.getBody());
-
-        logger.warn("dead letter message:{} | tag:{}", messageBody, message.getMessageProperties().getDeliveryTag());
-			/*// 入库
-			insertRecord(logKey, message);
-			// 发邮件
-			sendEmail(logKey, messageProperties.getMessageId(), messageBody);*/
-
-        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-
-        redisTemplate.opsForHash().delete(RmtConstants.MQ_CONSUMER_RETRY_COUNT_KEY, messageProperties.getMessageId());
-    }
-}

+ 17 - 0
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/IRmtListener.java

@@ -0,0 +1,17 @@
+package com.baomidou.mybatisplus.rmt.listener;
+
+/**
+ * 可靠消息事务监听
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+public interface IRmtListener<M> {
+
+    /**
+     * 接收对象处理
+     *
+     * @param object 接收到对象
+     */
+    void receive(M object);
+}

+ 0 - 17
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/IRmtMessageListener.java

@@ -1,17 +0,0 @@
-package com.baomidou.mybatisplus.rmt.listener;
-
-/**
- * 可靠消息事务,消息监听
- *
- * @author jobob
- * @since 2019-04-18
- */
-public interface IRmtMessageListener<M> {
-
-    /**
-     * 接收消息对象处理
-     *
-     * @param object 接收到消息对象
-     */
-    void receive(M object);
-}

+ 2 - 2
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/RmtAbstractMessageListener.java → mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/listener/RmtMessageListener.java

@@ -6,7 +6,7 @@ import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.MessageProperties;
-import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
+import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
 
@@ -17,7 +17,7 @@ import org.springframework.data.redis.core.RedisTemplate;
  * @since 2019-04-18
  */
 @Slf4j
-public abstract class RmtAbstractMessageListener implements IRmtMessageListener<Message>, ChannelAwareMessageListener {
+public abstract class RmtMessageListener implements IRmtListener<Message>, ChannelAwareMessageListener {
     @Autowired
     private RedisTemplate<String, Object> redisTemplate;
 

+ 51 - 101
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/mq/RabbitConfiguration.java

@@ -1,82 +1,78 @@
 package com.baomidou.mybatisplus.rmt.mq;
 
-import com.baomidou.mybatisplus.rmt.coordinator.IRmtCoordinator;
+import com.baomidou.mybatisplus.rmt.RmtConstants;
 import com.baomidou.mybatisplus.rmt.coordinator.RedisRmtCoordinator;
-import com.baomidou.mybatisplus.rmt.parser.IRmtParser;
 import com.baomidou.mybatisplus.rmt.parser.JacksonRmtParser;
 import com.baomidou.mybatisplus.rmt.sender.RabbitRmtSender;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.AcknowledgeMode;
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
 import org.springframework.amqp.rabbit.annotation.EnableRabbit;
-import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
-import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.transaction.PlatformTransactionManager;
 
+import javax.annotation.PostConstruct;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
- * Rabbit MQ 连接工厂配置
+ * Rabbit MQ 可靠消息配置
  *
  * @author jobob
- * @since 2019-04-18
+ * @since 2019-04-19
  */
-@Slf4j
-@EnableRabbit
 @Configuration
+@ConditionalOnClass(EnableRabbit.class)
 public class RabbitConfiguration {
-    @Value("${spring.rabbitmq.host}")
-    String host;
-    @Value("${spring.rabbitmq.port}")
-    int port;
-    @Value("${spring.rabbitmq.username}")
-    String username;
-    @Value("${spring.rabbitmq.password}")
-    String password;
-    @Value("${spring.rabbitmq.virtual.host}")
-    String virtualHost;
-    @Value("${spring.rabbitmq.cache.channel.size}")
-    int cacheSize;
+    @Autowired
+    protected PlatformTransactionManager transactionManager;
+    @Autowired
+    protected RabbitTemplate rabbitTemplate;
+    @Autowired
+    protected RabbitAdmin rabbitAdmin;
 
-    /**
-     * 创建RabbitMQ连接工厂
-     *
-     * @param
-     * @return CachingConnectionFactory
-     * @throws Exception 异常
-     */
     @Bean
-    public CachingConnectionFactory rabbitConnectionFactory() throws Exception {
-        log.debug("custom rabbitmq connection factory");
-        RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
-        factory.setHost(host);
-        factory.setPort(port);
-        factory.setUsername(username);
-        factory.setPassword(password);
-        factory.setVirtualHost(virtualHost);
-        factory.setConnectionTimeout(60000);
-        factory.setAutomaticRecoveryEnabled(true);
-        factory.afterPropertiesSet();
-        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory.getObject());
-        connectionFactory.setPublisherReturns(true);
-        connectionFactory.setPublisherConfirms(true);
-        connectionFactory.setChannelCacheSize(cacheSize);
-        return connectionFactory;
+    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
+        return new RabbitTransactionManager(connectionFactory);
     }
 
     @Bean
-    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
-        log.debug("custom rabbitmq Listener factory: {}", connectionFactory);
-        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
-        factory.setConnectionFactory(connectionFactory);
-        factory.setConcurrentConsumers(3);
-        factory.setMaxConcurrentConsumers(10);
-        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
-        return factory;
+    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
+        return new RabbitAdmin(connectionFactory);
+    }
+
+    @PostConstruct
+    protected void init() {
+        /**
+         * make rabbit template to support transactions
+         */
+        rabbitTemplate.setChannelTransacted(true);
+
+        /**
+         * init queue
+         */
+        // define deadletter exchange and queue
+        rabbitAdmin.declareExchange(new DirectExchange(RmtConstants.DL_EXCHANGE, true, false));
+        rabbitAdmin.declareQueue(new Queue(RmtConstants.DL_QUEUE, true, false, false, null));
+        rabbitAdmin.declareBinding(new Binding(RmtConstants.DL_QUEUE, Binding.DestinationType.QUEUE, RmtConstants.DL_EXCHANGE, RmtConstants.DL_ROUTING_KEY, null));
+
+        // define simple exchange, queue with deadletter support and binding
+        rabbitAdmin.declareExchange(new TopicExchange(RmtConstants.EXCHANGE, true, false));
+        Map<String, Object> args = new HashMap<>(2);
+        args.put("x-dead-letter-exchange", RmtConstants.DL_EXCHANGE);
+        args.put("x-dead-letter-routing-key", RmtConstants.DL_ROUTING_KEY);
+        rabbitAdmin.declareQueue(new Queue(RmtConstants.QUEUE, true, false, true, args));
+
+        // declare binding
+        rabbitAdmin.declareBinding(new Binding(RmtConstants.QUEUE, Binding.DestinationType.QUEUE, RmtConstants.EXCHANGE, RmtConstants.ROUTING_KEY, null));
     }
 
     /**
@@ -105,50 +101,4 @@ public class RabbitConfiguration {
     public RabbitRmtSender rmtSender() {
         return new RabbitRmtSender();
     }
-
-    @Autowired
-    private IRmtCoordinator rmtCoordinator;
-
-    @Autowired
-    private IRmtParser rmtParser;
-
-    boolean returnFlag = false;
-
-    @Bean
-    public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
-        log.debug("==> custom rabbitTemplate, connectionFactory:" + connectionFactory);
-        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
-        rabbitTemplate.setMessageConverter(rmtParser.getMessageConverter());
-        rabbitTemplate.setMandatory(true);
-        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
-            if (returnFlag) {
-                log.error("mq发送错误,无对应的的交换机,confirm回掉,ack={},correlationData={} cause={}",
-                        ack, correlationData, cause);
-            }
-
-            log.debug("confirm回调,ack={} correlationData={} cause={}", ack, correlationData, cause);
-            String msgId = correlationData.getId();
-
-            /** 只要消息能投入正确的消息队列,并持久化,就返回ack为true*/
-            if (ack) {
-                log.debug("消息已正确投递到队列, correlationData:{}", correlationData);
-                rmtCoordinator.setSuccess(msgId);
-            } else {
-                log.error("消息投递至交换机失败,业务号:{},原因:{}", correlationData.getId(), cause);
-            }
-
-        });
-
-        //消息发送到RabbitMQ交换器,但无相应Exchange时的回调
-        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
-            String messageId = message.getMessageProperties().getMessageId();
-            log.error("return回调,没有找到任何匹配的队列!message id:{},replyCode{},replyText:{},"
-                    + "exchange:{},routingKey{}", messageId, replyCode, replyText, exchange, routingKey);
-            returnFlag = true;
-        });
-
-        // rabbitTemplate.waitForConfirms(RmtConstants.TIME_GAP);
-        return rabbitTemplate;
-    }
 }
-

+ 0 - 73
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/mq/RabbitQueueConfiguration.java

@@ -1,73 +0,0 @@
-package com.baomidou.mybatisplus.rmt.mq;
-
-import com.baomidou.mybatisplus.rmt.RmtConstants;
-import com.baomidou.mybatisplus.rmt.listener.DeadLetterMessageListener;
-import org.springframework.amqp.core.*;
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-
-/**
- * RabbitMQ交换机、队列的配置类.定义交换机、key、queue并做好绑定。
- * 同时定义每个队列的ttl,队列最大长度,Qos等等
- *
- * @author jobob
- * @since 2019-04-18
- */
-@Configuration
-public class RabbitQueueConfiguration {
-
-
-    /**
-     * 死信交换机
-     */
-    @Bean
-    public DirectExchange dlxExchange() {
-        return new DirectExchange(RmtConstants.DLX_EXCHANGE);
-    }
-
-    /**
-     * 死信队列
-     */
-    @Bean
-    public Queue dlxQueue() {
-        return new Queue(RmtConstants.DLX_QUEUE,true,false,false);
-    }
-
-    /**
-     * 通过死信路由key绑定死信交换机和死信队列
-     */
-    @Bean
-    public Binding dlxBinding() {
-        return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
-                .with(RmtConstants.DLX_ROUTING_KEY);
-    }
-
-    @Bean
-    public DeadLetterMessageListener deadLetterMessageListener() {
-        return new DeadLetterMessageListener();
-    }
-
-    /**
-     *
-     * 死信队列的监听
-     *
-     * @param connectionFactory RabbitMQ连接工厂
-     * @param deadLetterMessageListener  死信队列监听
-     * @return 监听容器对象
-     */
-    @Bean
-    public SimpleMessageListenerContainer deadLetterListenerContainer(ConnectionFactory connectionFactory,
-    		DeadLetterMessageListener deadLetterMessageListener) {
-        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
-        container.setQueues(dlxQueue());
-        container.setExposeListenerChannel(true);
-        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
-        container.setMessageListener(deadLetterMessageListener);
-        // 设置消费者能处理消息的最大个数
-        container.setPrefetchCount(100);
-        return container;
-    }
-}

+ 1 - 1
mybatis-plus-rmt/src/main/java/com/baomidou/mybatisplus/rmt/sender/RabbitRmtSender.java

@@ -9,8 +9,8 @@ import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.MessageDeliveryMode;
 import org.springframework.amqp.core.MessagePostProcessor;
 import org.springframework.amqp.core.MessageProperties;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.amqp.rabbit.support.CorrelationData;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.UUID;