package weaver.interfaces.dito.mq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import weaver.file.Prop; import weaver.general.BaseBean; import weaver.general.GCONST; import weaver.general.InitServer; import weaver.interfaces.dito.comInfo.PropBean; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; public class RocketmqServlet extends HttpServlet { private boolean isMainIp = false;//是否为主节点 private boolean isSampleMode = false;//是否为单机(启用ridis并且配置了主节点的,则不是单机,否则认为是集群环境) @Override public void init() throws ServletException { BaseBean bb = new BaseBean(); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); bb.writeLog("initiated---进入时间:"+sdf.format(new Date())); bb.writeLog("***** resource model initiated"); bb.writeLog("***** resource model initiated"); bb.writeLog("***** resource model initiated"); try{ isMainIp = isMainIp();//计算是否为集群中的主节点 isSampleMode = isSampleMode();//判断是否为单机 bb.writeLog("isMainIp:"+isMainIp); bb.writeLog("isSampleMode:"+isSampleMode); if(isSampleMode){ initData(); }else{ if (isMainIp) { //则调用service方法加载设置,重新获取workflowid相关超时设置OvertimeEntity initData(); } } //DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("portal-producer-group_nj"); //DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cbec-consumer-group_nj_133"); //consumer.setNamesrvAddr("172.16.84.183:9001;172.16.84.187:9001"); // consumer.subscribe("dataSync_topic_nj", "BPM"); // consumer.setInstanceName("dataSync_topic_nj"); // String consumerGroup = PropBean.getUfPropValue("consumerGroup"); // bb.writeLog("consumerGroup:"+consumerGroup); // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); // // String namesrvAddr = PropBean.getUfPropValue("namesrvAddr"); // bb.writeLog("namesrvAddr:"+namesrvAddr); // consumer.setNamesrvAddr(namesrvAddr); // // String instanceName = PropBean.getUfPropValue("instanceName"); // bb.writeLog("instanceName:"+instanceName); // consumer.setInstanceName(instanceName); // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // // String topic = PropBean.getUfPropValue("topic"); // String subExpression = PropBean.getUfPropValue("subExpression"); // // bb.writeLog("topic:"+topic); // bb.writeLog("subExpression:"+subExpression); // // consumer.subscribe(topic,subExpression); // consumer.setConsumeThreadMin(1); // consumer.setConsumeThreadMax(1); // consumer.setConsumeMessageBatchMaxSize(1); // consumer.setMessageModel(MessageModel.BROADCASTING); // consumer.registerMessageListener(new RocketMsgListener()); // consumer.registerMessageListener(new RocketMsgOrderListener()); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
*/ // consumer.start(); // bb.writeLog("Consumer Started."); }catch (Exception e){ bb.writeLog("Consumer resource model initiated--MQClientException:"+e); } bb.writeLog("***** resource model initiated"); bb.writeLog("***** resource model initiated"); bb.writeLog("***** resource model initiated"); } public void initData(){ BaseBean bb = new BaseBean(); PropBean propBean = new PropBean(); try { String consumerGroup = propBean.getUfPropValueStatic("consumerGroup"); bb.writeLog("consumerGroup:" + consumerGroup); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); String namesrvAddr = propBean.getUfPropValueStatic("namesrvAddr"); bb.writeLog("namesrvAddr:" + namesrvAddr); consumer.setNamesrvAddr(namesrvAddr); String instanceName = propBean.getUfPropValueStatic("instanceName"); bb.writeLog("instanceName:" + instanceName); consumer.setInstanceName(instanceName); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); String topic = propBean.getUfPropValueStatic("topic"); String subExpression = propBean.getUfPropValueStatic("subExpression"); bb.writeLog("topic:" + topic); bb.writeLog("subExpression:" + subExpression); String authid = propBean.getUfPropValueStatic("authid"); String authpwd = propBean.getUfPropValueStatic("authpwd"); String clustername = propBean.getUfPropValueStatic("clustername"); String tenantid = propBean.getUfPropValueStatic("tenantid"); bb.writeLog("authid:" + authid); bb.writeLog("authpwd:" + authpwd); bb.writeLog("clustername:" + clustername); bb.writeLog("tenantid:" + tenantid); consumer.subscribe(topic, subExpression); consumer.setAuthID(authid); consumer.setAuthPWD(authpwd); consumer.setClusterName(clustername); consumer.setTenantID(tenantid); consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMax(1); consumer.setConsumeMessageBatchMaxSize(1); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new RocketMsgListener()); consumer.start(); bb.writeLog("Consumer66Started."); } catch (MQClientException var13) { bb.writeLog("Consumer resource model initiated--MQClientException:" + var13); } } /** * 实现 HttpServlet 的 doGet 方法,不作任何操作 * @param request * @param response * @throws ServletException * @throws IOException */ @Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { doPost(request, response); } /** * 实现 HttpServlet 的 doPost 方法,不作任何操作 * @param request * @param response * @throws ServletException * @throws IOException */ @Override public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { } @Override public void destroy() { // 什么也不做 } private boolean isMainIp() { new BaseBean().writeLog("超时判断主节点"); String mainControlIp = ""; ArrayList hostIps = new InitServer().getRealIp(); Prop prop = Prop.getInstance(); mainControlIp = prop.getPropValue(GCONST.getConfigFile(), "MainControlIP"); if (hostIps == null || hostIps.size() == 0) { new BaseBean().writeLog("System Init Error:Cannot get local Ip address,This may cause scripts or Timed task not run! "); } else { new BaseBean().writeLog("System Init Message:mainControlIp=" + mainControlIp + " localIp:" + hostIps.toString()); } if ((!"".equals(mainControlIp) && hostIps.contains(mainControlIp)) || "".equals(mainControlIp)) { return true; } return false; } public boolean isSampleMode() { BaseBean base = new BaseBean(); boolean redis_flag = "1".equals(base.getPropValue("weaver_new_session", "status")); boolean mainIp_flag = !"".equals(base.getPropValue("weaver", "MainControlIP")); base.writeLog("超时判断是否为集群环境:redis_flag======"+redis_flag); base.writeLog("超时判断是否为集群环境:mainIp_flag======="+mainIp_flag); return !(redis_flag && mainIp_flag); } }