A high performance fiber RPC network framework. 高性能协程RPC网络框架

Related tags

Database acid
Overview

ACID: 高性能协程RPC框架

学习本项目需要有一定的C++,网络,RPC知识

项目依赖

1.项目用到了大量C++17/20新特性,如constexpr if的编译期代码生成,基于c++20 coroutine的无栈协程状态机解析 URI 和 HTTP 协议等。注意,必须安装g++-11,否则不支持编译。

ubuntu可以采用以下方法升级g++,对于其他环境,可以下载源代码自行编译。

sudo apt install software-properties-common
sudo add-apt-repository ppa:ubuntu-toolchain-r/test
sudo apt install gcc-11 g++-11

2.配置模块采用了yaml-cpp,按照以下方法安装yaml-cpp

git clone https://github.com/jbeder/yaml-cpp.git
cd yaml-cpp
mkdir build
cd build
cmake ..
make #make时如果编译test错误可以忽略
sudo cp libyaml-cpp.a /usr/local/lib
cd ..
sudo cp -r include/yaml-cpp /usr/local/include
cd ..

3.构建项目 / Build

git clone https://github.com/zavier-wong/acid
cd acid
mkdir build
cd build
cmake ..
make
cd ../..

make完可在acid/bin执行example和test的例子。

RPC框架设计

本项目将从零开始搭建出一个基于协程的异步RPC框架。

通过本项目你将学习到:

  • 序列化协议
  • 通信协议
  • 服务注册
  • 服务发现
  • 负载均衡
  • 心跳机制
  • 三种异步调用方式

相信大家对RPC的相关概念都已经很熟悉了,这里不做过多介绍,直接进入重点。 本文档将简单介绍框架的设计,在最后的 examples 会给出完整的使用范例。 更多的细节需要仔细阅读源码。

本RPC框架主要有网络模块, 序列化模块,通信协议模块,客户端模块,服务端模块,服务注册中心模块,负载均衡模块

主要有以下三个角色:

role

注册中心 Registry

主要是用来完成服务注册和服务发现的工作。同时需要维护服务下线机制,管理了服务器的存活状态。

服务提供方 Service Provider

其需要对外提供服务接口,它需要在应用启动时连接注册中心,将服务名发往注册中心。服务端还需要启动Socket服务监听客户端请求。

服务消费方 Service Consumer

客户端需要有从注册中心获取服务的基本能力,它需要在发起调用时, 从注册中心拉取开放服务的服务器地址列表存入本地缓存, 然后选择一种负载均衡策略从本地缓存中筛选出一个目标地址发起调用, 并将这个连接存入连接池等待下一次调用。

序列化协议

本模块支持了基本类型以及标准库容器的序列化,包括:

  • 顺序容器:string, list, vector
  • 关联容器: set, multiset, map, multimap
  • 无序容器:unordered_set, unordered_multiset, unordered_map, unordered_multimap
  • 异构容器:tuple

以及通过以上任意组合嵌套类型的序列化

序列化有以下规则:

  1. 默认情况下序列化,8,16位类型以及浮点数不压缩,32,64位有符号/无符号数采用 zigzag 和 varints 编码压缩
  2. 针对 std::string 会将长度信息压缩序列化作为元数据,然后将原数据直接写入。char数组会先转换成 std::string 后按此规则序列化
  3. 调用 writeFint 将不会压缩数字,调用 writeRowData 不会加入长度信息

对于任意用户自定义类型,只要实现了以下的重载,即可参与传输时的序列化。

template<typename T>
Serializer &operator >> (Serializer& in, T& i){
   return *this;
}
template<typename T>
Serializer &operator << (Serializer& in, T i){
    return *this;
}

rpc调用过程:

  • 调用方发起过程调用时,自动将参数打包成tuple,然后序列化传输。
  • 被调用方收到调用请求时,先将参数包反序列回tuple,再解包转发给函数。

通信协议

封装通信协议,使RPC Server和RPC Client 可以基于同一协议通信。

采用私有通信协议,协议如下:

第一个字节是魔法数。

第二个字节代表协议版本号,以便对协议进行扩展,使用不同的协议解析器。

第三个字节是请求类型,如心跳包,请求,响应等。

第四个字节表示消息长度,占四个字节,表示后面还有多少个字节的数据。

除了协议头固定7字节,消息不定长。

目前提供了以下几种请求

enum class MsgType : uint8_t {
    HEARTBEAT_PACKET,       // 心跳包
    RPC_PROVIDER,           // 向服务中心声明为provider
    RPC_CONSUMER,           // 向服务中心声明为consumer
    
    RPC_REQUEST,            // 通用请求
    RPC_RESPONSE,           // 通用响应
    
    RPC_METHOD_REQUEST ,    // 请求方法调用
    RPC_METHOD_RESPONSE,    // 响应方法调用
    
    RPC_SERVICE_REGISTER,   // 向中心注册服务
    RPC_SERVICE_REGISTER_RESPONSE,
    
    RPC_SERVICE_DISCOVER,   // 向中心请求服务发现
    RPC_SERVICE_DISCOVER_RESPONSE
};

服务注册

每一个服务提供者对应的机器或者实例在启动运行的时候, 都去向注册中心注册自己提供的服务以及开放的端口。 注册中心维护一个服务名到服务地址的多重映射,一个服务下有多个服务地址, 同时需要维护连接状态,断开连接后移除服务。

/**
 * 维护服务名和服务地址列表的多重映射
 * serviceName -> serviceAddress1
 *             -> serviceAddress2
 *             ...
 */

服务发现

虽然服务调用是服务消费方直接发向服务提供方的,但是分布式的服务,都是集群部署的, 服务的提供者数量也是动态变化的, 所以服务的地址也就无法预先确定。 因此如何发现这些服务就需要一个统一注册中心来承载。

客户端从注册中心获取服务,它需要在发起调用时, 从注册中心拉取开放服务的服务器地址列表存入本地缓存,

RPC连接池采用LRU淘汰算法,关闭最旧未使用的连接。

负载均衡

实现通用类型负载均衡路由引擎(工厂)。 通过路由引擎获取指定枚举类型的负载均衡器, 降低了代码耦合,规范了各个负载均衡器的使用,减少出错的可能。

提供了三种路由策略(随机、轮询、哈希), 由客户端使用,在客户端实现负载均衡

/**
 * @brief: 路由均衡引擎
 */
template<class T>
class RouteEngine {
public:
    static typename RouteStrategy<T>::ptr queryStrategy(typename RouteStrategy<T>::Strategy routeStrategyEnum) {
        switch (routeStrategyEnum){
            case RouteStrategy<T>::Random:
                return s_randomRouteStrategy;
            case RouteStrategy<T>::Polling:
                return std::make_shared<impl::PollingRouteStrategyImpl<T>>();
            case RouteStrategy<T>::HashIP:
                return s_hashIPRouteStrategy ;
            default:
                return s_randomRouteStrategy ;
        }
    }
private:
    static typename RouteStrategy<T>::ptr s_randomRouteStrategy;
    static typename RouteStrategy<T>::ptr s_hashIPRouteStrategy;
};

选择客户端负载均衡策略,根据路由策略选择服务地址。默认随机策略。

acid::rpc::RouteStrategy<std::string>::ptr strategy =
        acid::rpc::RouteEngine<std::string>::queryStrategy(
                acid::rpc::RouteStrategy<std::string>::Random);

客户端同时会维护RPC连接池,以及服务发现结果缓存,减少频繁建立连接。

通过上述策略尽量消除或减少系统压力及系统中各节点负载不均衡的现象。

心跳机制

服务中心必须管理服务器的存活状态,也就是健康检查。 注册服务的这一组机器,当这个服务组的某台机器如果出现宕机或者服务死掉的时候, 就会剔除掉这台机器。这样就实现了自动监控和管理。

项目采用了心跳发送的方式来检查健康状态。

服务器端: 开启一个定时器,定期给注册中心发心跳包,注册中心会回一个心跳包。

注册中心: 开启一个定时器倒计时,每次收到一个消息就更新一次定时器。如果倒计时结束还没有收到任何消息,则判断服务掉线。

三种异步调用方式

整个框架最终都要落实在服务消费者。为了方便用户,满足用户的不同需求,项目设计了三种异步调用方式。 三种调用方式的模板参数都是返回值类型,对void类型会默认转换uint8_t 。

  1. 以同步的方式异步调用

整个框架本身基于协程池,所以在遇到阻塞时会自动调度实现以同步的方式异步调用

auto sync_call = con->call<int>("add", 123, 321);
ACID_LOG_INFO(g_logger) << sync_call.getVal();
  1. future 形式的异步调用

调用时会立即返回一个future

auto async_call_future = con->async_call<int>("add", 123, 321);
ACID_LOG_INFO(g_logger) << async_call_future.get().getVal();
  1. 异步回调

async_call的第一个参数为函数时,启用回调模式,回调参数必须是返回类型的包装。收到消息时执行回调。

con->async_call<int>([](acid::rpc::Result<int> res){
    ACID_LOG_INFO(g_logger) << res.getVal();
}, "add", 123, 321); 

对调用结果及状态的封装如下

/**
 * @brief RPC调用状态
 */
enum RpcState{
    RPC_SUCCESS = 0,    // 成功
    RPC_FAIL,           // 失败
    RPC_NO_METHOD,      // 没有找到调用函数
    RPC_CLOSED,         // RPC 连接被关闭
    RPC_TIMEOUT         // RPC 调用超时
};

/**
 * @brief 包装 RPC调用结果
 */
template<typename T = void>
class Result {
    ...
private:
    /// 调用状态
    code_type m_code = 0;
    /// 调用消息
    msg_type m_msg;
    /// 调用结果
    type m_val;
}

最后

通过以上介绍,我们粗略地了解了分布式服务的大概流程。但篇幅有限,无法面面俱到, 更多细节就需要去阅读代码来理解了。

这并不是终点,项目只是实现了简单的服务注册、发现。后续将考虑加入注册中心集群,限流,熔断,监控节点等。

示例

rpc服务注册中心

#include "acid/rpc/rpc_service_registry.h"

// 服务注册中心
void Main() {
    acid::Address::ptr address = acid::Address::LookupAny("127.0.0.1:8080");
    acid::rpc::RpcServiceRegistry::ptr server = std::make_shared<acid::rpc::RpcServiceRegistry>();
    // 服务注册中心绑定在8080端口
    while (!server->bind(address)){
        sleep(1);
    }
    server->start();
}

int main() {
    acid::IOManager::ptr loop = std::make_shared<acid::IOManager>();
    loop->submit(Main);
}

rpc 服务提供者

#include "acid/rpc/rpc_server.h"

int add(int a,int b){
    return a + b;
}

std::string getStr() {
    return  "hello world";
}

// 向服务中心注册服务,并处理客户端请求
void Main() {
    int port = 9000;
    acid::Address::ptr local = acid::IPv4Address::Create("127.0.0.1",port);
    acid::Address::ptr registry = acid::Address::LookupAny("127.0.0.1:8080");

    acid::rpc::RpcServer::ptr server = std::make_shared<acid::rpc::RpcServer>();

    // 注册服务,支持函数指针和函数对象
    server->registerMethod("add",add);
    server->registerMethod("getStr",getStr);
    server->registerMethod("echo", [](std::string str){
        return str;
    });

    // 先绑定本地地址
    while (!server->bind(local)){
        sleep(1);
    }
    // 绑定服务注册中心
    server->bindRegistry(registry);
    // 开始监听并处理服务请求
    server->start();
}

int main() {
    acid::IOManager::ptr loop = std::make_shared<acid::IOManager>();
    loop->submit(Main);
}

rpc 服务消费者,并不直接用RpcClient,而是采用更高级的封装,RpcConnectionPool。 提供了连接池和服务地址缓存。

#include "acid/log.h"
#include "acid/rpc/rpc_connection_pool.h"

static acid::Logger::ptr g_logger = ACID_LOG_ROOT();

// 连接服务中心,自动服务发现,执行负载均衡决策,同时会缓存发现的结果
void Main() {
    acid::Address::ptr registry = acid::Address::LookupAny("127.0.0.1:8080");

    // 设置连接池的数量
    acid::rpc::RpcConnectionPool::ptr con = std::make_shared<acid::rpc::RpcConnectionPool>(5);

    // 连接服务中心
    con->connect(registry);

    // 第一种调用接口,以同步的方式异步调用,原理是阻塞读时会在协程池里调度
    acid::rpc::Result<int> sync_call = con->call<int>("add", 123, 321);
    ACID_LOG_INFO(g_logger) << sync_call.getVal();

    // 第二种调用接口,future 形式的异步调用,调用时会立即返回一个future
    std::future<acid::rpc::Result<int>> async_call_future = con->async_call<int>("add", 123, 321);
    ACID_LOG_INFO(g_logger) << async_call_future.get().getVal();

    // 第三种调用接口,异步回调
    con->async_call<int>([](acid::rpc::Result<int> res){
        ACID_LOG_INFO(g_logger) << res.getVal();
    }, "add", 123, 321);

}

int main() {
    acid::IOManager::ptr loop = std::make_shared<acid::IOManager>();
    loop->submit(Main);
}

反馈和参与

致谢

感谢 sylar 项目, 跟随着sylar完成了网络部分,才下定决心写一个RPC框架。在RPC框架的设计与实现上也深度参考了sylar项目。

Issues
  • 关于c++20编译的问题

    关于c++20编译的问题

    我是ubuntu系统,更新了g++-11。 在cmake中设置 '-std=c++20' ,make时会报错 c++: error: unrecognized command line option ‘-std=c++20’; did you mean ‘-std=c++2a’? 如果改为 '-std=c++2a' 又会报 fatal error: concepts: No such file or directory

    希望更详细地说明一下项目中c++20如何通过编译,谢谢!

    opened by dracodearys 6
  • 运行测试文件 test_scheduler.cpp 时出现问题

    运行测试文件 test_scheduler.cpp 时出现问题

    运行环境

    Ubuntu 21.10

    IDE

    Clion-2021.1.3

    问题

    2022-04-08 23:46:57	[ DEBUG	]	[root]	20761	main	0		tests/test_scheduler.cpp:70	main
    2022-04-08 23:46:57	[ INFO	]	[system]	20761	main	0		source/scheduler.cpp:144	notify
    2022-04-08 23:46:57	[ DEBUG	]	[root]	20764	test_2	2		tests/test_scheduler.cpp:64	Im fiber 1
    2022-04-08 23:46:57	[ ERROR	]	[root]	20761	main	0		source/scheduler.cpp:21	ASSERTION: m_stop
    backtrace:
        /home/titto/CProjects/acid-main/bin/test/test_scheduler(_ZN4acid9SchedulerD1Ev+0x236) [0x55b92b1cd5f4]
        /home/titto/CProjects/acid-main/bin/test/test_scheduler(main+0x206) [0x55b92b1678ec]
        /lib/x86_64-linux-gnu/libc.so.6(+0x29fd0) [0x7f11eaa4cfd0]
        /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0x7d) [0x7f11eaa4d07d]
        /home/titto/CProjects/acid-main/bin/test/test_scheduler(_start+0x25) [0x55b92b1662c5]
    
    2022-04-08 23:46:57	[ INFO	]	[system]	20763	test_1	3		source/scheduler.cpp:149	idle
    test_scheduler: source/scheduler.cpp:21: virtual acid::Scheduler::~Scheduler(): Assertion `m_stop' failed.
    
    Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
    
    opened by hc-tec 1
HybridSE (Hybrid SQL Engine) is an LLVM-based, hybrid-execution and high-performance SQL engine

HybridSE (Hybrid SQL Engine) is an LLVM-based, hybrid-execution and high-performance SQL engine. It can provide fast and consistent execution on heterogeneous SQL data systems, e.g., OLAD database, HTAP system, SparkSQL, and Flink Stream SQL.

4Paradigm 45 Sep 12, 2021
YugabyteDB is a high-performance, cloud-native distributed SQL database that aims to support all PostgreSQL features

YugabyteDB is a high-performance, cloud-native distributed SQL database that aims to support all PostgreSQL features. It is best to fit for cloud-native OLTP (i.e. real-time, business-critical) applications that need absolute data correctness and require at least one of the following: scalability, high tolerance to failures, or globally-distributed deployments.

yugabyte 6.6k Jul 1, 2022
pgagroal is a high-performance protocol-native connection pool for PostgreSQL.

pgagroal is a high-performance protocol-native connection pool for PostgreSQL.

Agroal 524 Jun 15, 2022
PGSpider: High-Performance SQL Cluster Engine for distributed big data.

PGSpider: High-Performance SQL Cluster Engine for distributed big data.

PGSpider 127 Jun 24, 2022
High-performance time-series aggregation for PostgreSQL

PipelineDB has joined Confluent, read the blog post here. PipelineDB will not have new releases beyond 1.0.0, although critical bugs will still be fix

PipelineDB 2.5k Jun 24, 2022
A framework to monitor and improve the performance of PostgreSQL using Machine Learning methods.

pg_plan_inspector pg_plan_inspector is being developed as a framework to monitor and improve the performance of PostgreSQL using Machine Learning meth

suzuki hironobu 168 Jul 1, 2022
An SQLite binding for node.js with built-in encryption, focused on simplicity and (async) performance

Description An SQLite (more accurately SQLite3MultipleCiphers) binding for node.js focused on simplicity and (async) performance. When dealing with en

mscdex 14 May 15, 2022
Trilogy is a client library for MySQL-compatible database servers, designed for performance, flexibility, and ease of embedding.

Trilogy is a client library for MySQL-compatible database servers, designed for performance, flexibility, and ease of embedding.

GitHub 248 Jun 11, 2022
Nebula Graph is a distributed, fast open-source graph database featuring horizontal scalability and high availability

Nebula Graph is an open-source graph database capable of hosting super large scale graphs with dozens of billions of vertices (nodes) and trillions of edges, with milliseconds of latency.

vesoft inc. 807 Jun 30, 2022
DB Browser for SQLite (DB4S) is a high quality, visual, open source tool to create, design, and edit database files compatible with SQLite.

DB Browser for SQLite What it is DB Browser for SQLite (DB4S) is a high quality, visual, open source tool to create, design, and edit database files c

null 16.7k Jun 27, 2022
dqlite is a C library that implements an embeddable and replicated SQL database engine with high-availability and automatic failover

dqlite dqlite is a C library that implements an embeddable and replicated SQL database engine with high-availability and automatic failover. The acron

Canonical 3k Jun 27, 2022
Fiber - A header only cross platform wrapper of fiber API.

Fiber Header only cross platform wrapper of fiber API A fiber is a particularly lightweight thread of execution. Which is useful for implementing coro

Tony Wang 42 Jan 7, 2022
BingBing 53 Jun 10, 2022
Industrial-grade RPC framework used throughout Baidu, with 1,000,000+ instances and thousands kinds of services. "brpc" means "better RPC".

中文版 An industrial-grade RPC framework used throughout Baidu, with 1,000,000+ instances(not counting clients) and thousands kinds of services. "brpc" m

The Apache Software Foundation 13.3k Jul 1, 2022
c++11, high performance, cross platform, easy to use rpc framework.

modern C++(C++11), simple, easy to use rpc framework

qicosmos 1k Jun 29, 2022
Packio - An asynchronous msgpack-RPC and JSON-RPC library built on top of Boost.Asio.

Header-only | JSON-RPC | msgpack-RPC | asio | coroutines This library requires C++17 and is designed as an extension to boost.asio. It will let you bu

Quentin Chateau 46 Jun 15, 2022
RPC++ is a tool for Discord RPC (Rich Presence) to let your friends know about your Linux system

RPC++ RPC++ is a tool for Discord RPC (Rich Presence) to let your friends know about your Linux system Installing requirements Arch based systems pacm

grialion 3 Jun 13, 2022
A modern C++ network library for developing high performance network services in TCP/UDP/HTTP protocols.

evpp Introduction 中文说明 evpp is a modern C++ network library for developing high performance network services using TCP/UDP/HTTP protocols. evpp provid

Qihoo 360 3k Jun 27, 2022
KBMS is a C++11 high performance network framework based on libevent.

KBMS is a C++11 high performance network framework based on libevent. It is light and easy to deploy. At present, it mainly supports HTTP protocol.

Ke Technologies 28 Apr 12, 2022
High Performance Linux C++ Network Programming Framework based on IO Multiplexing and Thread Pool

Kingpin is a C++ network programming framework based on TCP/IP + epoll + pthread, aims to implement a library for the high concurrent servers and clie

null 14 Jun 19, 2022
ncnn is a high-performance neural network inference framework optimized for the mobile platform

ncnn ncnn is a high-performance neural network inference computing framework optimized for mobile platforms. ncnn is deeply considerate about deployme

Tencent 14.9k Jul 2, 2022
A hybrid thread / fiber task scheduler written in C++ 11

Marl Marl is a hybrid thread / fiber task scheduler written in C++ 11. About Marl is a C++ 11 library that provides a fluent interface for running tas

Google 1.4k Jun 24, 2022
Lucy job system - Fiber-based job system with extremely simple API

Lucy Job System This is outdated compared to Lumix Engine. Use that instead. Fiber-based job system with extremely simple API. It's a standalone versi

Mikulas Florek 78 Mar 11, 2022
Termite-jobs - Fast, multiplatform fiber based job dispatcher based on Naughty Dogs' GDC2015 talk.

NOTE This library is obsolete and may contain bugs. For maintained version checkout sx library. until I rip it from there and make a proper single-hea

Sepehr Taghdisian 35 Jan 9, 2022
Coroutine - C++11 single .h asymmetric coroutine implementation via ucontext / fiber

C++11 single .h asymmetric coroutine implementation API in namespace coroutine: routine_t create(std::function<void()> f); void destroy(routine_t id);

null 378 Jun 24, 2022
An eventing framework for building high performance and high scalability systems in C.

NOTE: THIS PROJECT HAS BEEN DEPRECATED AND IS NO LONGER ACTIVELY MAINTAINED As of 2019-03-08, this project will no longer be maintained and will be ar

Facebook Archive 1.7k Jun 15, 2022
An eventing framework for building high performance and high scalability systems in C.

NOTE: THIS PROJECT HAS BEEN DEPRECATED AND IS NO LONGER ACTIVELY MAINTAINED As of 2019-03-08, this project will no longer be maintained and will be ar

Meta Archive 1.7k Jun 15, 2022
RPC Network Library for Multiplayer Games

Game NET - A Simple RPC Network Library (C++11) Write a simple server / client RPC system in just 10+20 lines of code. Write a multiplayer game with c

Spacerat 73 Jun 3, 2022
Brynet - Header Only Cross platform high performance TCP network library using C++ 11.

Brynet Header Only Cross platform high performance TCP network library using C++ 11. Build status Windows : Linux/MacOS : Features Header only Cross p

IronsDu 848 Jun 27, 2022