RabbitMQ Golang教程(二)任務隊列什么是任務隊列 ?
首先,需要編寫發(fā)送端的程序。該程序會將任務安排到我們的工作隊列中,因此將其命名為new_task.gopackage mainimport ( "fmt" "github.com/streadway/amqp" "log" "os" "strings")func main() { // 1. 嘗試連接RabbitMQ,建立連接 // 該連接抽象了套接字連接,并為我們處理協(xié)議版本協(xié)商和認證等。 conn,err := amqp.Dial("amqp://admin:admin@xx.xxx.xxx.xxx:xxx/") if err != nil { fmt.Printf("connect to RabbitMQ failed, err:%v\n", err) return } defer conn.Close() // 2. 接下來,我們創(chuàng)建一個通道,大多數(shù)API都是用過該通道操作的。 ch,err := conn.Channel() if err != nil { fmt.Printf("open a channel failed, err:%v\n", err) return } defer ch.Close() // 3. 要發(fā)送,我們必須聲明要發(fā)送到的隊列。 q, err := ch.QueueDeclare( "task_queue", // name true, // 持久的 false, // delete when unused false, // 獨有的 false, // no-wait nil, // arguments ) if err != nil { fmt.Printf("declare a queue failed, err:%v\n", err) return } // 4. 然后我們可以將消息發(fā)布到聲明的隊列 body := bodyFrom(os.Args) err = ch.Publish( "", // exchange q.Name, // routing key false, // 立即 false, // 強制 amqp.Publishing{ DeliveryMode: amqp.Persistent, // 持久 ContentType: "text/plain", Body: []byte(body), }) if err != nil { fmt.Printf("publish a message failed, err:%v\n", err) return } log.Printf(" [x] Sent %s", body)}// bodyFrom 從命令行獲取將要發(fā)送的消息內(nèi)容func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s} 然后,需要編寫接收端程序,它需要為消息正文中出現(xiàn)的每個.偽造一秒鐘的工作。它將從隊列中彈出消息并執(zhí)行任務,因此將其稱為worker.gopackage mainimport ( "bytes" "fmt" "github.com/streadway/amqp" "log" "time")func main() { conn,err := amqp.Dial("amqp://admin:admin@49.234.192.212:5672/") if err != nil { fmt.Printf("connect to RabbitMQ failed, err:%v\n", err) return } defer conn.Close() ch,err := conn.Channel() if err != nil { fmt.Printf("open a channel failed, err:%v\n", err) return } defer ch.Close() // 聲明一個queue q, err := ch.QueueDeclare( "task_queue", // name true, // 聲明為持久隊列 false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) if err != nil { fmt.Printf("ch.Qos() failed, err:%v\n", err) return } // 立即返回一個Delivery的通道 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // 注意這里傳false,關閉自動消息確認 false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { fmt.Printf("ch.Consume failed, err:%v\n", err) return } // 開啟循環(huán)不斷地消費消息 forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dotCount := bytes.Count(d.Body, []byte(".")) t := time.Duration(dotCount) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) // 手動傳遞消息確認 } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever} 執(zhí)行結果單個執(zhí)行
循環(huán)調(diào)度使用任務隊列的優(yōu)點之一是能夠輕松并行化工作。 首先,嘗試同時運行兩個worker.go腳本。它們都將從隊列中獲取消息。 需要打開三個控制臺。其中兩個將運行worker.go腳本。這些控制臺將成為我們的兩個消費者——C1和C2。
|
|