|
|
|
|
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
|
|
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
|
|
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
|
|
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
|
|
|
import com.alibaba.rocketmq.client.exception.MQClientException;
|
|
|
|
|
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
|
|
|
|
|
import com.alibaba.rocketmq.common.message.MessageExt;
|
|
|
|
|
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
|
|
|
|
|
|
|
|
|
|
import java.io.*;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
|
|
public class Consumer {
|
|
|
|
|
/**
|
|
|
|
|
* 内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
|
|
|
|
|
*/
|
|
|
|
|
public static void main(String[] args) throws InterruptedException,
|
|
|
|
|
MQClientException {
|
|
|
|
|
/**
|
|
|
|
|
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
|
|
|
|
|
* 注意:ConsumerGroupName需要由应用来保证唯一。
|
|
|
|
|
*不同consumer group里的consumer即便是消费同一个topic下的同一个queue,
|
|
|
|
|
*那消费进度也是分开存储的。也就是说,不同的consumer group内的consumer的消费
|
|
|
|
|
*完全隔离,彼此不受影响。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// mq.datasync.topic=dataSync_topic_nj
|
|
|
|
|
// ctgmq.producer.common.producerGroupName=portal-producer-group_nj
|
|
|
|
|
// ctgmq.producer.common.instanceName=dataSync_topic_nj
|
|
|
|
|
// ctgmq.producer.common.namesrvAddr=172.16.84.183:9001;172.16.84.187:9001
|
|
|
|
|
// ctgmq.producer.common.authId=rul
|
|
|
|
|
// ctgmq.producer.common.authPwd=rul
|
|
|
|
|
// ctgmq.producer.common.clusterName=CtgMQ_01
|
|
|
|
|
// ctgmq.producer.common.tenantID=100000
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
|
|
|
|
|
"cbec-consumer-group_nj_133");
|
|
|
|
|
consumer.setNamesrvAddr("172.16.84.183:9001;172.16.84.187:9001");
|
|
|
|
|
//consumer.setInstanceName("dataSync_topic_nj");
|
|
|
|
|
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//广播消费是指一个consumer只要订阅了某个topic的消息,那它就会收到该topic下的所有queue里的消息,
|
|
|
|
|
//而不管这个consumer的group是什么。所以对于广播消费来说,consumer group没什么实际意义。consumer可以在实例化时,我们可以指定是集群消费还是广播消费。
|
|
|
|
|
//consumer.setMessageModel(MessageModel.BROADCASTING);
|
|
|
|
|
|
|
|
|
|
consumer.setMessageModel(MessageModel.BROADCASTING);
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 订阅指定topic下tags分别等于TagA或TagC或TagD
|
|
|
|
|
*/
|
|
|
|
|
//consumer.subscribe("dataSync_topic_nj", "TagA || TagC || TagD");
|
|
|
|
|
/**
|
|
|
|
|
* 订阅指定topic下所有消息<br>
|
|
|
|
|
* 注意:一个consumer对象可以订阅多个topic
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
consumer.setConsumeMessageBatchMaxSize(1);
|
|
|
|
|
//关闭VIP通道,避免接收不了消息
|
|
|
|
|
|
|
|
|
|
consumer.subscribe("dataSync_topic_pto", "BPM");
|
|
|
|
|
|
|
|
|
|
int i = 0;
|
|
|
|
|
|
|
|
|
|
String path = "C:\\Users\\liuliang\\Desktop\\Demo3-liuliang\\demo21.txt";
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
consumer.registerMessageListener(new MessageListenerConcurrently() {
|
|
|
|
|
/**
|
|
|
|
|
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
|
|
|
|
|
// System.out.println(Thread.currentThread().getName()
|
|
|
|
|
// + " Receive New Messages: " + msgs.size());
|
|
|
|
|
MessageExt msg = msgs.get(0);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// System.out.println("11----"+new String(msg.getBody(), StandardCharsets.UTF_8));
|
|
|
|
|
// System.out.println("11----"+new String(msg.toString()));
|
|
|
|
|
// System.out.println("1----"+new String(msg.getBody(),"UTF-8"));
|
|
|
|
|
// System.out.println("2----"+new String(msg.getBody(),"GBK"));
|
|
|
|
|
System.out.println("3----"+new String(msg.getBody()));
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String msgdata = new String(msg.getBody(),"UTF-8");
|
|
|
|
|
|
|
|
|
|
// System.out.println("5----"+msgdata.substring(msgdata.indexOf("{")));
|
|
|
|
|
|
|
|
|
|
if(!"".equals(msgdata)){
|
|
|
|
|
String data = msgdata.substring(msgdata.indexOf("{"));
|
|
|
|
|
//SymOrgUserData.updasteSysOrgData(data);
|
|
|
|
|
System.out.println(data);
|
|
|
|
|
System.out.println(context.getMessageQueue().toString());
|
|
|
|
|
|
|
|
|
|
BufferedWriter out = null;
|
|
|
|
|
try {
|
|
|
|
|
out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(path,true)));
|
|
|
|
|
out.write(data+"\r\n");
|
|
|
|
|
out.close();
|
|
|
|
|
} catch (FileNotFoundException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
System.out.println(e);
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
System.out.println(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} catch (UnsupportedEncodingException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
System.out.println(e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
|
|
|
|
|
*/
|
|
|
|
|
consumer.start();
|
|
|
|
|
System.out.println("Consumer Started.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|