#HJ-22# 更新mq消息接口优化

feature-LeeD-20221025
shilei 2 years ago
parent b66085aa59
commit 42b01d79d2

@ -0,0 +1,40 @@
package weaver.interfaces.dito.mq;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import weaver.general.BaseBean;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class RocketMsgListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
BaseBean bb = new BaseBean();
MessageExt msg = msgs.get(0);
RocketmqUtil rocketmqUtil = new RocketmqUtil();
try {
bb.writeLog("Consumer---3----"+new String(msg.getBody()));
String msgdata = new String(msg.getBody(),"UTF-8");
if(!"".equals(msgdata))
{
String data = msgdata.substring(msgdata.indexOf("{"));
int errcount = rocketmqUtil.updateOrgData(data);
bb.writeLog("Consumer---errcount---:"+errcount);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
bb.writeLog("Consumer---UnsupportedEncodingException---e:"+e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

@ -0,0 +1,118 @@
package weaver.interfaces.dito.mq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
//import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
//import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
//import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
//import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import weaver.general.BaseBean;
import weaver.interfaces.dito.comInfo.PropBean;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class RocketmqServlet extends HttpServlet {
@Override
public void init() throws ServletException
{
BaseBean bb = new BaseBean();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
bb.writeLog("initiated---进入时间:"+sdf.format(new Date()));
bb.writeLog("***** resource model initiated");
bb.writeLog("***** resource model initiated");
bb.writeLog("***** resource model initiated");
try{
//DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("portal-producer-group_nj");
//DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cbec-consumer-group_nj_133");
//consumer.setNamesrvAddr("172.16.84.183:9001;172.16.84.187:9001");
// consumer.subscribe("dataSync_topic_nj", "BPM");
// consumer.setInstanceName("dataSync_topic_nj");
String consumerGroup = PropBean.getUfPropValue("consumerGroup");
bb.writeLog("consumerGroup:"+consumerGroup);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
String namesrvAddr = PropBean.getUfPropValue("namesrvAddr");
bb.writeLog("namesrvAddr:"+namesrvAddr);
consumer.setNamesrvAddr(namesrvAddr);
String instanceName = PropBean.getUfPropValue("instanceName");
bb.writeLog("instanceName:"+instanceName);
consumer.setInstanceName(instanceName);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
String topic = PropBean.getUfPropValue("topic");
String subExpression = PropBean.getUfPropValue("subExpression");
bb.writeLog("topic:"+topic);
bb.writeLog("subExpression:"+subExpression);
consumer.subscribe(topic,subExpression);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new RocketMsgListener());
// consumer.registerMessageListener(new RocketMsgOrderListener());
/**
* Consumer使start<br>
*/
consumer.start();
bb.writeLog("Consumer Started.");
}catch (MQClientException e){
bb.writeLog("Consumer resource model initiated--MQClientException:"+e);
}
bb.writeLog("***** resource model initiated");
bb.writeLog("***** resource model initiated");
bb.writeLog("***** resource model initiated");
}
/**
* HttpServlet doGet
* @param request
* @param response
* @throws ServletException
* @throws IOException
*/
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doPost(request, response);
}
/**
* HttpServlet doPost
* @param request
* @param response
* @throws ServletException
* @throws IOException
*/
@Override
public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
}
@Override
public void destroy()
{
// 什么也不做
}
}
Loading…
Cancel
Save