import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; import java.io.*; import java.util.List; public class Consumer { /** * 内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法
*/ public static void main(String[] args) throws InterruptedException, MQClientException { /** * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
* 注意:ConsumerGroupName需要由应用来保证唯一。 *不同consumer group里的consumer即便是消费同一个topic下的同一个queue, *那消费进度也是分开存储的。也就是说,不同的consumer group内的consumer的消费 *完全隔离,彼此不受影响。 */ // mq.datasync.topic=dataSync_topic_nj // ctgmq.producer.common.producerGroupName=portal-producer-group_nj // ctgmq.producer.common.instanceName=dataSync_topic_nj // ctgmq.producer.common.namesrvAddr=172.16.84.183:9001;172.16.84.187:9001 // ctgmq.producer.common.authId=rul // ctgmq.producer.common.authPwd=rul // ctgmq.producer.common.clusterName=CtgMQ_01 // ctgmq.producer.common.tenantID=100000 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "cbec-consumer-group_nj_133"); consumer.setNamesrvAddr("172.16.84.183:9001;172.16.84.187:9001"); //consumer.setInstanceName("dataSync_topic_nj"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //广播消费是指一个consumer只要订阅了某个topic的消息,那它就会收到该topic下的所有queue里的消息, //而不管这个consumer的group是什么。所以对于广播消费来说,consumer group没什么实际意义。consumer可以在实例化时,我们可以指定是集群消费还是广播消费。 //consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setMessageModel(MessageModel.BROADCASTING); /** * 订阅指定topic下tags分别等于TagA或TagC或TagD */ //consumer.subscribe("dataSync_topic_nj", "TagA || TagC || TagD"); /** * 订阅指定topic下所有消息
* 注意:一个consumer对象可以订阅多个topic */ consumer.setConsumeMessageBatchMaxSize(1); //关闭VIP通道,避免接收不了消息 consumer.subscribe("dataSync_topic_pto", "BPM"); int i = 0; String path = "C:\\Users\\liuliang\\Desktop\\Demo3-liuliang\\demo21.txt"; consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 */ @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { // System.out.println(Thread.currentThread().getName() // + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); try { // System.out.println("11----"+new String(msg.getBody(), StandardCharsets.UTF_8)); // System.out.println("11----"+new String(msg.toString())); // System.out.println("1----"+new String(msg.getBody(),"UTF-8")); // System.out.println("2----"+new String(msg.getBody(),"GBK")); System.out.println("3----"+new String(msg.getBody())); String msgdata = new String(msg.getBody(),"UTF-8"); // System.out.println("5----"+msgdata.substring(msgdata.indexOf("{"))); if(!"".equals(msgdata)){ String data = msgdata.substring(msgdata.indexOf("{")); //SymOrgUserData.updasteSysOrgData(data); System.out.println(data); System.out.println(context.getMessageQueue().toString()); BufferedWriter out = null; try { out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(path,true))); out.write(data+"\r\n"); out.close(); } catch (FileNotFoundException e) { e.printStackTrace(); System.out.println(e); } catch (IOException e) { e.printStackTrace(); System.out.println(e); } } } catch (UnsupportedEncodingException e) { e.printStackTrace(); System.out.println(e); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
*/ consumer.start(); System.out.println("Consumer Started."); } }