golang rabbitmq的使用(五)

先说一个实际的业务场景:
Client端有一个请求需要进行耗时处理或者查询,这个处理在Server端做。Server 端处理完后通知给请求的Client端。
这种场景可以称之为RPC(Remote Procedure Call)

有两个点说明一下:

  • <1>Client端发送请求给Server端可以简单定义一个Queue。Client作为Producer发布消息,Server端作为Cosumer消费消息
  • <2>Server端处理完耗时处理后需要将处理结果返回给请求的客户端。

    • 可以在Client声明一个不指定名称的Queue,系统会自动生成一个随机名称的Queue。将Queue的名称在publish是发送给Server端
    • 因为Server端要将处理结果返回给对应的请求,所以在Client端需要生成一个CorrelationId发送给Server端

处理流程

Client端
(1)声明从Server返回消息用的queue

respQueue, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // noWait
        nil,
    )

(2)发送请求消息到rpc_queue

err = ch.Publish(
        "",               //exchange
        config.QUEUENAME, //routing key
        false,
        false,
        amqp.Publishing{
            ContentType:   "text/plain",
            CorrelationId: correlationID,
            ReplyTo:       respQueue.Name,
            Body:          []byte(msgBody),
        })

corrId为自己随机生成的Id

(2)Server端
(3)声明rpc_queue,从rpc_queue中消费消息

q, err := ch.QueueDeclare(
                config.QUEUENAME,
                false,
                false,
                false,
                false,
                nil,
            )

            msgs, err := ch.Consume(
                q.Name,
                "",
                false, // auto ack
                false,
                false,
                false,
                nil,
            )

(4)执行处理后使用msg中的ReplyTo返回处理结果给Client

err = ch.Publish(
        "",
        msg.ReplyTo,
        false,
        false,
        amqp.Publishing{
            ContentType:   "text/plain",
            CorrelationId: msg.CorrelationId,
            Body:          []byte(bookName),
        })

    msg.Ack(false)

(5)Client端从reply queue中接收从Server端来的response

respMsgs, err := ch.Consume(
        respQueue.Name,
        "",
        true,  // auto-ack
        true,  // exclusive
        false, // noLocal
        false, // nowait
        nil,
    )

详细代码如下
conf.go

package config

const (
    RMQADDR            = "amqp://guest:guest@172.17.84.205:5672/"
    QUEUENAME          = "rpc_queue"
    SERVERINSTANCESCNT = 5
)

client.go

package main

import (
    config "binTest/rabbitmqTest/t1/l6/conf"
    "fmt"
    "log"
    "math/rand"
    "os"

    "github.com/streadway/amqp"
)

func main() {

    if len(os.Args) < 2 {
        log.Println("Arguments error")
        return
    }

    conn, err := amqp.Dial(config.RMQADDR)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    msgBody := os.Args[1]

    respQueue, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // noWait
        nil,
    )
    failOnError(err, "Failed to declare a response queue")

    correlationID := randomID(32)

    err = ch.Publish(
        "",               //exchange
        config.QUEUENAME, //routing key
        false,
        false,
        amqp.Publishing{
            ContentType:   "text/plain",
            CorrelationId: correlationID,
            ReplyTo:       respQueue.Name,
            Body:          []byte(msgBody),
        })

    log.Printf(" [x] Sent %s", msgBody)
    failOnError(err, "Failed to publish a message")

    respMsgs, err := ch.Consume(
        respQueue.Name,
        "",
        true,  // auto-ack
        true,  // exclusive
        false, // noLocal
        false, // nowait
        nil,
    )

    for item := range respMsgs {
        if item.CorrelationId == correlationID {
            fmt.Println("response:", string(item.Body))
            break
        }
    }
}

func failOnError(err error, msg string) {
    if err != nil {
        fmt.Printf("%s: %s\n", msg, err)
    }
}

func randomID(length int) string {
    if length <= 0 {
        return ""
    }

    bytes := make([]byte, length)
    for i := 0; i < length; i++ {
        bytes[i] = byte(rand.Intn(9))
    }

    return string(bytes)
}

server.go

package main

import (
    config "binTest/rabbitmqTest/t1/l6/conf"
    "fmt"
    "log"
    "math/rand"
    "time"

    "github.com/streadway/amqp"
)

func main() {

    conn, err := amqp.Dial(config.RMQADDR)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.Qos(
        config.SERVERINSTANCESCNT,
        0,
        false,
    )

    forever := make(chan bool)

    for routine := 0; routine < config.SERVERINSTANCESCNT; routine++ {
        go func(routineNum int) {

            q, err := ch.QueueDeclare(
                config.QUEUENAME,
                false,
                false,
                false,
                false,
                nil,
            )
            failOnError(err, "Failed to declare a queue")

            msgs, err := ch.Consume(
                q.Name,
                "",
                false, // auto ack
                false,
                false,
                false,
                nil,
            )

            for msg := range msgs {
                log.Printf("In %d start consuming message: %s\n", routineNum, msg.Body)

                bookName := queryBookID(string(msg.Body))

                err = ch.Publish(
                    "",
                    msg.ReplyTo,
                    false,
                    false,
                    amqp.Publishing{
                        ContentType:   "text/plain",
                        CorrelationId: msg.CorrelationId,
                        Body:          []byte(bookName),
                    })

                if err != nil {
                    fmt.Println("Failed to reply msg to client")
                } else {
                    fmt.Println("Response to client:", bookName)
                }
                msg.Ack(false)
            }
        }(routine)
    }

    <-forever
}

func failOnError(err error, msg string) {
    if err != nil {
        fmt.Printf("%s: %s\n", msg, err)
    }
}

func queryBookID(bookID string) string {
    bookName := "QUERIED_" + bookID
    time.Sleep(time.Duration(rand.Intn(9)) * time.Second)

    return bookName
}

执行效果
Client端
golang rabbitmq的使用(五)

Server端
golang rabbitmq的使用(五)

全部代码可以在如下处取得
https://github.com/BinWang-sh...

相关推荐