import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import java.io.*;
import java.util.List;
public class Consumer {
/**
* 内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法
*/
public static void main(String[] args) throws InterruptedException,
MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
* 注意:ConsumerGroupName需要由应用来保证唯一。
*不同consumer group里的consumer即便是消费同一个topic下的同一个queue,
*那消费进度也是分开存储的。也就是说,不同的consumer group内的consumer的消费
*完全隔离,彼此不受影响。
*/
// mq.datasync.topic=dataSync_topic_nj
// ctgmq.producer.common.producerGroupName=portal-producer-group_nj
// ctgmq.producer.common.instanceName=dataSync_topic_nj
// ctgmq.producer.common.namesrvAddr=172.16.84.183:9001;172.16.84.187:9001
// ctgmq.producer.common.authId=rul
// ctgmq.producer.common.authPwd=rul
// ctgmq.producer.common.clusterName=CtgMQ_01
// ctgmq.producer.common.tenantID=100000
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"cbec-consumer-group_nj_133");
consumer.setNamesrvAddr("172.16.84.183:9001;172.16.84.187:9001");
//consumer.setInstanceName("dataSync_topic_nj");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//广播消费是指一个consumer只要订阅了某个topic的消息,那它就会收到该topic下的所有queue里的消息,
//而不管这个consumer的group是什么。所以对于广播消费来说,consumer group没什么实际意义。consumer可以在实例化时,我们可以指定是集群消费还是广播消费。
//consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setMessageModel(MessageModel.BROADCASTING);
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
//consumer.subscribe("dataSync_topic_nj", "TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.setConsumeMessageBatchMaxSize(1);
//关闭VIP通道,避免接收不了消息
consumer.subscribe("dataSync_topic_pto", "BPM");
int i = 0;
String path = "C:\\Users\\liuliang\\Desktop\\Demo3-liuliang\\demo21.txt";
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
// System.out.println(Thread.currentThread().getName()
// + " Receive New Messages: " + msgs.size());
MessageExt msg = msgs.get(0);
try {
// System.out.println("11----"+new String(msg.getBody(), StandardCharsets.UTF_8));
// System.out.println("11----"+new String(msg.toString()));
// System.out.println("1----"+new String(msg.getBody(),"UTF-8"));
// System.out.println("2----"+new String(msg.getBody(),"GBK"));
System.out.println("3----"+new String(msg.getBody()));
String msgdata = new String(msg.getBody(),"UTF-8");
// System.out.println("5----"+msgdata.substring(msgdata.indexOf("{")));
if(!"".equals(msgdata)){
String data = msgdata.substring(msgdata.indexOf("{"));
//SymOrgUserData.updasteSysOrgData(data);
System.out.println(data);
System.out.println(context.getMessageQueue().toString());
BufferedWriter out = null;
try {
out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(path,true)));
out.write(data+"\r\n");
out.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
System.out.println(e);
} catch (IOException e) {
e.printStackTrace();
System.out.println(e);
}
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
System.out.println(e);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可
*/
consumer.start();
System.out.println("Consumer Started.");
}
}