亚洲国产成人精品在线播放_日韩第一页在线观看_人人插人人摸精品在线视频_日韩黄色成人电影_国产人成无码视频在线观看_国产91热爆ts人妖系列_免费观看欧美一级_午夜影院操一操黄片_午夜大片免费爽爽爽影院_日本少妇中文三级

聯(lián)系我們

公司地址: 上海市滬宜公路1188號4號樓
     一層
聯(lián)系電話:021-31080981
電子郵箱:[email protected]
郵政編碼:201802

消息中間件解決方案JMS

1、什么是消息中間件

        消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環(huán)境下擴展進程間的通信。對于消息中間件,常見的角色大致也就有Producer(生產(chǎn)者)、Consumer(消費者)

    常見的消息中間件產(chǎn)品

     (1)ActiveMQ
        ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實現(xiàn)。我們在本次課程中介紹 ActiveMQ的使用。    
    (2)RabbitMQ
        AMQP協(xié)議的領導實現(xiàn),支持多種場景。淘寶的MySQL集群內部有使用它進行通訊,OpenStack開源云平臺的通信組件,最先在金融行業(yè)得到運用。
    (3)ZeroMQ
        史上最快的消息隊列系統(tǒng)    
    (4)Kafka
        Apache下的一個子項目 。特點:高吞吐,在一臺普通的服務器上既可以達到10W/s的吞吐速率;完全的分布式系統(tǒng)。適合處理海量數(shù)據(jù)。

2、JMS簡介

    2.1、什么是JMS    

        JMS(Java?Messaging Service)是Java平臺上有關面向消息中間件的技術規(guī)范,它便于消息系統(tǒng)中的Java應用程序進行消息交換,并且通過提供標準的產(chǎn)生、發(fā)送、接收消息的接口簡化企業(yè)應用的開發(fā)。
       JMS本身只定義了一系列的接口規(guī)范,是一種與廠商無關的 API,用來訪問消息收發(fā)系統(tǒng)。它類似于 JDBC(java?Database Connectivity):這里,JDBC 是可以用來訪問許多不同關系數(shù)據(jù)庫的 API,而 JMS 則提供同樣與廠商無關的訪問方法,以訪問消息收發(fā)服務。許多廠商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。 JMS 使您能夠通過消息收發(fā)服務(有時稱為消息中介程序或路由器)從一個 JMS 客戶機向另一個 JML 客戶機發(fā)送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數(shù)據(jù)組成。消息主體則攜帶著應用程序的數(shù)據(jù)或有效負載。
    JMS 定義了五種不同的消息正文格式,以及調用的消息類型,允許你發(fā)送并接收以一些不同形式的數(shù)據(jù),提供
現(xiàn)有消息格式的一些級別的兼容性。
· TextMessage--一個字符串對象
· MapMessage--一套名稱-值對
· ObjectMessage--一個序列化的 Java 對象
· BytesMessage--一個字節(jié)的數(shù)據(jù)流
· StreamMessage -- Java 原始值的數(shù)據(jù)流

2.2、JMS消息傳遞類型

    對于消息的傳遞有兩種類型

        一種是點對點的,即一個生產(chǎn)者和一個消費者一一對應。

    


            另一種是發(fā)布/ 訂閱模式,即一個生產(chǎn)者產(chǎn)生消息并進行發(fā)送后,可以由多個消費者進
        行接收。
     


3、ActiveMQ下載與安裝

    3.1、官方網(wǎng)站下載

            官方網(wǎng)站下載:http://activemq.apache.org/

    3.2、安裝(Linux)

    (1)將apache-activemq-5.12.0-bin.tar.gz 上傳至服務器

    (2)解壓此文件

tar  zxvf  apache-activemq-5.12.0-bin.tar.gz
    (3)為apache-activemq-5.12.0目錄賦權
chmod 777 apache-activemq-5.12.0

    (4)進入apache-activemq-5.12.0\bin目錄

    (5)賦與執(zhí)行權限       

    (6)啟動

 ./activemq start

        出現(xiàn)下列提示表示成功!

     

假設服務器地址為192.168.25.135 ,打開瀏覽器輸入地址

            http://192.168.25.135:8161/ 即可進入ActiveMQ管理頁面

     

    點擊進入管理頁面

     

    輸入用戶名和密碼  均為 admin

    

    進入主界面

        

    點對點消息隊列

     

    列表各列含義

    Number Of Pending Messages  :等待消費的消息 這個是當前未出隊列的數(shù)量。
    Number Of Consumers  :消費者 這個是消費者端的消費者數(shù)量
    Messages Enqueued  :進入隊列的消息  進入隊列的總數(shù)量,包括出隊列的。
    Messages Dequeued  :出了隊列的消息  可以理解為是消費這消費掉的數(shù)量。

4、JMS小Demo

    4.1、點對點模式    

    點對點的模式主要建立在一個隊列上面,當連接一個列隊的時候,發(fā)送端不需要知道接收端是否正在接收,可以直接向ActiveMQ發(fā)送消息,發(fā)送的消息,將會先進入隊列中,如果有接收端在監(jiān)聽,則會發(fā)向接收端,如果沒有接收端接收,則會保存在activemq服務器,直到接收端接收消息,點對點的消息模式可以有多個發(fā)送端,多個接收端,但是一條消息,只會被一個接收端給接收到,哪個接收端先連上ActiveMQ,則會先接收到,而后來的接收端則接收不到那條消息。

    4.2、消息生產(chǎn)者       

    (1)創(chuàng)建工程jmsDemo ,引入依賴
  	<dependency>		<groupId>org.apache.activemq</groupId>		<artifactId>activemq-client</artifactId>		<version>5.13.4</version>	</dependency>

    (2)創(chuàng)建類QueueProducer  main方法代碼如下:

	//1.創(chuàng)建連接工廠	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.獲取連接	Connection connection = connectionFactory.createConnection();	//3.啟動連接	connection.start();	//4.獲取session  (參數(shù)1:是否啟動事務,參數(shù)2:消息確認模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);			//5.創(chuàng)建隊列對象	Queue queue = session.createQueue("test-queue");	//6.創(chuàng)建消息生產(chǎn)者	MessageProducer producer = session.createProducer(queue);	//7.創(chuàng)建消息	TextMessage textMessage = session.createTextMessage("歡迎來到神奇的品優(yōu)購世界");	//8.發(fā)送消息	producer.send(textMessage);	//9.關閉資源	producer.close();	session.close();	connection.close();
    上述代碼中第4步創(chuàng)建session  的兩個參數(shù):
        第1個參數(shù) 是否使用事務
        第2個參數(shù) 消息的確認模式
        ? AUTO_ACKNOWLEDGE = 1    自動確認
        ? CLIENT_ACKNOWLEDGE = 2    客戶端手動確認   
        ? DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
        ? SESSION_TRANSACTED = 0    事務提交并確認
        運行后通過ActiveMQ管理界面查詢    
    4.3、消息消費者  


        創(chuàng)建類QueueConsumer ,main方法代碼如下:
	//1.創(chuàng)建連接工廠	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.獲取連接	Connection connection = connectionFactory.createConnection();	//3.啟動連接	connection.start();	//4.獲取session  (參數(shù)1:是否啟動事務,參數(shù)2:消息確認模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	//5.創(chuàng)建隊列對象	Queue queue = session.createQueue("test-queue");	//6.創(chuàng)建消息消費	MessageConsumer consumer = session.createConsumer(queue);		//7.監(jiān)聽消息	consumer.setMessageListener(new MessageListener() {		public void onMessage(Message message) {			TextMessage textMessage=(TextMessage)message;			try {				System.out.println("接收到消息:"+textMessage.getText());			} catch (JMSException e) {				// TODO Auto-generated catch block				e.printStackTrace();			}		}	});		//8.等待鍵盤輸入	System.in.read();		//9.關閉資源	consumer.close();	session.close();	connection.close();	
    執(zhí)行后看到控制臺輸出

         

    運行測試

        同時開啟2個以上的消費者,再次運行生產(chǎn)者,觀察每個消費者控制臺的輸出,會發(fā)現(xiàn)只有一個消費者會接收到消息。

    5、發(fā)布/訂閱模式

        5.1、消息生產(chǎn)者 

        創(chuàng)建類TopicProducer ,main方法代碼如下:
        //1.創(chuàng)建連接工廠	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.獲取連接	Connection connection = connectionFactory.createConnection();	//3.啟動連接	connection.start();	//4.獲取session  (參數(shù)1:是否啟動事務,參數(shù)2:消息確認模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	//5.創(chuàng)建主題對象	Topic topic = session.createTopic("test-topic");	//6.創(chuàng)建消息生產(chǎn)者	MessageProducer producer = session.createProducer(topic);	//7.創(chuàng)建消息	TextMessage textMessage = session.createTextMessage("歡迎來到神奇的品優(yōu)購世界");	//8.發(fā)送消息	producer.send(textMessage);	//9.關閉資源	producer.close();	session.close();	connection.close();

    運行效果    

    5.2、消息消費者    

    創(chuàng)建類TopicConsumer ,main方法代碼如下:
	//1.創(chuàng)建連接工廠	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.獲取連接	Connection connection = connectionFactory.createConnection();	//3.啟動連接	connection.start();	//4.獲取session  (參數(shù)1:是否啟動事務,參數(shù)2:消息確認模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	//5.創(chuàng)建主題對象	//Queue queue = session.createQueue("test-queue");	Topic topic = session.createTopic("test-topic");	//6.創(chuàng)建消息消費	MessageConsumer consumer = session.createConsumer(topic);		//7.監(jiān)聽消息	consumer.setMessageListener(new MessageListener() {		public void onMessage(Message message) {			TextMessage textMessage=(TextMessage)message;			try {				System.out.println("接收到消息:"+textMessage.getText());			} catch (JMSException e) {				// TODO Auto-generated catch block				e.printStackTrace();			}		}	});	//8.等待鍵盤輸入	System.in.read();	//9.關閉資源	consumer.close();	session.close();	connection.close();	

5.3、運行測試        

            同時開啟2個以上的消費者,再次運行生產(chǎn)者,觀察每個消費者控制臺的輸出,會發(fā)現(xiàn)每個消費者會接收到消息。

6、Spring整合JMS

    6.1、點對點模式

          消息生產(chǎn)者

    (1)創(chuàng)建工程springjms_producer,在POM文件中引入SpringJms 、activeMQ以及單元測試相關依賴  
    (2)在src/main/resources下創(chuàng)建spring配置文件applicationContext-jms-producer.xml

	<context:component-scan base-package="cn.itcast.demo"></context:component-scan><!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">	<property name="brokerURL" value="tcp://192.168.25.135:61616"/>	</bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">	<!-- 目標ConnectionFactory對應真實的可以產(chǎn)生JMS Connection的ConnectionFactory -->	<property name="targetConnectionFactory" ref="targetConnectionFactory"/>	</bean>	   <!-- Spring提供的JMS工具類,它可以進行消息發(fā)送、接收等 -->	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">	<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->	<property name="connectionFactory" ref="connectionFactory"/>	</bean><!--這個是隊列目的地,點對點的  文本信息-->	<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">	<constructor-arg value="queue_text"/>	</bean>
(3)在cn.itcast.demo包下創(chuàng)建消息生產(chǎn)者類
@Componentpublic class QueueProducer {		@Autowired	private JmsTemplate jmsTemplate;		@Autowired	private Destination queueTextDestination;		/**	 * 發(fā)送文本消息	 * @param text	 */	public void sendTextMessage(final String text){		jmsTemplate.send(queueTextDestination, new MessageCreator() {						public Message createMessage(Session session) throws JMSException {				return session.createTextMessage(text);			}		});			}}

    (4)單元測試     

            在src/test/java創(chuàng)建測試類

@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-producer.xml")public class TestQueue {	@Autowired	private QueueProducer queueProducer;		@Test	public void testSend(){		queueProducer.sendTextMessage("SpringJms-點對點");	}	}

        消息消費者

(1)創(chuàng)建工程springjms_consumer,在POM文件中引入依賴(同上一個工程)
(2)創(chuàng)建配置文件 applicationContext-jms-consumer-queue.xml

    
<!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">	<property name="brokerURL" value="tcp://192.168.25.135:61616"/>	</bean>	   <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">	<!-- 目標ConnectionFactory對應真實的可以產(chǎn)生JMS Connection的ConnectionFactory -->	<property name="targetConnectionFactory" ref="targetConnectionFactory"/>	</bean>	<!--這個是隊列目的地,點對點的  文本信息-->	<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">	<constructor-arg value="queue_text"/>	</bean>	<!-- 我的監(jiān)聽類 -->	<bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener"></bean>	<!-- 消息監(jiān)聽容器 -->	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">		<property name="connectionFactory" ref="connectionFactory" />		<property name="destination" ref="queueTextDestination" />		<property name="messageListener" ref="myMessageListener" />	</bean>	

(3)編寫監(jiān)聽類    

public class MyMessageListener implements MessageListener {	public void onMessage(Message message) {	TextMessage textMessage=(TextMessage)message;				try {			System.out.println("接收到消息:"+textMessage.getText());		} catch (JMSException e) {			e.printStackTrace();		}	}}

(4)創(chuàng)建測試類

        
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-consumer-queue.xml")public class TestQueue {	@Test	public void testQueue(){		try {			System.in.read();		} catch (IOException e) {			e.printStackTrace();		}			}	}

6.2、發(fā)布/訂閱模式

        消息生產(chǎn)者    

(1)在工程springjms_producer的applicationContext-jms-producer.xml增加配置

    
	<!--這個是訂閱模式  文本信息-->	<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">	<constructor-arg value="topic_text"/>	</bean>

(2)創(chuàng)建生產(chǎn)者類

    
@Componentpublic class TopicProducer {	@Autowired	private JmsTemplate jmsTemplate;		@Autowired	private Destination topicTextDestination;		/**	 * 發(fā)送文本消息	 * @param text	 */	public void sendTextMessage(final String text){		jmsTemplate.send(topicTextDestination, new MessageCreator() {						public Message createMessage(Session session) throws JMSException {				return session.createTextMessage(text);			}		});			}}

(3)編寫測試類

import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import cn.itcast.demo.TopicProducer;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-activemq-producer.xml")public class TestTopic {	@Autowired	private TopicProducer topicProducer;	@Test	public void sendTextQueue(){				topicProducer.sendTextMessage();	}	}

    消息消費者        

(1)在activemq-spring-consumer工程中創(chuàng)建配置文件applicationContext-jms-consumer-topic.xml

<!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">	<property name="brokerURL" value="tcp://192.168.25.135:61616"/>	</bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">	<!-- 目標ConnectionFactory對應真實的可以產(chǎn)生JMS Connection的ConnectionFactory -->	<property name="targetConnectionFactory" ref="targetConnectionFactory"/>	</bean><!--這個是隊列目的地,點對點的  文本信息-->	<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">	<constructor-arg value="topic_text"/>	</bean>	<!-- 我的監(jiān)聽類 -->	<bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener"></bean>	<!-- 消息監(jiān)聽容器 -->	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">		<property name="connectionFactory" ref="connectionFactory" />		<property name="destination" ref="topicTextDestination" />		<property name="messageListener" ref="myMessageListener" />	</bean>

(2)編寫測試類    

@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-consumer-topic.xml")public class TestTopic {	@Test	public void testTopic(){		try {			System.in.read();		} catch (IOException e) {			e.printStackTrace();		}			}		}

    測試:同時運行三個消費者工程,在運行生產(chǎn)者工程,查看三個消費者工程的控制臺輸出。