Message Queue

消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布式集群技术, 提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。 为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性。

mq

Active MQ Service->Request MQ Resource: Request MQ Resource->Send Msg By SDK: Send Msg By SDK->Consumer Msg:

Active Service

1、 开通服务

阿里云主页-》产品-》互联网中间件-》RocketMQ-》开通

2、 下载demo

git clone git@github.com:lollipopjin/Aliware-MQ-demo.git

3、IDEA 导入项目并添加 maven 支持。

4、管理平台

(1) TOPIC管理-》发布Topic

定义一个topic: MQ_TEST_DEMO_TOPIC 备注: MQ_TEST

(2) TOPIC管理-》发布Topic-》申请发布

Producer ID: PID_RYO_MQ_TEST

(3) TOPIC管理-》发布Topic-》申请订阅

Consumer ID: CID_RYO_MQ_TEST

MQConfig

创建Access Key-》

(1) AccessKey (2) SecretKey

Run demo

MqConfig.java中数据修改为设置的值。直接运行 main 方法。

Quick Start

quick start

需要

64bit OS, best to have Linux/Unix/Mac;
64bit JDK 1.6+;
Maven 3.x
Git
Screen
  • JDK
houbinbindeMacBook-Pro:aliyun-ons-client-java houbinbin$ java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

Download and install

  • Download rocketmq

download from this https://github.com/alibaba/RocketMQ/releases, now tag is 3.5.8

$ ls
RocketMQ-3.5.8		RocketMQ-3.5.8.tar.gz
  • Install
$ pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.5.8

$ bash instal.sh
...

then we get

 To refactor, move this assembly into a child project and use the flag <useAllReactorProjects>true</useAllReactorProjects> in each moduleSet.
[INFO] Building tar: /Users/houbinbin/IT/tools/rocketMQ/RocketMQ-3.5.8/target/alibaba-rocketmq-broker.tar.gz
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] rocketmq-all 3.5.8 ................................. SUCCESS [ 48.565 s]
[INFO] rocketmq-remoting 3.5.8 ............................ SUCCESS [  1.772 s]
[INFO] rocketmq-common 3.5.8 .............................. SUCCESS [  1.348 s]
[INFO] rocketmq-client 3.5.8 .............................. SUCCESS [  1.522 s]
[INFO] rocketmq-store 3.5.8 ............................... SUCCESS [  0.815 s]
[INFO] rocketmq-srvutil 3.5.8 ............................. SUCCESS [  0.139 s]
[INFO] rocketmq-broker 3.5.8 .............................. SUCCESS [  0.951 s]
[INFO] rocketmq-tools 3.5.8 ............................... SUCCESS [  0.889 s]
[INFO] rocketmq-namesrv 3.5.8 ............................. SUCCESS [  0.373 s]
[INFO] rocketmq-example 3.5.8 ............................. SUCCESS [  0.392 s]
[INFO] rocketmq-filtersrv 3.5.8 ........................... SUCCESS [  0.337 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 59.323 s
[INFO] Finished at: 2017-01-02T18:21:01+08:00
[INFO] Final Memory: 52M/1042M
[INFO] ------------------------------------------------------------------------

环境配置

Make sure the following environment variable is properly setup: JAVA_HOME

Now setup ROCKETMQ_HOME environment variable:

cd devenv

echo "ROCKETMQ_HOME=`pwd`" >> ~/.bash_profile

make it work at once:

source ~/.bash_profile

Launch RocketMQ Name Server and Broker

确保在devenv目录下:

$ cd bin
$ pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.5.8/devenv/bin
  • Start Name Server
screen bash mqnamesrv

现实是可能会遇到这个:

$ screen bash mqnamesrv
[screen is terminating]

screen is terminating

执行以下命令(暂时提权):

su

使用 screen 命令进入, 运行:

$   bash mqnamesrv

then

sh-3.2# bash mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

If you see “The Name Server boot success. serializeType=JSON”, this means name server starts successfully.

Press Ctrl + A, then D to detach the session.

sh-3.2# screen
[detached]
  • Start Broker
sh-3.2# screen
sh-3.2# pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.5.8/devenv/bin
sh-3.2# bash mqbroker -n localhost:9876
cp: /var/root/rmq_bk_gc.log: No such file or directory
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /var/root/tmpfs/logs/gc.log due to No such file or directory

The broker[houbinbindeMacBook-Pro.local, 192.168.2.102:10911] boot success. serializeType=JSON and name server is localhost:9876

If you see output like the following:

The broker[lizhanhui-Lenovo, 172.30.30.233:10911] boot success. serializeType=JSON and name server is localhost:9876 your broker then runs successfully.

Send & Receive MSG

Before sending/receiving messages, we need to tell clients where name servers are located. RocketMQ provides multiple ways to achieve this. For simplicity, we use environment variable NAMESRV_ADDR

$   export NAMESRV_ADDR=localhost:9876

Test In Mac

3.4.6 demo zh_CN

blog zh_CN

rocketmq console zh_CN

分布式

  • Download

download from this https://github.com/alibaba/RocketMQ/archive/v3.4.6.tar.gz, now tag is 3.4.6

$ pwd
/Users/houbinbin/it/tools/rocketMQ
$ ls
RocketMQ-3.4.6		RocketMQ-3.4.6.tar.gz
  • Install
$ pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.4.6
$ bash install.sh

...
  • 添加可执行权限
$   pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.4.6/bin
$ chmod +x mqadmin mqbroker mqfiltersrv mqshutdown  mqnamesrv
  • NameServer

默认端口为9876, 进入 devenv/bin 目录

$ pwd
/Users/houbinbin/it/tools/rocketMQ/RocketMQ-3.4.6/devenv/bin

$   sudo nohup sh mqnamesrv > ~/logs/ons.log &

log将打印在~/logs/ons.log

不出意外的话,应该会报错如下:

ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

mac error

这个是阿里内部直接定义的,可以将 runserver.sh 中的以下内容注释掉(前提是确实正确的配置了JDK),并将JAVA直接写死为当前JAVA_HOME

houbinbindeMacBook-Pro:bin houbinbin$ which java
/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/bin/java
  • runserver.sh
#!/bin/sh

#===========================================================================================
# Java Environment Setting
#===========================================================================================
error_exit ()
{
    echo "ERROR: $1 !!"
    exit 1
}

#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/opt/taobao/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

#export JAVA_HOME
export JAVA="/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

#===========================================================================================
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${HOME}/rmq_srv_gc.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

$JAVA ${JAVA_OPT} $@

重新执行:

log如下,则证明启动成功。

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
  • shutdown
 mqshutdown namesrv

mq broker

启动这个之前,需要制定出 NameServer地址:

$   export NAMESRV_ADDR=127.0.0.1:9876
  • start
$   nohup sh  mqbroker > ~/logs/ons.log &

log 如下:

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The broker[houbinbindeMacBook-Pro.local, 192.168.2.102:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
  • shutdown
sudo bash mqshutdown broker

启动后简单查看端口占用信息如下:

houbinbindeMacBook-Pro:bin houbinbin$ sudo lsof -i:9876
COMMAND  PID USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
java    1556 root   56u  IPv6 0xba83a0ce19f11035      0t0  TCP *:sd (LISTEN)
houbinbindeMacBook-Pro:bin houbinbin$ sudo lsof -i:10911
COMMAND  PID      USER   FD   TYPE             DEVICE SIZE/OFF NODE NAME
java    1591 houbinbin   88u  IPv6 0xba83a0ce19f0ea95      0t0  TCP *:10911 (LISTEN)

Maven Project

You can get the whole project from here.

The project struct may like this:

|____src
| |____main
| | |____java
| | | |____com
| | | | |____ryo
| | | | | |____rocket
| | | | | | |____demo
| | | | | | | |____common
| | | | | | | | |____consumer
| | | | | | | | | |____Consumer.java
| | | | | | | | |____productor
| | | | | | | | | |____Productor.java
| | |____resources
| | | |____log4j.properties
  • pom.xml
<dependencies>

    <!--rocketmq-->
    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>${rocketmq.version}</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-common</artifactId>
        <version>${rocketmq.version}</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-remoting</artifactId>
        <version>${rocketmq.version}</version>
    </dependency>


    <!--log-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>jcl-over-slf4j</artifactId>
        <version>1.7.7</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
</dependencies>
  • log4j.properties
log4j.rootLogger=warn, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%t] %d{MM-dd HH:mm:ss,SSS} - %m%n
  • Consumer.java
/**
 * @author houbinbin
 * @on 17/1/2
 */
public class Productor {

    public static void main(String[] args) {

        DefaultMQProducer producer = new DefaultMQProducer("Producer");
        producer.setNamesrvAddr("127.0.0.1:9876");

        try {

            producer.start();

            Message message = new Message("PushTopic", "push", "1", "Just fot test.".getBytes());
            SendResult result = producer.send(message);
            System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());

            message = new Message("PushTopic", "push", "3", "Just fot test.".getBytes());

            result = producer.send(message);
            System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }
}

运行LOG如下:

id:C0A8026600002A9F000000000000011A result:SEND_OK
id:C0A8026600002A9F00000000000001A7 result:SEND_OK
  • Consumer.java
/**
 * @author houbinbin
 * @on 17/1/4
 */
public class Consumer {

    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        try {
            // 订阅PushTopic下的Tag未push的消息
            consumer.subscribe("PushTopic", "push");
            // 程序第一次启动从消息队列头取数据
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    Message message = msgs.get(0);
                    System.err.println(message.toString());
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行LOG如下:

MessageExt [queueId=1, storeSize=141, queueOffset=1, sysFlag=0, bornTimestamp=1483460812589, bornHost=/192.168.2.102:54835, storeTimestamp=1483460812590, storeHost=/192.168.2.102:10911, msgId=C0A8026600002A9F00000000000001A7, commitLogOffset=423, bodyCRC=1329428386, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=3, WAIT=true, TAGS=push}, body=14]]
MessageExt [queueId=0, storeSize=141, queueOffset=1, sysFlag=0, bornTimestamp=1483460812558, bornHost=/192.168.2.102:54835, storeTimestamp=1483460812575, storeHost=/192.168.2.102:10911, msgId=C0A8026600002A9F000000000000011A, commitLogOffset=282, bodyCRC=1329428386, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, KEYS=1, WAIT=true, TAGS=push}, body=14]]

RocketMQ Console

Download it, or use

$ git clone https://github.com/rocketmq/rocketmq-console

This fork is compatible with RocketMQ 3.2.6

  • install
$ pwd
/Users/houbinbin/it/tools/rocketMQ/rocketmq-console
$ mvn clean install
  • get war in target package
$ pwd
/Users/houbinbin/it/tools/rocketMQ/rocketmq-console/target
$ ls
classes					generated-sources			maven-archiver				rocketmq-console-SNAPSHOT-1.0		rocketmq-console-SNAPSHOT-1.0.war
  • mv into tomcat and renamed into ROOT.war
$ cp rocketmq-console-SNAPSHOT-1.0.war ~/it/tools/tomcat/tomcat8/webapps/ROOT.war
  • start tomcat
$   cd ~/it/tool/tomcat/tomcat8/bin
$   ./startup.sh
  • visit in the browser
localhost:8888

you may see

index

(1) 你是幸运的,顺利走到这一步。 (2) 你是不幸的,console目前进度较慢。使用时需要自己改造此项目。干巴爹!