To use Luxun, you need to have JDK 1.6 or above installed on your operating system, and Luxun supports Windows and Linux operation systems.
Step 1 : Download Luxun package
Download a recent stable release by following instructions on Luxun github site, then extract zip file to a local folder. Current(when this post is written) stable release is 0.6.0.
Luxun distribution provides scripts(in bin folder) for both Windows and Linux environments, the demo below is made in a windows environment, if you are using Linux environment, please change script accordingly.
Step 2 : Start the server
123456
D:\test\luxun-0.6.0>bin\server.bat conf\server.properties
2013-04-03 16:36:18.856 INFO [LuxunServer] Starting luxun server 0.6
2013-04-03 16:36:18.872 INFO [LogManager] starting log cleaner every 600000 ms.
2013-04-03 16:36:18.888 INFO [ThriftServer] Wating server to start, time waited : 0 s.
2013-04-03 16:36:19.902 INFO [ThriftServer] Thrift server started on port : 9092
2013-04-03 16:36:19.902 INFO [LuxunServer] Server started.
Step 3 : Send some messages
Luxun comes with a command line client that will take input from standard input and send it out as messages to the Luxun server. Each line will be sent as a separate message. The topic demo is created automatically when messages are sent to it. You should see something like this:
1234567
D:\test\luxun-0.6.0>bin\producer-console.bat --broker-list 0:localhost:9092 --topic demo
Enter your message and exit with empty string.
> Welcome to Luxun
Message sent : Welcome to Luxun
> 鲁迅是一个伟大的中国作家
Message sent : 鲁迅是一个伟大的中国作家
>
Step 4 : Start a consumer
Luxun also has a command line consumer that will dump out messages to standard output.
If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
Both of these command line tools have additional options. Running the command with no arguments will display usage information documenting them in more detail.
simple-consumer-console is built on Luxun’s simple consumer, there is also a similar consumer-console which is built on Luxun’s advanced consumer. Both can be used for demo and testing.
Step 5 : Write some code
Below is some very simple examples of using Luxun for message producing and consuming, for consuming part, SimpleConsumer is used, both consume by index and consume by fanout id are demonstrated in every case.
Setup & Cleanup
The demos are showed as unit test cases, before every test cases, we set up two brokers and two simple consumers for testing, after every test cases, we do some cleanup:
privateintbrokerId1=0;privateintbrokerId2=1;privateintport1=9092;privateintport2=9093;privateLuxunServerserver1=null;privateLuxunServerserver2=null;privateStringbrokerList=brokerId1+":localhost:"+port1+","+brokerId2+":localhost:"+port2;privateStringbroker1=brokerId1+":localhost:"+port1;privateSimpleConsumersimpleConsumer1=null;privateSimpleConsumersimpleConsumer2=null;@Beforepublicvoidsetup(){// set up 2 brokersPropertiesprops1=newProperties();props1.put("brokerid",String.valueOf(brokerId1));props1.put("port",String.valueOf(port1));props1.put("log.dir",TestUtils.createTempDir().getAbsolutePath());ServerConfigconfig1=newServerConfig(props1);server1=newLuxunServer(config1);server1.startup();Propertiesprops2=newProperties();props2.put("brokerid",String.valueOf(brokerId2));props2.put("port",String.valueOf(port2));props2.put("log.dir",TestUtils.createTempDir().getAbsolutePath());ServerConfigconfig2=newServerConfig(props2);server2=newLuxunServer(config2);server2.startup();// set up two simple consumers// create a consumer 1 to connect to the Luxun server running on localhost, port 9092, socket timeout of 60 secssimpleConsumer1=newSimpleConsumer("localhost",port1,60000);// create a consumer 2 to connect to the Luxun server running on localhost, port 9093, socket timeout of 60 secssimpleConsumer2=newSimpleConsumer("localhost",port2,60000);}@Afterpublicvoidcleanup()throwsException{server1.close();server2.close();simpleConsumer1.close();simpleConsumer2.close();Utils.deleteDirectory(newFile(server1.config.getLogDir()));Utils.deleteDirectory(newFile(server2.config.getLogDir()));Thread.sleep(500);}
@TestpublicvoidsendSingleMessage()throwsException{Propertiesprops=newProperties();props.put("serializer.class",StringEncoder.class.getName());props.put("broker.list",broker1);ProducerConfigconfig=newProducerConfig(props);Producer<String,String>producer=newProducer<String,String>(config);ProducerData<String,String>data=newProducerData<String,String>("test-topic","test-message");producer.send(data);producer.close();// finish with the producer// consume by indexList<MessageList>listOfMessageList=simpleConsumer1.consume("test-topic",0,10000);assertTrue(listOfMessageList.size()==1);MessageListmessageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);Messagemessage=messageList.get(0);assertEquals("test-message",newString(message.getBytes()));// consume by fanout idStringfanoutId="demo";listOfMessageList=simpleConsumer1.consume("test-topic",fanoutId,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);message=messageList.get(0);assertEquals("test-message",newString(message.getBytes()));}
@TestpublicvoidsendMultipleMessages()throwsException{Propertiesprops=newProperties();props.put("serializer.class",StringEncoder.class.getName());props.put("broker.list",broker1);ProducerConfigconfig=newProducerConfig(props);Producer<String,String>producer=newProducer<String,String>(config);List<String>messages=newArrayList<String>();messages.add("test-message1");messages.add("test-message2");messages.add("test-message3");ProducerData<String,String>data=newProducerData<String,String>("test-topic",messages);producer.send(data);producer.close();// finish with the producer// consume by indexList<MessageList>listOfMessageList=simpleConsumer1.consume("test-topic",0,10000);assertTrue(listOfMessageList.size()==1);MessageListmessageList=listOfMessageList.get(0);assertTrue(messageList.size()==3);for(inti=1;i<=3;i++){Messagemessage=messageList.get(i-1);assertEquals("test-message"+i,newString(message.getBytes()));}// consume by fanout idStringfanoutId="demo";listOfMessageList=simpleConsumer1.consume("test-topic",fanoutId,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertTrue(messageList.size()==3);for(inti=1;i<=3;i++){Messagemessage=messageList.get(i-1);assertEquals("test-message"+i,newString(message.getBytes()));}}
@TestpublicvoidsendMessagesToDifferentTopics()throwsException{Propertiesprops=newProperties();props.put("serializer.class",StringEncoder.class.getName());props.put("broker.list",broker1);ProducerConfigconfig=newProducerConfig(props);Producer<String,String>producer=newProducer<String,String>(config);ProducerData<String,String>data1=newProducerData<String,String>("test-topic1","test-message1");producer.send(data1);ProducerData<String,String>data2=newProducerData<String,String>("test-topic2","test-message2");producer.send(data2);producer.close();// finish with the producer// consume by indexList<MessageList>listOfMessageList=simpleConsumer1.consume("test-topic1",0,10000);assertTrue(listOfMessageList.size()==1);MessageListmessageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);Messagemessage=messageList.get(0);assertEquals("test-message1",newString(message.getBytes()));listOfMessageList=simpleConsumer1.consume("test-topic2",0,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);message=messageList.get(0);assertEquals("test-message2",newString(message.getBytes()));// consume by fanoutIdStringfanoutId="demo";listOfMessageList=simpleConsumer1.consume("test-topic1",fanoutId,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);message=messageList.get(0);assertEquals("test-message1",newString(message.getBytes()));listOfMessageList=simpleConsumer1.consume("test-topic2",fanoutId,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);message=messageList.get(0);assertEquals("test-message2",newString(message.getBytes()));}
@TestpublicvoidsendMessageWithGZIPCompression()throwsException{Propertiesprops=newProperties();props.put("serializer.class",StringEncoder.class.getName());props.put("broker.list",broker1);props.put("compression.codec","1");ProducerConfigconfig=newProducerConfig(props);Producer<String,String>producer=newProducer<String,String>(config);ProducerData<String,String>data=newProducerData<String,String>("test-topic","test-message");producer.send(data);producer.close();// finish with the producer// consume by indexList<MessageList>listOfMessageList=simpleConsumer1.consume("test-topic",0,10000);assertTrue(listOfMessageList.size()==1);MessageListmessageList=listOfMessageList.get(0);assertEquals(CompressionCodec.GZIP,messageList.getCompressionCodec());assertTrue(messageList.size()==1);Messagemessage=messageList.get(0);assertEquals("test-message",newString(message.getBytes()));// consume by fanout idStringfanoutId="demo";listOfMessageList=simpleConsumer1.consume("test-topic",fanoutId,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertEquals(CompressionCodec.GZIP,messageList.getCompressionCodec());assertTrue(messageList.size()==1);message=messageList.get(0);assertEquals("test-message",newString(message.getBytes()));}
@TestpublicvoidsendMessageWithAsyncProducer()throwsException{Propertiesprops=newProperties();props.put("serializer.class",StringEncoder.class.getName());props.put("producer.type","async");props.put("broker.list",broker1);ProducerConfigconfig=newProducerConfig(props);Producer<String,String>producer=newProducer<String,String>(config);ProducerData<String,String>data=newProducerData<String,String>("test-topic","test-message");producer.send(data);producer.close();// finish with the producer// consume by indexList<MessageList>listOfMessageList=simpleConsumer1.consume("test-topic",0,10000);assertTrue(listOfMessageList.size()==1);MessageListmessageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);Messagemessage=messageList.get(0);assertEquals("test-message",newString(message.getBytes()));// consume by fanout idStringfanoutId="demo";listOfMessageList=simpleConsumer1.consume("test-topic",fanoutId,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);message=messageList.get(0);assertEquals("test-message",newString(message.getBytes()));}
@TestpublicvoidsendMessagesWithCustomPartitioner()throwsException{Propertiesprops=newProperties();props.put("serializer.class",StringEncoder.class.getName());props.put("broker.list",brokerList);props.put("partitioner.class",CustomPartitioner.class.getName());ProducerConfigconfig=newProducerConfig(props);Producer<String,String>producer=newProducer<String,String>(config);// will be sent to broker 1 since (the length of key % num of brokers) = 0ProducerData<String,String>data1=newProducerData<String,String>("test-topic1","key1","test-message1");producer.send(data1);// will be went to broker 2 since (the length of key % num of brokers) = 1ProducerData<String,String>data2=newProducerData<String,String>("test-topic2","key11","test-message2");producer.send(data2);producer.close();// finish with the producer// consume by indexList<MessageList>listOfMessageList=simpleConsumer1.consume("test-topic1",0,10000);assertTrue(listOfMessageList.size()==1);MessageListmessageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);Messagemessage=messageList.get(0);assertEquals("test-message1",newString(message.getBytes()));listOfMessageList=simpleConsumer2.consume("test-topic2",0,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);message=messageList.get(0);assertEquals("test-message2",newString(message.getBytes()));// consume by fanoutIdStringfanoutId="demo";listOfMessageList=simpleConsumer1.consume("test-topic1",fanoutId,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);message=messageList.get(0);assertEquals("test-message1",newString(message.getBytes()));listOfMessageList=simpleConsumer2.consume("test-topic2",fanoutId,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);message=messageList.get(0);assertEquals("test-message2",newString(message.getBytes()));}
@TestpublicvoidsendMessageWithCustomEncoder()throwsException{Propertiesprops=newProperties();props.put("serializer.class",LogEventEncoder.class.getName());props.put("broker.list",broker1);ProducerConfigconfig=newProducerConfig(props);Producer<String,LogEvent>producer=newProducer<String,LogEvent>(config);LogEventlogEvent=newLogEvent();logEvent.createdTime=System.currentTimeMillis();logEvent.hostId="127.0.0.1";logEvent.logLevel=LogLevel.INFO;logEvent.message="a test log message";ProducerData<String,LogEvent>data=newProducerData<String,LogEvent>("log-topic",logEvent);producer.send(data);producer.close();// finish with the producer// consume by indexLogEventDecoderdecoder=newLogEventDecoder();List<MessageList>listOfMessageList=simpleConsumer1.consume("log-topic",0,10000);assertTrue(listOfMessageList.size()==1);MessageListmessageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);Messagemessage=messageList.get(0);assertEquals(logEvent,decoder.toEvent(message));// consume by fanout idStringfanoutId="demo";listOfMessageList=simpleConsumer1.consume("log-topic",fanoutId,10000);assertTrue(listOfMessageList.size()==1);messageList=listOfMessageList.get(0);assertTrue(messageList.size()==1);message=messageList.get(0);assertEquals(logEvent,decoder.toEvent(message));}
@TestpublicvoidconsumeMessageWithDifferentFanoutId()throwsException{Propertiesprops=newProperties();props.put("serializer.class",StringEncoder.class.getName());props.put("broker.list",broker1);ProducerConfigconfig=newProducerConfig(props);Producer<String,String>producer=newProducer<String,String>(config);for(inti=0;i<100;i++){ProducerData<String,String>data=newProducerData<String,String>("test-topic","test-message"+i);producer.send(data);}producer.close();// finish with the producer// consume by different fanout id independentlyStringfanoutId="group-a";List<MessageList>listOfMessageList=simpleConsumer1.consume("test-topic",fanoutId,10000);assertTrue(listOfMessageList.size()==100);for(inti=0;i<100;i++){MessageListmessageList=listOfMessageList.get(i);assertTrue(messageList.size()==1);Messagemessage=messageList.get(0);assertEquals("test-message"+i,newString(message.getBytes()));}fanoutId="group-b";listOfMessageList=simpleConsumer1.consume("test-topic",fanoutId,10000);assertTrue(listOfMessageList.size()==100);for(inti=0;i<100;i++){MessageListmessageList=listOfMessageList.get(i);assertTrue(messageList.size()==1);Messagemessage=messageList.get(0);assertEquals("test-message"+i,newString(message.getBytes()));}fanoutId="group-c";listOfMessageList=simpleConsumer1.consume("test-topic",fanoutId,10000);assertTrue(listOfMessageList.size()==100);for(inti=0;i<100;i++){MessageListmessageList=listOfMessageList.get(i);assertTrue(messageList.size()==1);Messagemessage=messageList.get(0);assertEquals("test-message"+i,newString(message.getBytes()));}}
Step 6 : Study advanced demo
You can find the source of an advanced demo here, this demo shows:
Multiple topics
Multiple threads concurrent producing and consuming
The use of StreamFactory to create advanced stream style consumers
Group consuming
Below is a figure vividly show the demo scenario:
There are two brokers.
There are two topics, topic star(marked as green) and topic moon(marked as blue), in each broker.
Two topic star producer threads(marked as green) will produce topic star messages to two brokers.
Four topic moon producer threads(marked as blue) will produce topic moon messages to two brokers.
There are two consumer groups.
In consumer group A, there are two topic star consumer threads(marked as green) and one topic moon consumer thread(marked as blue).
In consume group B, there are four topic star consumer threads(marked as green) and two topic moon consumer threads(marked as blue).
Within same consumer group, message for a topic will be consumed by one and only one consumer, for example, in consumer group B, although there are four topic star consumers, every topic star message can only be consumed by exact one topic star consumer, this is just the consume once queue semantics.
Among different consumer groups, message for a topic will be consumed by each consumer group, for example, both star consumers in consumer group A and group B will get their respective topic star message copy, this is just the fanout queue semantics.