跳到主要内容

提高Asio的Read性能

· 阅读需 12 分钟
Bruce
Back End Engineer

最近编写rust版moon, 使用tokio提供的网络库, 性能测试发现, 虽然多了一层channel通信, 但仍然拥有不错的性能, 数据如下:

cargo run --release assets/benchmark_socket.lua

====== PING_INLINE ======
100000 requests completed in 1.04 seconds
100 parallel clients
3 bytes payload
keep alive: 1
host configuration "save": 3600 1 300 100 60 10000
host configuration "appendonly": yes
multi-thread: no

0.00% <= 0.2 milliseconds
0.00% <= 0.3 milliseconds
0.01% <= 0.4 milliseconds
17.11% <= 0.5 milliseconds
86.85% <= 0.6 milliseconds
94.76% <= 0.7 milliseconds
98.26% <= 0.8 milliseconds
99.27% <= 0.9 milliseconds
99.70% <= 1.0 milliseconds
99.88% <= 1.1 milliseconds
99.92% <= 1.2 milliseconds
99.95% <= 1.3 milliseconds
99.97% <= 1.4 milliseconds
99.99% <= 1.5 milliseconds
100.00% <= 1.8 milliseconds
100.00% <= 2 milliseconds
96153.85 requests per second

====== PING_BULK ======
100000 requests completed in 1.03 seconds
100 parallel clients
3 bytes payload
keep alive: 1
host configuration "save": 3600 1 300 100 60 10000
host configuration "appendonly": yes
multi-thread: no

94.53% <= 1 milliseconds
100.00% <= 1 milliseconds
96805.42 requests per second

Cpp版moon性能测试

./moon example/tcp_coroutine_benchmark.lua

====== PING_INLINE ======
100000 requests completed in 1.00 seconds
100 parallel clients
3 bytes payload
keep alive: 1
host configuration "save": 3600 1 300 100 60 10000
host configuration "appendonly": yes
multi-thread: no

0.00% <= 0.1 milliseconds
0.00% <= 0.4 milliseconds
61.49% <= 0.5 milliseconds
90.44% <= 0.6 milliseconds
97.07% <= 0.7 milliseconds
98.94% <= 0.8 milliseconds
99.44% <= 0.9 milliseconds
99.61% <= 1.0 milliseconds
99.74% <= 1.1 milliseconds
99.81% <= 1.2 milliseconds
99.88% <= 1.3 milliseconds
99.94% <= 1.4 milliseconds
99.97% <= 1.5 milliseconds
99.99% <= 1.6 milliseconds
100.00% <= 1.6 milliseconds
99700.90 requests per second

====== PING_BULK ======
100000 requests completed in 1.12 seconds
100 parallel clients
3 bytes payload
keep alive: 1
host configuration "save": 3600 1 300 100 60 10000
host configuration "appendonly": yes
multi-thread: no

0.00% <= 0.4 milliseconds
0.03% <= 0.5 milliseconds
0.44% <= 0.6 milliseconds
1.15% <= 0.7 milliseconds
3.00% <= 0.8 milliseconds
10.83% <= 0.9 milliseconds
34.09% <= 1.0 milliseconds
81.97% <= 1.1 milliseconds
94.04% <= 1.2 milliseconds
96.93% <= 1.3 milliseconds
98.23% <= 1.4 milliseconds
98.97% <= 1.5 milliseconds
99.40% <= 1.6 milliseconds
99.63% <= 1.7 milliseconds
99.68% <= 1.8 milliseconds
99.72% <= 1.9 milliseconds
99.77% <= 2 milliseconds
99.90% <= 6 milliseconds
99.98% <= 7 milliseconds
100.00% <= 7 milliseconds
89445.44 requests per second

发现PING_BULK测试中性能比moon_rs版还低。由于moon的网络消息处理是在当前线程的, 而moon_rs的网络消息是经过channel异步切换的,这明显不太合理, 经过查询资料得知PING_BULK消息包的特点:

PING_INLINEPING_BULK 是两种不同的网络通信模式,通常在网络编程中使用。这两个术语并非特定于 ASIO,而是更广泛的网络编程概念。

  1. PING_INLINE: 这种模式通常指的是每发送一个请求就等待一个响应。这种模式的优点是可以立即得到反馈,但缺点是如果网络延迟高,那么整体的通信效率会降低。

  2. PING_BULK: 这种模式通常指的是批量发送请求,然后等待所有的响应。这种模式的优点是可以提高网络利用率和吞吐量,特别是在网络延迟高的情况下。但缺点是如果一个请求失败,可能会影响到整个批次的请求。

然后我想到moon的stream_connection中处理批量消息时,就算数据已经在缓冲区里面也要经过一次Asio的Completion Handler, 这样就影响了响应能力,也解释了上面测试中延迟高和requests per second较低的原因。

#pragma once
#include "base_connection.hpp"
#include "streambuf.hpp"

namespace moon
{
class stream_connection : public base_connection
{
public:
.....
protected:
void read_until(size_t count)
{
//这里可能缓冲区response_.as_buffer()中的数据已经足够, 虽然不会经过IO调用, 但还会经过一次Completion Handler调用
asio::async_read_until(socket_, moon::streambuf(response_.as_buffer(), count), delim_,
[this, self = shared_from_this()](const asio::error_code& e, std::size_t bytes_transferred)
{
if (e)
{
error(e);
return;
}
response(bytes_transferred);
});
}

void read(size_t count)
{
//这里同上
std::size_t size = (response_.size() >= count ? 0 : (count - response_.size()));
asio::async_read(socket_, moon::streambuf(response_.as_buffer(), count), asio::transfer_exactly(size),
[this, self = shared_from_this(), count](const asio::error_code& e, std::size_t)
{
if (e)
{
error(e);
return;
}
response(count);
});
}
.....
protected:
size_t revert_ = 0;
int64_t sessionid_ = 0;
std::string delim_;
message response_;
};
}

解决方案就是增加缓冲区中数据足够时,直接返回的机制, 修改后的代码:

#pragma once
#include "common/static_string.hpp"
#include "base_connection.hpp"
#include "streambuf.hpp"

namespace moon
{
class stream_connection : public base_connection
{
public:
.....

protected:
std::optional<std::string_view> read(read_until op)
{
//先判断缓冲区数据是否足够,足够的化直接返回
size_t delim_size = op.delim.size();
if (read_cache_.size() >= delim_size) {
std::string_view data{ read_cache_.data(), read_cache_.size() };
std::default_searcher searcher{ op.delim.data(), op.delim.data() + delim_size };
auto it = std::search(data.begin(), data.end(), searcher);
if (it != data.end()) {
read_in_progress_ = false;
auto count = std::distance(data.begin(), it);
read_cache_.as_buffer()->consume(count + delim_size);
return std::make_optional<std::string_view>(data.data(), count);
}
}

asio::async_read_until(socket_, moon::streambuf(read_cache_.as_buffer(), op.max_size), op.delim.to_string_view(),
[this, self = shared_from_this(), op](const asio::error_code& e, std::size_t bytes_transferred)
{
if (!e)
{
response(op.session, bytes_transferred, op.delim.size());
return;
}
error(e, op.session);
});
return std::nullopt;
}

std::optional<std::string_view> read(read_exactly op)
{
if (read_cache_.size() >= op.size)
{
read_in_progress_ = false;
consume_ = op.size;
return std::make_optional<std::string_view>(read_cache_.data(), op.size);
};

std::size_t size = op.size - read_cache_.size();
asio::async_read(socket_, moon::streambuf(read_cache_.as_buffer(), op.size), asio::transfer_exactly(size),
[this, self = shared_from_this(), op](const asio::error_code& e, std::size_t)
{
if (!e)
{
response(op.session, op.size, 0);
return;
}
error(e, op.session);
});
return std::nullopt;
}
......
protected:
size_t more_bytes_ = 0;
size_t consume_ = 0;
message read_cache_;
};
}

lua层也减少一次协程切换

--- NOTE:  used only when protocol == moon.PTYPE_SOCKET_TCP
---@async
---@param delim string @Read until reach the specified delim string from the socket. Max length is 7 bytes.
---@param maxcount? integer
---@overload fun(fd: integer, count: integer) @ read a specified number of bytes from the socket.
function socket.read(fd, delim, maxcount)
local session, data = read(fd, delim, maxcount)
if data then--如果有数据,则直接return
return data
end
return moon.wait(session)
end

修改过后的性能测试, 略微领先moon_rs版:

====== PING_INLINE ======
100000 requests completed in 1.00 seconds
100 parallel clients
3 bytes payload
keep alive: 1
host configuration "save": 3600 1 300 100 60 10000
host configuration "appendonly": yes
multi-thread: no

0.00% <= 0.4 milliseconds
62.82% <= 0.5 milliseconds
91.15% <= 0.6 milliseconds
98.10% <= 0.7 milliseconds
99.22% <= 0.8 milliseconds
99.76% <= 0.9 milliseconds
99.95% <= 1.0 milliseconds
99.95% <= 1.2 milliseconds
99.97% <= 1.3 milliseconds
99.97% <= 1.4 milliseconds
99.98% <= 1.5 milliseconds
99.99% <= 1.6 milliseconds
100.00% <= 1.6 milliseconds
100200.40 requests per second

====== PING_BULK ======
100000 requests completed in 1.00 seconds
100 parallel clients
3 bytes payload
keep alive: 1
host configuration "save": 3600 1 300 100 60 10000
host configuration "appendonly": yes
multi-thread: no

0.00% <= 0.1 milliseconds
0.00% <= 0.3 milliseconds
0.00% <= 0.4 milliseconds
53.94% <= 0.5 milliseconds
87.86% <= 0.6 milliseconds
95.57% <= 0.7 milliseconds
97.74% <= 0.8 milliseconds
98.51% <= 0.9 milliseconds
98.91% <= 1.0 milliseconds
99.12% <= 1.1 milliseconds
99.30% <= 1.2 milliseconds
99.43% <= 1.3 milliseconds
99.57% <= 1.4 milliseconds
99.68% <= 1.5 milliseconds
99.92% <= 1.6 milliseconds
100.00% <= 1.7 milliseconds
100.00% <= 1.7 milliseconds
99601.60 requests per second

原版moon_connection中, 直接读取IO来解析协议, 同样也有读取一条协议至少两次完成调用的问题, 这样做的好处是代码非常简洁, 并且较少缓冲区的分配:

#pragma once
#include "base_connection.hpp"
#include "common/byte_convert.hpp"

namespace moon
{
class moon_connection : public base_connection
{
public:
.....

void read_header()
{
//直接读取IO
header_ = 0;
asio::async_read(socket_, asio::buffer(&header_, sizeof(header_)),
[this, self = shared_from_this()](const asio::error_code& e, std::size_t)
{
if (e)
{
error(e);
return;
}

net2host(header_);

bool fin = (header_ != MESSAGE_CONTINUED_FLAG);
if (!fin && !enum_has_any_bitmask(flag_, enable_chunked::receive)) {
error(make_error_code(moon::error::read_message_too_big));
return;
}

read_body(header_, fin);
});
}

void read_body(message_size_t size, bool fin)
{
if (nullptr == buf_)
{
buf_ = buffer::make_unique((fin ? size : static_cast<size_t>(5) * size) + BUFFER_OPTION_CHEAP_PREPEND);
buf_->commit(BUFFER_OPTION_CHEAP_PREPEND);
}

auto space = buf_->prepare(size);

//直接读取IO
asio::async_read(socket_, asio::buffer(space.first, space.second),
[this, self = shared_from_this(), fin](const asio::error_code& e, std::size_t bytes_transferred)
{
if (e)
{
error(e);
return;
}

buf_->commit(static_cast<int>(bytes_transferred));
if (fin)
{
buf_->seek(BUFFER_OPTION_CHEAP_PREPEND);
auto m = message{ std::move(buf_) };
m.set_receiver(static_cast<uint8_t>(socket_data_type::socket_recv));
handle_message(std::move(m));
}

read_header();
});
}

protected:
enable_chunked flag_;
message_size_t header_;
buffer_ptr_t buf_;
};
}

性能测试

./moon example/tcp_benchmark.lua

0.00% <= 2 milliseconds
0.00% <= 3 milliseconds
0.01% <= 4 milliseconds
0.01% <= 5 milliseconds
0.01% <= 6 milliseconds
0.02% <= 7 milliseconds
0.16% <= 8 milliseconds
38.44% <= 9 milliseconds
93.09% <= 10 milliseconds
97.96% <= 11 milliseconds
98.96% <= 12 milliseconds
99.48% <= 13 milliseconds
99.70% <= 14 milliseconds
99.78% <= 15 milliseconds
99.83% <= 16 milliseconds
99.88% <= 17 milliseconds
99.95% <= 18 milliseconds
99.97% <= 19 milliseconds
100.00% <= 20 milliseconds
100.00% <= 21 milliseconds
102785.49 requests per second

解决方案是增加读缓冲区, 但moon_connection常用于面向客户端的长连接服务, 默认支持最大64KB的数据,直接设置一个64KB的缓冲区可能会浪费内存,游戏业务上行数据包几十到几百KB的占大部分, 所以我打算设置一个512B的常驻缓冲区和一个动态的最大64KB的缓冲区,这样既可以减少完成调用也避免开辟过大的缓冲区:

#pragma once
#include "base_connection.hpp"
#include "common/byte_convert.hpp"
#include "streambuf.hpp"

namespace moon
{
class moon_connection : public base_connection
{
public:
...
void read_header()
{
if (cache_.size() >= sizeof(message_size_t))
{
hanlde_header();
return;
}

asio::async_read(socket_, moon::streambuf(&cache_, cache_.capacity()), asio::transfer_at_least(sizeof(message_size_t)),
[this, self = shared_from_this()](const asio::error_code& e, std::size_t)
{
if (!e)
{
hanlde_header();
return;
}
error(e);
});
}

void hanlde_header()
{
message_size_t header = 0;
cache_.read(&header, 1);
net2host(header);

bool fin = (header != MESSAGE_CONTINUED_FLAG);
if (!fin && !enum_has_any_bitmask(flag_, enable_chunked::receive)) {
error(make_error_code(moon::error::read_message_too_big));
return;
}

read_body(header, fin);
}

void read_body(message_size_t size, bool fin)
{
if (nullptr == data_)
{
data_ = buffer::make_unique((fin ? size : static_cast<size_t>(5) * size) + BUFFER_OPTION_CHEAP_PREPEND);//头部预留空间,方便逻辑层转发消息, 如存储玩家64 bit UID
data_->commit(BUFFER_OPTION_CHEAP_PREPEND);
}

// Calculate the difference between the cache size and the expected size
ssize_t diff = static_cast<ssize_t>(cache_.size()) - static_cast<ssize_t>(size);
// Determine the amount of data to consume from the cache
// If the cache size is greater than or equal to the expected size, consume the expected size
// Otherwise, consume the entire cache
size_t consume_size = (diff >= 0 ? size : cache_.size());
data_->write_back(cache_.data(), consume_size);
cache_.consume(consume_size);

if (diff >=0)
{
handle_body(fin);
return;
}

cache_.clear();

//数据还不够, 开辟大的缓冲区读取
asio::async_read(socket_, moon::streambuf(data_.get()), asio::transfer_exactly(static_cast<size_t>(-diff)),
[this, self = shared_from_this(), size, fin](const asio::error_code& e, std::size_t)
{
if (!e)
{
handle_body(fin);
return;
}
error(e);
});
}

void handle_body(bool fin)
{
if (fin)
{
data_->seek(BUFFER_OPTION_CHEAP_PREPEND);
auto m = message{ std::move(data_) };
m.set_type(type_);
m.set_receiver(static_cast<uint8_t>(socket_data_type::socket_recv));
handle_message(std::move(m));
}
read_header();
}

protected:
enable_chunked flag_;
buffer cache_;//常驻读取缓冲区
buffer_ptr_t data_;//动态取缓冲区
};
}

增加读取缓冲区并且直接回调lua层后性能测试:

0.00% <= 0 milliseconds
0.02% <= 1 milliseconds
0.02% <= 2 milliseconds
0.02% <= 3 milliseconds
0.02% <= 6 milliseconds
4.07% <= 7 milliseconds
80.33% <= 8 milliseconds
297.40% <= 9 milliseconds
98.95% <= 10 milliseconds
99.50% <= 11 milliseconds
99.71% <= 12 milliseconds
99.76% <= 13 milliseconds
99.80% <= 14 milliseconds
99.85% <= 15 milliseconds
99.91% <= 16 milliseconds
99.97% <= 17 milliseconds
100.00% <= 18 milliseconds
121817.52 requests per second

结论是使用asio时尽量要使用应用层缓冲区并且减少Asio的完成调用, 这样能最大的提高asio的读取性能。 但rust-tokio的性能也是相当优秀, 使用者不用刻意优化,也能达到很高的性能。