amqp源码阅读

概览

Python amqp 是celery项目维护的一个AMQP客户端,详细的介绍可以点开链接查看。它的代码量比较小,有助于我们学习AMQP,首先我们了解一下包的目录结构。大致阅读整体代码后,我们能够了解到整体的分层设计大致如图。之后我们再深入每一层的代码实现,由底至上,学习相关的知识点。阅读完整个的源码后,我们再尝试用golang重新撸一遍实现。

amqp包目录结构

amqp源码分层

Transport

这一层主要是基于TCP连接,实现带缓冲区套接字字节流的读写,协议数据报的读写

transport

  • 协议无关性:socket.getaddrinfo 将返回目标地址支持的套接字信息并返回已填入相关信息的网际套接字地址结构sa,可以直接conenct。无需考虑IPv4还是IPv6

  • 设置描述符cloexec,子进程fork之后调用exec函数成功后,会自动关闭文件描述符,避免父进程退出重启后因为端口占用无法重启:

1
2
3
4
5
6
7
8
9
10
11
12
13
def set_cloexec(fd, cloexec):
try:
FD_CLOEXEC = fcntl.FD_CLOEXEC
except AttributeError:
raise NotImplementedError(
'close-on-exec flag not supported on this platform',
)
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
if cloexec:
flags |= FD_CLOEXEC
else:
flags &= ~FD_CLOEXEC
return fcntl.fcntl(fd, fcntl.F_SETFD, flags)
  • 套接字选项设置,详情可以参考《UNIX网络编程卷1》第7章

    1
    2
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    self.sock.setsockopt(SOL_TCP, socket.TCP_NODELAY, 1)
  • 套接字读写,TCPTransport和SSLTransport分别实现了抽象类的套接字读写方法。SSLTransport使用ssl库包裹了当前套接字并使用ssl的读写方法。读数据时,可能出现的异常

1
2
3
errno.ENOENT:recv收到对端发送的RST产生的错误
errno.EAGAIN:如果设置成非阻塞读,没有数据可读时,返回该错误
errno.EINTR:慢系统调用中断,常见于子进程终止时传递信号给父进程
  • 协议数据报读写,从读取的方法,我们可以得到协议数据报的格式 帧类型1字节 + 2字节信道id + 4字节payload size + size字节payload + 1字节结束标志
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    def read_frame(self, unpack=unpack):
    ...
    frame_header = read(7, True)
    read_frame_buffer += frame_header
    frame_type, channel, size = unpack('>BHI', frame_header)
    payload = read(size)
    read_frame_buffer += payload
    ch = ord(read(1))
    ...

    def write_frame(self, frame_type, channel, payload)

数据协议层

这一层主要提供了字节流与上层数据类型的转换工具AMQPReader/AMQPWriter,不同帧类型的数据报的组装和读取解析工具MethodWriter/MethodReader。我们先研究一个具体使用的场景如声明exchange_declare。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Channel(AbstractChannel):
def exchange_declare(self, exchange, type, passive=False, durable=False,
auto_delete=True, nowait=False, arguments=None):
arguments = {} if arguments is None else arguments
args = AMQPWriter()
args.write_short(0)
args.write_shortstr(exchange)
args.write_shortstr(type)
args.write_bit(passive)
args.write_bit(durable)
args.write_bit(auto_delete)
args.write_bit(False) # internal: deprecated
args.write_bit(nowait)
args.write_table(arguments)
self._send_method((40, 10), args)

if auto_delete:
warn(VDeprecationWarning(EXCHANGE_AUTODELETE_DEPRECATED))

if not nowait:
return self.wait(allowed_methods=[
(40, 11), # Channel.exchange_declare_ok
])

通过AMQPWriter,将exchange_declare的参数序列化为字节流args,通过查看_send_method可以知道最后由MethodWriter.write_method执行组装发送,这里的write_frame则由底层的transport提供

1
2
3
4
5
6
7
class MethodWriter(object):
def write_method(self, channel, method_sig, args, content=None):
write_frame = self.dest.write_frame
payload = pack('>HH', method_sig[0], method_sig[1]) + args
...
write_frame(1, channel, payload)
...

至此我们已经初步了解了从上层抽象的操作接口到底层字节流的转换进行通信的过程。而上述只是一个基本的包含参数的操作,对于带有消息内容的发布与接收操作,则增加两种类型:消息头部和消息实体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class MethodWriter(object):
def write_method(self, channel, method_sig, args, content=None):
...
if content:
body = content.body
if isinstance(body, string):
coding = content.properties.get('content_encoding', None)
if coding is None:
coding = content.properties['content_encoding'] = 'UTF-8'

body = body.encode(coding)
properties = content._serialize_properties()
...
if content:
payload = pack('>HHQ', method_sig[0], 0, len(body)) + properties

write_frame(2, channel, payload)

chunk_size = self.frame_max - 8
for i in range(0, len(body), chunk_size):
write_frame(3, channel, body[i:i + chunk_size])
self.bytes_sent += 1


class Message(GenericContent):
PROPERTIES = [
('content_type', 'shortstr'),
('content_encoding', 'shortstr'),
('application_headers', 'table'),
('delivery_mode', 'octet'),
('priority', 'octet'),
('correlation_id', 'shortstr'),
('reply_to', 'shortstr'),
('expiration', 'shortstr'),
('message_id', 'shortstr'),
('timestamp', 'timestamp'),
('type', 'shortstr'),
('user_id', 'shortstr'),
('app_id', 'shortstr'),
('cluster_id', 'shortstr')
]

还是同一个方法,这次我们关注frame_type为2的消息头部,从图可以看出这次需要得到两个关键信息就是消息实体的长度和序列化后的消息头的属性,消息实体的长度计算的是编码后的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class GenericContent(object):
def _load_properties(self, raw_bytes):
r = AMQPReader(raw_bytes)
flags = []
while 1:
flag_bits = r.read_short()
flags.append(flag_bits)
if flag_bits & 1 == 0:
break

shift = 0
d = {}
for key, proptype in self.PROPERTIES:
if shift == 0:
if not flags:
break
flag_bits, flags = flags[0], flags[1:]
shift = 15
if flag_bits & (1 << shift):
d[key] = getattr(r, 'read_' + proptype)()
shift -= 1

self.properties = d

def _serialize_properties(self):
shift = 15
flag_bits = 0
flags = []
raw_bytes = AMQPWriter()
for key, proptype in self.PROPERTIES:
val = self.properties.get(key, None)
if val is not None:
if shift == 0:
flags.append(flag_bits)
flag_bits = 0
shift = 15

flag_bits |= (1 << shift)
if proptype != 'bit':
getattr(raw_bytes, 'write_' + proptype)(val)

shift -= 1

flags.append(flag_bits)
result = AMQPWriter()
for flag_bits in flags:
result.write_short(flag_bits)
result.write(raw_bytes.getvalue())

return result.getvalue()

消息头属性的序列化,仍然是使用AMQPWriter来对值进行转换。需要考虑三个问题,一个是如何知道消息头的属性有多少个,二是属性在字节流的对应位置,三是确定每个属性在字节流对应位置的边界。
写的时候通过从高位到低位设置标记位,并依次根据属性值不同类型,写入属性值转换后的数据。
消息实体类型的处理相对简单,如果数据太大,则分片进行多次发送,需要考虑的是接收端同一个channel需要进行等待组装完整的数据接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class _PartialMessage(object):
def add_header(self, payload):
...
def add_payload(self, payload):
...

class MethodReader(object):
def read_method(self):
self._next_method()
m = self._quick_get()
if isinstance(m, Exception):
raise m
if isinstance(m, tuple) and isinstance(m[1], AMQPError):
raise m[1]
return m

def _next_method(self):
queue = self.queue
put = self._quick_put
read_frame = self.source.read_frame
while not queue:
try:
frame_type, channel, payload = read_frame()
except Exception as exc:
#
# Connection was closed? Framing Error?
#
put(exc)
break

self.bytes_recv += 1

if frame_type not in (self.expected_types[channel], 8):
put((
channel,
UnexpectedFrame(
'Received frame {0} while expecting type: {1}'.format(
frame_type, self.expected_types[channel]))))
elif frame_type == 1:
self._process_method_frame(channel, payload)
elif frame_type == 2:
self._process_content_header(channel, payload)
elif frame_type == 3:
self._process_content_body(channel, payload)
elif frame_type == 8:
self._process_heartbeat(channel, payload)

MethodReader提供了给上层消费者调用的read_method,内部维护一个临时队列,如果完整的数据每结束,则继续阻塞读取直到合并成完成的数据message对象,这一步临时存储和组装则由_PartialMessage完成

会话层

这一部分,通过对Connection的研究,可以将这部分分为4部分:连接管理,会话建立与关闭的状态转移,channel管理,事件驱动与分发

连接管理

  1. 通过初始化的参数,创建Transport对象
  2. 查看连接是否存活,MSG_PEEK如果套接字有该选项,则支持从套接字缓冲区预读以此检测连接是否存活

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    class Connection(object):
    def is_alive(self):
    if HAS_MSG_PEEK:
    sock = self.sock
    prev = sock.gettimeout()
    sock.settimeout(0.0001)
    try:
    sock.recv(1, socket.MSG_PEEK)
    except socket.timeout:
    pass
    except socket.error:
    return False
    finally:
    sock.settimeout(prev)
    return True
  3. 提供保持连接的心跳探活

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    class Connection(object):
    def send_heartbeat(self):
    self.transport.write_frame(8, 0, bytes())

    def heartbeat_tick(self, rate=2):
    if not self.heartbeat:
    return

    # 记录数据包的发送和接收数量记录
    sent_now = self.method_writer.bytes_sent
    recv_now = self.method_reader.bytes_recv
    # 记录此次检查活跃的时间
    if self.prev_sent is None or self.prev_sent != sent_now:
    self.last_heartbeat_sent = monotonic()
    if self.prev_recv is None or self.prev_recv != recv_now:
    self.last_heartbeat_received = monotonic()
    self.prev_sent, self.prev_recv = sent_now, recv_now

    # 发送心跳包
    if monotonic() > self.last_heartbeat_sent + self.heartbeat:
    self.send_heartbeat()
    self.last_heartbeat_sent = monotonic()

    # 检查失败,说明连接已经断开
    if (self.last_heartbeat_received and
    self.last_heartbeat_received + 2 *
    self.heartbeat < monotonic()):
    raise ConnectionForced('Too many heartbeats missed')
  4. 连接关闭,关闭transport,所有打开的channel都进行清理关闭

会话建立与关闭的状态转移

其状态转移过程如下图

1
2
3
4
5
6
7
8
9
10
11
12
GRAMMAR::

connection = open-connection *use-connection close-connection
open-connection = C:protocol-header
S:START C:START-OK
*challenge
S:TUNE C:TUNE-OK
C:OPEN S:OPEN-OK
challenge = S:SECURE C:SECURE-OK
use-connection = *channel
close-connection = C:CLOSE S:CLOSE-OK
/ S:CLOSE C:CLOSE-OK

channel 管理

  1. channel id 管理

事件驱动与分发

Connection作为0号信道,负责该连接下的事件驱动与分发,接收server的数据报,并分发给数据报指定的channel