<strike id="rtjff"><noframes id="rtjff"><strike id="rtjff"></strike>
<progress id="rtjff"><address id="rtjff"></address></progress>
<th id="rtjff"></th>
<span id="rtjff"><video id="rtjff"></video></span><th id="rtjff"><video id="rtjff"><span id="rtjff"></span></video></th><progress id="rtjff"><noframes id="rtjff"><strike id="rtjff"><video id="rtjff"><strike id="rtjff"></strike></video></strike>
<th id="rtjff"></th><strike id="rtjff"><noframes id="rtjff"><strike id="rtjff"></strike>
<span id="rtjff"><video id="rtjff"></video></span>
<span id="rtjff"></span><span id="rtjff"><noframes id="rtjff">
<span id="rtjff"></span>
<th id="rtjff"><noframes id="rtjff"><th id="rtjff"></th>
<th id="rtjff"><noframes id="rtjff">
<span id="rtjff"><noframes id="rtjff">
  1. 首頁 > 汽車知識網 > 汽車知識

FOXQ1,FOXQ1自噬

什么是 RabbitMQ

MQ(Message Queue)消息隊列

消息隊列中間件,是分布式系統中的重要組件;主要解決異步處理、應用解耦、流量削峰等問題,從而實現高性能,高可用,可伸縮和最終一致性的架構。

使用較多的消息隊列產品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka 等。

異步處理

用戶注冊后,需要發送驗證郵箱和手機驗證碼。

將注冊信息寫入數據庫,發送驗證郵件,發送手機,三個步驟全部完成后,返回給客戶端。

傳統:

客戶端 <-> 注冊信息寫入數據庫 -> 發送注冊郵件 -> 發送注冊短信

現在:

客戶端 <-> 注冊信息寫入數據庫 -> 寫入消息隊列 -> 異步 [發送注冊郵件,發送注冊短信]

應用解耦

場景:訂單系統需要通知庫存系統。

如果庫存系統異常,則訂單調用庫存失敗,導致下單失敗。

原因:訂單系統和庫存系統耦合度太高。

傳統:

用戶 <-> 訂單系統 - 調用庫存接口 -> 庫存系統

現在:

用戶 <-> 訂單系統 - 寫入 -> 消息隊列 <- 訂閱 - 庫存系統

訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶,下單成功。

庫存系統:訂閱下單的消息,獲取下單信息,庫存系統根據下單信息,再進行庫存操作。

假如:下單的時候,庫存系統不能正常運行,也不會影響下單,因為下單后,訂單系統寫入消息隊列就不再關心其他的后續操作了,實現了訂單系統和庫存系統的應用解耦。

所以,消息隊列是典型的“生產者-消費者“模型。

生產者不斷的向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。

因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的入侵,這樣就實現了生產者和消費者的解耦。

流量削峰

搶購,秒殺等業務,針對高并發的場景。

因為流量過大,暴增會導致應用掛掉,為解決這個問題,在前端加入消息隊列。

用戶的請求,服務器接收后,首先寫入消息隊列,如果超過隊列的長度,就拋棄,發送一個結束的頁面;而請求成功的就是進入隊列的用戶。

背景知識介紹

AMQP 高級消息隊列協議

Advanced Message Queuing Protocol 是一個提供統一消息服務的應用層標準高級消息隊列協議。

協議:數據在傳輸的過程中必須要遵守的規則。

基于此協議的客戶端可以與消息中間件傳遞消息。

并不受產品、開發語言等條件的限制。

JMS

Java Message Server 是 Java 消息服務應用程序接口,一種規范,和 JDBC 擔任的角色類似。

JMS 是一個 Java 平臺中關于面向消息中間件的 API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。

二者的聯系

JMS 是定義了統一接口,統一消息操作;AMQP 通過協議統一數據交互格式。

JMS 必須是 Java 語言;AMQP 只是協議,與語言無關。

Erlang 語言

Erlang 是一種通用的面向并發的編程語言,目的是創造一種可以應對大規模并發活動的編程語言和運行環境。

最初是專門為通信應用設計的,比如控制交換機或者變換協議等,因此非常適合構建分布式,實時軟并行計算系統。

Erlang 運行時環境是一個虛擬機,有點像 Java 的虛擬機,這樣代碼一經編譯,同樣可以隨處運行。

為什么選擇 RabbitMQ

RabbitMQ 由 Erlang 開發,AMQP 的最佳搭檔,安裝部署簡單,上手門檻低。

企業級消息隊列,經過大量實踐考驗的高可靠,大量成功的應用案例,例如阿里、網易等一線大廠都有使用。

有強大的 WEB 管理頁面。

強大的社區支持,為技術進步提供動力。

FOXQ1

支持消息持久化、支持消息確認機制、靈活的任務分發機制等,支持功能非常豐富。

集群擴展很容易,并且可以通過增加節點實現成倍的性能提升。

總結:如果希望使用一個可靠性高、功能強大、易于管理的消息隊列系統那么就選擇 RabbitMQ;如果想用一個性能高,但偶爾丟點數據,可以使用 Kafka 或者 ZeroMQ。

1.和男/女朋友進行到哪一步了 2.最喜歡在座哪位異性 3.內衣/褲顏色(這個,如果不太熟要慎用啊~)4.初吻年齡 5.自己最丟人的事 6.最后一次發自內心的笑是什么時候?7.愿意為愛情犧牲到什么程度 8.朋友和男/女朋。

Kafka 和 ZeroMQ 的性能比 RabbitMQ 好很多。

RabbitMQ 各組件功能

Publisher --> Exchange --banding--> Queue --> Connection --> Consumer|-------------------------------|| |-------------------------| || | |------------------|| || | |Exchange --> Queue|| || | |------------------|| || | Virtual Host| || |-------------------------| ||Broker ||-------------------------------|?Broker 包含 Virtual HostVirtual Host 包含 Exchange 和 Queue?Connection 包含多個 Channel

Broker - 消息隊列服務器實體。

Virtual Host - 虛擬主機:

標識一批交換機、消息隊列和相關對象,形成的整體。

虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。

每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。

VHost 是 AMQP 概念的基礎,RabbitMQ 默認的 vhost 是 ,必須在鏈接時指定。

Exchange - 交換器(路由):用來接收生產者發送的消息并將這些消息通過路由發給服務器中的隊列。

Banding - 綁定。

Queue - 消息隊列:

用來保存消息直到發送給消費者。

它是消息的容器,也是消息的終點。

一個消息可投入一個或多個隊列。

消息一直在隊列里面,等待消費者連接到這個隊列將其取走。

Banding - 綁定:用于消息隊列和交換機之間的關聯。

Channel - 通道(信道):

多路復用連接中的一條獨立的雙向數據流通道。

信道是建立在真實的 TCP 連接內的虛擬鏈接。

AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,都是通過信道完成的。

因為對于操作系統來說,建立和銷毀 TCP 連接都是非常昂貴的開銷,所以引入了信道的概 念,用來復用 TCP 連接。

Connection - 網絡連接,比如一個 TCP 連接。

Publisher - 消息的生產者,也是一個向交換器發布消息的客戶端應用程序。

Consumer - 消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

Message - 消息:

消息是不具名的,它是由消息頭和消息體組成。

消息體是不透明的,而消息頭則是由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(優先級)、delivery-mode(消息可能需要持久性存儲[消息的路由模式])等。

使用 RabbitMQ

想要安裝 RabbitMQ,必須先安裝 erlang 語言環境;類似安裝 tomcat,必須先安裝 JDK。

查看匹配的版本:

RabbitMQ 安裝啟動

Erlang 下載:

Socat 下載:

RabbitMQ 下載:

安裝

啟動 Linux 系統(192.168.186.128),傳輸相關的三個 rpm 到 opt 目錄下,然后在 opt 目錄下按順序執行安裝命令:

rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpmrpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpmrpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm

啟動后臺管理插件

rabbitmq-plugins enable rabbitmq_management

啟動 RabbitMQ

systemctl start rabbitmq-server.servicesystemctl status rabbitmq-server.servicesystemctl restart rabbitmq-server.servicesystemctl stop rabbitmq-server.service

查看進程

ps -ef | grep rabbitmq

測試

1) 防火墻開放對應的端口號

firewall-cmd --zone=public --add-port=15672/tcp --permanentfirewall-cmd --zone=public --add-port=5671/tcp --permanentfirewall-cmd --zone=public --add-port=5672/tcp --permanentfirewall-cmd --zone=public --add-port=25672/tcp --permanentfirewall-cmd --reload

3)默認帳號和密碼是 guest,而 guest 用戶默認不允許遠程連接

創建賬號:

rabbitmqctl add_user renda 123456

設置用戶角色:

rabbitmqctl set_user_tags renda administrator

設置用戶權限:

rabbitmqctl set_permissions -p &34; renda &34; &34; &34;

查看當前用戶和角色:

rabbitmqctl list_users

修改用戶密碼:

rabbitmqctl change_password renda NewPassword

管理界面介紹:

Overview - 概覽

Connections - 查看鏈接情況

Channels - 信道(通道)情況

Exchanges - 交換機(路由)情況,默認4類7個

Queues - 消息隊列情況

Admin - 管理員列表

RabbitMQ 提供給編程語言客戶端鏈接的端口 - 5672;RabbitMQ 管理界面的端口 15672;RabbitMQ 集群的端口 - 25672。

RabbitMQ 快速入門

依賴

UTF-8UTF-81.111111com.rabbitmqamqp-client5.7.3org.slf4jslf4j-log4j121.7.25compileorg.apache.commonscommons-lang33.9

日志依賴 log4j(可選項)

log4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target=System.outlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%nlog4j.appender.file=org.apache.log4j.FileAppenderlog4j.appender.file.File=rebbitmq.loglog4j.appender.file.layout=org.apache.log4j.PatternLayoutlog4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%nlog4j.rootLogger=debug,stdout,file

創建連接

先在 RabbitMQ 管理界面 Admin -> Virtual Hosts -> Add a new virtual host 創建虛擬主機 (Name: renda,Description: 張人大,Tags: administrator);

然后編寫連接的代碼:

public class ConnectionUtil {public static Connection getConnection() throwsException{// 1.創建連接工廠ConnectionFactory factory = new ConnectionFactory();// 2.在工廠對象中設置 MQ 的連接信息(ip,port,vhost,username,password)factory.setHost(&34;);factory.setPort(5672);factory.setVirtualHost(&34;);factory.setUsername(&34;);factory.setPassword(&34;);// 3.通過工廠獲得與 MQ 的連接return factory.newConnection();}public static void main(String[] args) throws Exception {Connection connection = getConnection();System.out.println(&34; + connection);connection.close();}}

RabbitMQ 模式

RabbitMQ 提供了 6 種消息模型,但是第 6 種其實是 RPC,并不是 MQ。

在線手冊:

5 種消息模型,大體分為兩類:

1 和 2 屬于點對點。

3、4、5 屬于發布訂閱模式(一對多)。

點對點模式 - P2P(Point to Point)模式:

包含三個角色:消息隊列 queue,發送者 sender,接收者 receiver。

每個消息發送到一個特定的隊列中,接收者從中獲得消息。

隊列中保留這些消息,直到他們被消費或超時。

如果希望發送的每個消息都會被成功處理,那需要 P2P。

點對點模式特點:

每個消息只有一個消費者,一旦消費,消息就不在隊列中了。

發送者和接收者之間沒有依賴性,發送者發送完成,不管接收者是否運行,都不會影響消息發送到隊列中。

接收者成功接收消息之后需向對象應答成功(確認)。

發布訂閱模式 - publish / subscribe 模式:

Pub / Sub 模式包含三個角色:交換機 exchange,發布者 publisher,訂閱者 subcriber。

多個發布者將消息發送交換機,系統將這些消息傳遞給多個訂閱者。

如果希望發送的消息被多個消費者處理,可采用 Pub / Sub。

發布訂閱模式特點:

每個消息可以有多個訂閱者。

發布者和訂閱者之間在時間上有依賴,對于某個交換機的訂閱者,必須創建一個訂閱后,才能消費發布者的消息。

為了消費消息,訂閱者必須保持運行狀態。

FOXQ1

簡單模式

RabbitMQ 本身只是接收,存儲和轉發消息,并不會對信息進行處理;類似郵局,處理信件的應該是收件人而不是郵局。

生產者 P

public class Sender {?public static void main(String[] args) throws Exception {String msg = &34;;?// 1.獲得連接Connection connection = ConnectionUtil.getConnection();// 2.在連接中創建通道(信道)Channel channel = connection.createChannel();// 3.創建消息隊列 (1,2,3,4,5)/*參數 1: 隊列的名稱參數 2: 隊列中的數據是否持久化參數 3: 是否排外(是否支持擴展,當前隊列只能自己用,不能給別人用)參數 4: 是否自動刪除(當隊列的連接數為 0 時,隊列會銷毀,不管隊列是否還存保存數據)參數 5: 隊列參數(沒有參數為 null) */channel.queueDeclare(&34;,false,false,false,null);// 4.向指定的隊列發送消息 (1,2,3,4)/*參數 1: 交換機名稱,當前是簡單模式,也就是 P2P 模式,沒有交換機,所以名稱為 &34;參數 2: 目標隊列的名稱參數 3: 設置消息的屬性(沒有屬性則為 null)參數 4: 消息的內容 (只接收字節數組) */channel.basicPublish(&34;,&34;,null,msg.getBytes());System.out.println(&34; + msg);// 5.釋放資源channel.close();connection.close();}?}

啟動生產者,即可前往管理端查看隊列中的信息,會有一條信息沒有處理。

消費者 C

public class Receiver {public static void main(String[] args) throws Exception {// 1.獲得連接Connection connection = ConnectionUtil.getConnection();// 2.獲得通道(信道)Channel channel = connection.createChannel();// 3.從信道中獲得消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 交付處理(收件人信息,包裹上的快遞標簽,協議的配置,FOXQ1自噬,消息)@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {// body 就是從隊列中獲取的消息String s = new String(body);System.out.println(&34; + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(&34;,true,consumer);}}

啟動消費者,前往管理端查看隊列中的信息,所有信息都已經處理和確認,顯示 0。

消息確認機制 ACK

通過剛才的案例可以看出,消息一旦被消費,消息就會立刻從隊列中移除。

如果消費者接收消息后,還沒執行操作就拋異常宕機導致消費失敗,但是 RabbitMQ 無從得知,這樣消息就丟失了。

因此,RabbitMQ 有一個 ACK 機制,當消費者獲取消息后,會向 RabbitMQ 發送回執 ACK,告知消息已經被接收。

ACK - Acknowledge character 即是確認字符,在數據通信中,接收站發給發送站的一種傳輸類控制字符,表示發來的數據已確認接收無誤。在使用 http 請求時,http 的狀態碼 200 就是表示服務器執行成功。

整個過程就像快遞員將包裹送到你手里,并且需要你的簽字,并拍照回執。

不過這種回執 ACK 分為兩種情況:

自動 ACK - 消息接收后,消費者立刻自動發送 ACK,類似快遞放在快遞柜。

手動 ACK - 消息接收后,不會發送 ACK,需要手動調用,類似快遞必須本人簽收。

兩種情況如何選擇,需要看消息的重要性:

如果消息不太重要,丟失也沒有影響,自動 ACK 會比較方便。

如果消息非常重要,最好消費完成手動 ACK;如果自動 ACK 消費后,RabbitMQ 就會把消息從隊列中刪除,而此時消費者拋異常宕機,那么消息就永久丟失了。

修改啟動手動 ACK 消息確認:

/ 監聽隊列 false: 手動消息確認channel.basicConsume(&34;,false,consumer);

啟動生產者和消費者,前往管理端查看隊列中的信息,會有一條信息沒有確認(Unacked)。

手動 ACK 消息確認解決問題:

public class ReceiverAck {?public static void main(String[] args) throws Exception {// 1.獲得連接Connection connection = ConnectionUtil.getConnection();// 2.獲得通道(信道)final Channel channel = connection.createChannel();?// 3.從信道中獲得消息DefaultConsumer consumer = new DefaultConsumer(channel) {// 交付處理(收件人信息,包裹上的快遞標簽,協議的配置,消息)@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {// body就是從隊列中獲取的消息String s = new String(body);System.out.println(&34; + s);// 手動確認(收件人信息,是否同時確認多個消息)channel.basicAck(envelope.getDeliveryTag(),false);}};// 4.監聽隊列 false: 手動消息確認channel.basicConsume(&34;,false,consumer);}?}

工作隊列模式

當運行許多消費者程序時,消息隊列中的任務會被眾多消費者共享,但其中某一個消息只會被一個消費者獲取(100 支肉串 20 個人吃,但是其中的某支肉串只能被一個人吃)。

生產者 P

public class Sender {?public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();?channel.queueDeclare(&34;,false,false,false,null);?for(int i = 1;i<=100;i++) {String msg = &34; + i;channel.basicPublish(&34;,&34;,null,msg.getBytes());System.out.println(msg);}?channel.close();connection.close();}?}

消費者 1

public class Receiver1 {?// 統計獲取的信息的數量static int counter = 1;?public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();?// queueDeclare() 此方法有雙重作用,如果隊列不存在,就創建;如果隊列存在,則獲取channel.queueDeclare(&34;,false,false,false,null);?DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String s = new String(body);System.out.println(&34; + s + &34; + counter++);// 模擬網絡延遲 200 毫秒try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}// 手動確認(收件人信息,是否同時確認多個消息)channel.basicAck(envelope.getDeliveryTag(),false);}};// 4.監聽隊列 false:手動消息確認channel.basicConsume(&34;,false,consumer);}?}

消費者 2

public class Receiver2 {?// 統計獲取的信息的數量static int counter = 1;?public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();?// queueDeclare() 此方法有雙重作用,如果隊列不存在,就創建;如果隊列存在,則獲取channel.queueDeclare(&34;,false,false,false,null);?DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String s = new String(body);System.out.println(&34; + s + &34; + counter++);// 模擬網絡延遲 900 毫秒try {Thread.sleep(900);} catch (InterruptedException e) {e.printStackTrace();}// 手動確認(收件人信息,是否同時確認多個消息)channel.basicAck(envelope.getDeliveryTag(),false);}};// 4.監聽隊列 false:手動消息確認channel.basicConsume(&34;,false,consumer);}?}

能者多勞

先運行 2 個消費者,排隊等候消費(取餐),再運行生產者開始生產消息(烤肉串)。

由運行結果可以看到,雖然兩個消費者的消費速度不一致(線程休眠時間),但是消費的數量卻是一致的,各消費 50 個消息。

效率高的多干點,效率低的少干點。

為了克服這個問題,可以使用設置為 prefetchCount = 1 的 basicQos 方法。這告訴RabbitMQ 一次不要給一個 worker 發送一條以上的消息。或者,換句話說,在 worker 處理并確認前一個消息之前,不要向它發送新消息。相反,它將把它分派到下一個不繁忙的 worker。

在消費者 1 和消費者 2 中加上 channel.basicQos(1):

...// queueDeclare() 此方法有雙重作用,如果隊列不存在,就創建;如果隊列存在,則獲取channel.queueDeclare(&34;,false,false,false,null);// 開啟一次接受一條消息。可以理解為:快遞一個一個送,送完一個再送下一個,速度快的送件就多channel.basicQos(1);...

能者多勞必須要配合手動的 ACK 機制才生效。

如何避免消息堆積?

Workqueue,多個消費者監聽同一個隊列。

接收到消息后,通過線程池,異步消費。

發布/訂閱模式

工作隊列背后的假設是,每個任務都被準確地交付給一個工作者;“發布/訂閱”模式將一個消息傳遞給多個消費者。

生活中的案例:眾多粉絲關注一個視頻主,視頻主發布視頻,所有粉絲都可以得到視頻通知。

生產者 P 發送信息給路由 X,路由 X 將信息轉發給綁定路由 X 的隊列;隊列將信息通過信道發送給消費者,最后消費者進行消費。整個過程,必須先創建路由。

路由在生產者程序中創建。

路由沒有存儲消息的能力,當生產者將信息發送給路由后,消費者還沒有運行,所以沒有隊列,路由并不知道將信息發送給誰。

運行程序的順序:

執行一次 MessageSender。

執行 MessageReceiver1 和 MessageReceiver2,綁定到路由。

再次執行 MessageSender,發送消息給路由。

生產者

public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//

public class Receiver1 {private static final String RECEIVER_QUEUE = &34;;public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//

public class Receiver2 {private static final String RECEIVER_QUEUE = &34;;public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//

路由會根據類型進行定向(direct)分發消息給不同的隊列;每種類型可以對應多個消費者。

運行程序的順序:

先運行一次 Sender(創建路由器)。

有了路由器之后,在創建兩個 Receiver1 和 Receiver2,進行隊列綁定。

再次運行 Sender,發出消息。

生產者

林肯和索菲亞最后在一起了(林肯和邁克不是親兄弟,林肯是以前公司一對特工的孩子,因在執行任務的過程中死了,被邁克的父親收養)馬洪重新獲得了愛情,和以前的同事那個黑人女的在一起了 T-BAG進了FOX監獄 DON-SELF因為沒。

public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//

public class Receiver1 {private static final String RECEIVER_QUEUE = &34;;public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//

public class Receiver2 {private static final String RECEIVER_QUEUE = &34;;public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//

通配符模式是和路由模式差不多,唯獨的區別就是路由鍵支持模糊匹配。

匹配符號:

* - 只能匹配一個詞(正好一個詞,多一個不行,少一個也不行)。

案例:

Q1 綁定了路由鍵 `*.orange.*`Q2 綁定了路由鍵 `*.*.rabbit` 和 `lazy. Q1Q2lazy.orange.elephant Q1lazy.brown.fox Q2quick.brown.fox無quick.orange.male.rabbit34;test_exchange_topic&34;topic&34;price-off promotion&34;test_exchange_topic&34;product.price&34;Provider: &34;test_exchange_topic_queue_1&34;test_exchange_topic&34;user.34;);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String s = new String(body);System.out.println(&34; + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(RECEIVER_QUEUE,true,consumer);}}

消費者 2

public class Receiver2 {private static final String RECEIVER_QUEUE = &34;;public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(RECEIVER_QUEUE,false,false,false,null);// 綁定路由(綁定用戶相關的消息)channel.queueBind(RECEIVER_QUEUE,&34;,&&34;test_exchange_topic&34;order.34;);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String s = new String(body);System.out.println(&34; + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(RECEIVER_QUEUE,true,consumer);}}

持久化

消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何避免消息丟失?

消費者的 ACK 確認機制,可以防止消費者丟失消息。

萬一在消費者消費之前,RabbitMQ 服務器宕機了,那消息也會丟失。

想要將消息持久化,那么路由和隊列都要持久化才可以。

生產者

public class Sender {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//

public class Receiver1 {private static final String RECEIVER_QUEUE = &34;;public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列 (第二個參數為 true:支持持久化)channel.queueDeclare(RECEIVER_QUEUE,true,false,false,null);// 綁定路由(綁定用戶相關的消息)channel.queueBind(RECEIVER_QUEUE,&34;,&&34;Consumer 1: " + s);}};// 4.監聽隊列 true: 自動消息確認channel.basicConsume(RECEIVER_QUEUE,true,consumer);}}

Spring 整合 RabbitMQ

五種消息模型,在企業中應用最廣泛的就是定向匹配 topics。

Spring AMQP 是基于 Spring 框架的 AMQP 消息解決方案,提供模板化的發送和接收消息的抽象層,提供基于消息驅動的 POJO 的消息監聽等,簡化了對于 RabbitMQ 相關程序的開發。

生產端工程

依賴 pom.xml

org.springframework.amqpspring-rabbit2.0.1.RELEASEorg.slf4jslf4j-log4j121.7.25compileorg.apache.commonscommons-lang33.9

spring-rabbitmq-producer.xml

發消息 com.renda.test.Sender:

public class Sender {public static void main(String[] args) {// 1.創建 spring 容器ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(&34;);// 2.從 spring 容器中獲得 rabbit 模版對象RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);// 3.發消息Map map = new HashMap();map.put(&34;,&34;);map.put(&34;,&34;);rabbitTemplate.convertAndSend(&34;,map);System.out.println(&34;);context.close();}}

消費端工程

依賴與生產者一致

spring-rabbitmq-consumer.xml

?

消費者:

MessageListener 接口用于 spring 容器接收到消息后處理消息;

如果需要使用自己定義的類型來實現處理消息時,必須實現該接口,并重寫 onMessage() 方法;

當 spring 容器接收消息后,會自動交由 onMessage 進行處理。

com.renda.listener.ConsumerListener:

@Componentpublic class ConsumerListener implements MessageListener {?/** * jackson 提供序列化和反序列中使用最多的類,用來轉換 json 的 */private static final ObjectMapper MAPPER = new ObjectMapper();?@Overridepublic void onMessage(Message message) {// 將 message對象轉換成 jsonJsonNode jsonNode = null;try {jsonNode = MAPPER.readTree(message.getBody());String name = jsonNode.get(&34;).asText();String email = jsonNode.get(&34;).asText();System.out.println(&34; + name + &34; + email + &34;);} catch (IOException e) {e.printStackTrace();}}?}

啟動項目 com.renda.test.TestRunner:

public class TestRunner {?public static void main(String[] args) throws IOException {// 獲得容器ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(&34;);// 讓程序一直運行,別終止System.in.read();}?}

消息成功確認機制

在實際場景下,有的生產者發送的消息是必須保證成功發送到消息隊列中,需要事務機制和發布確認機制。

FOXQ1

事務機制

AMQP 協議提供的一種保證消息成功投遞的方式,通過信道開啟 transactional 模式;

利用信道的三個方法來實現以事務方式發送消息,若發送失敗,通過異常處理回滾事務,確保消息成功投遞

channel.txSelect() - 開啟事務

channel.txCommit() - 提交事務

channel.txRollback() - 回滾事務

Spring 已經對上面三個方法進行了封裝,所以這里使用原始的代碼演示。

生產者

public class Sender {?public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();?channel.exchangeDeclare(&34;,&34;);// 開啟事務channel.txSelect();try {channel.basicPublish(&34;,&34;,null,&34;.getBytes());// 模擬出錯// System.out.println(1 / 0);channel.basicPublish(&34;,&34;,null,&34;.getBytes());// 提交事務(一起成功)channel.txCommit();System.out.println(&34;);} catch (Exception e) {System.out.println(&34;);// 事務回滾(一起失敗)channel.txRollback();e.printStackTrace();} finally {channel.close();connection.close();}}?}

消費者

public class Receiver {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(&34;,false,false,false,null);channel.queueBind(&34;,&34;,&&34;Consumer: &34;test_transaction_queue",true,consumer);}}

Confirm 發布確認機制

RabbitMQ 為了保證消息的成功投遞,采用通過 AMQP 協議層面提供事務機制的方案,但是采用事務會大大降低消息的吞吐量。

開啟事務性能最大損失超過 250 倍。

事務效率低下原因:100 條消息,前 99 條成功,如果第 100 條失敗,那么 99 條消息要全部撤銷回滾。

更加高效的解決方式是采用 Confirm 模式,而 Confirm 模式則采用補發第 100 條的措施來完成 100 條消息的送達。

在 Spring 中應用

resources\spring\spring-rabbitmq-producer.xml

......

消息確認處理類 com.renda.confirm.MessageConfirm:

public class MessageConfirm implements RabbitTemplate.ConfirmCallback {/** * @param correlationData 消息相關的數據對象(封裝了消息的唯一 id) * @param b 消息是否確認成功 * @param s 異常信息 */@Overridepublic void confirm(CorrelationData correlationData,boolean b,String s) {if (b) {System.out.println(&34;);} else {System.out.println(&34; + s);// 如果本條消息一定要發送到隊列中,例如下訂單消息,可以采用補發// 1.采用遞歸(限制遞歸的次數)// 2.redis + 定時任務(jdk 的 timer,或者定時任務框架 Quartz)}}}

resources\log4j.properties

log4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target=System.outlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%nlog4j.appender.file=org.apache.log4j.FileAppenderlog4j.appender.file.File=rabbitmq.loglog4j.appender.file.layout=org.apache.log4j.PatternLayoutlog4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l%m%nlog4j.rootLogger=debug,stdout,file

發送消息 com.renda.test.Sender:

...// 3.發消息Map map = new HashMap();map.put(&34;,&34;);map.put(&34;,&34;);// 模擬發送消息失敗// rabbitTemplate.convertAndSend(&34;,&34;,map);rabbitTemplate.convertAndSend(&34;,map);System.out.println(&34;);...

1、華為 華為最近出的藍牙耳機還不錯,外觀設計簡約,佩戴舒適,不容易掉,雖然對于學生黨來說有點貴,但品質好啊又是國產的,值得推薦。2、小米 小米最先是做手機起家的,但現在小米會做各種電子配件,甚至有智能家居,在。

消費端限流

RabbitMQ 服務器積壓了成千上萬條未處理的消息,然后隨便打開一個消費者客戶端,就會出現這樣的情況:巨量的消息瞬間全部噴涌推送過來,但是單個客戶端無法同時處理這么多數據,就會被壓垮崩潰。

所以,當數據量特別大的時候,對生產端限流肯定是不科學的,因為有時候并發量就是特別大,有時候并發量又特別少,這是用戶的行為 - 是無法約束的。

應該對消費端限流,用于保持消費端的穩定。

RabbitMQ 提供了一種 QoS(Quality of Service,服務質量)服務質量保證功能;

即在非自動確認消息的前提下,如果一定數目的消息未被確認前,不再進行消費新的消息。

生產者 com.renda.test.Sender 使用循環發出多條消息:

...for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(&34;,map);System.out.println(&34;);}...

RabbitMQ 的管理頁面可以看到生產了 10 條堆積未處理的消息。

消費者進行限流處理:

resources\spring\spring-rabbitmq-consumer.xml

...5.配置監聽 -->...

com.renda.listener.ConsumerListener

@Componentpublic class ConsumerListener extends AbstractAdaptableMessageListener {?/** * jackson 提供序列化和反序列中使用最多的類,用來轉換 json 的 */private static final ObjectMapper MAPPER = new ObjectMapper();?@Overridepublic void onMessage(Message message,Channel channel) throws Exception {// 將 message對象轉換成 json//JsonNode jsonNode = MAPPER.readTree(message.getBody());//String name = jsonNode.get(&34;).asText();//String email = jsonNode.get(&34;).asText();//System.out.println(&34; + name + &34; + email + &34;);?String str = new String(message.getBody());System.out.println(&34; + str);?/** * 手動確認消息(參數1,參數2) * 參數 1:RabbitMQ 想該 channel 投遞的這條消息的唯一標識 ID,此 ID 是一個單調遞增的正整數。 * 參數 2:為了減少網絡流量,手動確認可以被批量處理;當該參數為 true 時,則可以一次性確認小于等于 msgId 值的所有消息。 */long msgId = message.getMessageProperties().getDeliveryTag();channel.basicAck(msgId,true);?Thread.sleep(3000);System.out.println(&34;);}}

每次最多只確認接收 3 條消息,直到消息隊列為空。

過期時間 TTL

Time To Live - 生存時間、還能活多久,單位毫秒。

在這個周期內,消息可以被消費者正常消費,超過這個時間,則自動刪除(其實是被稱為 dead message 并投入到死信隊列,無法消費該消息)。

RabbitMQ 可以對消息和隊列設置 TTL:

通過隊列設置,隊列中所有消息都有相同的過期時間。

對消息單獨設置,每條消息的 TTL 可以不同(更顆粒化)。

設置隊列 TTL

RabbitMQ 管理端刪除掉 test_spring_queue_1 隊列。

resources\spring\spring-rabbitmq-producer.xml

5 秒之后,消息自動刪除。

設置消息 TTL

RabbitMQ 管理端刪除掉 test_spring_queue_1 隊列。

設置某條消息的 TTL,只需要在創建發送消息時指定即可。

resources\spring\spring-rabbitmq-producer.xml

com.renda.test.Sender2

public class Sender2 {?public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(&34;);RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);?// 創建消息配置對象MessageProperties messageProperties = new MessageProperties();// 設置消息過期時間messageProperties.setExpiration(&34;);// 創建消息Message message = new Message(&34;.getBytes(),messageProperties);// 發消息rabbitTemplate.convertAndSend(&34;,message);System.out.println(&34;);?context.close();}?}

如果同時設置了 queue 和 message 的 TTL 值,則只有二者中較小的才會起作用。

死信隊列

DLX(Dead Letter Exchanges)死信交換機 / 死信郵箱,當消息在隊列中由于某些原因沒有被及時消費而變成死信(dead message)后,這些消息就會被分發到 DLX 交換機中,而綁定 DLX 交換機的隊列,稱之為:“死信隊列”。

消息沒有被及時消費的原因:

消息被拒絕(basic.reject / basic.nack)并且不再重新投遞 requeue=false。

消息超時未消費。

達到最大隊列長度。

my_exchange 交換機 --- 沒有及時消費的消息 ---> dlx_exchange 死信交換機my_exchange -- 路由鍵 dlx_ttl --> test_ttl_queue 消息過期my_exchange -- 路由鍵 dlx_max --> test_max_queue 達到最大隊列長度沒有及時消費的消息:[test_ttl_queue,test_max_queue]test_ttl_queue -- 過期的消息 --> dlx_exchangetest_max_queue -- 被擠出的消息 --> dlx_exchangedlx_exchange -- 路由鍵 dlx_ttl --> dlx_queue 死信隊列dlx_exchange -- 路由鍵 dlx_max --> dlx_queue 死信隊列

resources\spring\spring-rabbitmq-producer-dlx.xml

?????????

發消息進行測試

public class SendDLX {?public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(&34;);RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);// 測試超時// rabbitTemplate.convertAndSend(&34;,&34;.getBytes());?// 測試超過最大長度rabbitTemplate.convertAndSend(&34;,&34;.getBytes());rabbitTemplate.convertAndSend(&34;,&34;.getBytes());rabbitTemplate.convertAndSend(&34;,&34;.getBytes());?System.out.println(&34;);?context.close();}?}

延遲隊列

延遲隊列 = TTL + 死信隊列的合體。

死信隊列只是一種特殊的隊列,里面的消息仍然可以消費。

在電商開發部分中,都會涉及到延時關閉訂單,此時延遲隊列正好可以解決這個問題。

我在讀到伊蘭這個角色時,心里總是不由自主的浮現出FOX的科幻劇Fringe,女主角Olivia Dunham,的樣子。奧莉維亞,也是一名軍人,早年喪父喪母,父親很巧地也是一名軍官,在軍事基地任要職;劇中的她給人的印象也是獨立干練且自制地,對待自己的。

生產者

沿用上面死信隊列案例的超時測試,超時時間改為訂單關閉時間即可。

消費者

resources\spring\spring-rabbitmq-consumer.xml

......

版權聲明:本站文章均來源于網絡,如有侵權請聯系刪除!

聯系我們

在線咨詢:點擊這里給我發消息

QQ:

工作日:9:30-18:30,節假日休息

<strike id="rtjff"><noframes id="rtjff"><strike id="rtjff"></strike>
<progress id="rtjff"><address id="rtjff"></address></progress>
<th id="rtjff"></th>
<span id="rtjff"><video id="rtjff"></video></span><th id="rtjff"><video id="rtjff"><span id="rtjff"></span></video></th><progress id="rtjff"><noframes id="rtjff"><strike id="rtjff"><video id="rtjff"><strike id="rtjff"></strike></video></strike>
<th id="rtjff"></th><strike id="rtjff"><noframes id="rtjff"><strike id="rtjff"></strike>
<span id="rtjff"><video id="rtjff"></video></span>
<span id="rtjff"></span><span id="rtjff"><noframes id="rtjff">
<span id="rtjff"></span>
<th id="rtjff"><noframes id="rtjff"><th id="rtjff"></th>
<th id="rtjff"><noframes id="rtjff">
<span id="rtjff"><noframes id="rtjff">
一二三四视频社区在线7