From 42b01d79d26bc647f14e8bd85b3a6df2e34f1d15 Mon Sep 17 00:00:00 2001 From: shilei Date: Wed, 30 Nov 2022 17:29:45 +0800 Subject: [PATCH] =?UTF-8?q?#HJ-22#=20=E6=9B=B4=E6=96=B0mq=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=8E=A5=E5=8F=A3=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../interfaces/dito/mq/RocketMsgListener.java | 40 ++++++ .../interfaces/dito/mq/RocketmqServlet.java | 118 ++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 src/weaver/interfaces/dito/mq/RocketMsgListener.java create mode 100644 src/weaver/interfaces/dito/mq/RocketmqServlet.java diff --git a/src/weaver/interfaces/dito/mq/RocketMsgListener.java b/src/weaver/interfaces/dito/mq/RocketMsgListener.java new file mode 100644 index 00000000..7e1c6680 --- /dev/null +++ b/src/weaver/interfaces/dito/mq/RocketMsgListener.java @@ -0,0 +1,40 @@ +package weaver.interfaces.dito.mq; + +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.common.message.MessageExt; +import weaver.general.BaseBean; + +import java.io.UnsupportedEncodingException; +import java.util.List; + + +public class RocketMsgListener implements MessageListenerConcurrently { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + BaseBean bb = new BaseBean(); + MessageExt msg = msgs.get(0); + + RocketmqUtil rocketmqUtil = new RocketmqUtil(); + + try { + + bb.writeLog("Consumer---3----"+new String(msg.getBody())); + String msgdata = new String(msg.getBody(),"UTF-8"); + if(!"".equals(msgdata)) + { + String data = msgdata.substring(msgdata.indexOf("{")); + int errcount = rocketmqUtil.updateOrgData(data); + bb.writeLog("Consumer---errcount---:"+errcount); + } + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + bb.writeLog("Consumer---UnsupportedEncodingException---e:"+e); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + + } + +} diff --git a/src/weaver/interfaces/dito/mq/RocketmqServlet.java b/src/weaver/interfaces/dito/mq/RocketmqServlet.java new file mode 100644 index 00000000..1d4f1f7e --- /dev/null +++ b/src/weaver/interfaces/dito/mq/RocketmqServlet.java @@ -0,0 +1,118 @@ +package weaver.interfaces.dito.mq; + + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +//import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +//import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +//import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.client.exception.MQClientException; +//import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import weaver.general.BaseBean; +import weaver.interfaces.dito.comInfo.PropBean; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; + +public class RocketmqServlet extends HttpServlet { + + @Override + public void init() throws ServletException + { + + BaseBean bb = new BaseBean(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); + + bb.writeLog("initiated---进入时间:"+sdf.format(new Date())); + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + + try{ + //DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("portal-producer-group_nj"); + //DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cbec-consumer-group_nj_133"); + //consumer.setNamesrvAddr("172.16.84.183:9001;172.16.84.187:9001"); +// consumer.subscribe("dataSync_topic_nj", "BPM"); +// consumer.setInstanceName("dataSync_topic_nj"); + + String consumerGroup = PropBean.getUfPropValue("consumerGroup"); + bb.writeLog("consumerGroup:"+consumerGroup); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); + + String namesrvAddr = PropBean.getUfPropValue("namesrvAddr"); + bb.writeLog("namesrvAddr:"+namesrvAddr); + consumer.setNamesrvAddr(namesrvAddr); + + String instanceName = PropBean.getUfPropValue("instanceName"); + bb.writeLog("instanceName:"+instanceName); + consumer.setInstanceName(instanceName); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + String topic = PropBean.getUfPropValue("topic"); + String subExpression = PropBean.getUfPropValue("subExpression"); + + bb.writeLog("topic:"+topic); + bb.writeLog("subExpression:"+subExpression); + + consumer.subscribe(topic,subExpression); + consumer.setConsumeThreadMin(1); + consumer.setConsumeThreadMax(1); + consumer.setConsumeMessageBatchMaxSize(1); + consumer.setMessageModel(MessageModel.BROADCASTING); + consumer.registerMessageListener(new RocketMsgListener()); +// consumer.registerMessageListener(new RocketMsgOrderListener()); + + /** + * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
+ */ + consumer.start(); + bb.writeLog("Consumer Started."); + + }catch (MQClientException e){ + bb.writeLog("Consumer resource model initiated--MQClientException:"+e); + } + + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + } + + + /** + * 实现 HttpServlet 的 doGet 方法,不作任何操作 + * @param request + * @param response + * @throws ServletException + * @throws IOException + */ + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + doPost(request, response); + } + + /** + * 实现 HttpServlet 的 doPost 方法,不作任何操作 + * @param request + * @param response + * @throws ServletException + * @throws IOException + */ + + @Override + public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + } + + @Override + public void destroy() + { + // 什么也不做 + } + +}