#EC_HJ5# RocketMq修改消息广播模式、角色唯一判断字段

main
shilei 2 years ago
parent 52cfeef35f
commit d634f1e5cb

@ -22,15 +22,18 @@ public class HrmRocketMsgListener implements MessageListenerConcurrently {
try {
bb.writeLog("Consumer---6----"+new String(msg.getBody()));
String msgdata = new String(msg.getBody(),"UTF-8");
if(StringUtils.isBlank(msgdata))
if(StringUtils.isNotBlank(msgdata))
{
int errcount = hrmRocketmqUtil.updateOrgData(msgdata);
bb.writeLog("Consumer---errcount---:"+errcount);
bb.writeLog("Consumer1---errcount---:"+errcount);
}else{
bb.writeLog("Consumer1---msgdata---is null ");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
bb.writeLog("Consumer---UnsupportedEncodingException---e:"+e);
bb.writeLog("Consumer1---UnsupportedEncodingException---e:"+e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

@ -0,0 +1,43 @@
package weaver.interfaces.dito.mq;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import weaver.general.BaseBean;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class HrmRocketMsgListener3 implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
BaseBean bb = new BaseBean();
HrmRocketmqUtil hrmRocketmqUtil = new HrmRocketmqUtil();
for (MessageExt msg : msgs) {
bb.writeLog("consumeThread3=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
String msgdata = "";
try {
String msgdata2 = new String(msg.getBody(),"UTF-8");
bb.writeLog("msgdata2:"+msgdata2);
msgdata = new String(msg.getBody());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
bb.writeLog("Consumer3---UnsupportedEncodingException---e:"+e);
}
if(StringUtils.isNotBlank(msgdata))
{
bb.writeLog("Consumer3---msgdataisnotnull");
int errcount = hrmRocketmqUtil.updateOrgData(msgdata);
bb.writeLog("Consumer3---errcount---:"+errcount);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

@ -43,8 +43,8 @@ public class HrmRocketmqServlet extends HttpServlet {
try{
isMainIp = isMainIp();//计算是否为集群中的主节点
isSampleMode = isSampleMode();//判断是否为单机
bb.writeLog("isMainIp:"+isMainIp);
bb.writeLog("isSampleMode:"+isSampleMode);
bb.writeLog("isMainIp3:"+isMainIp);
bb.writeLog("isSampleMode3:"+isSampleMode);
if(isSampleMode){
initData();
@ -70,8 +70,6 @@ public class HrmRocketmqServlet extends HttpServlet {
PropBean propBean = new PropBean();
try {
// String consumerGroup = propBean.getUfPropValueStatic("consumerGroup");
// bb.writeLog("consumerGroup:" + consumerGroup);
String hrmConsumerGroup = PropBean.getUfPropValue("hrmConsumerGroup");
String hrmConsumerAddr = PropBean.getUfPropValue("hrmConsumerAddr");
String hrmInstanceName = PropBean.getUfPropValue("hrmInstanceName");
@ -80,14 +78,24 @@ public class HrmRocketmqServlet extends HttpServlet {
String hrmMQAuthPWD = PropBean.getUfPropValue("hrmMQAuthPWD");
String hrmMQClusterName = PropBean.getUfPropValue("hrmMQClusterName");
String hrmMQTenantID = PropBean.getUfPropValue("hrmMQTenantID");
bb.writeLog("hrmConsumerGroup:" + hrmConsumerGroup);
bb.writeLog("hrmConsumerAddr:" + hrmConsumerAddr);
bb.writeLog("hrmInstanceName:" + hrmInstanceName);
bb.writeLog("hrmSubExpr:" + hrmSubExpr);
bb.writeLog("hrmMQAuthID:" + hrmMQAuthID);
bb.writeLog("hrmMQAuthPWD:" + hrmMQAuthPWD);
bb.writeLog("hrmMQClusterName:" + hrmMQClusterName);
bb.writeLog("hrmMQTenantID:" + hrmMQTenantID);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(hrmConsumerGroup);
// String namesrvAddr = propBean.getUfPropValueStatic("namesrvAddr");
// bb.writeLog("namesrvAddr:" + namesrvAddr);
consumer.setNamesrvAddr(hrmConsumerAddr);
// String instanceName = propBean.getUfPropValueStatic("instanceName");
// bb.writeLog("instanceName:" + instanceName);
consumer.setInstanceName(hrmInstanceName);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// String topic = propBean.getUfPropValueStatic("topic");
// String subExpression = propBean.getUfPropValueStatic("subExpression");
// bb.writeLog("topic:" + topic);
@ -100,26 +108,59 @@ public class HrmRocketmqServlet extends HttpServlet {
// bb.writeLog("authpwd:" + authpwd);
// bb.writeLog("clustername:" + clustername);
// bb.writeLog("tenantid:" + tenantid);
consumer.subscribe(hrmInstanceName, hrmSubExpr);
consumer.setAuthID(hrmMQAuthID);
consumer.setAuthPWD(hrmMQAuthPWD);
consumer.setClusterName(hrmMQClusterName);
consumer.setTenantID(hrmMQTenantID);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1);
// consumer.setConsumeThreadMin(1);
// consumer.setConsumeThreadMax(1);
// consumer.setConsumeMessageBatchMaxSize(1);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new HrmRocketMsgListener());
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new HrmRocketMsgListener3());
consumer.start();
bb.writeLog("Consumer88Started.");
} catch (MQClientException var13) {
bb.writeLog("Consumer resource model initiated--MQClientException:" + var13);
} catch (MQClientException e) {
e.printStackTrace();
bb.writeLog("Consumer resource model initiated--MQClientException:" + e);
}
}
// public void initData(){
// BaseBean bb = new BaseBean();
// PropBean propBean = new PropBean();
//
// try {
//
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("");
// consumer.setNamesrvAddr("");
// consumer.setInstanceName("");
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// consumer.subscribe("", "");
// consumer.setAuthID("");
// consumer.setAuthPWD("");
// consumer.setClusterName("");
// consumer.setTenantID("");
//
//// consumer.setConsumeThreadMin(1);
//// consumer.setConsumeThreadMax(1);
//// consumer.setConsumeMessageBatchMaxSize(1);
//
// consumer.setMessageModel(MessageModel.CLUSTERING);
// consumer.registerMessageListener(new HrmRocketMsgListener3());
// consumer.start();
// bb.writeLog("Consumer9999Started.");
//
// } catch (MQClientException var13) {
// bb.writeLog("Consumer resource model initiated--MQClientException:" + var13);
// }
// }
/**
* HttpServlet doGet
* @param request

@ -24,6 +24,7 @@ public class HrmRocketmqUtil {
public int updateOrgData(String data)
{
BaseBean bb = new BaseBean();
bb.writeLog("updateOrgData---data:"+data);
int errcount = 0;
lock.lock();
try{
@ -35,9 +36,11 @@ public class HrmRocketmqUtil {
String nowDateTime = sdf.format(new Date());
JSONObject jsonObject = JSONObject.parseObject(data);
bb.writeLog("updateOrgData---jsonObject:"+jsonObject.toJSONString());
if(jsonObject.containsKey("masterInfo"))
{
JSONObject masterInfoObject = jsonObject.getJSONObject("masterInfo");
bb.writeLog("updateOrgData---masterInfoObject:"+masterInfoObject.toJSONString());
if(masterInfoObject.containsKey("datas")){
JSONArray datasArray = jsonObject.getJSONArray("datas");
for(int i=0;i<datasArray.size();i++)
@ -46,21 +49,32 @@ public class HrmRocketmqUtil {
String personno = datasObject.getString("personno");
String managerno = datasObject.getString("managerno");
String status = datasObject.getString("status");
bb.writeLog("updateOrgData---personno:"+personno);
bb.writeLog("updateOrgData---managerno:"+managerno);
bb.writeLog("updateOrgData---status:"+status);
if("1".equals(status))
{
if(StringUtils.isNotEmpty(personno) && StringUtils.isNotEmpty(managerno)) {
String personid = "" ;
String managerid = "" ;
String sql = "select id from cus_fielddata where "+cus_staff+" = ? and scope='HrmCustomFieldByInfoType' and scopeid=-1 " ;
bb.writeLog("sql:"+sql);
rs.executeQuery(sql,new Object[]{personno});
if(rs.next()) {
personid = Util.null2String(rs.getString("id"));
}
bb.writeLog("personid:"+personid);
sql = "select id from cus_fielddata where "+cus_staff+" = ? and scope='HrmCustomFieldByInfoType' and scopeid=-1 " ;
bb.writeLog("sql:"+sql);
rs.executeQuery(sql,new Object[]{managerno});
if(rs.next()) {
managerid = Util.null2String(rs.getString("id"));
}
bb.writeLog("managerid:"+managerid);
if(StringUtils.isNotEmpty(personid) && StringUtils.isNotEmpty(managerid)) {
Map<String,String> dataMap = new HashMap<String,String>();
@ -125,7 +139,6 @@ public class HrmRocketmqUtil {
dataMap.put("systable","hrmresource");
dataMap.put("errmessage","入参参数缺少masterInfo");
recordErrorData(dataMap);
}
}catch (Exception e){
bb.writeLog("e:"+e);

@ -29,7 +29,7 @@ public class RocketMsgListener implements MessageListenerConcurrently {
int errcount = rocketmqUtil.updateOrgData(data);
bb.writeLog("Consumer---errcount---:"+errcount);
}
} catch (UnsupportedEncodingException e) {
} catch (Exception e) {
e.printStackTrace();
bb.writeLog("Consumer---UnsupportedEncodingException---e:"+e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;

@ -145,7 +145,7 @@ public class RocketmqServlet extends HttpServlet {
consumer.setConsumeThreadMax(1);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new RocketMsgListener());
consumer.start();
bb.writeLog("Consumer66Started.");

@ -541,7 +541,7 @@ public class RocketmqUtil {
if(StringUtils.isNotEmpty(roleCode)) {
String queryRoleIdSql = " select id from hrmroles where ecology_pinyin_search=?";
String queryRoleIdSql = " select id from hrmroles where rolesmark=?";
rs.executeQuery(queryRoleIdSql, new Object[]{roleCode});
if (rs.next()) {
roleId = rs.getString("id");
@ -818,7 +818,7 @@ public class RocketmqUtil {
// String rolesmark = "~`~`7 "+sysRoleName+"`~`8 "+sysRoleName+"`~`~" ;
// String rolesname = "~`~`7 "+sysRoleName+"`~`8 "+sysRoleName+"`~`~" ;
String rolesmark = sysRoleName ;
String rolesmark = sysRoleCode ;
String rolesname = sysRoleName ;
bb.writeLog("rolesmark:"+rolesmark);
@ -830,7 +830,7 @@ public class RocketmqUtil {
String rolesid = "";
try{
if(StringUtils.isNotEmpty(sysRoleCode)){
String queryRoleSql = "select id from hrmroles where ecology_pinyin_search=?";
String queryRoleSql = "select id from hrmroles where rolesmark=?";
rs.executeQuery(queryRoleSql,new Object[]{sysRoleCode});
bb.writeLog(queryRoleSql);
if (rs.next()){
@ -841,7 +841,7 @@ public class RocketmqUtil {
if (StringUtils.isNotEmpty(rolesid)){
if ("M".equals(actType)) { //修改
String updateHrmrolesSql = " update hrmroles set rolesmark=?,rolesname=? where id=?";
boolean flag = rs.executeUpdate(updateHrmrolesSql, new Object[]{sysRoleName, sysRoleName, rolesid});
boolean flag = rs.executeUpdate(updateHrmrolesSql, new Object[]{sysRoleCode, sysRoleName, rolesid});
bb.writeLog("updateHrmroles : " + flag);
Map<String,String> dataMap = new HashMap<String,String>();
dataMap.put("syndate",nowDateTime);

Loading…
Cancel
Save