Java ActiveMQ 理解JMS 和 ActiveMQ基本使用_java activemq jms header-程序员宅基地

技术标签: 开源社区  

最近的项目中用到了mq,之前自己一直在码农一样的照葫芦画瓢。最近几天研究了下,把自己所有看下来的文档和了解总结一下。

一. 认识JMS

1.概述

对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

简短来说,JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(java Database Connectivity),提供了应用程序之间异步通信的功能。

JMS1.0是jsr 194里规定的规范(关于jsr规范,请点击)。目前最新的规范是JSR 343,JMS2.0。

好了,说了这么多,其实只是在说,JMS只是sun公司为了统一厂商的接口规范,而定义出的一组api接口。

2. JMS体系结构

描述如下:

  • JMS提供者(JMS的实现者,比如activemq jbossmq等)
  • JMS客户(使用提供者发送消息的程序或对象,例如在12306中,负责发送一条购票消息到处理队列中,用来解决购票高峰问题,那么,发送消息到队列的程序和从队列获取消息的程序都叫做客户)
  • JMS生产者,JMS消费者(生产者及负责创建并发送消息的客户,消费者是负责接收并处理消息的客户)
  • JMS消息(在JMS客户之间传递数据的对象)
  • JMS队列(一个容纳那些被发送的等待阅读的消息的区域)
  • JMS主题(一种支持发送消息给多个订阅者的机制)

3. JMS对象模型

  • 连接工厂(connectionfactory)客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
  • JMS连接 表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
  • JMS会话 session 标识JMS客户端和服务端的会话状态。会话建立在JMS连接上,标识客户与服务器之间的一个会话进程。
  • JMS目的 Destinatio 又称为消息队列,是实际的消息源
  • 生产者和消费者
  • 消息类型,分为队列类型(优先先进先出)以及订阅类型

二. ActiveMQ

1. ActiveMQ的安装

  1. 从官网下载安装包, http://activemq.apache.org/download.html
  2. 赋予运行权限 chmod +x,windows可以忽略此步
  3. 运行 ./active start | stop

启动后,activeMQ会占用两个端口,一个是负责接收发送消息的tcp端口:61616,一个是基于web负责用户界面化管理的端口:8161。这两个端口可以在conf下面的xml中找到。http服务器使用了jettry。这里有个问题是启动mq后,很长时间管理界面才可以显示出来。

2. 用Java访问ActiveMQ

先附上Bean代码:

public class MqBean implements Serializable{
	private Integer age;
	private String name;
	public Integer getAge() {
		return age;
	}
	public void setAge(Integer age) {
		this.age = age;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
}
2.1 队列消息的发送:
public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	Connection connection;
	Session session;
	Destination destination;
	MessageProducer producer;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
	try {
		connection = connectionFactory.createConnection();
		connection.start();
		//第一个参数是是否是事务型消息,设置为true,第二个参数无效
		//第二个参数是
		//Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。异常也会确认消息,应该是在执行之前确认的
		//Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。可以在失败的
		//时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的连接不断开,其他的消费者也不会接受(正常情况下队列模式不存在其他消费者)
		//DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。
		//待测试
		session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		destination = session.createQueue("test-queue");
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		//优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢
		MqBean bean = new MqBean();
		bean.setAge(13);
		for(int i=0;i<100;i++){
			bean.setName("小黄"+i);
			producer.send(session.createObjectMessage(bean));
		}
		producer.close();
		System.out.println("呵呵");
	} catch (JMSException e) {
		e.printStackTrace();
	}
}

注:在上面的代码中,确认模式有三种,里面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直没明白有什么区别。因为无法测试。不过大概也明白了一些。其实主要是MQ处理消息的流程决定的:

  1. 消息从生成方客户端传送到消息服务器。
  2. 消息服务器读取消息。
  3. 消息被放置到持久性存储器当中(出于可靠性的考虑)。
  4. 消息服务器确认收到消息(出于可靠性的考虑)。
  5. 消息服务器确定消息的路由。
  6. 消息服务器写出消息。
  7. 消息从消息服务器传送到使用方客户端。
  8. 使用方客户端确认收到消息(出于可靠性的考虑)。
  9. 消息服务器处理客户端确认(出于可靠性的考虑)。
  10. 消息服务器确定已经处理客户端确认。

这些步骤是连续的,所以任何步骤都可能成为消息从生成方客户端到使用方客户端的传送过程的瓶颈。这些步骤中的大多数都取决于消息传送系统的物理特征:网络带宽、计算机处理速度和消息服务器体系结构等等。但是,有一些步骤还取决于消息传送应用程序的特征和该应用程序要求的可靠性级别。其实就是基于可靠性还是性能的选择.

2.2 队列消息的接收:
public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	// Connection :JMS 客户端到JMS Provider 的连接  
	Connection connection = null;
	// Session: 一个发送或接收消息的线程  
	Session session;
	// Destination :消息的目的地;消息发送给谁.  
	Destination destination;
	// 消费者,消息接收者  
	MessageConsumer consumer;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
	try {
		// 构造从工厂得到连接对象  
		connection = connectionFactory.createConnection();
		// 启动  
		connection.start();
		// 获取操作连接  
		//这个最好还是有事务
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
		destination = session.createQueue("test-queue");
		consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				try {
					MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
					System.out.println(bean);
					if (null != message) {
						System.out.println("收到消息" + bean.getName());
					}
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		});
	} catch (Exception e) {
		e.printStackTrace();
	}
}

注:对于队列来说,比较简单的优化策略,应该就是队列分载了。由于每个消费者都是单线程的,所以可以设置多个消费者来提高速度。大家可以复制个消费者自己测试下,在消费者中添加sleep测试下效果。

2.3 订阅消息的发送
public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	Connection connection;
	Session session;
	Destination destination;
	MessageProducer producer;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
	try {
		connection = connectionFactory.createConnection();
		connection.start();
		
		session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		destination = session.createTopic("test-topic");
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		//优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢
		MqBean bean = new MqBean();
		bean.setAge(13);
		for(int i=0;i<100;i++){
			Thread.sleep(1000);
			bean.setName("小黄"+i);
			producer.send(session.createObjectMessage(bean));
		}
		producer.close();
		System.out.println("呵呵");
	} catch (Exception e) {
		e.printStackTrace();
	}
}
2.4 订阅消息的接收
public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	// Connection :JMS 客户端到JMS Provider 的连接  
	Connection connection = null;
	// Session: 一个发送或接收消息的线程  
	Session session;
	// Destination :消息的目的地;消息发送给谁.  
	Destination destination;
	// 消费者,消息接收者  
	MessageConsumer consumer;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
	try {
		// 构造从工厂得到连接对象  
		connection = connectionFactory.createConnection();
		// 启动  
		connection.start();
		// 获取操作连接  
		//这个最好还是有事务
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
		destination = session.createQueue("test-queue");
		consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				try {
					MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
					System.out.println(bean);
					if (null != message) {
						System.out.println("收到消息" + bean.getName());
					}
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		});
	} catch (Exception e) {
		e.printStackTrace();
	}
}

以上的消息发送后,如果没有接收到,可以登录自己的MQ管理页面: http://192.168.3.159:8161/admin/ ,默认帐号密码都是admin,查看队列中的消息

Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数

Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减

Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量

本文转载自: http://www.cnblogs.com/luochengqiuse/p/4678020.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/xiaoniaofeifei35/article/details/76063553

智能推荐

angular6使用ngx-bootstrap/modal_ngx-bootstrap modal-程序员宅基地

文章浏览阅读6.4k次。首先下载ngx-bootstrap:npm install ngx-bootstrap --save下载后,在module.ts中导入ngx-bootstrap/modal类:import { ModalModule } from 'ngx-bootstrap'; imports: [ ... ModelModule, ModalModule.forRoot(..._ngx-bootstrap modal

Centos7利用rpm升级OpenSSH到openssh-8.1p1版本_openssh8.1升级使用rpm-程序员宅基地

文章浏览阅读3.4k次。参考:https://www.cnblogs.com/fsckzy/p/10834550.html Centos 升级至 OpenSSH 8 rpm包制作RPM打包使用的是rpmbuild命令,这个命令来自rpm-build软件包,这个是必装的。yum install rpm-build -y #安装rpm-build软件,以提供rpmbuild命令 69 ssh -V 70 rpm -qa openssh 71 yum install rpm-..._openssh8.1升级使用rpm

基于51单片机的门禁系统RFID_c51 指纹rfid门禁-程序员宅基地

文章浏览阅读1.4w次,点赞13次,收藏26次。代码量:542行代码开发与仿真周期:七天硬件开发周期:一天整机调试周期:一天简介:简单的门禁系统,包含RFID,时间,温度,1602液晶,按键。收获:①多个文件的keil工程的组织方法:Main函数包含各个头文件(.h);各个头文件中包含#ifndef,#define,#endif、声明函数、位定义、extern申明变量但不赋值;在各个C文件中对变量的进行赋值,包含头文件;在工_c51 指纹rfid门禁

ImportError: No module named torch-程序员宅基地

文章浏览阅读996次。ImportError: No module named torchimage and video datasets and models for torch deep learningThe torchvision package consists of popular datasets, model architectures, and common image transformations for computer vision.pip install torchvision_importerror: no module named torch

Mac安装虚拟机Parallels Desktop,以及Windows10系统详细教程_parallel win10镜像-程序员宅基地

文章浏览阅读4.1k次。Mac的老用户想必对Parallels Desktop已经非常熟悉,一款运行快速、操作简单、功能强大的应用程序,无需重启即可在您的Intel 或 Apple M 系列Mac 上运行Windows。包含 40 多种一键式工具,可简化 Mac 和Windows上的日常任务。无需重启,即可在 Mac 上运行 Windows。_parallel win10镜像

BZOJ_P1088&Codevs_P2452 [SCOI2005]扫雷(DP)_扫雷 dp-程序员宅基地

文章浏览阅读462次。2005年省队选拔赛四川 时间限制: 1 s 空间限制: 128000 KB 题目等级 : 大师 Master 题目描述 Description 相信大家都玩过扫雷的游戏。那是在一个n*m的矩阵里面有一些雷,要你根据一些信息找出雷来。万圣节到了,“余”人国流行起了一种简单的扫雷游戏,这个游戏规则和扫雷一样,如果某个格子没有雷,那么它里面的数字表示和它8连通的格子里面雷的数目。现_扫雷 dp

随便推点

搞定“超超超难”剑桥面试数学题番外篇:ARM64汇编_a+b+c+d=63, find max(ab+bc+cd)-程序员宅基地

文章浏览阅读1.1k次,点赞7次,收藏6次。在本篇博文中,我们用 ARM64 汇编实现了之前题目的算法,并用 SIMD 指令重写了其中对应的乘法(mul)运算操作。_a+b+c+d=63, find max(ab+bc+cd)

锤子android 7,锤子正式加入安卓7.1.1阵容 一加3/3T尝鲜氢OS公测版-程序员宅基地

文章浏览阅读213次。标签:氢OS(2)一加3T(153)前些日子nubia为旗下旗舰手机Z11推送了首个安卓7.1开发版更新,这也让nubia成了国内少数率先适配安卓7.1的手机厂商,拥有刷机小王子之称号的一加也不甘落后,今日,一加正式为旗下的一加3和3T推送了基于安卓7.1.1的氢OS系统更新(公测)。此次更新这两款手机迎来了众多新功能,包括支持便签同步、支持显示拍摄时的地理位置、支持图集隐藏等等。不过经网友反馈,..._smartisan os v7.1.0基于android

Elasticsearch、Logstash、Kibana在CentOs7下部署(单机版)_centos单节点部署elasticsearch+kibana-程序员宅基地

文章浏览阅读1.1k次。1.下载elasticsearch-6.4.0.tar.gz,路径:https://www.elastic.co/cn/downloads/elasticsearch2.设置资源参数vi /etc/security/limits.conf# 修改如下 * soft nofile 65536 * hard nofile 131072 * soft nproc..._centos单节点部署elasticsearch+kibana

参数估计:最大似然估计(MLE),最大后验估计(MAP),贝叶斯估计,经验贝叶斯(Empirical Bayes)与全贝叶斯(Full Bayes)_完全贝叶斯方法 和经验贝叶斯方法-程序员宅基地

文章浏览阅读1.8w次,点赞19次,收藏77次。介绍几个常见的参数估计方法:最大似然估计(MLE),最大后验估计(MAP),贝叶斯估计,经验贝叶斯(Empirical Bayes)与全贝叶斯(Full Bayes)。_完全贝叶斯方法 和经验贝叶斯方法

【网络原理】数据链路层_载波扩展-程序员宅基地

文章浏览阅读1.4k次。一、数据链路层的基础知识:1、数据链路层主要目的将原始的、有差错的物理线路变成无差错的数据链路。2、数据链路层主要功能1.链路管理:数据链路的建立、维护、释放;2.帧同步:接收方应从收到的比特流中正确地判断出一帧的开始与结束位;3.流量控制:控制发送方的数据发送速度,使得接收方来得及接收,以致网络不发生拥塞;4.差错控制:发现传输中出现的错误;5.透明传输:使接收方分辨数据还是控制信息;6.寻址:收发双方应知道对方是谁;3、数据链路层使用的信道1.点对点信道 1vs 12._载波扩展

CentOS7搭建Kubernetes v1.20集群_centos中kubernetes-1.20.0集群搭建-程序员宅基地

文章浏览阅读1.1k次,点赞4次,收藏8次。CentOS7搭建Kubernetes v1.20.1集群环境准备基本环境yum源配置功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入环境准备基本环境项目软件及版本操作系统Mac Mojave 10.14_centos中kubernetes-1.20.0集群搭建

推荐文章

热门文章

相关标签