hubin 6 лет назад
Родитель
Сommit
86a1308f09

+ 0 - 6
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/config/DtsAutoConfiguration.java

@@ -16,7 +16,6 @@
 package com.baomidou.mybatisplus.dts.config;
 
 import com.baomidou.mybatisplus.dts.parser.JacksonDtsParser;
-import com.baomidou.mybatisplus.dts.sender.RabbitRmtSender;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -34,9 +33,4 @@ public class DtsAutoConfiguration {
         return new JacksonDtsParser();
     }
 
-    @Bean
-    public RabbitRmtSender rmtSender() {
-        return new RabbitRmtSender();
-    }
-
 }

+ 14 - 0
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/config/RabbitConfiguration.java

@@ -16,6 +16,8 @@
 package com.baomidou.mybatisplus.dts.config;
 
 import com.baomidou.mybatisplus.dts.DtsConstants;
+import com.baomidou.mybatisplus.dts.listener.RabbitRmtListener;
+import com.baomidou.mybatisplus.dts.sender.RabbitRmtSender;
 import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.DirectExchange;
 import org.springframework.amqp.core.Queue;
@@ -27,6 +29,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -49,6 +52,17 @@ public class RabbitConfiguration {
     protected RabbitAdmin rabbitAdmin;
 
     @Bean
+    public RabbitRmtSender rmtSender() {
+        return new RabbitRmtSender();
+    }
+
+    @Bean
+    public RabbitRmtListener rabbitRmtListener() {
+        return new RabbitRmtListener();
+    }
+
+    @Bean
+    @ConditionalOnMissingClass("org.springframework.jdbc.datasource.DataSourceTransactionManager")
     public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
         return new RabbitTransactionManager(connectionFactory);
     }

+ 6 - 4
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/listener/IDtsListener.java

@@ -15,18 +15,20 @@
  */
 package com.baomidou.mybatisplus.dts.listener;
 
+import com.baomidou.mybatisplus.dts.DtsMeta;
+
 /**
  * 分布式事务监听
  *
  * @author jobob
  * @since 2019-04-18
  */
-public interface IDtsListener<T> {
+public interface IDtsListener {
 
     /**
-     * 处理对象
+     * 监听处理
      *
-     * @param object 待处理对象
+     * @param dtsMeta 分布式元数据对象
      */
-    void process(T object);
+    void process(DtsMeta dtsMeta);
 }

+ 18 - 8
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/listener/RmtMessageListener.java → mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/listener/RabbitRmtListener.java

@@ -15,35 +15,45 @@
  */
 package com.baomidou.mybatisplus.dts.listener;
 
+import com.baomidou.mybatisplus.core.toolkit.ExceptionUtils;
 import com.baomidou.mybatisplus.dts.DtsConstants;
 import com.baomidou.mybatisplus.dts.DtsMeta;
 import com.baomidou.mybatisplus.dts.parser.IDtsParser;
 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.List;
+
 /**
- * RabbitMQ 抽象消息监听
+ * Rabbit 可靠消息事务监听
  *
  * @author jobob
  * @since 2019-04-18
  */
+@Component
 @RabbitListener(queues = {DtsConstants.RABBIT_QUEUE})
-public abstract class RmtMessageListener implements IDtsListener<String> {
+public class RabbitRmtListener {
     @Autowired
     private IDtsParser dtsParser;
+    @Autowired
+    private List<IDtsListener> dtsListenerList;
 
+    /**
+     * 解析处理,接收消息对象
+     *
+     * @param event rabbit 消息
+     */
     @RabbitHandler
     @Transactional(rollbackFor = Exception.class)
-    @Override
-    public void process(String event) {
+    public void receive(String event) {
         try {
-            receive(dtsParser.readValue(event, DtsMeta.class));
+            DtsMeta dtsMeta = dtsParser.readValue(event, DtsMeta.class);
+            dtsListenerList.forEach(d -> d.process(dtsMeta));
         } catch (Exception e) {
-            e.printStackTrace();
+            ExceptionUtils.mpe("rmt parser error, event: %s", e, event);
         }
     }
-
-    public abstract void receive(DtsMeta dtsMeta);
 }

+ 7 - 6
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/sender/RabbitRmtSender.java

@@ -15,6 +15,7 @@
  */
 package com.baomidou.mybatisplus.dts.sender;
 
+import com.baomidou.mybatisplus.core.toolkit.ExceptionUtils;
 import com.baomidou.mybatisplus.dts.DtsConstants;
 import com.baomidou.mybatisplus.dts.DtsMeta;
 import com.baomidou.mybatisplus.dts.parser.IDtsParser;
@@ -45,18 +46,18 @@ public class RabbitRmtSender implements IRmtSender {
      * 发送MQ消息
      *
      * @param dtsMeta Rabbit元信息对象,用于存储交换器、队列名、消息体
-     * @return 消息ID
-     * @throws JsonProcessingException
      */
     @Override
     public void send(DtsMeta dtsMeta) {
+        String object = null;
         try {
-            rabbitTemplate.convertAndSend(DtsConstants.RABBIT_EXCHANGE, DtsConstants.RABBIT_ROUTINGKEY,
-                rmtParser.toJSONString(dtsMeta.getPayload()));
+            object = rmtParser.toJSONString(dtsMeta);
+            rabbitTemplate.convertAndSend(DtsConstants.RABBIT_EXCHANGE,
+                DtsConstants.RABBIT_ROUTINGKEY, object);
         } catch (AmqpException e) {
-            throw new RuntimeException("发送RabbitMQ消息失败", e);
+            ExceptionUtils.mpe("rabbit send error, dtsMeta: %s", e, object);
         } catch (Exception e) {
-            throw new RuntimeException("对象解析异常", e);
+            ExceptionUtils.mpe("rmt parser error, dtsMeta.event: %s", e, dtsMeta.getEvent());
         }
     }
 }