Ezreal 书架 Ezreal 书架
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
  • RabbitMQ简介

  • RabbitMQ 入门

  • 客户端开发向导

  • RabbitMQ 进阶

    • 消息何去何从
    • 过期时间 (TTL)
    • 死信队列
    • 延迟队列
    • 优先级队列
    • RPC 实现
    • 持久化
    • 生产者确认
    • 消费端要点介绍
    • 消息传输保障
    • 小结
  • RabbitMQ 管理

  • RabbitMQ实战指南
  • RabbitMQ 进阶
ezreal_rao
2023-04-06

RPC 实现

RPC , 是 Remote Procedure Call 的简称,即远程过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。 RPC 的主要功用是让构建分布式计算更容易, 在提供强大的远程调用能力时不损失本地调用的语义简洁性。

通俗点来说,假设有两台服务器 A 和 B , 一个应用部署在 A 服务器上,想要调用 B 服务器上应用提供的函数或者方法,由于不在同一个内存空间 ,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

RPC 的协议有很多,比如最早的 CORBA、 Java RMI、WebService 的 RPC 风格、 Hessian、 Thrift 甚至还有 Restful API 。

一般在 RabbitMQ 中进行 RPC 是很简单。客户端发送请求消息,服务端回复响应的消息。为了接收响应的消息,我们需要在请求消息中发送一个回调队列 (参考下面代码中的 replyTo)。 可以使用默认的队列, 具体示例代码如代码清单 4-11 所示。

代码清单 4-11

String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new
    BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("", "rpc queue", props, message.getBytes());
//then code to read a response message from the callback_queue...
1
2
3
4
5

对于代码中涉及的 BasicProperties 这个类 , 在 3.3 节中我们在阐述发送消息的时候讲解过,其包含 14 个属性,这里就用到两个属性。

  • replyTo: 通常用来设置一个回调队列。
  • correlationId: 用来关联请求 (request) 和其调用 RPC 之后的回复 (response)。

如果像上面的代码中一样,为每个 RPC 请求创建一个回调队列,则是非常低效的。但是幸运的是这里有一个通用的解决方案一一可以为每个客户端创建一个单一 的回调队列。

这样就产生了一个新的问题,对于回调队列而言,在其接收到一条回复的消息之后,它并不知道这条消息应该和哪一个请求匹配。这里就用到 correlationld 这个属性了 , 我们应该为每一个请求设置一个唯一的 correlationld 。之后在回调队列接收到回复的消息时,可以根据这个属性匹配到相应的请求。如果回调队列接收到一条未知 correlationld 的回复消息,可以简单地将其丢弃。

你有可能会问,为什么要将回调队列中的未知消息丢弃而不是仅仅将其看作失败?这样可以针对这个失败做一些弥补措施。参考图 4-7, 考虑这样一种情况, RPC 服务器可能在发送给回调队列 (amq.gen-LhQzlgv3GhDOv8PIDabOXA) 并且在确认接收到请求的消息 (rpc_queue 中的消息) 之后挂掉了,那么只需重启下 RPC 服务器即可, RPC 服务会重新消费 rpc_queue 队列中的请求,这样就不会出现 RPC 服务端未处理请求的情况。这里的回调队列可能会收到重复消息的情况,这需要客户端能够优雅地处理这种情况,并且 RPC 请求也需要保证其本身是幂等的 (补充:根据 3.5 节的介绍,消费者消费消息一般是先处理业务逻辑, 再使用 Basic.Ack 确认己接收到消息以防止消息不必要地丢失)。

RPC 示意图

根据图 4-7 所示,RPC 的处理流程如下:

  1. 当客户端启动时,创建一个匿名的回调队列 (名称由 RabbitMQ 自动创建,图 4-7 中 的回调队列为 amq.gen-LhQzlgv3GhDOv8PIDabOXA) 。
  2. 客户端为 RPC 请求设置 2 个属性 : replyTo 用来告知 RPC 服务端回复请求时的目的队列,即回调队列;correlationld 用来标记一个请求。
  3. 请求被发送到 rpc_queue 队 列中。
  4. RPC 服务端监听 rpc_queue 队列中的请求,当请求到来时, 服务端会处理并且把带有结果的消息发送给客户端。 接收的队列就是 replyTo 设定的回调队列。
  5. 客户端监昕回调队列,当有消息时,检查 correlationld 属性,如果与请求匹配,那就是结果了。

首先是服务端的关键代码 ,代码清单 4-12 所示。

代码清单 4-12

package main

import (
	"context"
	"fmt"
	"strconv"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	RpcServer()
}

func RpcServer() {
	conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
	ch, _ := conn.Channel()

	q, _ := ch.QueueDeclare(
		"rpc_queue",
		false,
		false,
		false,
		false,
		nil,
	)
	_ = ch.Qos(1, 0, false)

	msgs, _ := ch.Consume(
		q.Name,
		"",
		false,
		false,
		false,
		false,
		nil,
	)

	fmt.Printf(" [*] Awaiting RPC requests \n")
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	for d := range msgs {
		n, _ := strconv.Atoi(string(d.Body))
		fmt.Printf(" [.] fib(%d)\n", n)
		response := fib(n)

		_ = ch.PublishWithContext(ctx,
			"",        // exchange
			d.ReplyTo, // routing key
			false,     // mandatory
			false,     // immediate
			amqp.Publishing{
				ContentType:   "text/plain",
				CorrelationId: d.CorrelationId,
				Body:          []byte(strconv.Itoa(response)),
			})
		d.Ack(false)
	}
}

func fib(n int) int {
	if n == 0 {
		return 0
	} else if n == 1 {
		return 1
	} else {
		return fib(n-1) + fib(n-2)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

RPC 客户端的关键代码如代码清单 4-13 所示。

代码清单 4-13

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"strconv"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	RpcClient()
}

func RpcClient() {
	rand.Seed(time.Now().UTC().UnixNano())
	n := rand.Intn(20)
	log.Printf(" [x] Requesting fib(%d)", n)
	var res int
	conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
	ch, _ := conn.Channel()

	q, _ := ch.QueueDeclare("", false, false, true, false, nil)

	msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil)

	corrId := randomString(32)

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	_ = ch.PublishWithContext(ctx,
		"",          // exchange
		"rpc_queue", // routing key
		false,       // mandatory
		false,       // immediate
		amqp.Publishing{
			ContentType:   "text/plain",
			CorrelationId: corrId,
			ReplyTo:       q.Name,
			Body:          []byte(strconv.Itoa(n)),
		})

	for d := range msgs {
		if corrId == d.CorrelationId {
			res, _ = strconv.Atoi(string(d.Body))
			break
		}
	}

	log.Printf(" [.] Got %d \n", res)

}

func randomString(l int) string {
	bytes := make([]byte, l)
	for i := 0; i < l; i++ {
		bytes[i] = byte(randInt(65, 90))
	}
	return string(bytes)
}

func randInt(min int, max int) int {
	return min + rand.Intn(max-min)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#mq#rabbitmq#amq
上次更新: 5/9/2023, 10:58:32 AM
优先级队列
持久化

← 优先级队列 持久化→

最近更新
01
为什么我的MySQL会抖一下
07-15
02
HTTP 性能优化面面观
07-12
03
WebSocket:沙盒里的 TCP
07-12
更多文章>
Theme by Vdoing | Copyright © 2022-2024 Ezreal Rao | CC BY-NC-SA 4.0
豫ICP备2023001810号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式