一个基于C++11的轻量级网络框架,基于线程池技术可以实现大并发网络IO

Overview

一个基于C++11简单易用的轻量级网络编程框架

Build Status

项目特点

  • 基于C++11开发,避免使用裸指针,代码稳定可靠;同时跨平台移植简单方便,代码清晰简洁。
  • 使用epoll+线程池+异步网络IO模式开发,并发性能优越。
  • 代码经过大量的稳定性、性能测试,可满足商用服务器项目。
  • 支持linux、macos、ios、android、windows平台
  • 了解更多:ZLMediaKit

特性

  • 网络库
    • tcp/udp客户端,接口简单易用并且是线程安全的,用户不必关心具体的socket api操作。
    • tcp服务器,使用非常简单,只要实现具体的tcp会话(TcpSession类)逻辑,使用模板的方式可以快速的构建高性能的服务器。
    • 对套接字多种操作的封装。
  • 线程库
    • 使用线程实现的简单易用的定时器。
    • 信号量。
    • 线程组。
    • 简单易用的线程池,可以异步或同步执行任务,支持functional 和 lambad表达式。
  • 工具库
    • 文件操作。
    • std::cout风格的日志库,支持颜色高亮、代码定位、异步打印。
    • INI配置文件的读写。
    • 监听者模式的消息广播器。
    • 基于智能指针的循环池,不需要显式手动释放。
    • 环形缓冲,支持主动读取和读取事件两种模式。
    • mysql链接池,使用占位符(?)方式生成sql语句,支持同步异步操作。
    • 简单易用的ssl加解密黑盒,支持多线程。
    • 其他一些有用的工具。
    • 命令行解析工具,可以很便捷的实现可配置应用程序

编译(Linux)

  • 我的编译环境

    • Ubuntu16.04 64 bit + gcc5.4(最低gcc4.7)
    • cmake 3.5.1
  • 编译

    cd ZLToolKit
    ./build_for_linux.sh
    

编译(macOS)

  • 我的编译环境

    • macOS Sierra(10.12.1) + xcode8.3.1
    • Homebrew 1.1.3
    • cmake 3.8.0
  • 编译

    cd ZLToolKit
    ./build_for_mac.sh
    

编译(iOS)

  • 编译环境:请参考macOS的编译指导。

  • 编译

    cd ZLToolKit
    ./build_for_ios.sh
    
  • 你也可以生成Xcode工程再编译:

    cd ZLToolKit
    mkdir -p build
    cd build
    # 生成Xcode工程,工程文件在build目录下
    cmake .. -DCMAKE_TOOLCHAIN_FILE=../cmake/iOS.cmake -DIOS_PLATFORM=SIMULATOR64 -G "Xcode"
    

编译(Android)

  • 我的编译环境

    • macOS Sierra(10.12.1) + xcode8.3.1
    • Homebrew 1.1.3
    • cmake 3.8.0
    • android-ndk-r14b
  • 编译

    cd ZLToolKit
    export ANDROID_NDK_ROOT=/path/to/ndk
    ./build_for_android.sh
    

编译(Windows)

   1 使用cmake-gui打开工程并生成vs工程文件.
   2 找到工程文件(ZLToolKit.sln),双击用vs2017打开.
   3 选择编译Release 版本.
   4 依次编译 ZLToolKit_static、ZLToolKit_shared、ALL_BUILD、INSTALL.
   5 找到目标文件并运行测试用例.
   6 找到安装的头文件及库文件(在源码所在分区根目录).

授权协议

本项目自有代码使用宽松的MIT协议,在保留版权信息的情况下可以自由应用于各自商用、非商业的项目。 但是本项目也零碎的使用了一些其他的开源代码,在商用的情况下请自行替代或剔除; 由于使用本项目而产生的商业纠纷或侵权行为一概与本项项目及开发者无关,请自行承担法律风险。

QA

  • 该库性能怎么样?

基于ZLToolKit,我实现了一个流媒体服务器ZLMediaKit;作者已经对其进行了性能测试,可以查看benchmark.md了解详情。

  • 该库稳定性怎么样?

该库经过作者严格的valgrind测试,长时间大负荷的测试;作者也使用该库进行了多个线上项目的开发。实践证明该库稳定性很好;可以无看门狗脚本的方式连续运行几个月。

  • 在windows下编译很多错误?

由于本项目主体代码在macOS/linux下开发,部分源码采用的是无bom头的UTF-8编码;由于windows对于utf-8支持不甚友好,所以如果发现编译错误请先尝试添加bom头再编译。

联系方式

  • 邮箱:[email protected](本项目相关或网络编程相关问题请走issue流程,否则恕不邮件答复)
  • QQ群:542509000
Comments
  • 这年头怎么是个人就敢写自旋锁了?

    这年头怎么是个人就敢写自旋锁了?

    https://github.com/xiongziliang/ZLToolKit/blob/7353a131b46ab6af46d9de23078076d1932f33fb/src/Thread/spin_mutex.h#L44

    且不说用户态根本无法屏蔽 context switch, 就你这渣渣实现,和 其它实现 比较过吗? 有做过 性能测试 吗?

    opened by lhmouse 14
  • 关于ThreadLoadCounter统计问题

    关于ThreadLoadCounter统计问题

    ThreadPool(int num = 1, Priority priority = PRIORITY_HIGHEST, bool autoRun = true) ;

    ThreadPool允许创建多条线程. 在ThreadPool::run()中调用startSleep()与sleepWakeUp()进行load统计,当线程数大于1时,load统计是否不准确?

    opened by lam2003 11
  • fixed win32 std namespace bug

    fixed win32 std namespace bug

    用vs2017编译最新的toolkit报这错: 1>e:\zlmediakit\3rdpart\zltoolkit\src\util\List.h(188): error C2039: “list”: 不是“toolkit::std”的成员 1>C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\VC\Tools\MSVC\14.16.27023\include\cctype(33): note: 参见“toolkit::std”的声明 1>E:\ZLMediaKit\3rdpart\ZLToolKit\src\Util\logger.cpp(214): note: 参见对正在编译的函数 模板 实例化“toolkit::List<std::pair<toolkit::LogContextPtr,toolkit::Logger *>>::List<>(void)”的引用 1>E:\ZLMediaKit\3rdpart\ZLToolKit\src\Util\logger.cpp(214): note: 参见对正在编译的函数 模板 实例化“toolkit::List<std::pair<toolkit::LogContextPtr,toolkit::Logger *>>::List<>(void)”的引用 1>e:\zlmediakit\3rdpart\zltoolkit\src\util\List.h(188): error C2059: 语法错误:“<” 1>e:\zlmediakit\3rdpart\zltoolkit\src\util\NoticeCenter.h(102): error C2039: “forward”: 不是“toolkit::std”的成员

    经检查代码应该是这样导致的,做个小修复,应尽力避免在名字空间里面包含头文件 namespace toolkit{ #include std_namespace header }

    opened by mtdxc 9
  • 持续读会撑爆内存

    持续读会撑爆内存

    当一个连接来的速度很快,多到超过实际处理速度。比如一个server接受上传文件。客户端以1G的速度上传文件,但是该server只能以每秒100M的速度写文件。这种情况下,server会持续的通过epoll读,最终撑爆内存。 当server发现自己缓存的内存已经超过一定阈值,它不能持续read直到EAGAIN。如果是EDGE触发,它得在自己消费完了缓存的内存之后再read,如果是LEVEL触发,它得调用epoll_ctl删除这个fd,并且在自己消费完了缓存的内存之后再加入这个fd。 当server不再read时,tcp协议栈会降低tcp wnd,从而抑制发送方的发送速度。

    你这个库里似乎并没有这个逻辑。

    opened by huyuguang 8
  • 请教TCP拆包和粘包的处理解决办法

    请教TCP拆包和粘包的处理解决办法

    我仔细看了您流媒体服务器中HTTP中关于TCP合包的代码,但是无论才疏学浅,没有很明白,希望作者可以大概讲解下TCP处理拆包粘包的思路,现在基于ZLToolKit要实现一个大文件收发,出现拆包粘包问题. 另外我现在需要实现http上传文件,您的http里面有实现好的吗?或者告诉我一个解决思路,非常感谢

    opened by lixinhan2019 7
  • 关于用EventPollerPool::Instance().getPoller()->doDelayTask() EventPollerPool::Instance().getPoller()->async()做心跳线程的问题

    关于用EventPollerPool::Instance().getPoller()->doDelayTask() EventPollerPool::Instance().getPoller()->async()做心跳线程的问题

    现在要实现一个http短链接心跳接口,

    方法1:失败

    url.append("/acd/api/v1/0/agent/heartbeat");
    weak_ptr<WebClient> weakSelf = dynamic_pointer_cast<WebClient>(_client);
    EventPollerPool::Instance().getPoller()->async([weakSelf, url]() {
        while (true) {
            sleep(10);
            auto strongSelf=weakSelf.lock();
            if(strongSelf){
                strongSelf->heartbeat(str);
            }
        }
    }, true);
    

    方法2:失败

    EventPollerPool::Instance().getPoller()->doDelayTask(10 * 1000, [weakSelf, url]() {
                    auto strongSelf=weakSelf.lock();
                    if(strongSelf){
                        strongSelf->heartbeat(str);
                    }
                    sleep(10);
                    return true;
                });
    

    这是heartbeat()的实现方法

    void WebClient::heartbeat(const string &url){
        //TraceL << url;
        _heartbeatPost.reset(new HttpRequester);
        _heartbeatPost->setMethod("POST");
        //设置http请求头
        _heartbeatPost->addHeader("Content-Type", "application/json");
        //设置POST参数列表
        Value val;
        val["groupId"] = _groupId;
        val["agentId"] = _agentId;
        _heartbeatPost->setBody(val.toStyledString());
        //InfoL << val.toStyledString();
        //开启请求
        _heartbeatPost->startRequester(url,//url地址
                                       [](const SockException &ex,                          //网络相关的失败信息,如果为空就代表成功
                                          const string &status,                             //http回复的状态码,比如说200/404
                                          const HttpClient::HttpHeader &header,             //http回复头
                                          const string &strRecvBody) {                       //http回复body
                                           //DebugL << "=====================HttpRequester POST==========================";
                                           if (ex) {
                                               //网络相关的错误
                                               WarnL << "heartbeat network err:" << ex.getErrCode() << " " << ex.what();
                                           } else {
                                               TraceL << "heartbeat body:" << strRecvBody;
                                           }
                                       });
    }
    

    _heartbeatPost申请的是全局变量,但是http包都发不出去. 现在用的方法是申请一个thread,放进去实现,但是这样写太难看了,请问有什么好方法用EventPollerPool::Instance().getPoller()发http的心跳请求,

    opened by lixinhan2019 5
  • Windows下关于 g_defaultLogger 的编译问题

    Windows下关于 g_defaultLogger 的编译问题

    VS2017,到编译 ALL_BUILD 时,出现如下问题:

    test_logger.obj : error LNK2019: 无法解析的外部符号 "class toolkit::Logger * toolkit::g_defaultLogger" (?g_defaultLogger@toolkit@@3PAVLogger@1@A),该符号在函数 _main 中被引用

    test_delayTask.obj : error LNK2019: 无法解析的外部符号 "class toolkit::Logger * toolkit::g_defaultLogger" (?g_defaultLogger@toolkit@@3PAVLogger@1@A),该符号在函数 "private: virtual void __thiscall std::_Func_impl_no_alloc<class <lambda_1f860e808c4732d9497d62aaa8d648d8>,void>::_Do_call(void)" (?_Do_call@?$_Func_impl_no_alloc@V<lambda_1f860e808c4732d9497d62aaa8d648d8>@@X$$V@std@@EAEXXZ) 中被引用

    ...

    请问是test项目的引用有问题吗

    opened by yantanglife 5
  • 如何使用ZLToolKit构建高性能的媒体转发服务器

    如何使用ZLToolKit构建高性能的媒体转发服务器

    背景需求: 应用场景类似互动聊天直播室,可能有1-8个同时说话(视频)的嘉宾,还有n个观众。服务器将嘉宾上传的udp媒体数据包转发给房间其余人。 还有一点简单信令,比如进房间,离开房间,上嘉宾位,离开嘉宾位(变成观众)等,这部分打算用TCP做。 房间之间是相互独立的,一个服务器需要同时支持很多个房间,所以对UDP的性能要求会高,tcp只是信令以及长连接维持。UDP媒体数据包头会包含房间号和用户uid。 问题如下: 1、UDP的使用参考test_udpSock.cpp即可吗,对于我们这种场景需求,zltoolkit的使用上有没什么其他的注意,比如避免内存拷贝等事项? 2、将一个包转发给房间其他人,是自己维护一个房间所有人的udp地址列表,然后遍历发送吗,zltoolkit有没什么其他更高端高效率的办法? 3、关于房间数据结构的查找问题,目前是打算用一个udp端口进行监听,udp数据包头带上房间号,解析包头得到房间号,然后通过map查找到这个房间的数据结构;另外一种方式是,一个房间使用一个udp端口进行监听,这样这个udp端口收到的数据天然就是这个房间的,避免了我们使用map查找房间。请问这两种方式,哪种房间数量多了性能会好些呢?

    opened by airx 5
  • 请问下AsyncLogWriter出现corrupted size vs. prev_size异常导致signal SIGABRT,要怎么解决。

    请问下AsyncLogWriter出现corrupted size vs. prev_size异常导致signal SIGABRT,要怎么解决。

    @xia-chu 老大,我使用ZLToolKit打印日志 以下初始化代码

            auto channel = std::make_shared<ConsoleChannel>("ConsoleChn", (LogLevel)log_level);
            Logger::Instance().add(channel);
            Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
    

    程序启动后出现以下异常:

    corrupted size vs. prev_size
    

    堆栈截图

    image

    opened by jiangjxuan 4
  • Patch 12

    Patch 12

    简单封装了redis的自有实现的无锁的localtime

    localtime_r是线程安全的,但是,对如下两种情况并不安全,甚至会引发死锁。

    (1)信号处理函数调用localtime: 假如进程调用localtime,已经获取全局锁,且并没有释放。此时,如果进程接收到信号,在信号处理函数也调用了localtime,就会造成死锁

    (2)多线程下fork: 在多线程下,若线程A调用localtime,已经获取全局锁,尚未释放锁。此时,假如线程B调用了fork,并且在子进程也调用localtime,也会造成死锁,导致子进程一直被hang住

    opened by alexliyu7352 4
  • makeRandStr获取的随机值问题

    makeRandStr获取的随机值问题

    平台:Linux, ARMv7 问题: 连续执行makeRandStr获取的随机字符串不随机。

    能否修改为以下实现,谢谢!

    string makeRandStr(int sz, bool printable) {
        char *tmp = new char[sz + 1];
        static const char CCH[] =
            "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
        int i;
        std::mt19937 rng(std::random_device{}());
        for (i = 0; i < sz; i++) {
            if (printable) {
                uint32_t x = rng() % (sizeof(CCH) - 1);
                tmp[i] = CCH[x];
            }
            else {
                tmp[i] = rng() % 0xFF;
            }
        }
        tmp[i] = 0;
        string ret = tmp;
        delete[] tmp;
        return ret;
    }
    
    opened by bzd2132 4
  • 在Socket::onAccept中 peer_sock与peer_sock_fd的生命周期太长 可能会有问题

    在Socket::onAccept中 peer_sock与peer_sock_fd的生命周期太长 可能会有问题

    https://github.com/ZLMediaKit/ZLToolKit/blob/350ba7fccea277977496424996979aa18a1eeb27/src/Network/Socket.cpp#L589 在Socket::onAccept中 peer_sock与peer_sock_fd的生命周期太长 可能会有问题 eg: 有poller1 poller2两个线程同时监听某端口

    1. poller1 accept一个新链接 并将其绑定到poller2线程(_on_before_accept)
    2. 将新链接加入到poller2上进行监听(attachEvent); 且poller2立即触发epoll_wait并回调上层 上层业务清除该链接(_session_map.erase)
    3. 由于系统原因poller1此时被调度执行 然后poller1 析构peer_sock与peer_sock_fd 并调用close

    peer_sock与peer_sock_fd应该在poller2中析构并close 但是步骤3有很小的几率发生 导致跨线程关闭

    opened by hankai17 10
  • 您好,请教一个问题createsocket 创建后,关闭程序时需要释放么

    您好,请教一个问题createsocket 创建后,关闭程序时需要释放么

    QWeakPointer<MainWindow> weakSelf = QWeakPointer<MainWindow>(sharedFromThis());
    sockRecv  = Socket::createSocket();//创建一个UDP数据接收端口
    sockSend  = Socket::createSocket();//创建一个UDP数据发送端口
    
    sockRecv->bindUdpSock(9001);//接收UDP绑定9001端口
    sockSend->bindUdpSock(0, "0.0.0.0");//发送UDP随机端口
    
    sockRecv->setOnRead([weakSelf](const Buffer::Ptr &buf, struct sockaddr *addr , int){
            //接收到数据回调
           // DebugL << "recv data form " << getIP(addr) << ":" << buf->data();
        auto strongSelf = weakSelf.lock();
        if(!strongSelf) {
            return ;
        }
        strongSelf->handleOneRtp((uint8_t *) buf->data(), buf->size());
    });
    

    我创建了以后,关闭程序的时候会报如下错误, the inferior stopped because it received a signal from operating system

    Signal name :SIGSEGV

    opened by downloadproject 8
  • 建议套接字创建和连接分两步进行

    建议套接字创建和连接分两步进行

    目前SockUtil::connect函数内完成套接字创建和端口绑定,然后立即连接到远程目的ip和端口,但有时候这两件事需要分开进行,比如在gb28181播放协商的时候,下级在收到上级invite请求,返回200ok的sdp报文中附带发流源ip和端口,有些上级加了安全边界设备,需要核对下级的源ip端口才允许下级连接并发送码流数据,这种情况就需要做三个步骤:1、创建并绑定本地端口;2、向上级发送发流ip和端口并等待上级ack;3、连接远程目的ip和端口。具体实现代码:

    bool Socket::createAndBindSock(bool udpOrTcp, uint16_t port, const string &local_ip) {
        closeSock();
        int fd = SockUtil::createAndBindSock(udpOrTcp, port, local_ip.data());
        if (fd == -1)
            return false;
    
        SockFD::Ptr sock;
        if (udpOrTcp) {
            sock = makeSock(fd, SockNum::Sock_UDP);
            if (!attachEvent(sock, true))
                return false;
        }
        else
        {
            // 注意tcp在连接目标成功后才会监听事件
            sock = makeSock(fd, SockNum::Sock_TCP);
        }
        LOCK_GUARD(_mtx_sock_fd);
        _sock_fd = sock;
        return true;
    }
    void Socket::connect(const string &url, uint16_t port, onErrCB con_cb_in,
        float timeout_sec, const string &local_ip, uint16_t local_port, bool newSock) {
        if (newSock) {
            // 重置当前socket
            closeSock();
        }
        weak_ptr<Socket> weak_self = shared_from_this();
        auto con_cb = [con_cb_in, weak_self](const SockException &err) {
            auto strong_self = weak_self.lock();
            if (!strong_self) {
                return;
            }
            strong_self->_async_con_cb = nullptr;
            strong_self->_con_timer = nullptr;
            if (err) {
                LOCK_GUARD(strong_self->_mtx_sock_fd);
                strong_self->_sock_fd = nullptr;
            }
            con_cb_in(err);
        };
    
        auto async_con_cb = std::make_shared<function<void(int)> >([weak_self, con_cb, newSock](int sock) {
            auto strong_self = weak_self.lock();
            if (sock == -1 || !strong_self) {
                if (!strong_self) {
                    CLOSE_SOCK(sock);
                } else {
                    con_cb(SockException(Err_dns, get_uv_errmsg(true)));
                }
                return;
            }
            bool test = false;
            SockFD::Ptr sock_fd = strong_self->getSock();
            if (newSock || !sock_fd) {
                test = true;
                sock_fd = strong_self->makeSock(sock, SockNum::Sock_TCP);
            }
            weak_ptr<SockFD> weak_sock_fd = sock_fd;
            // 监听该socket是否可写,可写表明已经连接服务器成功
            int result = strong_self->_poller->addEvent(sock, Event_Write, [weak_self, weak_sock_fd, con_cb](int event) {
                auto strong_sock_fd = weak_sock_fd.lock();
                auto strong_self = weak_self.lock();
                if (strong_sock_fd && strong_self) {
                    //socket可写事件,说明已经连接服务器成功
                    strong_self->onConnected(strong_sock_fd, con_cb);
                }
            });
    
            if (result == -1) {
                con_cb(SockException(Err_other, "add event to poller failed when start connect"));
                return;
            }
            if (test) {
                // 保存fd
                strong_self->setSock(sock_fd);
            }
        });
    
        auto poller = _poller;
        weak_ptr<function<void(int)> > weak_task = async_con_cb;
        int sockfd_in = -1;
        if (!newSock) {
            SockFD::Ptr sock_fd = getSock();
            if (sock_fd)
                sockfd_in = sock_fd->rawFd();
        }
        WorkThreadPool::Instance().getExecutor()->async([url, port, local_ip, local_port, sockfd_in, weak_task, poller]() {
            // 阻塞式dns解析放在后台线程执行
            int sock = SockUtil::connect(url.data(), port, true, local_ip.data(), local_port, sockfd_in);
            poller->async([sock, weak_task]() {
                auto strong_task = weak_task.lock();
                if (strong_task) {
                    (*strong_task)(sock);
                } else {
                    CLOSE_SOCK(sock);
                }
            });
        });
    
        //连接超时定时器
        _con_timer = std::make_shared<Timer>(timeout_sec, [weak_self, con_cb]() {
            con_cb(SockException(Err_timeout, uv_strerror(UV_ETIMEDOUT)));
            return false;
        }, _poller);
    
        _async_con_cb = async_con_cb;
    }
    int SockUtil::createAndBindSock(bool udpOrTcp, const uint16_t port, const char* localIp, int af, bool bAsync, bool reusePort) {
        int sockfd = -1;
        if (udpOrTcp)
            sockfd = (int)socket(af, SOCK_DGRAM, IPPROTO_UDP);
        else
            sockfd = (int)socket(af, SOCK_STREAM, IPPROTO_TCP);
        if (sockfd == -1) {
            WarnL << "创建套接字失败:" << get_uv_errmsg(true);
            return -1;
        }
        setReuseable(sockfd, reusePort);
        setNoSigpipe(sockfd);
        setNoBlocked(sockfd, bAsync);
        if (!udpOrTcp)
            setNoDelay(sockfd);
        setSendBuf(sockfd);
        setRecvBuf(sockfd);
        setCloseWait(sockfd);
        setCloExec(sockfd);
        if (bindSock(sockfd, localIp, port) == -1) {
            close(sockfd);
            return -1;
        }
        return sockfd;
    }
    int SockUtil::connect(const char *host, uint16_t port, bool bAsync, const char* localIp, uint16_t localPort, int sockfd_in) {
        sockaddr addr;
        if(!DnsCache::Instance().getDomainIP(host,addr)){
            //dns解析失败
            return -1;
        }
        //设置端口号
        ((sockaddr_in *)&addr)->sin_port = htons(port);
    
        int sockfd = sockfd_in;
        if (sockfd < 0)
            sockfd = createAndBindSock(false, localPort, localIp, addr.sa_family, bAsync);
        if (sockfd < 0) {
            WarnL << "创建套接字失败:" << host;
            return -1;
        }
        if (::connect(sockfd, &addr, sizeof(struct sockaddr)) == 0) {
            //同步连接成功
            return sockfd;
        }
        if (bAsync &&  get_uv_error(true) == UV_EAGAIN) {
            //异步连接成功
            return sockfd;
        }
        WarnL << "连接主机失败:" << host << " " << port << " " << get_uv_errmsg(true);
        close(sockfd);
        return -1;
    }
    
    opened by wujianGit123 1
Owner
zlmediakit开源
zlmediakit开源