You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
haojing/test/Consumer.java

132 lines
5.6 KiB
Java

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import java.io.*;
import java.util.List;
public class Consumer {
/**
* 内部是使用长轮询Pull方式从MetaQ服务器拉消息然后再回调用户Listener方法<br>
*/
public static void main(String[] args) throws InterruptedException,
MQClientException {
/**
* 一个应用创建一个Consumer由应用来维护此对象可以设置为全局对象或者单例<br>
* 注意ConsumerGroupName需要由应用来保证唯一。
*不同consumer group里的consumer即便是消费同一个topic下的同一个queue
*那消费进度也是分开存储的。也就是说不同的consumer group内的consumer的消费
*完全隔离,彼此不受影响。
*/
// mq.datasync.topic=dataSync_topic_nj
// ctgmq.producer.common.producerGroupName=portal-producer-group_nj
// ctgmq.producer.common.instanceName=dataSync_topic_nj
// ctgmq.producer.common.namesrvAddr=172.16.84.183:9001;172.16.84.187:9001
// ctgmq.producer.common.authId=rul
// ctgmq.producer.common.authPwd=rul
// ctgmq.producer.common.clusterName=CtgMQ_01
// ctgmq.producer.common.tenantID=100000
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"cbec-consumer-group_nj_133");
consumer.setNamesrvAddr("172.16.84.183:9001;172.16.84.187:9001");
//consumer.setInstanceName("dataSync_topic_nj");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//广播消费是指一个consumer只要订阅了某个topic的消息那它就会收到该topic下的所有queue里的消息
//而不管这个consumer的group是什么。所以对于广播消费来说consumer group没什么实际意义。consumer可以在实例化时我们可以指定是集群消费还是广播消费。
//consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setMessageModel(MessageModel.BROADCASTING);
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
//consumer.subscribe("dataSync_topic_nj", "TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息<br>
* 注意一个consumer对象可以订阅多个topic
*/
consumer.setConsumeMessageBatchMaxSize(1);
//关闭VIP通道避免接收不了消息
consumer.subscribe("dataSync_topic_pto", "BPM");
int i = 0;
String path = "C:\\Users\\liuliang\\Desktop\\Demo3-liuliang\\demo21.txt";
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// System.out.println(Thread.currentThread().getName()
// + " Receive New Messages: " + msgs.size());
MessageExt msg = msgs.get(0);
try {
// System.out.println("11----"+new String(msg.getBody(), StandardCharsets.UTF_8));
// System.out.println("11----"+new String(msg.toString()));
// System.out.println("1----"+new String(msg.getBody(),"UTF-8"));
// System.out.println("2----"+new String(msg.getBody(),"GBK"));
System.out.println("3----"+new String(msg.getBody()));
String msgdata = new String(msg.getBody(),"UTF-8");
// System.out.println("5----"+msgdata.substring(msgdata.indexOf("{")));
if(!"".equals(msgdata)){
String data = msgdata.substring(msgdata.indexOf("{"));
//SymOrgUserData.updasteSysOrgData(data);
System.out.println(data);
System.out.println(context.getMessageQueue().toString());
BufferedWriter out = null;
try {
out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(path,true)));
out.write(data+"\r\n");
out.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
System.out.println(e);
} catch (IOException e) {
e.printStackTrace();
System.out.println(e);
}
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
System.out.println(e);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化初始化一次即可<br>
*/
consumer.start();
System.out.println("Consumer Started.");
}
}