From 3e37c9c204cc3e2c2473de37bafd6a42b490d6de Mon Sep 17 00:00:00 2001 From: liuliang <401809302@qq.com> Date: Tue, 28 Feb 2023 09:42:02 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E8=BF=87mq=E6=B6=88=E6=81=AF=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E7=AD=BE=E5=90=8D=E7=AD=BE=E7=AB=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../interfaces/dito/mq/ConsumerBase.java | 22 + .../dito/mq/HrmRocketMsgListener.java | 80 ++-- .../dito/mq/HrmRocketmqServlet.java | 364 ++++++++-------- .../interfaces/dito/mq/RocketMsgListener.java | 81 ++-- .../interfaces/dito/mq/RocketmqServlet.java | 410 +++++++++--------- .../interfaces/dito/mq/RocketmqUtil.java | 17 +- .../interfaces/dito/mq/SignatureConsumer.java | 32 ++ 7 files changed, 535 insertions(+), 471 deletions(-) create mode 100644 src/weaver/interfaces/dito/mq/ConsumerBase.java create mode 100644 src/weaver/interfaces/dito/mq/SignatureConsumer.java diff --git a/src/weaver/interfaces/dito/mq/ConsumerBase.java b/src/weaver/interfaces/dito/mq/ConsumerBase.java new file mode 100644 index 00000000..a16c8db1 --- /dev/null +++ b/src/weaver/interfaces/dito/mq/ConsumerBase.java @@ -0,0 +1,22 @@ +package weaver.interfaces.dito.mq; + +import com.alibaba.fastjson.JSONArray; + +public interface ConsumerBase { + + /** + * + * 当为ele_signature时为同步签名签章功能 + * @param tableName :ele_signature + * @return + */ + boolean support(String tableName); + + + /** + * + * @param jsonArray + * @param tableName + */ + void consumer(JSONArray jsonArray, String tableName); +} diff --git a/src/weaver/interfaces/dito/mq/HrmRocketMsgListener.java b/src/weaver/interfaces/dito/mq/HrmRocketMsgListener.java index 2f68681a..8dabdc14 100644 --- a/src/weaver/interfaces/dito/mq/HrmRocketMsgListener.java +++ b/src/weaver/interfaces/dito/mq/HrmRocketMsgListener.java @@ -1,40 +1,40 @@ -//package weaver.interfaces.dito.mq; -// -//import org.apache.commons.lang3.StringUtils; -//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -//import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -//import org.apache.rocketmq.common.message.MessageExt; -//import weaver.general.BaseBean; -// -//import java.io.UnsupportedEncodingException; -//import java.util.List; -// -// -//public class HrmRocketMsgListener implements MessageListenerConcurrently { -// @Override -// public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { -// BaseBean bb = new BaseBean(); -// MessageExt msg = msgs.get(0); -// -// HrmRocketmqUtil hrmRocketmqUtil = new HrmRocketmqUtil(); -// -// try { -// -// bb.writeLog("Consumer---6----"+new String(msg.getBody())); -// String msgdata = new String(msg.getBody(),"UTF-8"); -// if(StringUtils.isBlank(msgdata)) -// { -// int errcount = hrmRocketmqUtil.updateOrgData(msgdata); -// bb.writeLog("Consumer---errcount---:"+errcount); -// } -// } catch (UnsupportedEncodingException e) { -// e.printStackTrace(); -// bb.writeLog("Consumer---UnsupportedEncodingException---e:"+e); -// return ConsumeConcurrentlyStatus.RECONSUME_LATER; -// } -// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; -// -// } -// -//} +package weaver.interfaces.dito.mq; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import weaver.general.BaseBean; + +import java.io.UnsupportedEncodingException; +import java.util.List; + + +public class HrmRocketMsgListener implements MessageListenerConcurrently { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + BaseBean bb = new BaseBean(); + MessageExt msg = msgs.get(0); + + HrmRocketmqUtil hrmRocketmqUtil = new HrmRocketmqUtil(); + + try { + + bb.writeLog("Consumer---6----"+new String(msg.getBody())); + String msgdata = new String(msg.getBody(),"UTF-8"); + if(StringUtils.isBlank(msgdata)) + { + int errcount = hrmRocketmqUtil.updateOrgData(msgdata); + bb.writeLog("Consumer---errcount---:"+errcount); + } + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + bb.writeLog("Consumer---UnsupportedEncodingException---e:"+e); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + + } + +} diff --git a/src/weaver/interfaces/dito/mq/HrmRocketmqServlet.java b/src/weaver/interfaces/dito/mq/HrmRocketmqServlet.java index 5346ef27..970ba0c1 100644 --- a/src/weaver/interfaces/dito/mq/HrmRocketmqServlet.java +++ b/src/weaver/interfaces/dito/mq/HrmRocketmqServlet.java @@ -1,182 +1,182 @@ -//package weaver.interfaces.dito.mq; -// -// -//import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -//import org.apache.rocketmq.client.exception.MQClientException; -//import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -//import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -//import weaver.file.Prop; -//import weaver.general.BaseBean; -//import weaver.general.GCONST; -//import weaver.general.InitServer; -//import weaver.interfaces.dito.comInfo.PropBean; -// -//import javax.servlet.ServletException; -//import javax.servlet.http.HttpServlet; -//import javax.servlet.http.HttpServletRequest; -//import javax.servlet.http.HttpServletResponse; -//import java.io.IOException; -//import java.text.SimpleDateFormat; -//import java.util.ArrayList; -//import java.util.Date; -// -// -//public class HrmRocketmqServlet extends HttpServlet { -// -// -// private boolean isMainIp = false;//是否为主节点 -// -// private boolean isSampleMode = false;//是否为单机(启用ridis并且配置了主节点的,则不是单机,否则认为是集群环境) -// -// @Override -// public void init() throws ServletException -// { -// -// BaseBean bb = new BaseBean(); -// SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); -// -// bb.writeLog("initiated---进入时间:"+sdf.format(new Date())); -// bb.writeLog("***** resource model initiated"); -// bb.writeLog("***** resource model initiated"); -// bb.writeLog("***** resource model initiated"); -// -// try{ -// isMainIp = isMainIp();//计算是否为集群中的主节点 -// isSampleMode = isSampleMode();//判断是否为单机 -// bb.writeLog("isMainIp:"+isMainIp); -// bb.writeLog("isSampleMode:"+isSampleMode); -// -// if(isSampleMode){ -// initData(); -// }else{ -// if (isMainIp) { -// //则调用service方法加载设置,重新获取workflowid相关超时设置OvertimeEntity -// initData(); -// } -// } -// -// }catch (Exception e){ -// bb.writeLog("Consumer resource model initiated--MQClientException:"+e); -// } -// -// bb.writeLog("***** resource model initiated"); -// bb.writeLog("***** resource model initiated"); -// bb.writeLog("***** resource model initiated"); -// } -// -// -// public void initData(){ -// BaseBean bb = new BaseBean(); -// PropBean propBean = new PropBean(); -// -// try { -//// String consumerGroup = propBean.getUfPropValueStatic("consumerGroup"); -//// bb.writeLog("consumerGroup:" + consumerGroup); -// String hrmConsumerGroup = PropBean.getUfPropValue("hrmConsumerGroup"); -// String hrmConsumerAddr = PropBean.getUfPropValue("hrmConsumerAddr"); -// String hrmInstanceName = PropBean.getUfPropValue("hrmInstanceName"); -// String hrmSubExpr = PropBean.getUfPropValue("hrmSubExpr"); -// String hrmMQAuthID = PropBean.getUfPropValue("hrmMQAuthID"); -// String hrmMQAuthPWD = PropBean.getUfPropValue("hrmMQAuthPWD"); -// String hrmMQClusterName = PropBean.getUfPropValue("hrmMQClusterName"); -// String hrmMQTenantID = PropBean.getUfPropValue("hrmMQTenantID"); -// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(hrmConsumerGroup); -//// String namesrvAddr = propBean.getUfPropValueStatic("namesrvAddr"); -//// bb.writeLog("namesrvAddr:" + namesrvAddr); -// consumer.setNamesrvAddr(hrmConsumerAddr); -//// String instanceName = propBean.getUfPropValueStatic("instanceName"); -//// bb.writeLog("instanceName:" + instanceName); -// consumer.setInstanceName(hrmInstanceName); -// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); -//// String topic = propBean.getUfPropValueStatic("topic"); -//// String subExpression = propBean.getUfPropValueStatic("subExpression"); -//// bb.writeLog("topic:" + topic); -//// bb.writeLog("subExpression:" + subExpression); -//// String authid = propBean.getUfPropValueStatic("authid"); -//// String authpwd = propBean.getUfPropValueStatic("authpwd"); -//// String clustername = propBean.getUfPropValueStatic("clustername"); -//// String tenantid = propBean.getUfPropValueStatic("tenantid"); -//// bb.writeLog("authid:" + authid); -//// bb.writeLog("authpwd:" + authpwd); -//// bb.writeLog("clustername:" + clustername); -//// bb.writeLog("tenantid:" + tenantid); -// consumer.subscribe(hrmInstanceName, hrmSubExpr); -// consumer.setAuthID(hrmMQAuthID); -// consumer.setAuthPWD(hrmMQAuthPWD); -// consumer.setClusterName(hrmMQClusterName); -// consumer.setTenantID(hrmMQTenantID); -// -// consumer.setConsumeThreadMin(1); -// consumer.setConsumeThreadMax(1); -// consumer.setConsumeMessageBatchMaxSize(1); -// -// consumer.setMessageModel(MessageModel.BROADCASTING); -// consumer.registerMessageListener(new HrmRocketMsgListener()); -// consumer.start(); -// bb.writeLog("Consumer88Started."); -// -// } catch (MQClientException var13) { -// bb.writeLog("Consumer resource model initiated--MQClientException:" + var13); -// } -// } -// -// /** -// * 实现 HttpServlet 的 doGet 方法,不作任何操作 -// * @param request -// * @param response -// * @throws ServletException -// * @throws IOException -// */ -// @Override -// public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { -// doPost(request, response); -// } -// -// /** -// * 实现 HttpServlet 的 doPost 方法,不作任何操作 -// * @param request -// * @param response -// * @throws ServletException -// * @throws IOException -// */ -// -// @Override -// public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { -// } -// -// @Override -// public void destroy() -// { -// // 什么也不做 -// } -// -// -// private boolean isMainIp() { -// new BaseBean().writeLog("超时判断主节点"); -// String mainControlIp = ""; -// ArrayList hostIps = new InitServer().getRealIp(); -// Prop prop = Prop.getInstance(); -// mainControlIp = prop.getPropValue(GCONST.getConfigFile(), "MainControlIP"); -// if (hostIps == null || hostIps.size() == 0) { -// new BaseBean().writeLog("System Init Error:Cannot get local Ip address,This may cause scripts or Timed task not run! "); -// } else { -// new BaseBean().writeLog("System Init Message:mainControlIp=" + mainControlIp + " localIp:" + hostIps.toString()); -// } -// if ((!"".equals(mainControlIp) && hostIps.contains(mainControlIp)) || "".equals(mainControlIp)) { -// return true; -// } -// return false; -// } -// -// public boolean isSampleMode() { -// BaseBean base = new BaseBean(); -// boolean redis_flag = "1".equals(base.getPropValue("weaver_new_session", "status")); -// boolean mainIp_flag = !"".equals(base.getPropValue("weaver", "MainControlIP")); -// base.writeLog("超时判断是否为集群环境:redis_flag======"+redis_flag); -// base.writeLog("超时判断是否为集群环境:mainIp_flag======="+mainIp_flag); -// return !(redis_flag && mainIp_flag); -// } -// -// -// -//} +package weaver.interfaces.dito.mq; + + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import weaver.file.Prop; +import weaver.general.BaseBean; +import weaver.general.GCONST; +import weaver.general.InitServer; +import weaver.interfaces.dito.comInfo.PropBean; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; + + +public class HrmRocketmqServlet extends HttpServlet { + + + private boolean isMainIp = false;//是否为主节点 + + private boolean isSampleMode = false;//是否为单机(启用ridis并且配置了主节点的,则不是单机,否则认为是集群环境) + + @Override + public void init() throws ServletException + { + + BaseBean bb = new BaseBean(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); + + bb.writeLog("initiated---进入时间:"+sdf.format(new Date())); + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + + try{ + isMainIp = isMainIp();//计算是否为集群中的主节点 + isSampleMode = isSampleMode();//判断是否为单机 + bb.writeLog("isMainIp:"+isMainIp); + bb.writeLog("isSampleMode:"+isSampleMode); + + if(isSampleMode){ + initData(); + }else{ + if (isMainIp) { + //则调用service方法加载设置,重新获取workflowid相关超时设置OvertimeEntity + initData(); + } + } + + }catch (Exception e){ + bb.writeLog("Consumer resource model initiated--MQClientException:"+e); + } + + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + } + + + public void initData(){ + BaseBean bb = new BaseBean(); + PropBean propBean = new PropBean(); + + try { +// String consumerGroup = propBean.getUfPropValueStatic("consumerGroup"); +// bb.writeLog("consumerGroup:" + consumerGroup); + String hrmConsumerGroup = PropBean.getUfPropValue("hrmConsumerGroup"); + String hrmConsumerAddr = PropBean.getUfPropValue("hrmConsumerAddr"); + String hrmInstanceName = PropBean.getUfPropValue("hrmInstanceName"); + String hrmSubExpr = PropBean.getUfPropValue("hrmSubExpr"); + String hrmMQAuthID = PropBean.getUfPropValue("hrmMQAuthID"); + String hrmMQAuthPWD = PropBean.getUfPropValue("hrmMQAuthPWD"); + String hrmMQClusterName = PropBean.getUfPropValue("hrmMQClusterName"); + String hrmMQTenantID = PropBean.getUfPropValue("hrmMQTenantID"); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(hrmConsumerGroup); +// String namesrvAddr = propBean.getUfPropValueStatic("namesrvAddr"); +// bb.writeLog("namesrvAddr:" + namesrvAddr); + consumer.setNamesrvAddr(hrmConsumerAddr); +// String instanceName = propBean.getUfPropValueStatic("instanceName"); +// bb.writeLog("instanceName:" + instanceName); + consumer.setInstanceName(hrmInstanceName); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); +// String topic = propBean.getUfPropValueStatic("topic"); +// String subExpression = propBean.getUfPropValueStatic("subExpression"); +// bb.writeLog("topic:" + topic); +// bb.writeLog("subExpression:" + subExpression); +// String authid = propBean.getUfPropValueStatic("authid"); +// String authpwd = propBean.getUfPropValueStatic("authpwd"); +// String clustername = propBean.getUfPropValueStatic("clustername"); +// String tenantid = propBean.getUfPropValueStatic("tenantid"); +// bb.writeLog("authid:" + authid); +// bb.writeLog("authpwd:" + authpwd); +// bb.writeLog("clustername:" + clustername); +// bb.writeLog("tenantid:" + tenantid); + consumer.subscribe(hrmInstanceName, hrmSubExpr); + consumer.setAuthID(hrmMQAuthID); + consumer.setAuthPWD(hrmMQAuthPWD); + consumer.setClusterName(hrmMQClusterName); + consumer.setTenantID(hrmMQTenantID); + + consumer.setConsumeThreadMin(1); + consumer.setConsumeThreadMax(1); + consumer.setConsumeMessageBatchMaxSize(1); + + consumer.setMessageModel(MessageModel.BROADCASTING); + consumer.registerMessageListener(new HrmRocketMsgListener()); + consumer.start(); + bb.writeLog("Consumer88Started."); + + } catch (MQClientException var13) { + bb.writeLog("Consumer resource model initiated--MQClientException:" + var13); + } + } + + /** + * 实现 HttpServlet 的 doGet 方法,不作任何操作 + * @param request + * @param response + * @throws ServletException + * @throws IOException + */ + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + doPost(request, response); + } + + /** + * 实现 HttpServlet 的 doPost 方法,不作任何操作 + * @param request + * @param response + * @throws ServletException + * @throws IOException + */ + + @Override + public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + } + + @Override + public void destroy() + { + // 什么也不做 + } + + + private boolean isMainIp() { + new BaseBean().writeLog("超时判断主节点"); + String mainControlIp = ""; + ArrayList hostIps = new InitServer().getRealIp(); + Prop prop = Prop.getInstance(); + mainControlIp = prop.getPropValue(GCONST.getConfigFile(), "MainControlIP"); + if (hostIps == null || hostIps.size() == 0) { + new BaseBean().writeLog("System Init Error:Cannot get local Ip address,This may cause scripts or Timed task not run! "); + } else { + new BaseBean().writeLog("System Init Message:mainControlIp=" + mainControlIp + " localIp:" + hostIps.toString()); + } + if ((!"".equals(mainControlIp) && hostIps.contains(mainControlIp)) || "".equals(mainControlIp)) { + return true; + } + return false; + } + + public boolean isSampleMode() { + BaseBean base = new BaseBean(); + boolean redis_flag = "1".equals(base.getPropValue("weaver_new_session", "status")); + boolean mainIp_flag = !"".equals(base.getPropValue("weaver", "MainControlIP")); + base.writeLog("超时判断是否为集群环境:redis_flag======"+redis_flag); + base.writeLog("超时判断是否为集群环境:mainIp_flag======="+mainIp_flag); + return !(redis_flag && mainIp_flag); + } + + + +} diff --git a/src/weaver/interfaces/dito/mq/RocketMsgListener.java b/src/weaver/interfaces/dito/mq/RocketMsgListener.java index b0936f7e..15d88fa4 100644 --- a/src/weaver/interfaces/dito/mq/RocketMsgListener.java +++ b/src/weaver/interfaces/dito/mq/RocketMsgListener.java @@ -1,40 +1,41 @@ -//package weaver.interfaces.dito.mq; -// -//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -//import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -//import org.apache.rocketmq.common.message.MessageExt; -//import weaver.general.BaseBean; -// -//import java.io.UnsupportedEncodingException; -//import java.util.List; -// -// -//public class RocketMsgListener implements MessageListenerConcurrently { -// @Override -// public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { -// BaseBean bb = new BaseBean(); -// MessageExt msg = msgs.get(0); -// -// RocketmqUtil rocketmqUtil = new RocketmqUtil(); -// -// try { -// -// bb.writeLog("Consumer---3----"+new String(msg.getBody())); -// String msgdata = new String(msg.getBody(),"UTF-8"); -// if(!"".equals(msgdata)) -// { -// String data = msgdata.substring(msgdata.indexOf("{")); -// int errcount = rocketmqUtil.updateOrgData(data); -// bb.writeLog("Consumer---errcount---:"+errcount); -// } -// } catch (UnsupportedEncodingException e) { -// e.printStackTrace(); -// bb.writeLog("Consumer---UnsupportedEncodingException---e:"+e); -// return ConsumeConcurrentlyStatus.RECONSUME_LATER; -// } -// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; -// -// } -// -//} +package weaver.interfaces.dito.mq; + +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import weaver.general.BaseBean; + +import java.io.UnsupportedEncodingException; +import java.util.List; + + +public class RocketMsgListener implements MessageListenerConcurrently { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + BaseBean bb = new BaseBean(); + MessageExt msg = msgs.get(0); + + RocketmqUtil rocketmqUtil = new RocketmqUtil(); + + try { + + bb.writeLog("Consumer---3----"+new String(msg.getBody())); + + String msgdata = new String(msg.getBody(),"UTF-8"); + if(!"".equals(msgdata)) + { + String data = msgdata.substring(msgdata.indexOf("{")); + int errcount = rocketmqUtil.updateOrgData(data); + bb.writeLog("Consumer---errcount---:"+errcount); + } + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + bb.writeLog("Consumer---UnsupportedEncodingException---e:"+e); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + + } + +} diff --git a/src/weaver/interfaces/dito/mq/RocketmqServlet.java b/src/weaver/interfaces/dito/mq/RocketmqServlet.java index ae87864e..63db5ad7 100644 --- a/src/weaver/interfaces/dito/mq/RocketmqServlet.java +++ b/src/weaver/interfaces/dito/mq/RocketmqServlet.java @@ -1,216 +1,216 @@ -//package weaver.interfaces.dito.mq; -// -// -//import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -//import org.apache.rocketmq.client.exception.MQClientException; -//import org.apache.rocketmq.common.consumer.ConsumeFromWhere; -//import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -//import weaver.file.Prop; -//import weaver.general.BaseBean; -//import weaver.general.GCONST; -//import weaver.general.InitServer; -//import weaver.interfaces.dito.comInfo.PropBean; -// -//import javax.servlet.ServletException; -//import javax.servlet.http.HttpServlet; -//import javax.servlet.http.HttpServletRequest; -//import javax.servlet.http.HttpServletResponse; -//import java.io.IOException; -//import java.text.SimpleDateFormat; -//import java.util.ArrayList; -//import java.util.Date; -// -// -// -//public class RocketmqServlet extends HttpServlet { -// -// -// private boolean isMainIp = false;//是否为主节点 -// -// private boolean isSampleMode = false;//是否为单机(启用ridis并且配置了主节点的,则不是单机,否则认为是集群环境) -// -// @Override -// public void init() throws ServletException -// { -// -// BaseBean bb = new BaseBean(); -// SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); -// -// bb.writeLog("initiated---进入时间:"+sdf.format(new Date())); -// bb.writeLog("***** resource model initiated"); -// bb.writeLog("***** resource model initiated"); -// bb.writeLog("***** resource model initiated"); -// -// try{ -// isMainIp = isMainIp();//计算是否为集群中的主节点 -// isSampleMode = isSampleMode();//判断是否为单机 -// bb.writeLog("isMainIp:"+isMainIp); -// bb.writeLog("isSampleMode:"+isSampleMode); -// -// if(isSampleMode){ -// initData(); -// }else{ -// if (isMainIp) { -// //则调用service方法加载设置,重新获取workflowid相关超时设置OvertimeEntity -// initData(); -// } -// } -// -// -// -// //DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("portal-producer-group_nj"); -// //DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cbec-consumer-group_nj_133"); -// //consumer.setNamesrvAddr("172.16.84.183:9001;172.16.84.187:9001"); -//// consumer.subscribe("dataSync_topic_nj", "BPM"); -//// consumer.setInstanceName("dataSync_topic_nj"); -// -//// String consumerGroup = PropBean.getUfPropValue("consumerGroup"); -//// bb.writeLog("consumerGroup:"+consumerGroup); -//// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); -//// -//// String namesrvAddr = PropBean.getUfPropValue("namesrvAddr"); -//// bb.writeLog("namesrvAddr:"+namesrvAddr); -//// consumer.setNamesrvAddr(namesrvAddr); -//// -//// String instanceName = PropBean.getUfPropValue("instanceName"); -//// bb.writeLog("instanceName:"+instanceName); -//// consumer.setInstanceName(instanceName); -//// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); -//// //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); -//// -//// String topic = PropBean.getUfPropValue("topic"); -//// String subExpression = PropBean.getUfPropValue("subExpression"); -//// -//// bb.writeLog("topic:"+topic); -//// bb.writeLog("subExpression:"+subExpression); -//// -//// consumer.subscribe(topic,subExpression); -//// consumer.setConsumeThreadMin(1); -//// consumer.setConsumeThreadMax(1); -//// consumer.setConsumeMessageBatchMaxSize(1); -//// consumer.setMessageModel(MessageModel.BROADCASTING); -//// consumer.registerMessageListener(new RocketMsgListener()); -//// consumer.registerMessageListener(new RocketMsgOrderListener()); -// -// /** -// * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
-// */ -//// consumer.start(); -//// bb.writeLog("Consumer Started."); -// -// }catch (Exception e){ -// bb.writeLog("Consumer resource model initiated--MQClientException:"+e); -// } -// -// bb.writeLog("***** resource model initiated"); -// bb.writeLog("***** resource model initiated"); -// bb.writeLog("***** resource model initiated"); -// } -// -// -// public void initData(){ -// BaseBean bb = new BaseBean(); -// PropBean propBean = new PropBean(); -// -// try { -// String consumerGroup = propBean.getUfPropValueStatic("consumerGroup"); -// bb.writeLog("consumerGroup:" + consumerGroup); +package weaver.interfaces.dito.mq; + + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import weaver.file.Prop; +import weaver.general.BaseBean; +import weaver.general.GCONST; +import weaver.general.InitServer; +import weaver.interfaces.dito.comInfo.PropBean; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; + + + +public class RocketmqServlet extends HttpServlet { + + + private boolean isMainIp = false;//是否为主节点 + + private boolean isSampleMode = false;//是否为单机(启用ridis并且配置了主节点的,则不是单机,否则认为是集群环境) + + @Override + public void init() throws ServletException + { + + BaseBean bb = new BaseBean(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); + + bb.writeLog("initiated---进入时间:"+sdf.format(new Date())); + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + + try{ + isMainIp = isMainIp();//计算是否为集群中的主节点 + isSampleMode = isSampleMode();//判断是否为单机 + bb.writeLog("isMainIp:"+isMainIp); + bb.writeLog("isSampleMode:"+isSampleMode); + + if(isSampleMode){ + initData(); + }else{ + if (isMainIp) { + //则调用service方法加载设置,重新获取workflowid相关超时设置OvertimeEntity + initData(); + } + } + + + + //DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("portal-producer-group_nj"); + //DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cbec-consumer-group_nj_133"); + //consumer.setNamesrvAddr("172.16.84.183:9001;172.16.84.187:9001"); +// consumer.subscribe("dataSync_topic_nj", "BPM"); +// consumer.setInstanceName("dataSync_topic_nj"); + +// String consumerGroup = PropBean.getUfPropValue("consumerGroup"); +// bb.writeLog("consumerGroup:"+consumerGroup); // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); -// String namesrvAddr = propBean.getUfPropValueStatic("namesrvAddr"); -// bb.writeLog("namesrvAddr:" + namesrvAddr); +// +// String namesrvAddr = PropBean.getUfPropValue("namesrvAddr"); +// bb.writeLog("namesrvAddr:"+namesrvAddr); // consumer.setNamesrvAddr(namesrvAddr); -// String instanceName = propBean.getUfPropValueStatic("instanceName"); -// bb.writeLog("instanceName:" + instanceName); +// +// String instanceName = PropBean.getUfPropValue("instanceName"); +// bb.writeLog("instanceName:"+instanceName); // consumer.setInstanceName(instanceName); // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); -// String topic = propBean.getUfPropValueStatic("topic"); -// String subExpression = propBean.getUfPropValueStatic("subExpression"); -// bb.writeLog("topic:" + topic); -// bb.writeLog("subExpression:" + subExpression); -// String authid = propBean.getUfPropValueStatic("authid"); -// String authpwd = propBean.getUfPropValueStatic("authpwd"); -// String clustername = propBean.getUfPropValueStatic("clustername"); -// String tenantid = propBean.getUfPropValueStatic("tenantid"); -// bb.writeLog("authid:" + authid); -// bb.writeLog("authpwd:" + authpwd); -// bb.writeLog("clustername:" + clustername); -// bb.writeLog("tenantid:" + tenantid); -// consumer.subscribe(topic, subExpression); -// consumer.setAuthID(authid); -// consumer.setAuthPWD(authpwd); -// consumer.setClusterName(clustername); -// consumer.setTenantID(tenantid); +// //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); +// +// String topic = PropBean.getUfPropValue("topic"); +// String subExpression = PropBean.getUfPropValue("subExpression"); +// +// bb.writeLog("topic:"+topic); +// bb.writeLog("subExpression:"+subExpression); // +// consumer.subscribe(topic,subExpression); // consumer.setConsumeThreadMin(1); // consumer.setConsumeThreadMax(1); // consumer.setConsumeMessageBatchMaxSize(1); -// // consumer.setMessageModel(MessageModel.BROADCASTING); // consumer.registerMessageListener(new RocketMsgListener()); +// consumer.registerMessageListener(new RocketMsgOrderListener()); + + /** + * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
+ */ // consumer.start(); -// bb.writeLog("Consumer66Started."); -// } catch (MQClientException var13) { -// bb.writeLog("Consumer resource model initiated--MQClientException:" + var13); -// } -// } -// -// /** -// * 实现 HttpServlet 的 doGet 方法,不作任何操作 -// * @param request -// * @param response -// * @throws ServletException -// * @throws IOException -// */ -// @Override -// public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { -// doPost(request, response); -// } -// -// /** -// * 实现 HttpServlet 的 doPost 方法,不作任何操作 -// * @param request -// * @param response -// * @throws ServletException -// * @throws IOException -// */ -// -// @Override -// public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { -// } -// -// @Override -// public void destroy() -// { -// // 什么也不做 -// } -// -// -// private boolean isMainIp() { -// new BaseBean().writeLog("超时判断主节点"); -// String mainControlIp = ""; -// ArrayList hostIps = new InitServer().getRealIp(); -// Prop prop = Prop.getInstance(); -// mainControlIp = prop.getPropValue(GCONST.getConfigFile(), "MainControlIP"); -// if (hostIps == null || hostIps.size() == 0) { -// new BaseBean().writeLog("System Init Error:Cannot get local Ip address,This may cause scripts or Timed task not run! "); -// } else { -// new BaseBean().writeLog("System Init Message:mainControlIp=" + mainControlIp + " localIp:" + hostIps.toString()); -// } -// if ((!"".equals(mainControlIp) && hostIps.contains(mainControlIp)) || "".equals(mainControlIp)) { -// return true; -// } -// return false; -// } -// -// public boolean isSampleMode() { -// BaseBean base = new BaseBean(); -// boolean redis_flag = "1".equals(base.getPropValue("weaver_new_session", "status")); -// boolean mainIp_flag = !"".equals(base.getPropValue("weaver", "MainControlIP")); -// base.writeLog("超时判断是否为集群环境:redis_flag======"+redis_flag); -// base.writeLog("超时判断是否为集群环境:mainIp_flag======="+mainIp_flag); -// return !(redis_flag && mainIp_flag); -// } -// -// -// -//} +// bb.writeLog("Consumer Started."); + + }catch (Exception e){ + bb.writeLog("Consumer resource model initiated--MQClientException:"+e); + } + + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + bb.writeLog("***** resource model initiated"); + } + + + public void initData(){ + BaseBean bb = new BaseBean(); + PropBean propBean = new PropBean(); + + try { + String consumerGroup = propBean.getUfPropValueStatic("consumerGroup"); + bb.writeLog("consumerGroup:" + consumerGroup); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); + String namesrvAddr = propBean.getUfPropValueStatic("namesrvAddr"); + bb.writeLog("namesrvAddr:" + namesrvAddr); + consumer.setNamesrvAddr(namesrvAddr); + String instanceName = propBean.getUfPropValueStatic("instanceName"); + bb.writeLog("instanceName:" + instanceName); + consumer.setInstanceName(instanceName); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + String topic = propBean.getUfPropValueStatic("topic"); + String subExpression = propBean.getUfPropValueStatic("subExpression"); + bb.writeLog("topic:" + topic); + bb.writeLog("subExpression:" + subExpression); + String authid = propBean.getUfPropValueStatic("authid"); + String authpwd = propBean.getUfPropValueStatic("authpwd"); + String clustername = propBean.getUfPropValueStatic("clustername"); + String tenantid = propBean.getUfPropValueStatic("tenantid"); + bb.writeLog("authid:" + authid); + bb.writeLog("authpwd:" + authpwd); + bb.writeLog("clustername:" + clustername); + bb.writeLog("tenantid:" + tenantid); + consumer.subscribe(topic, subExpression); + consumer.setAuthID(authid); + consumer.setAuthPWD(authpwd); + consumer.setClusterName(clustername); + consumer.setTenantID(tenantid); + + consumer.setConsumeThreadMin(1); + consumer.setConsumeThreadMax(1); + consumer.setConsumeMessageBatchMaxSize(1); + + consumer.setMessageModel(MessageModel.BROADCASTING); + consumer.registerMessageListener(new RocketMsgListener()); + consumer.start(); + bb.writeLog("Consumer66Started."); + } catch (MQClientException var13) { + bb.writeLog("Consumer resource model initiated--MQClientException:" + var13); + } + } + + /** + * 实现 HttpServlet 的 doGet 方法,不作任何操作 + * @param request + * @param response + * @throws ServletException + * @throws IOException + */ + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + doPost(request, response); + } + + /** + * 实现 HttpServlet 的 doPost 方法,不作任何操作 + * @param request + * @param response + * @throws ServletException + * @throws IOException + */ + + @Override + public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + } + + @Override + public void destroy() + { + // 什么也不做 + } + + + private boolean isMainIp() { + new BaseBean().writeLog("超时判断主节点"); + String mainControlIp = ""; + ArrayList hostIps = new InitServer().getRealIp(); + Prop prop = Prop.getInstance(); + mainControlIp = prop.getPropValue(GCONST.getConfigFile(), "MainControlIP"); + if (hostIps == null || hostIps.size() == 0) { + new BaseBean().writeLog("System Init Error:Cannot get local Ip address,This may cause scripts or Timed task not run! "); + } else { + new BaseBean().writeLog("System Init Message:mainControlIp=" + mainControlIp + " localIp:" + hostIps.toString()); + } + if ((!"".equals(mainControlIp) && hostIps.contains(mainControlIp)) || "".equals(mainControlIp)) { + return true; + } + return false; + } + + public boolean isSampleMode() { + BaseBean base = new BaseBean(); + boolean redis_flag = "1".equals(base.getPropValue("weaver_new_session", "status")); + boolean mainIp_flag = !"".equals(base.getPropValue("weaver", "MainControlIP")); + base.writeLog("超时判断是否为集群环境:redis_flag======"+redis_flag); + base.writeLog("超时判断是否为集群环境:mainIp_flag======="+mainIp_flag); + return !(redis_flag && mainIp_flag); + } + + + +} diff --git a/src/weaver/interfaces/dito/mq/RocketmqUtil.java b/src/weaver/interfaces/dito/mq/RocketmqUtil.java index 7cdd0438..f654961f 100644 --- a/src/weaver/interfaces/dito/mq/RocketmqUtil.java +++ b/src/weaver/interfaces/dito/mq/RocketmqUtil.java @@ -14,14 +14,18 @@ import weaver.hrm.resource.ResourceComInfo; import weaver.interfaces.dito.comInfo.PropBean; import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class RocketmqUtil { + private List consumerBases; + + { + consumerBases = new ArrayList<>(); + consumerBases.add(new SignatureConsumer()); + } + private Lock lock = new ReentrantLock(); public int updateOrgData(String data) { @@ -58,6 +62,11 @@ public class RocketmqUtil { }else if("staff".equals(tableName)){ updateStaffData(jsonArray,tableName); } + for (ConsumerBase consumerBase :consumerBases){ + if (consumerBase.support(tableName)){ + consumerBase.consumer(jsonArray,tableName); + } + } } } } diff --git a/src/weaver/interfaces/dito/mq/SignatureConsumer.java b/src/weaver/interfaces/dito/mq/SignatureConsumer.java new file mode 100644 index 00000000..aee7fffd --- /dev/null +++ b/src/weaver/interfaces/dito/mq/SignatureConsumer.java @@ -0,0 +1,32 @@ +package weaver.interfaces.dito.mq; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import weaver.general.BaseBean; +import weaver.interfaces.dito.job.ESiginsCronJob; + +import java.util.Arrays; + +public class SignatureConsumer implements ConsumerBase{ + + @Override + public boolean support(String tableName) { + + return "ele_signature".equals(tableName); + } + + @Override + public void consumer(JSONArray jsonArray, String tableName) { + BaseBean bb = new BaseBean(); + bb.writeLog("********** consumer SignatureConsumer start **********"); + for (int i = 0; i < jsonArray.size(); i++){ + JSONObject jsonObject = jsonArray.getJSONObject(i); + String sysUserCode = jsonObject.getString("sysUserCode"); + ESiginsCronJob eSiginsCronJob = new ESiginsCronJob(); + eSiginsCronJob.setUsercode(sysUserCode); + eSiginsCronJob.execute(); + + } + bb.writeLog("********** consumer SignatureConsumer end **********"); + } +}