Quellcode durchsuchen

分布式事务改进优化

hubin vor 6 Jahren
Ursprung
Commit
2c261a492a
18 geänderte Dateien mit 408 neuen und 244 gelöschten Zeilen
  1. 1 0
      mybatis-plus-dts/build.gradle
  2. 21 6
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/DtsConstants.java
  3. 21 4
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/DtsMeta.java
  4. 18 2
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/EnableDtsRabbit.java
  5. 19 4
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/config/DtsAutoConfiguration.java
  6. 83 0
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/config/RabbitConfiguration.java
  7. 32 0
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/listener/IDtsListener.java
  8. 49 0
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/listener/RmtMessageListener.java
  9. 45 0
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/IDtsParser.java
  10. 0 39
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/IRmtParser.java
  11. 18 8
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/JacksonDtsParser.java
  12. 0 60
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/RabbitConfiguration.java
  13. 0 17
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/listener/IRmtListener.java
  14. 0 16
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/listener/RmtMessageListener.java
  15. 0 24
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/sender/IRmtSender.java
  16. 0 64
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/sender/RabbitRmtSender.java
  17. 39 0
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/sender/IRmtSender.java
  18. 62 0
      mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/sender/RabbitRmtSender.java

+ 1 - 0
mybatis-plus-dts/build.gradle

@@ -2,6 +2,7 @@ ext{
     springBootVersion = "2.1.2.RELEASE"
 }
 dependencies {
+    api project(":mybatis-plus-core")
     implementation(enforcedPlatform("org.springframework.boot:spring-boot-dependencies:${springBootVersion}" as String))
     annotationProcessor "org.springframework.boot:spring-boot-configuration-processor:${springBootVersion}"
     annotationProcessor "org.springframework.boot:spring-boot-autoconfigure-processor:${springBootVersion}"

+ 21 - 6
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/DtsConstants.java

@@ -1,3 +1,18 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package com.baomidou.mybatisplus.dts;
 
 /**
@@ -12,14 +27,14 @@ public interface DtsConstants {
     /**
      * 队列配置
      */
-    String EXCHANGE = "rabbit-exchange";
-    String QUEUE = "rabbit-queue";
-    String ROUTING_KEY = "rabbit-routing-key";
+    String RABBIT_EXCHANGE = "dts-rmt-exchange";
+    String RABBIT_QUEUE = "dts-rmt-queue";
+    String RABBIT_ROUTINGKEY = "dts-rmt-routingkey";
     /**
      * 死信队列配置
      */
-    String DL_EXCHANGE = "rabbit-dl-exchange";
-    String DL_QUEUE = "rabbit-dl-queue";
-    String DL_ROUTING_KEY = "rabbit-dl-routing-key";
+    String RABBIT_DEADLETTER_EXCHANGE = "dts-rmt-deadletter-exchange";
+    String RABBIT_DEADLETTER_QUEUE = "dts-rmt-deadletter-queue";
+    String RABBIT_DEADLETTER_ROUTINGKEY = "dts-rmt-deadletter-routingkey";
 
 }

+ 21 - 4
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/DtsMeta.java

@@ -1,21 +1,38 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package com.baomidou.mybatisplus.dts;
 
 import lombok.Data;
+import lombok.experimental.Accessors;
 
 /**
  * <p>
- * 可靠消息元数据
+ * 分布式事务元数据
  * </p>
  *
  * @author jobob
- * @since 2019-04-17
+ * @since 2019-04-20
  */
 @Data
+@Accessors(chain = true)
 public class DtsMeta {
     /**
-     * 业务 KEY
+     * 执行事件
      */
-    String key;
+    String event;
     /**
      * 消息内容
      */

+ 18 - 2
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/EnableDtsRabbit.java

@@ -1,6 +1,22 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package com.baomidou.mybatisplus.dts;
 
-import com.baomidou.mybatisplus.dts.rabbit.RabbitConfiguration;
+import com.baomidou.mybatisplus.dts.config.DtsAutoConfiguration;
+import com.baomidou.mybatisplus.dts.config.RabbitConfiguration;
 import org.springframework.context.annotation.Import;
 
 import java.lang.annotation.Documented;
@@ -19,7 +35,7 @@ import java.lang.annotation.Target;
 @Retention(value = java.lang.annotation.RetentionPolicy.RUNTIME)
 @Target(value = { java.lang.annotation.ElementType.TYPE })
 @Documented
-@Import({RabbitConfiguration.class})
+@Import({DtsAutoConfiguration.class, RabbitConfiguration.class})
 public @interface EnableDtsRabbit {
 
 }

+ 19 - 4
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/config/DtsAutoConfiguration.java

@@ -1,7 +1,22 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package com.baomidou.mybatisplus.dts.config;
 
-import com.baomidou.mybatisplus.dts.parser.JacksonRmtParser;
-import com.baomidou.mybatisplus.dts.rabbit.sender.RabbitRmtSender;
+import com.baomidou.mybatisplus.dts.parser.JacksonDtsParser;
+import com.baomidou.mybatisplus.dts.sender.RabbitRmtSender;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -15,8 +30,8 @@ import org.springframework.context.annotation.Configuration;
 public class DtsAutoConfiguration {
 
     @Bean
-    public JacksonRmtParser rmtParser() {
-        return new JacksonRmtParser();
+    public JacksonDtsParser rmtParser() {
+        return new JacksonDtsParser();
     }
 
     @Bean

+ 83 - 0
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/config/RabbitConfiguration.java

@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.baomidou.mybatisplus.dts.config;
+
+import com.baomidou.mybatisplus.dts.DtsConstants;
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.rabbit.annotation.EnableRabbit;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Rabbit MQ 可靠消息配置
+ *
+ * @author jobob
+ * @since 2019-04-19
+ */
+@Configuration
+@ConditionalOnClass(EnableRabbit.class)
+public class RabbitConfiguration {
+    @Autowired
+    protected RabbitTemplate rabbitTemplate;
+    @Autowired
+    protected RabbitAdmin rabbitAdmin;
+
+    @Bean
+    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
+        return new RabbitTransactionManager(connectionFactory);
+    }
+
+    @Bean
+    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
+        return new RabbitAdmin(connectionFactory);
+    }
+
+    @PostConstruct
+    protected void init() {
+        // make rmt template to support transactions
+        rabbitTemplate.setChannelTransacted(true);
+
+        // define deadletter exchange and queue
+        rabbitAdmin.declareExchange(new DirectExchange(DtsConstants.RABBIT_DEADLETTER_EXCHANGE, true, false));
+        rabbitAdmin.declareQueue(new Queue(DtsConstants.RABBIT_DEADLETTER_QUEUE, true, false, false, null));
+        rabbitAdmin.declareBinding(new Binding(DtsConstants.RABBIT_DEADLETTER_QUEUE, Binding.DestinationType.QUEUE,
+            DtsConstants.RABBIT_DEADLETTER_EXCHANGE, DtsConstants.RABBIT_DEADLETTER_ROUTINGKEY, null));
+
+        // define simple exchange, queue with deadletter support and binding
+        rabbitAdmin.declareExchange(new TopicExchange(DtsConstants.RABBIT_EXCHANGE, true, false));
+        Map<String, Object> args = new HashMap<>(2);
+        args.put("x-dead-letter-exchange", DtsConstants.RABBIT_DEADLETTER_EXCHANGE);
+        args.put("x-dead-letter-routing-key", DtsConstants.RABBIT_DEADLETTER_ROUTINGKEY);
+        rabbitAdmin.declareQueue(new Queue(DtsConstants.RABBIT_QUEUE, true, false, true, args));
+
+        // declare binding
+        rabbitAdmin.declareBinding(new Binding(DtsConstants.RABBIT_QUEUE, Binding.DestinationType.QUEUE, DtsConstants.RABBIT_EXCHANGE,
+            DtsConstants.RABBIT_ROUTINGKEY, null));
+    }
+}

+ 32 - 0
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/listener/IDtsListener.java

@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.baomidou.mybatisplus.dts.listener;
+
+/**
+ * 分布式事务监听
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+public interface IDtsListener<T> {
+
+    /**
+     * 处理对象
+     *
+     * @param object 待处理对象
+     */
+    void process(T object);
+}

+ 49 - 0
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/listener/RmtMessageListener.java

@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.baomidou.mybatisplus.dts.listener;
+
+import com.baomidou.mybatisplus.dts.DtsConstants;
+import com.baomidou.mybatisplus.dts.DtsMeta;
+import com.baomidou.mybatisplus.dts.parser.IDtsParser;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * RabbitMQ 抽象消息监听
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+@RabbitListener(queues = {DtsConstants.RABBIT_QUEUE})
+public abstract class RmtMessageListener implements IDtsListener<String> {
+    @Autowired
+    private IDtsParser dtsParser;
+
+    @RabbitHandler
+    @Transactional(rollbackFor = Exception.class)
+    @Override
+    public void process(String event) {
+        try {
+            receive(dtsParser.readValue(event, DtsMeta.class));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public abstract void receive(DtsMeta dtsMeta);
+}

+ 45 - 0
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/IDtsParser.java

@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.baomidou.mybatisplus.dts.parser;
+
+/**
+ * 可靠消息事务解析器
+ *
+ * @author jobob
+ * @since 2019-04-18
+ */
+public interface IDtsParser {
+
+    /**
+     * JSON 字符串转为对象
+     *
+     * @param jsonStr   JSON 字符串
+     * @param valueType 转换对象类
+     * @param <T>
+     * @return
+     * @throws Exception
+     */
+    <T> T readValue(String jsonStr, Class<T> valueType) throws Exception;
+
+    /**
+     * 对象转换为 JSON 字符串
+     *
+     * @param object 转换对象
+     * @return
+     * @throws Exception
+     */
+    String toJSONString(Object object) throws Exception;
+}

+ 0 - 39
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/IRmtParser.java

@@ -1,39 +0,0 @@
-package com.baomidou.mybatisplus.dts.parser;
-
-import org.springframework.amqp.support.converter.MessageConverter;
-
-/**
- * 可靠消息事务解析器
- *
- * @author jobob
- * @since 2019-04-18
- */
-public interface IRmtParser {
-
-    /**
-     * 消息转换器
-     *
-     * @return
-     */
-    MessageConverter getMessageConverter();
-
-    /**
-     * JSON 字符串转为对象
-     *
-     * @param jsonStr   JSON 字符串
-     * @param valueType 转换对象类
-     * @param <T>
-     * @return
-     * @throws Exception
-     */
-    <T> T readValue(String jsonStr, Class<T> valueType) throws Exception;
-
-    /**
-     * 对象转换为 JSON 字符串
-     *
-     * @param object 转换对象
-     * @return
-     * @throws Exception
-     */
-    String toJSONString(Object object) throws Exception;
-}

+ 18 - 8
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/JacksonRmtParser.java → mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/parser/JacksonDtsParser.java

@@ -1,9 +1,23 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
 package com.baomidou.mybatisplus.dts.parser;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
-import org.springframework.amqp.support.converter.MessageConverter;
+import org.springframework.stereotype.Component;
 
 /**
  * Jackson 可靠消息解析器
@@ -11,7 +25,8 @@ import org.springframework.amqp.support.converter.MessageConverter;
  * @author jobob
  * @since 2019-04-18
  */
-public class JacksonRmtParser implements IRmtParser {
+@Component
+public class JacksonDtsParser implements IDtsParser {
 
     private static ObjectMapper objectMapper;
 
@@ -23,11 +38,6 @@ public class JacksonRmtParser implements IRmtParser {
         return objectMapper;
     }
 
-    @Override
-    public MessageConverter getMessageConverter() {
-        return new Jackson2JsonMessageConverter();
-    }
-
     @Override
     public <T> T readValue(String jsonStr, Class<T> valueType) throws Exception {
         if (null == jsonStr || "".equals(jsonStr)) {

+ 0 - 60
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/RabbitConfiguration.java

@@ -1,60 +0,0 @@
-package com.baomidou.mybatisplus.dts.rabbit;
-
-import com.baomidou.mybatisplus.dts.DtsConstants;
-import org.springframework.amqp.core.Binding;
-import org.springframework.amqp.core.DirectExchange;
-import org.springframework.amqp.core.Queue;
-import org.springframework.amqp.core.TopicExchange;
-import org.springframework.amqp.rabbit.annotation.EnableRabbit;
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.amqp.rabbit.core.RabbitAdmin;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import javax.annotation.PostConstruct;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Rabbit MQ 可靠消息配置
- *
- * @author jobob
- * @since 2019-04-19
- */
-@Configuration
-@ConditionalOnClass(EnableRabbit.class)
-public class RabbitConfiguration {
-    @Autowired
-    protected RabbitTemplate rabbitTemplate;
-    @Autowired
-    protected RabbitAdmin rabbitAdmin;
-
-    @Bean
-    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
-        return new RabbitAdmin(connectionFactory);
-    }
-
-    @PostConstruct
-    protected void init() {
-        // make rabbit template to support transactions
-        rabbitTemplate.setChannelTransacted(true);
-
-        // define deadletter exchange and queue
-        rabbitAdmin.declareExchange(new DirectExchange(DtsConstants.DL_EXCHANGE, true, false));
-        rabbitAdmin.declareQueue(new Queue(DtsConstants.DL_QUEUE, true, false, false, null));
-        rabbitAdmin.declareBinding(new Binding(DtsConstants.DL_QUEUE, Binding.DestinationType.QUEUE, DtsConstants.DL_EXCHANGE, DtsConstants.DL_ROUTING_KEY, null));
-
-        // define simple exchange, queue with deadletter support and binding
-        rabbitAdmin.declareExchange(new TopicExchange(DtsConstants.EXCHANGE, true, false));
-        Map<String, Object> args = new HashMap<>(2);
-        args.put("x-dead-letter-exchange", DtsConstants.DL_EXCHANGE);
-        args.put("x-dead-letter-routing-key", DtsConstants.DL_ROUTING_KEY);
-        rabbitAdmin.declareQueue(new Queue(DtsConstants.QUEUE, true, false, true, args));
-
-        // declare binding
-        rabbitAdmin.declareBinding(new Binding(DtsConstants.QUEUE, Binding.DestinationType.QUEUE, DtsConstants.EXCHANGE, DtsConstants.ROUTING_KEY, null));
-    }
-}

+ 0 - 17
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/listener/IRmtListener.java

@@ -1,17 +0,0 @@
-package com.baomidou.mybatisplus.dts.rabbit.listener;
-
-/**
- * 可靠消息事务监听
- *
- * @author jobob
- * @since 2019-04-18
- */
-public interface IRmtListener<M> {
-
-    /**
-     * 接收对象处理
-     *
-     * @param object 接收到对象
-     */
-    void receive(M object);
-}

+ 0 - 16
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/listener/RmtMessageListener.java

@@ -1,16 +0,0 @@
-
-package com.baomidou.mybatisplus.dts.rabbit.listener;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.Message;
-
-/**
- * RabbitMQ 抽象消息监听,所有消息消费者必须继承此类
- *
- * @author jobob
- * @since 2019-04-18
- */
-@Slf4j
-public abstract class RmtMessageListener implements IRmtListener<Message> {
-
-}

+ 0 - 24
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/sender/IRmtSender.java

@@ -1,24 +0,0 @@
-package com.baomidou.mybatisplus.dts.rabbit.sender;
-
-import com.baomidou.mybatisplus.dts.DtsMeta;
-
-/**
- * <p>
- * 可靠消息发送者
- * </p>
- *
- * @author jobob
- * @since 2019-04-17
- */
-public interface IRmtSender {
-
-    /**
-     * <p>
-     * 发送消息
-     * </p>
-     *
-     * @param rmtMeta         可靠消息元数据
-     * @return 消息ID
-     */
-    String send(DtsMeta rmtMeta);
-}

+ 0 - 64
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/rabbit/sender/RabbitRmtSender.java

@@ -1,64 +0,0 @@
-package com.baomidou.mybatisplus.dts.rabbit.sender;
-
-import com.baomidou.mybatisplus.dts.DtsConstants;
-import com.baomidou.mybatisplus.dts.DtsMeta;
-import com.baomidou.mybatisplus.dts.parser.IRmtParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.AmqpException;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.core.MessageDeliveryMode;
-import org.springframework.amqp.core.MessagePostProcessor;
-import org.springframework.amqp.core.MessageProperties;
-import org.springframework.amqp.rabbit.connection.CorrelationData;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.UUID;
-
-/**
- * <p>
- * RabbitMQ 消息发送者
- * </p>
- *
- * @author hubin
- * @since 2019-04-17
- */
-@Slf4j
-public class RabbitRmtSender implements IRmtSender {
-    @Autowired
-    private IRmtParser rmtParser;
-    @Autowired
-    private RabbitTemplate rabbitTemplate;
-
-    /**
-     * 发送MQ消息
-     *
-     * @param rmtMeta Rabbit元信息对象,用于存储交换器、队列名、消息体
-     * @return 消息ID
-     * @throws JsonProcessingException
-     */
-    @Override
-    public String send(DtsMeta rmtMeta) {
-        final String messageId = UUID.randomUUID().toString();
-        MessagePostProcessor messagePostProcessor = message -> {
-            message.getMessageProperties().setMessageId(messageId);
-            // 设置消息持久化
-            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
-            return message;
-        };
-        try {
-            MessageProperties messageProperties = new MessageProperties();
-            messageProperties.setContentType("application/json");
-            Message message = new Message(rmtParser.toJSONString(rmtMeta.getPayload()).getBytes(), messageProperties);
-            rabbitTemplate.convertAndSend(DtsConstants.EXCHANGE, DtsConstants.ROUTING_KEY,
-                    message, messagePostProcessor, new CorrelationData(messageId));
-            log.info("发送消息,消息ID:{}", messageId);
-            return messageId;
-        } catch (AmqpException e) {
-            throw new RuntimeException("发送RabbitMQ消息失败!", e);
-        } catch (Exception e) {
-            throw new RuntimeException("发送RabbitMQ消息失败!", e);
-        }
-    }
-}

+ 39 - 0
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/sender/IRmtSender.java

@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.baomidou.mybatisplus.dts.sender;
+
+import com.baomidou.mybatisplus.dts.DtsMeta;
+
+/**
+ * <p>
+ * 可靠消息发送者
+ * </p>
+ *
+ * @author jobob
+ * @since 2019-04-17
+ */
+public interface IRmtSender {
+
+    /**
+     * <p>
+     * 发送消息
+     * </p>
+     *
+     * @param dtsMeta 分布式事务元数据
+     * @return
+     */
+    void send(DtsMeta dtsMeta);
+}

+ 62 - 0
mybatis-plus-dts/src/main/java/com/baomidou/mybatisplus/dts/sender/RabbitRmtSender.java

@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2011-2020, baomidou (jobob@qq.com).
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * <p>
+ * https://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.baomidou.mybatisplus.dts.sender;
+
+import com.baomidou.mybatisplus.dts.DtsConstants;
+import com.baomidou.mybatisplus.dts.DtsMeta;
+import com.baomidou.mybatisplus.dts.parser.IDtsParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.AmqpException;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * <p>
+ * RabbitMQ 消息发送者
+ * </p>
+ *
+ * @author hubin
+ * @since 2019-04-17
+ */
+@Slf4j
+@Component
+public class RabbitRmtSender implements IRmtSender {
+    @Autowired
+    private IDtsParser rmtParser;
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    /**
+     * 发送MQ消息
+     *
+     * @param dtsMeta Rabbit元信息对象,用于存储交换器、队列名、消息体
+     * @return 消息ID
+     * @throws JsonProcessingException
+     */
+    @Override
+    public void send(DtsMeta dtsMeta) {
+        try {
+            rabbitTemplate.convertAndSend(DtsConstants.RABBIT_EXCHANGE, DtsConstants.RABBIT_ROUTINGKEY,
+                rmtParser.toJSONString(dtsMeta.getPayload()));
+        } catch (AmqpException e) {
+            throw new RuntimeException("发送RabbitMQ消息失败", e);
+        } catch (Exception e) {
+            throw new RuntimeException("对象解析异常", e);
+        }
+    }
+}