午夜视频在线网站,日韩视频精品在线,中文字幕精品一区二区三区在线,在线播放精品,1024你懂我懂的旧版人,欧美日韩一级黄色片,一区二区三区在线观看视频

分享

RabbitMQ Golang教程(二)

 雨夜的博客 2021-05-05

RabbitMQ Golang教程(二)


任務隊列


什么是任務隊列 ?

把要執(zhí)行的任務放在隊列中。使用較多的任務隊列有machiney、Celery、goWorker、YTask。每一個任務隊列都有自己的特點,這里就不細講了。 我們將創(chuàng)建一個工作隊列,該隊列將用于在多個工人之間分配耗時的任務。

工作隊列(又稱任務隊列)的主要思想是避免立即執(zhí)行某些資源密集型任務并且不得不等待這些任務完成。相反,我們安排任務異步地同時或在當前任務之后完成。我們將任務封裝為消息并將其發(fā)送到隊列,在后臺運行的工作進程將取出消息并最終執(zhí)行任務。當你運行多個工作進程時,任務將在他們之間共享。

這個概念在Web應用中特別有用,因為在Web應用中不可能在較短的HTTP請求窗口內(nèi)處理復雜的任務,(例如:注冊時發(fā)送郵件或短信驗證碼等場景)。

首先,需要編寫發(fā)送端的程序。該程序會將任務安排到我們的工作隊列中,因此將其命名為new_task.go


package mainimport (
"fmt"
"github.com/streadway/amqp"
"log"
"os"
"strings")func main()  {
// 1. 嘗試連接RabbitMQ,建立連接
// 該連接抽象了套接字連接,并為我們處理協(xié)議版本協(xié)商和認證等。
conn,err := amqp.Dial("amqp://admin:admin@xx.xxx.xxx.xxx:xxx/")
if err != nil {
fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
return
}
defer conn.Close()

// 2. 接下來,我們創(chuàng)建一個通道,大多數(shù)API都是用過該通道操作的。
ch,err := conn.Channel()
if err != nil {
fmt.Printf("open a channel failed, err:%v\n", err)
return
}
defer ch.Close()

// 3. 要發(fā)送,我們必須聲明要發(fā)送到的隊列。
q, err := ch.QueueDeclare(
"task_queue", // name
true,         // 持久的
false,        // delete when unused
false,        // 獨有的
false,        // no-wait
nil,          // arguments
)
if err != nil {
fmt.Printf("declare a queue failed, err:%v\n", err)
return
}

// 4. 然后我們可以將消息發(fā)布到聲明的隊列
body := bodyFrom(os.Args)
err = ch.Publish(
"",     // exchange
q.Name, // routing key
false,  // 立即
false,  // 強制
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久
ContentType:  "text/plain",
Body:         []byte(body),
})
if err != nil {
fmt.Printf("publish a message failed, err:%v\n", err)
return
}
log.Printf(" [x] Sent %s", body)}// bodyFrom 從命令行獲取將要發(fā)送的消息內(nèi)容func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s}

然后,需要編寫接收端程序,它需要為消息正文中出現(xiàn)的每個.偽造一秒鐘的工作。它將從隊列中彈出消息并執(zhí)行任務,因此將其稱為worker.go

package mainimport (
"bytes"
"fmt"
"github.com/streadway/amqp"
"log"
"time")func main()  {
conn,err := amqp.Dial("amqp://admin:admin@49.234.192.212:5672/")
if err != nil {
fmt.Printf("connect to RabbitMQ failed, err:%v\n", err)
return
}
defer conn.Close()

ch,err := conn.Channel()
if err != nil {
fmt.Printf("open a channel failed, err:%v\n", err)
return
}
defer ch.Close()

// 聲明一個queue
q, err := ch.QueueDeclare(
"task_queue", // name
true,         // 聲明為持久隊列
false,        // delete when unused
false,        // exclusive
false,        // no-wait
nil,          // arguments
)
err = ch.Qos(
1,     // prefetch count
0,     // prefetch size
false, // global
)
if err != nil {
fmt.Printf("ch.Qos() failed, err:%v\n", err)
return
}

// 立即返回一個Delivery的通道
msgs, err := ch.Consume(
q.Name, // queue
"",     // consumer
false,  // 注意這里傳false,關閉自動消息確認
false,  // exclusive
false,  // no-local
false,  // no-wait
nil,    // args
)
if err != nil {
fmt.Printf("ch.Consume failed, err:%v\n", err)
return
}

// 開啟循環(huán)不斷地消費消息
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dotCount := bytes.Count(d.Body, []byte("."))
t := time.Duration(dotCount)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false) // 手動傳遞消息確認
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever}

執(zhí)行結果


單個執(zhí)行


  • 打開兩個終端,分別執(zhí)行new_task.go和worker.go了。

    go run worker.go

    file

    go run new_task.go

    file

  • 查看是否鏈接成功 file

  • 查看隊列是否存在 file

  • 通過命令查看

    rabbitmqctl list_queues

    file

循環(huán)調(diào)度


使用任務隊列的優(yōu)點之一是能夠輕松并行化工作。

首先,嘗試同時運行兩個worker.go腳本。它們都將從隊列中獲取消息。

需要打開三個控制臺。其中兩個將運行worker.go腳本。這些控制臺將成為我們的兩個消費者——C1和C2。 file

  • 查看是否鏈接成功 file

  • 查看隊列是否存在 file

  • 通過命令查看

    rabbitmqctl list_queues

    file

    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多