Message Queue
消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布式集群技术, 提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。 为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性。
1、 开通服务
阿里云主页-》产品-》互联网中间件-》RocketMQ-》开通
2、 下载demo
git clone git@github.com:lollipopjin/Aliware-MQ-demo.git
3、IDEA 导入项目并添加 maven 支持。
4、管理平台
(1) TOPIC管理-》发布Topic
定义一个topic: MQ_TEST_DEMO_TOPIC 备注: MQ_TEST
(2) TOPIC管理-》发布Topic-》申请发布
Producer ID: PID_RYO_MQ_TEST
(3) TOPIC管理-》发布Topic-》申请订阅
Consumer ID: CID_RYO_MQ_TEST
MQConfig
创建Access Key-》
(1) AccessKey (2) SecretKey
Run demo
将MqConfig.java
中数据修改为设置的值。直接运行 main 方法。
Quick Start
需要
64bit OS, best to have Linux/Unix/Mac;
64bit JDK 1.6+;
Maven 3.x
Git
Screen
- JDK
houbinbindeMacBook-Pro:aliyun-ons-client-java houbinbin$ java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
Download and install
- Download rocketmq
download from this https://github.com/alibaba/RocketMQ/releases
, now tag is 3.5.8
$ ls
RocketMQ-3.5.8 RocketMQ-3.5.8.tar.gz
- Install
$ pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.5.8
$ bash instal.sh
...
then we get
To refactor, move this assembly into a child project and use the flag <useAllReactorProjects>true</useAllReactorProjects> in each moduleSet.
[INFO] Building tar: /Users/houbinbin/IT/tools/rocketMQ/RocketMQ-3.5.8/target/alibaba-rocketmq-broker.tar.gz
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] rocketmq-all 3.5.8 ................................. SUCCESS [ 48.565 s]
[INFO] rocketmq-remoting 3.5.8 ............................ SUCCESS [ 1.772 s]
[INFO] rocketmq-common 3.5.8 .............................. SUCCESS [ 1.348 s]
[INFO] rocketmq-client 3.5.8 .............................. SUCCESS [ 1.522 s]
[INFO] rocketmq-store 3.5.8 ............................... SUCCESS [ 0.815 s]
[INFO] rocketmq-srvutil 3.5.8 ............................. SUCCESS [ 0.139 s]
[INFO] rocketmq-broker 3.5.8 .............................. SUCCESS [ 0.951 s]
[INFO] rocketmq-tools 3.5.8 ............................... SUCCESS [ 0.889 s]
[INFO] rocketmq-namesrv 3.5.8 ............................. SUCCESS [ 0.373 s]
[INFO] rocketmq-example 3.5.8 ............................. SUCCESS [ 0.392 s]
[INFO] rocketmq-filtersrv 3.5.8 ........................... SUCCESS [ 0.337 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 59.323 s
[INFO] Finished at: 2017-01-02T18:21:01+08:00
[INFO] Final Memory: 52M/1042M
[INFO] ------------------------------------------------------------------------
环境配置
Make sure the following environment variable is properly setup: JAVA_HOME
Now setup ROCKETMQ_HOME environment variable:
cd devenv
echo "ROCKETMQ_HOME=`pwd`" >> ~/.bash_profile
make it work at once:
source ~/.bash_profile
Launch RocketMQ Name Server and Broker
确保在devenv
目录下:
$ cd bin
$ pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.5.8/devenv/bin
- Start Name Server
screen bash mqnamesrv
现实是可能会遇到这个:
$ screen bash mqnamesrv
[screen is terminating]
执行以下命令(暂时提权):
su
使用 screen
命令进入, 运行:
$ bash mqnamesrv
then
sh-3.2# bash mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
If you see “The Name Server boot success. serializeType=JSON”, this means name server starts successfully.
Press Ctrl + A
, then D
to detach the session.
sh-3.2# screen
[detached]
- Start Broker
sh-3.2# screen
sh-3.2# pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.5.8/devenv/bin
sh-3.2# bash mqbroker -n localhost:9876
cp: /var/root/rmq_bk_gc.log: No such file or directory
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /var/root/tmpfs/logs/gc.log due to No such file or directory
The broker[houbinbindeMacBook-Pro.local, 192.168.2.102:10911] boot success. serializeType=JSON and name server is localhost:9876
If you see output like the following:
The broker[lizhanhui-Lenovo, 172.30.30.233:10911] boot success. serializeType=JSON and name server is localhost:9876
your broker then runs successfully.
Send & Receive MSG
Before sending/receiving messages, we need to tell clients where name servers are located. RocketMQ provides multiple ways to achieve this.
For simplicity, we use environment variable NAMESRV_ADDR
$ export NAMESRV_ADDR=localhost:9876
Test In Mac
- Download
download from this https://github.com/alibaba/RocketMQ/archive/v3.4.6.tar.gz
, now tag is 3.4.6
$ pwd
/Users/houbinbin/it/tools/rocketMQ
$ ls
RocketMQ-3.4.6 RocketMQ-3.4.6.tar.gz
- Install
$ pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.4.6
$ bash install.sh
...
- 添加可执行权限
$ pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.4.6/bin
$ chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv
- NameServer
默认端口为9876
, 进入 devenv/bin
目录
$ pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.4.6/devenv/bin
$ sudo nohup sh mqnamesrv > ~/logs/ons.log &
log将打印在~/logs/ons.log
不出意外的话,应该会报错如下:
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
这个是阿里内部直接定义的,可以将 runserver.sh
中的以下内容注释掉(前提是确实正确的配置了JDK),并将JAVA直接写死为当前JAVA_HOME
houbinbindeMacBook-Pro:bin houbinbin$ which java
/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/bin/java
runserver.sh
#!/bin/sh
#===========================================================================================
# Java Environment Setting
#===========================================================================================
error_exit ()
{
echo "ERROR: $1 !!"
exit 1
}
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/opt/taobao/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
#export JAVA_HOME
export JAVA="/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
#===========================================================================================
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${HOME}/rmq_srv_gc.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
$JAVA ${JAVA_OPT} $@
重新执行:
log如下,则证明启动成功。
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
- shutdown
mqshutdown namesrv
mq broker
启动这个之前,需要制定出 NameServer地址:
$ export NAMESRV_ADDR=127.0.0.1:9876
- start
$ nohup sh mqbroker > ~/logs/ons.log &
log 如下:
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The broker[houbinbindeMacBook-Pro.local, 192.168.2.102:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
- shutdown
sudo bash mqshutdown broker
启动后简单查看端口占用信息如下:
houbinbindeMacBook-Pro:bin houbinbin$ sudo lsof -i:9876
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 1556 root 56u IPv6 0xba83a0ce19f11035 0t0 TCP *:sd (LISTEN)
houbinbindeMacBook-Pro:bin houbinbin$ sudo lsof -i:10911
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 1591 houbinbin 88u IPv6 0xba83a0ce19f0ea95 0t0 TCP *:10911 (LISTEN)
Maven Project
You can get the whole project from here.
The project struct may like this:
|____src
| |____main
| | |____java
| | | |____com
| | | | |____ryo
| | | | | |____rocket
| | | | | | |____demo
| | | | | | | |____common
| | | | | | | | |____consumer
| | | | | | | | | |____Consumer.java
| | | | | | | | |____productor
| | | | | | | | | |____Productor.java
| | |____resources
| | | |____log4j.properties
- pom.xml
<dependencies>
<!--rocketmq-->
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!--log-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
- log4j.properties
log4j.rootLogger=warn, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%t] %d{MM-dd HH:mm:ss,SSS} - %m%n
- Consumer.java
/**
* @author houbinbin
* @on 17/1/2
*/
public class Productor {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("127.0.0.1:9876");
try {
producer.start();
Message message = new Message("PushTopic", "push", "1", "Just fot test.".getBytes());
SendResult result = producer.send(message);
System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
message = new Message("PushTopic", "push", "3", "Just fot test.".getBytes());
result = producer.send(message);
System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
运行LOG如下:
id:C0A8026600002A9F000000000000011A result:SEND_OK
id:C0A8026600002A9F00000000000001A7 result:SEND_OK
- Consumer.java
/**
* @author houbinbin
* @on 17/1/4
*/
public class Consumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
try {
// 订阅PushTopic下的Tag未push的消息
consumer.subscribe("PushTopic", "push");
// 程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
Message message = msgs.get(0);
System.err.println(message.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行LOG如下:
MessageExt [queueId=1, storeSize=141, queueOffset=1, sysFlag=0, bornTimestamp=1483460812589, bornHost=/192.168.2.102:54835, storeTimestamp=1483460812590, storeHost=/192.168.2.102:10911, msgId=C0A8026600002A9F00000000000001A7, commitLogOffset=423, bodyCRC=1329428386, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=3, WAIT=true, TAGS=push}, body=14]]
MessageExt [queueId=0, storeSize=141, queueOffset=1, sysFlag=0, bornTimestamp=1483460812558, bornHost=/192.168.2.102:54835, storeTimestamp=1483460812575, storeHost=/192.168.2.102:10911, msgId=C0A8026600002A9F000000000000011A, commitLogOffset=282, bodyCRC=1329428386, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=1, WAIT=true, TAGS=push}, body=14]]
RocketMQ Console
Download it, or use
$ git clone https://github.com/rocketmq/rocketmq-console
This fork is compatible with RocketMQ 3.2.6
- install
$ pwd
/Users/houbinbin/it/tools/rocketMQ/rocketmq-console
$ mvn clean install
- get war in target package
$ pwd
/Users/houbinbin/it/tools/rocketMQ/rocketmq-console/target
$ ls
classes generated-sources maven-archiver rocketmq-console-SNAPSHOT-1.0 rocketmq-console-SNAPSHOT-1.0.war
- mv into tomcat and renamed into
ROOT.war
$ cp rocketmq-console-SNAPSHOT-1.0.war ~/it/tools/tomcat/tomcat8/webapps/ROOT.war
- start tomcat
$ cd ~/it/tool/tomcat/tomcat8/bin
$ ./startup.sh
- visit in the browser
localhost:8888
you may see
(1) 你是幸运的,顺利走到这一步。 (2) 你是不幸的,console目前进度较慢。使用时需要自己改造此项目。干巴爹!