C++ Parallel Computing and Asynchronous Networking Engine

Overview

中文版入口

Sogou C++ Workflow

License Language Platform Build Status

As Sogou`s C++ server engine, Sogou C++ Workflow supports almost all back-end C++ online services of Sogou, including all search services, cloud input method,online advertisements, etc., handling more than 10 billion requests every day. This is an enterprise-level programming engine in light and elegant design which can satisfy most C++ back-end development requirements.

You can use it:

  • To quickly build an HTTP server:
#include <stdio.h>
#include "workflow/WFHttpServer.h"

int main()
{
    WFHttpServer server([](WFHttpTask *task) {
        task->get_resp()->append_output_body("Hello World!
");
    });

    if (server.start(8888) == 0) { // start server on port 8888
        getchar(); // press "Enter" to end.
        server.stop();
    }

    return 0;
}
  • As a multifunctional asynchronous client, it currently supports HTTP, Redis, MySQL and Kafka protocols.
  • To implement client/server on user-defined protocol and build your own RPC system.
    • srpc is based on it and it is an independent open source project, which supports srpc, brpc and thrift protocols.
  • To build asynchronous workflow; support common series and parallel structures, and also support any DAG structures.
  • As a parallel computing tool. In addition to networking tasks, Sogou C++ Workflow also includes the scheduling of computing tasks. All types of tasks can be put into the same flow.
  • As a asynchronous file IO tool in Linux system, with high performance exceeding any system call. Disk file IO is also a task.
  • To realize any high-performance and high-concurrency back-end service with a very complex relationship between computing and networking.
  • To build a micro service system.
    • This project has built-in service governance and load balancing features.

Compiling and running environment

  • This project supports Linux, macOS, Windows, Android and other operating systems.
    • Windows version is currently released as an independent branch, using iocp to implement asynchronous networking. All user interfaces are consistent with the Linux version.
  • Supports all CPU platforms, including 32 or 64-bit x86 processors, big-endian or little-endian arm processors.
  • Relies on OpenSSL; OpenSSL 1.1 and above is recommended. If you don't like SSL, you may checkout the nossl branch. But still need to link crypto for md5 and sha1.
  • Uses the C++11 standard and therefore, it should be compiled with a compiler which supports C++11. Does not rely on boost or asio.
  • No other dependencies. However, if you need Kafka protocol, some compression libraries should be installed, including lz4, zstd and snappy.

Try it!

System design features

We believe that a typical back-end program=protocol+algorithm+workflow and should be developed completely independently.

  • Protocol
    • In most cases, users use built-in common network protocols, such as HTTP, Redis or various rpc.
    • Users can also easily customize user-defined network protocol. In the customization, they only need to provide serialization and deserialization functions to define their own client/server.
  • Algorithm
    • In our design, the algorithm is a concept symmetrical to the protocol.
      • If protocol call is rpc, then algorithm call is an apc (Async Procedure Call).
    • We have provided some general algorithms, such as sort, merge, psort, reduce, which can be used directly.
    • Compared with a user-defined protocol, a user-defined algorithm is much more common. Any complicated computation with clear boundaries should be packaged into an algorithm.
  • Workflow
    • Workflow is the actual bussiness logic, which is to put the protocols and algorithms into the flow graph for use.
    • The typical workflow is a closed series-parallel graph. Complex business logic may be a non-closed DAG.
    • The workflow graph can be constructed directly or dynamically generated based on the results of each step. All tasks are executed asynchronously.

Basic task, task factory and complex task

  • Our system contains six basic tasks: networking, file IO, CPU, GPU, timer, and counter.
  • All tasks are generated by the task factory and automatically recycled after callback.
    • Server task is one kind of special networking task, generated by the framework which calls the task factory, and handed over to the user through the process function.
  • In most cases, the task generated by the user through the task factory is a complex task, which is transparent to the user.
    • For example, an HTTP request may include many asynchronous processes (DNS, redirection), but for user, it is just a networking task.
    • File sorting seems to be an algorithm, but it actually includes many complex interaction processes between file IO and CPU computation.
    • If you think of business logic as building circuits with well-designed electronic components, then each electronic component may be a complex circuit.

Asynchrony and encapsulation based on C++11 std::function

  • Not based on user mode coroutines. Users need to know that they are writing asynchronous programs.
  • All calls are executed asynchronously, and there are almost no operation that occupys a thread.
    • Although we also provide some facilities with semi-synchronous interfaces, they are not core features.
  • We try to avoid user's derivations, and encapsulate user behavior with std::function instead, including:
    • The callback of any task.
    • Any server's process. This conforms to the FaaS (Function as a Service) idea.
    • The realization of an algorithm is simply a std::function. But the algorithm can also be implemented by derivation.

Memory reclamation mechanism

  • Every task will be automatically reclaimed after the callback. If a task is created but a user does not want to run it, the user needs to release it through the dismiss method.
  • Any data in the task, such as the response of the network request, will also be recycled with the task. At this time, the user can use std::move() to move the required data.
  • SeriesWork and ParallelWork are two kinds of framework objects, which are also recycled after their callback.
    • When a series is a branch of a parallel, it will be recycled after the callback of the parallel that it belongs to.
  • This project doesn’t use std::shared_ptr to manage memory.

More design documents

To be continued...

Comments
  • FAQ(持续更新)

    FAQ(持续更新)

    项目背景以及解决的问题

    C++ Workflow项目起源于搜狗公司的分布式存储项目的通讯引擎,并且发展成为搜狗公司级C++标准,应用于搜狗大多数C++后端服务。项目将通讯与计算和谐统一,帮助用户建立通讯与计算关系非常复杂的高性能服务。但同时用户也可以只把它当成简易的异步网络引擎或并行计算框架来使用。

    如何开始使用

    以Linux系统为例:

    $ git clone https://github.com/sogou/workflow
    $ cd workflow
    $ make
    $ cd tutorial
    $ make
    

    然后就可以愉快的运行示例了。每个示例都有对应的文档讲解。如果需要用到kafka协议,请预先安装snappy和lz4,并且:

    $ make KAFKA=y
    $ cd tutorial
    $ make KAFKA=y
    

    另外,make DEBUG=y,可以编译调试版。通过make REDIS=n MYSQL=n UPSTREAM=n CONSUL=n可以裁剪掉一个或多个功能,让库文件减小到最低400KB,更加适合嵌入式开发。

    与其它的网络引擎,RPC项目相比,有什么优势

    • 简单易上手,无依赖
    • 性能和稳定性优异benchmark
    • 丰富的通用协议实现
    • 通讯与计算统一
    • 任务流管理

    与其它并行计算框架相比,有什么优势

    • 使用简单
    • 有网络

    项目目前不支持的特征

    • pipeline服务器
    • udp服务器(支持udp客户端)
    • http/2
    • websocket(websocket客户端已实现。websocket分枝)

    项目原生包含哪些网络协议

    目前我们实现了HTTP,Redis,MySQL和kafka协议。除kafka目前只支持客户端以外,其他协议都是client+server。也就是说,用户可以用于构建Redis或MySQL协议的代理服务器。kafka模块是插件,默认不编译。

    为什么用callback

    我们用C++11 std::function类型的callback和process来包装用户行为,因此用户需要知道自己是在编写异步程序。我们认为callback方式比future或用户态协程能给程序带来更高的效率,并且能很好的实现通信与计算的统一。由于我们的任务封装方式以及std::function带来的便利,在我们的框架里使用callback并没有太多心智负担,反而非常简单明了。

    callback在什么线程里调用

    项目的一个特点是由框架来管理线程,除了一些很特殊情况,callback的调用线程必然是处理网络收发和文件IO结果的handler线程(默认数量20)或者计算线程(默认数量等于CPU总核数)。但无论在哪个线程里执行,都不建议在callback里等待或执行特别复杂的计算。需要等待可以用counter任务进行不占线程的wait,复杂计算则应该包装成计算任务。 需要说明的是,框架里的一切资源都是使用时分配。如果用户没有用到网络通信,那么所有和通信相关的线程都不会被创建。

    为什么我的任务启动之后没有反应

    int main(void)
    {
        ...
        task->start();
        return 0;
    }
    

    这是很多新用户都会遇到的问题。框架中几乎所有调用都是非阻塞的,上面的代码在task启动之后main函数立刻return,并不会等待task的执行结束。正确的做法应该是通过某种方式在唤醒主进程,例如:

    WFFaciliies::WaitGroup wait_group(1);
    
    void callback(WFHttpTask *task)
    {
        ....
        wait_group.done();
    }
    
    int main(void)
    {
        WFHttpTask *task = WFTaskFactory::create_http_task(url, 0, 0, callback);
        task->start();
        wait_group.wait();
        return 0;
    }
    

    任务对象的生命周期是什么

    框架中任何任务(以及SeriesWork),都是以裸指针形式交给用户。所有任务对象的生命周期,是从对象被创建,到对象的callback完成。也就是说callback之后task指针也就失效了,同时被销毁的也包括task里的数据。如果你需要保留数据,可以用std::move()把数据移走,例如我们需要保留http任务中的resp:

    void http_callback(WFHttpTask *task)
    {
        protocol::HttpResponse *resp = task->get_resp();
        protocol::HttpResponse *my_resp = new protocol::HttpResponse(std::move(*resp));
        /* or
        protocol::HttpResponse *my_resp = new protocol::HttpResponse;
        *my_resp = std::move(*resp);
        */
    }
    

    某些情况下,如果用户创建完任务又不想启动了,那么需要调用task->dismiss()直接销毁任务。 需要特别强调,server的process函数不是callback,server任务的callback发生在回复完成之后,而且默认为nullptr。

    为什么SeriesWork(串行)不是一种任务

    我们关于串并联的定义是:

    • 串行由任务组成
    • 并行由串行组成
    • 并行是一种任务

    显然通过这三句话的定义我们可以递归出任意复杂的串并联结构。如果把串行也定义为一种任务,串行就可以由多个子串行组成,那么使用起来就很容易陷入混乱。同样并行只能是若干串行的并,也是为了避免混乱。其实使用中你会发现,串行本质上就是我们的协程。

    我需要更一般的有向无环图怎么办

    可以使用WFGraphTask,或自己用WFCounterTask来构造。 示例:https://github.com/sogou/workflow/blob/master/tutorial/tutorial-11-graph_task.cc

    server是在process函数结束后回复请求吗

    不是。server是在server task所在series没有别的任务之后回复请求。如果你不向这个series里添加任何任务,就相当于process结束之后回复。注意不要在process里等待任务的完成,而应该把这个任务添加到series里。

    如何让server在收到请求后等一小段时间再回复

    错误的方法是在process里直接sleep。正确做法,向server所在的series里添加一个timer任务。以http server为例:

    void process(WFHttpTask *server_task)
    {
        WFTimerTask *timer = WFTaskFactory::create_timer_task(100000, nullptr);
        server_task->get_resp()->append_output_body("hello");
        series_of(server_task)->push_back(timer);
    }
    

    以上代码实现一个100毫秒延迟的http server。一切都是异步执行,等待过程没有线程被占用。

    怎么知道回复成功没有

    首先回复成功的定义是成功把数据写入tcp缓冲,所以如果回复包很小而且client端没有因为超时等原因关闭了连接,几乎可以认为一定回复成功。需要查看回复结果,只需给server task设置一个callback,callback里状态码和错误码的定义与client task是一样的,但server task不会出现dns错误。

    能不能不回复

    可以。任何时候调用server task的noreply()方法,那么在原本回复的时机,连接直接关闭。

    计算任务的调度规则是什么

    我们发现包括WFGoTask在内的所有计算任务,在创建时都需要指定一个计算队列名,这个计算队列名可用于指导我们内部的调度策略。首先,只要有空闲计算线程可用,任务将实时调起,计算队列名不起作用。当计算线程无法实时调起每个任务的时候,那么同一队列名下的任务将按FIFO的顺序被调起,而队列与队列之间则是平等对待。例如,先连续启动n个队列名为A的任务,再连续启动n个队列名为B的任务。那么无论每个任务的cpu耗时分别是多少,也无论计算线程数多少,这两个队列将近倾向于同时执行完毕。这个规律可以扩展到任意队列数量以及任意启动顺序。

    为什么使用redis client时无需先建立连接

    首先看一下redis client任务的创建接口:

    class WFTaskFactory
    {
    public:
        WFRedisTask *create_redis_task(const std::string& url, int retry_max, redis_callback_t callback);
    }
    

    其中url的格式为:redis://:password@host:port/dbnum。port默认值为6379,dbnum默认值为0。 workflow的一个重要特点是由框架来管理连接,使用户接口可以极致的精简,并实现最有效的连接复用。框架根据任务的用户名密码以及dbnum,来查找一个可以复用的连接。如果找不到则发起新连接并进行用户登陆,数据库选择等操作。如果是一个新的host,还要进行DNS解析。请求出错还可能retry。这每一个步骤都是异步并且透明的,用户只需要填写自己的request,将任务启动,就可以在callback里得到请求的结果。唯一需要注意的是,每次任务的创建都需要带着password,因为可能随时有登陆的需要。 同样的方法我们可以用来创建mysql任务。但对于有事务需求的mysql,则需要通过我们的WFMySQLConnection来创建任务了,否则无法保证整个事务都在同一个连接上进行。WFMySQLConnection依然能做到连接和认证过程的异步性。

    连接的复用规则是什么

    大多数情况下,用户使用框架产生的client任务都是无法指定具体连接。框架会有连接的复用策略:

    • 如果同一地址端口有满足条件的空闲连接,从中选择最近一个被释放的那个。即空闲连接的复用是先进后出的。
    • 当前地址端口没有满足条件的空闲连接时:
      • 如果当前并发连接数小于最大值(默认200),立刻发起新连接。
      • 并发连接数已经达到最大值,任务将得到系统错误EAGAIN。
    • 并不是所有相同目标地址和端口上的连接都满足复用条件。例如不同用户名或密码下的数据库连接,就不能复用。

    虽然我们的框架无法指定任务要使用的连接,但是我们支持连接上下文的功能。这个功能对于实现有连接状态的server非常重要。相关的内容可以参考关于连接上下文相关文档。

    同一域名下如果有多个IP地址,是否有负载均衡

    是的,我们会认为同一域名下的所有目标IP对等,服务能力也相同。因此任何一个请求都会寻找一个从本地看起来负载最轻的目标进行通信,同时也内置了熔断与恢复策略。同一域名下的负载均衡,目标都必须服务在同一端口,而且无法配置不同权重。负载均衡的优先级高于连接复用,也就是说会先选择好通信地址再考虑复用连接问题。

    如何实现带权重或不同端口上的负载均衡

    可以参考upstream相关文档。upstream还可以实现很多更复杂的服务管理需求。

    chunked编码的http body如何最高效访问

    很多情况下我们使用HttpMessage::get_parsed_body()来获得http消息体。但从效率角度上考虑,我们并不自动为用户解码chunked编码,而是返回原始body。解码chunked编码可以用HttpChunkCursor,例如:

    #include "workflow/HttpUtil.h"
    
    void http_callback(WFHttpTask *task)
    {
        protocol::HttpResponse *resp = task->get_resp();
        protocol::HttpChunkCursor cursor(resp);
        const void *chunk;
        size_t size;
    
        while (cursor.next(&chunk, &size))
        {
            ...
        }
    }
    

    cursor.next操作每次返回一个chunk的起始位置指针和chunk大小,不进行内存拷贝。使用HttpChunkCursor之前无需判断消息是不是chunk编码,因为非chunk编码也可以认为整体就是一个chunk。

    能不能在callback或process里同步等待一个任务完成

    我们不推荐这个做法,因为任何任务都可以串进任务流,无需占用线程等待。如果一定要这样做,可以用我们提供的WFFuture来实现。请不要直接使用std::future,因为我们所有通信的callback和process都在一组线程里完成,使用std::future可能会导致所有线程都陷入等待,引发整体死锁。WFFuture通过动态增加线程的方式来解决这个问题。使用WFFuture还需要注意在任务的callback里把要保留的数据(一般是resp)通过std::move移动到结果里,否则callback之后数据会随着任务一起被销毁。

    数据如何在task之间传递

    最常见的,同一个series里的任务共享series上下文,通过series的get_context()和set_context()的方法来读取和修改。而parallel在它的callback里,也可以通过series_at()获到它所包含的各个series(这些series的callback已经被调用,但会在parallel callback之后才被销毁),从而获取它们的上下文。由于parallel也是一种任务,所以它可以把汇总的结果通过它所在的series context继续传递。 总之,series是协程,series context就是协程的局部变量。parallel是协程的并行,可汇总所有协程的运行结果。

    Workflow和rpc的关系

    在我们的架构里,rpc是workflow上的应用,或者说rpc是workflow上的一组协议实现。如果你有接口描述,远程接口调用的需求,一定要试用一下srpc,这是一个把workflow的功能发挥到极致又和workflow完美融合的rpc系统,同时兼容brpc和thrift协议且更快更易用,满足你的任何rpc需求。地址:https://github.com/sogou/srpc

    Server的stop()操作完成时机

    Server的stop()操作是优雅关闭,程序结束之前必须关闭所有server。stop()由shutdown()和wait_finish()组成,wait_finish会等待所有运行中server task所在series结束。也就是说,你可以在server task回复完成的callback里,继续向series追加任务。stop()操作会等待这些任务的结束。另外,如果你同时开多个server,最好的关闭方法是:

    int main()
    {
        // 一个server对象不能start多次,所以多端口服务需要定义多个server对象
        WFRedisServer server1(process);
        WFRedisServer server2(process);
        server1.start(8080);
        server2.start(8888);
        getchar(); // 输入回车结束
        // 先全部关闭,再等待。
        server1.shutdown();
        server2.shutdown();
        server1.wait_finish();
        server2.wait_finish();
        return 0;
    }
    

    如何在收到某个特定请求时,结束server

    因为server的结束由shutdown()和wait_finish()组成,显然就可以在process里shutdown,在main()里wait_finish,例如:

    #include <string.h>
    #include <atomic>
    #include “workflow/WFHttpServer.h”
    
    extern void process(WFHttpTask *task);
    WFHttpServer server(process);
    
    void process(WFHttpTask *task) {
        if (strcmp(task->get_req()->get_request_uri(), “/stop”) == 0) {
            static std::atomic<int> flag;
            if (flag++ == 0)
                server.shutdown();
            task->get_resp()->append_output_body(“<html>server stop</html>”);
            return;
        }
    
        /* Server’s logic */
        //  ....
    }
    
    int main() {
        if (server.start(8888) == 0)
            server.wait_finish();
    
        return 0;
    }
    

    以上代码实现一个http server,在收到/stop的请求时结束程序。process中的flag是必须的,因为process并发执行,只能有一个线程来调用shutdown操作。

    Server里需要调用非Workflow框架的异步操作怎么办

    还是使用counter。在其它异步框架的回调里,对counter进行count操作。

    void other_callback(server_task, counter, ...)
    {
        server_task->get_resp()->append_output_body(result);
        counter->count();
    }
    
    void process(WFHttpTask *server_task)
    {
        WFCounterTask *counter = WFTaskFactory::create_counter_task(1, nullptr);
        OtherAsyncTask *other_task = create_other_task(other_callback, server_task, counter);//非workflow框架的任务
        other_task->run();
        series_of(server_task)->push_back(counter);
    }
    

    注意以上代码里,counter->count()的调用可能先于counter的启动。但无论什么时序,程序都是完全正确的。

    个别https站点抓取失败是什么原因

    如果浏览器可以访问,但用workflow抓取失败,很大概率是因为站点使用了TLS扩展功能的SNI。可以通过全局配置打开workflow的客户端SNI功能:

        struct WFGlobalSettings settings = GLOBAL_SETTINGS_DEFAULT;
        settings.endpoint_params.use_tls_sni = true;
        WORKFLOW_library_init(&settings);
    

    开启这个功能是有一定代价的,所有https站点都会启动SNI,相同IP地址但不同域名的访问,将无法复用SSL连接。 因此用户也可以通过upstream功能,只打开对某个确定域名的SNI功能:

    #Include "workflow/UpstreamManager.h"
    
    int main()
    {
        UpstreamManager::upstream_create_weighted_random("www.sogou.com", false);
        struct AddressParams params = ADDRESS_PARAMS_DEFAULT;
        params.endpoint_params.use_tls_sni = true;
        UpstreamManager::upstream_add_server("www.sogou.com", "www.sogou.com", &params);
        ...
    }
    

    上面的代码把www.sogou.com设置为upstream名,并且加入一个同名的server,同时打开SNI功能。

    怎么通过代理服务器访问http资源

    方法一(只适用于http任务且无法重定向): 可以通过代理服务器的地址创建http任务,并重新设置request_uri和Host头。假设我们想通过代理服务器www.proxy.com:8080访问http://www.sogou.com/ ,方法如下:

    task = WFTaskFactory::create_http_task("http://www.proxy.com:8080", 0, 0, callback);
    task->set_request_uri("http://www.sogou.com/");
    task->set_header_pair("Host", "www.sogou.com");
    

    方法二(通用。但有些代理服务器只支持HTTPS。HTTP还是推荐用方法一): 通过带proxy_url的接口创建http任务:

    class WFTaskFactory
    {
    public:
        static WFHttpTask *create_http_task(const std::string& url,
                                            const std::string& proxy_url,
                                            int redirect_max, int retry_max,
                                            http_callback_t callback);
    };
    

    其中proxy_url的格式为:http://user:[email protected]:port/ proxy只能是"http://"开头,而不能是"https://"。port默认值为80。 这个方法适用于http和https URL的代理,可以重定向,重定向时继续使用该代理服务器。

    http server是否支持RESTful接口

    推荐使用wfrest项目,这是基于workflow的一套RESTful API开发框架,项目地址:https://github.com/wfrest/wfrest

    documentation 
    opened by Barenboim 42
  • Kafka客户端的异常行为

    Kafka客户端的异常行为

    最近在用Kafka客户端时,发现异常如下: 1、客户端采用每200毫秒创建一个Kafka客户端任务(topic=xxx&api=fetch)。 2、程序运行一段时间后, 就收不到消息了 - 异常情形 3、使用GDB进行调试,断点到创建任务的线程后, 又能收到消息了 - 正常情形。

    通过抓包对比,发现异常情形时发出的request中offset都是-1,正常情形下,offset是正常值。

    image

    opened by song-git 33
  • 回调能递归吗?

    回调能递归吗?

    我想实现一个任务出错后,间隔60秒再重试的情景。 void mysql_callback(WFMySQLTask *task) { ::Json json = mysql_concat_json_res(mysql_task); if (json.find("errcode") != json.end()) { WFTimerTask *timer = WFTaskFactory::create_timer_task(60, 0, [](WFTimerTask *) { WFMySQLTask *mysql_task = WFTaskFactory::create_mysql_task(url, 0, mysql_callback); }); } }

    像上面这样写可以吗? 或者有其他最佳实践?

    opened by vincentqin-sys 32
  • benchmark: 希望增加连接数较大的数据对比

    benchmark: 希望增加连接数较大的数据对比

    我本地跑了下 http_server,机器是4c8t,给server设置了4线程

    env

    ubuntu@ubuntu:~/workflow/benchmark$ cat /etc/issue
    Ubuntu 20.04 LTS \n \l
    
    ubuntu@ubuntu:~/workflow/benchmark$ cat /proc/cpuinfo | grep processor
    processor	: 0
    processor	: 1
    processor	: 2
    processor	: 3
    processor	: 4
    processor	: 5
    processor	: 6
    processor	: 7
    ubuntu@ubuntu:~/workflow/benchmark$ ./http_server 4 9000 32
    
    

    200连接数时表现很棒

    ubuntu@ubuntu:~/wrk$ ./wrk --latency -d10 -c200 --timeout 8 -t4 http://127.0.0.1:9000
    Running 10s test @ http://127.0.0.1:9000
      4 threads and 200 connections
      Thread Stats   Avg      Stdev     Max   +/- Stdev
        Latency   574.13us    1.03ms  22.70ms   97.01%
        Req/Sec    98.28k     8.84k  131.17k    73.07%
      Latency Distribution
         50%  384.00us
         75%  579.00us
         90%    0.85ms
         99%    5.90ms
      3921936 requests in 10.10s, 647.06MB read
    Requests/sec: 388303.50
    Transfer/sec:     64.06MB
    
    ubuntu@ubuntu:~/workflow/benchmark$ top -d 1 | grep _server
      63161 ubuntu    20   0 1781400   5612   4548 S 276.0   0.1   0:02.76 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   5612   4548 S 465.3   0.1   0:07.46 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   5872   4548 S 467.0   0.1   0:12.13 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 458.4   0.1   0:16.76 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 469.0   0.1   0:21.45 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 457.4   0.1   0:26.07 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 468.0   0.1   0:30.75 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 461.4   0.1   0:35.41 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 463.0   0.1   0:40.04 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 465.3   0.1   0:44.74 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 194.0   0.1   0:46.68 http_server 
    

    20k连接数时表现不及预期

    ubuntu@ubuntu:~/wrk$ ./wrk --latency -d10 -c20000 --timeout 8 -t4 http://127.0.0.1:9000
    Running 10s test @ http://127.0.0.1:9000
      4 threads and 20000 connections
      Thread Stats   Avg      Stdev     Max   +/- Stdev
        Latency    32.63ms   18.67ms 299.99ms   71.61%
        Req/Sec     6.63k     6.71k   37.35k    89.16%
      Latency Distribution
         50%   30.52ms
         75%   42.46ms
         90%   55.49ms
         99%   86.61ms
      244113 requests in 10.13s, 40.28MB read
      Socket errors: connect 0, read 1300855, write 0, timeout 0
    Requests/sec:  24091.07
    Transfer/sec:      3.97MB
    
      63161 ubuntu    20   0 1781400  14740   4548 S 167.0   0.2   0:48.35 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16456   4612 S 261.4   0.2   0:50.99 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16468   4612 S 193.3   0.2   0:51.28 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16468   4612 S 200.0   0.2   0:51.62 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15200   4612 S 208.0   0.2   0:53.70 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15200   4612 S 195.0   0.2   0:55.67 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15308   4612 S 187.0   0.2   0:57.54 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15696   4612 S 184.2   0.2   0:59.40 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15960   4612 S 185.0   0.2   1:01.25 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16024   4612 S 186.1   0.2   1:03.13 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15984   4612 S 184.2   0.2   1:04.99 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15984   4612 S 185.0   0.2   1:06.84 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15984   4612 S   4.0   0.2   1:06.88 http_server                                                                                                                               
    

    如何优化

    连接数较大时cpu利用率下降、相对的性能下降也比较明显。 使用是默认的代码、没做修改,是否能够通过修改配置或代码来进行优化?

    opened by lesismal 31
  • 客户端卡死,服务器不响应

    客户端卡死,服务器不响应

    一个简单的http api服务,客户端少于10个,每秒钟都有几次请求。 客户端是java请求,服务端使用workflow实现。 长时间使用,大概7天左右,就会出现客户端请求卡死,达到最大超时时间抛错。而服务端业务部分不能收到任何消息。 使用lsof -i观察端口情况,端口依然处于监听状态。但是最后一条,fd显示1023。连接全部是close_wait状态,后面没用其他日志。 短时间不复现,长时间必现,没有有效日志。 出现此情况后,无论怎么请求,服务器均不再响应。

    opened by zj376879088 30
  • WF DAG能否实现视频结构化类的应用?

    WF DAG能否实现视频结构化类的应用?

    看到WF支持DAG吗,串并联。请问怎么使用WF来实现视频结构化类或者多媒体类的应用?比如: video capture ---> crop-resize-convert---> inference --> osd- > video output video streaming-> video decode -> video output 暂不考虑 batch inference。

    pipeline的每一个stage都可以并发执行。 video capture 按照固定频率比如30fps连续获取输入。

    初识Workflow,初步看了tutorial,还没有get到怎么处理连续的输入。

    opened by chris78l 26
  • 回包的时候不定期core,请问有遇到过么(MySQL client)?

    回包的时候不定期core,请问有遇到过么(MySQL client)?

    回包的时候不定期core,下面是Sanitizer分析得到的core栈,请问有遇到过么?可能是什么原因呢?

    AddressSanitizer:DEADLYSIGNAL

    ==37681==ERROR: AddressSanitizer: BUS on unknown address 0x000000000000 (pc 0x7ff8ae4d90de bp 0xbebebebebebebeae sp 0x7ff89b0feef0 T26) #0 0x7ff8ae4d90dd (/opt/compiler/gcc-8.2/lib/libasan.so.5+0x2f0dd) #1 0x7ff8ae595b8a in __interceptor_free (/opt/compiler/gcc-8.2/lib/libasan.so.5+0xebb8a) #2 0x9be470 in mysql_parser_deinit /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/protocol/mysql_parser.c:62 #3 0x9bea6b in protocol::MySQLMessage::~MySQLMessage() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/protocol/MySQLMessage.cc:43 #4 0x9c7d70 in protocol::MySQLResponse::~MySQLResponse() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/_include/workflow/MySQLMessage.h:88 #5 0x9c7d70 in WFComplexClientTask<protocol::MySQLRequest, protocol::MySQLResponse, bool>::clear_resp() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/factory/WFTaskFactory.inl:186 #6 0x9c7d70 in WFComplexClientTask<protocol::MySQLRequest, protocol::MySQLResponse, bool>::switch_callback(WFTimerTask*) /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/factory/WFTaskFactory.inl:421 #7 0x9c7f69 in WFComplexClientTask<protocol::MySQLRequest, protocol::MySQLResponse, bool>::done() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/factory/WFTaskFactory.inl:486 #8 0x9d1696 in SubTask::subtask_done() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/SubTask.cc:31 #9 0x9cf5b9 in Communicator::handle_incoming_reply(poller_result*) /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/Communicator.cc:691 #10 0x9d116a in Communicator::handler_thread_routine(void*) /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/Communicator.cc:1093 #11 0x9e148c in __thrdpool_routine /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/thrdpool.c:72 #12 0x7ff8ae492da3 in start_thread /home/liruihao/mygcc82/glibc-2.21/nptl/pthread_create.c:333 #13 0x7ff8ae0f932c in clone (/opt/compiler/gcc-8.2/lib/libc.so.6+0xeb32c)

    AddressSanitizer can not provide additional info. SUMMARY: AddressSanitizer: BUS (/opt/compiler/gcc-8.2/lib/libasan.so.5+0x2f0dd) Thread T26 created by T0 here: #0 0x7ff8ae4f6f50 in pthread_create (/opt/compiler/gcc-8.2/lib/libasan.so.5+0x4cf50) #1 0x9e1683 in __thrdpool_create_threads /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/thrdpool.c:156 #2 0x9e1683 in thrdpool_create /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/thrdpool.c:192

    ==37681==ABORTING

    bug 
    opened by chenlinzhong 26
  • 返回错误码74

    返回错误码74

    以下代码仅是我的一个封装,但是create_client_task的回调函数里返回的错误码74,且states是1:

    #include "../include/wcurl_http.h"
    #include "../include/wcurl.h"
    
    #include <workflow/WFTaskFactory.h>
    #include <workflow/WFGraphTask.h>
    #include <workflow/HttpMessage.h>
    #include <workflow/WFFacilities.h>
    #include <string>
    #include <stdlib.h>
    
    int send_request_with_result(
            const char *url, const char *version,
            const void *buf, size_t size,
            HttpHeader *headers, size_t headers_count,
            const char *action,
            int retry_max,
            http_callback_handler_t callback )
    {
        if (url == NULL || version == NULL || headers == NULL || action == NULL || callback == NULL) {
            return WCURL_ERROR;
        }
    
        //WFHttpTask *task = WFTaskFactory::create_http_task (url, 2, retry_max,
        WFHttpTask *task = WFNetworkTaskFactory<protocol::HttpRequest, protocol::HttpResponse>::create_client_task(TT_TCP_SSL, url, retry_max,
            [](WFHttpTask * task) {
                int state = task->get_state();
    
                if (state == WFT_STATE_SUCCESS) {
                    http_callback_handler_t *response_callback = (http_callback_handler_t *) task->user_data;
                    if (response_callback != NULL) {
                        protocol::HttpResponse *resp = task->get_resp();
                        const char *status_code = resp->get_status_code();
                        int code = 0;
                        if (status_code) {
                            code = atoi(status_code);
                        }
                        const char *reason = resp->get_reason_phrase();
                        const void *body = NULL;
                        size_t body_len = 0;
                        resp->get_parsed_body(&body, &body_len);
                        response_callback(code, body, body_len, reason);
                    }
                } else {
                    int error = task->get_error();
                    http_callback_handler_t *response_callback = (http_callback_handler_t *) task->user_data;
                    if (response_callback != NULL) {
                        int code = 0;
                        const void *body = NULL;
                        size_t body_len = 0;
                        char reason[128] = {0};
                        sprintf(reason, "failed with state:%d error:%d", state, error);
                        response_callback(code, body, body_len, reason);
                    }
                }
        });
    
        task->set_keep_alive(60*1000);
        task->user_data = (void *)callback;
    
        protocol::HttpRequest *req = task->get_req();
        for (size_t i =0; i < headers_count; i ++) {
            protocol::HttpMessageHeader header;
            header.name      = headers[i].name;
            header.name_len  = headers[i].name_len;
            header.value     = headers[i].value;
            header.value_len = headers[i].value_len;
            req->add_header(&header);
        }
        req->set_method(action);
        req->set_http_version(version);
        if (buf != NULL) {
            req->append_output_body(buf, size);
        }
        task->start();
    
        return WCURL_OK;
    }
    
    opened by zhiyong0804 22
  • pipeline支持

    pipeline支持

    尝试了下http client pipeline,同一个write发送两个http request的数据,代码如下:

    package main
    
    import (
    	"fmt"
    	"net"
    )
    
    func main() {
    	addr := "localhost:9000"
    	conn, err := net.Dial("tcp", addr)
    	if err != nil {
    		panic(err)
    	}
    
    	// 一个请求的数据
    	reqData := []byte(fmt.Sprintf("POST / HTTP/1.1\r\nHost: %v\r\nContent-Length: 5\r\n\r\n\r\nhello", addr))
    	// 两个请求一起发送
    	reqData = append(reqData, reqData...)
    
    	fmt.Println("---------------------")
    	n, err := conn.Write(reqData)
    	if err != nil {
    		panic(err)
    	}
    	fmt.Println("write:\n", string(reqData))
    	fmt.Println("---------------------")
    	defer fmt.Println("---------------------")
    	resData := make([]byte, 1024*8)
    	fmt.Println("read:", n)
    	for {
    		n, err = conn.Read(resData)
    		if err != nil {
    			fmt.Println("read failed:", n, err)
    			return
    		}
    		fmt.Println(string(resData[:n]))
    	}
    }
    

    output:

    root@k8s:~/workflow/benchmark# go run ./client.go 
    ---------------------
    write:
     GET / HTTP/1.1
    Host: localhost:9000
    
    GET / HTTP/1.1
    Host: localhost:9000
    
    
    ---------------------
    read:
    HTTP/1.1 200 OK
    Date: Sun, 15 May 2022 06:35:29 UTC
    Content-Type: text/plain; charset=UTF-8
    Content-Length: 32
    Connection: Keep-Alive
    
    g\g,u(VC:E!Qa\nygJ$&g.H)-(RJi+f>
    read failed: 0 EOF
    

    返回了一个响应后连接就被关闭了。

    opened by lesismal 21
  • workflow如何保证执行顺序的同时又避免开启过多线程数?

    workflow如何保证执行顺序的同时又避免开启过多线程数?

    比如A->B->C三个子任务组成的串行任务列表。三个子任务都继承WFTreadTask对象,重载execute方法。当前是在execute里面使用WaitGroup,线程外部调用WaitGroup::done操作实现线程退出。但WaitGroup默认会新增线程。有没有其他实现方法,进来不新增线程,同时能直接复用SeriesWork和ParallelWork原有语义?

    opened by willkk 20
  • kafka集群中leader重启后问题

    kafka集群中leader重启后问题

    我在使用workflow作为kafka客户端消费消息时, 当集群中的一台重启后,kafka客户端回调报错 state 67 ,错误号 5006. 重启客户端进程回调也一直返回这个状态码。知道集群中的那台服务器启动完成后,才能恢复正常。 请问这种情况是客户端的问题吗?客户端应该如何正确的应对这种情况呢。

    opened by song-git 18
  • windows分支,CMakeLists.txt, 建议 WORKFLOW_BUILD_STATIC_RUNTIME 缺省为 OFF

    windows分支,CMakeLists.txt, 建议 WORKFLOW_BUILD_STATIC_RUNTIME 缺省为 OFF

    windows分支, CMakeLists.txt 建议修改为: if (WIN32) option(WORKFLOW_BUILD_STATIC_RUNTIME "Use static runtime" OFF) endif ()

    在使用 .\vcpkg.exe install workflow:x64-windows --head 后,应用中会出现如下编译问题: [build] workflow.lib(HttpMessage.cc.obj) : error LNK2038: mismatch detected for 'RuntimeLibrary': value 'MTd_StaticDebug' doesn't match value 'MDd_DynamicDebug' in test.obj

    通过手工修改vcpkg\ports\workflow\portfile.cmake 文件可以解决上面问题 vcpkg_cmake_configure( SOURCE_PATH "${SOURCE_PATH}" DISABLE_PARALLEL_CONFIGURE OPTIONS "-DWORKFLOW_BUILD_STATIC_RUNTIME=OFF" )

    opened by zhsnew 2
  • WF部署网关遇到了一点问题

    WF部署网关遇到了一点问题

    工作岗位要求使用wf开发网关试试,途中遇到了一些现象,因为有一些转换不能公开,我直接拿官方的tutorial的代码来复现了一下

    现象:频繁错误:/: Fetch failed. state = 1, error = 11: Resource temporarily unavailable 除此之外。在公司还会报 errorn 2 No such file or directory

    环境: open files (-n) 65535 wf配置:maxconnections=10000,keepalive=-1,其他默认 服务器配置(这个肯定够了):64核-----

    复现环境: open files (-n) 1048576 wf配置:maxconnections=2000,keepalive=-1,其他默认 配置:虚拟机20核,内存6G

    测压工具:jmeter,1000个线程,循环次数10000,没特殊消息,默认回复helloworld

    场景:proxy固定连接一个服务器,给用户做转发,与自己复现主要差异在proxy连接的服务器是自定义协议,发送和接收有协议转换的过程。

    请求: helloworld<--proxy<--用户

    回复: helloworld-->proxy-->用户

    复现代码只是把tutorial的helloworld和proxy部署了下就OK了,proxy中对process做了个简单的修改,改为连接固定服务器

    void process(WFHttpTask *proxy_task)
    {
    	auto *req = proxy_task->get_req();
    	SeriesWork *series = series_of(proxy_task);
    	WFHttpTask *http_task; /* for requesting remote webserver. */
    
    	tutorial_series_context *context = new tutorial_series_context;
    	context->url = req->get_request_uri();
    	context->proxy_task = proxy_task;
    
    	series->set_context(context);
    	series->set_callback([](const SeriesWork *series) {
    		delete (tutorial_series_context *)series->get_context();
    	});
    
    	context->is_keep_alive = req->is_keep_alive();
    	http_task = WFTaskFactory::create_http_task("http://192.168.1.7:8888", 0, 0, http_callback);									
    	const void *body;
    	size_t len;
    	
    	http_task->get_resp()->set_size_limit(200 * 1024 * 1024);
    	http_task->set_keep_alive(-1);
    
    	*series << http_task;
    }
    
    opened by modestpower 13
  • kafka_cli使用中无响应和内存泄漏的问题(Windows)

    kafka_cli使用中无响应和内存泄漏的问题(Windows)

    我在使用kafka_cli中遇到如下问题:

    1. 如使用kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic import_data_v6创建topic后,再启动程序去Fetch record,程序无反应,不会有CallBack被调用。 此后即使向该topic中produce record,程序依然无响应。 但如果把程序重启动一下,就可以正常Fetch记录了,此后再插入该topic中的记录也能正常消费。
    2. 如使用kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 6 --topic import_data_v6创建6个partition的topic,再启动程序去Fetch record,程序无反应,也不会有CallBack被调用。并且此时内存在不停增长。

    不知道是那里用的有问题?

    代码如下:

    bool g_run_flag_ = true;
    std::future<void> g_consumer_thread_;
    static WFFacilities::WaitGroup wait_group_(1);
    WFKafkaClient client_;
    std::string query_ = "";
    bool no_cgroup_ = false;
    
    void kafkaCallback(WFKafkaTask* task)
    {
    	printf("-");
    	int state = task->get_state();
    	int error = task->get_error();
    
    	if (state != WFT_STATE_SUCCESS){
    		PLOG_ERR("kafka error, msg: %s", WFGlobal::get_error_string(state, error));
    		client_.deinit();
    		wait_group_.done();
    		return;
    	}
    
    	WFKafkaTask* next_task = nullptr;
    	int api_type = task->get_api_type();
    	switch (api_type)
    	{
    	case Kafka_Produce:
    	{
    		assert(0);
    	}
    	break;
    	case Kafka_Fetch:
    	{
    		std::vector<std::vector<protocol::KafkaRecord*>> records;
    		protocol::KafkaResult new_result;
    		new_result = std::move(*task->get_result());
    		new_result.fetch_records(records);
    		if (!records.empty()) {
    			std::string out;
    			for (const auto& v : records) {
    				if (v.empty())
    					continue;
    				long long offset = 0;
    				int partition = 0;
    				std::string topic;
    
    				for (const auto& rcd : v) {
    					const void* value;
    					size_t value_len;
    					rcd->get_value(&value, &value_len);
    					void* pChar = const_cast<void*>(value);
    					const char* p = static_cast<const char*>(pChar);
    					std::string message = std::string(p);
    					auto gdal_path = Singleton<SettingFileGroup>::Instance().Find(DIR_GDAL);
    					TaskConf task_conf;
    					if (!task_conf.LoadJson(message) || !task_conf.IsValid()) {
    						PLOG_ERR("failed to get task");
    						continue;
    					}
    					//这里不用异步处理,方便导入完成后做kafka commit操作
    					if (ImportData(task_conf.GetMissionId(), gdal_path)) {
    						offset = rcd->get_offset();
    						partition = rcd->get_partition();
    						topic = rcd->get_topic();
    
    						//导入成功,执行commit
    						if (!no_cgroup_) {
    							next_task = client_.create_kafka_task("api=commit", 3, kafkaCallback);
    							next_task->add_commit_record(*rcd);
    						}
    					}
    				}
    
    				if (!topic.empty()) {
    					out += "topic: " + topic;
    					out += ",partition: " + std::to_string(partition);
    					out += ",offset: " + std::to_string(offset) + ";";
    				}
    			}
    
    			PLOG_DEBUG("kafka fetch: %s", out.c_str());
    
    		}
    		if ( next_task ){
    			if (!no_cgroup_) {
    				//执行Commit任务
    				series_of(task)->push_back(next_task);
    			}
    		}
    		else {
    			if (g_run_flag_) {
    				//没有获取到记录,继续下次Fetch
    				next_task = client_.create_kafka_task(query_, 3, kafkaCallback);
    			}
    			else {
    				//需要调用create_leavegroup_task创建leavegroup_task,
    				//它会发送leavegroup协议包,如果没有启动leavegroup_task,
    				//会导致消费者组没有正确退出,触发这个组的rebalance。
    				next_task = client_.create_leavegroup_task(3, kafkaCallback);
    			}
    			series_of(task)->push_back(next_task);
    		}
    	}
    	break;
    	case Kafka_OffsetCommit:
    	{
    		std::vector<protocol::KafkaToppar*> toppars;
    		task->get_result()->fetch_toppars(toppars);
    		if (!toppars.empty())
    		{
    			for (const auto& v : toppars)
    			{
    				PLOG_DEBUG("kafka commit topic: %s, partition: %d, offset: %llu, error: %d",
    					v->get_topic(), v->get_partition(),
    					v->get_offset(), v->get_error());
    			}
    		}
    
    		//开始下次Fetch
    		next_task = client_.create_kafka_task(query_, 3, kafkaCallback);
    		series_of(task)->push_back(next_task);
    	}
    	break;
    	case Kafka_LeaveGroup:
    	{
    		PLOG_DEBUG("leavegroup callback");
    	}
    	break;
    	default:
    		break;
    	}
    	if (!next_task){
    		client_.deinit();
    		wait_group_.done();
    	}
    }
    
    //启动消费的代码如下
    bool DataImport()
    {
    	auto url = Singleton<KafkaConf>::Instance().broker_list_[0];
    	auto topic = Singleton<KafkaConf>::Instance().topic_list_[0];
    
    	WFKafkaTask* task;
    	int ret = client_.init(url, "ImporterGroup");
    	if (ret) {
    		PLOG_ERR("failed to init wfkafka client");
    		return false;
    	}
    	query_ = se::FormatStr("api=fetch&topic=%s", topic.c_str());
    	task = client_.create_kafka_task(query_, 3, kafkaCallback);
    	task->start();
    	wait_group_.wait();
    }
    
    opened by cmoth150415 7
  • [document] Add vcpkg instruction step

    [document] Add vcpkg instruction step

    workflow is available as a port in vcpkg, a C++ library manager that simplifies installation for workflow and other project dependencies. Documenting the install process here will help users get started by providing a single set of commands to build workflow, ready to be included in their projects.

    We also test whether our library ports build in various configurations (dynamic, static) on various platforms (OSX, Linux, Windows: x86, x64) to keep a wide coverage for users.

    I'm a maintainer for vcpkg, and here is what the port script looks like. We try to keep the library maintained as close as possible to the original library. :)

    opened by JackBoosY 7
Releases(v0.10.5)
  • v0.10.5(Jan 3, 2023)

    New Features

    • Support xmake
    • More JSON building interfaces

    Improvements

    • Optimize communicator for speed
    • Replace all MD5 with SHA1 for better speed
    • Optimize SSL server verifying clients interface

    Bug Fixes

    • Fix endpoint params problem when upstreams share an endpoint
    • Fix kafka client double free bug
    • DNS request avoids zero request ID to be compatible with some DNS server
    • Fix JSON parsing bug when a object's name is illegal
    Source code(tar.gz)
    Source code(zip)
  • v0.10.4(Nov 4, 2022)

    New Features

    • Add WFTaskFactory::reset_go_task to enable capturing the go task itself
    • Support creating WFThreadTask with running time limit
    • Add WFMessageQueue

    Improvements

    • Optimize route manager
    • Remove MD5Util
    • Optimize communicator

    Bug Fixes

    • Fix named conditional bug
    • Fix Kafka consuming bug that always returns the latest record
    • Fix service governance's fail to fuse a server on DNS lookup error
    • Fix the problem that always creating network threads when using wait group
    Source code(tar.gz)
    Source code(zip)
  • v0.10.3(Aug 26, 2022)

    New Features

    • Add WFRepeaterTask
    • Redis server supports inline commands
    • Reconstruct kafka task error. Add WFKafkaTask::get_kafka_error()

    Improvements

    • Reduce new/delete times when running any series
    • Optimize communicator's locking to improve client tasks' speed
    • Add 'append_output_body' interfaces for HttpMessage

    Bug Fixes

    • Fix the bug that server does not accept any connection after fd number reached max
    • Fix dns task scheme checking bug
    • Fix kafka client bug of all SASL authentication fails
    • Fix compiling error on macOS with gcc
    Source code(tar.gz)
    Source code(zip)
  • v0.10.2(Jun 26, 2022)

    New Features

    • Add WFModuleTask.
    • Add round-robin upstream policy.
    • Add named conditional to support observer mode.
    • Support creating a go task with running time limit.

    Improvements

    • Use sysconf() to get max open files. Reduce memory usage when it's below 65536.
    • Optimize redis parser to make it safer.
    • Optimize thread pool by using message queue. Reduce the overhead of computing task.

    Bug Fixes

    • Fix bugs to make it fully compatible with boring ssl.
    • Fix double deleting bug when canceling server task's series.
    • Fix Kafka client bugs.
    • Fix upstream UPSVNSWRRPolicy bug.
    Source code(tar.gz)
    Source code(zip)
  • v0.10.1(Apr 29, 2022)

    New Features

    • Add a built-in json parser
    • Add Consul client (prepared for Consul service governance)
    • Add MySQLUtil
    • Add package wrapper

    Improvements

    • Refactor kafka client and fixed some bugs
    • Refactor service governance task and add 'pre_select' for service governance plugins

    Bug Fixes

    • Fix MySQLCell::as_float bug when the it's a negative number.
    • Update CMake and bazel building files to fix kafka task's canceling bug
    Source code(tar.gz)
    Source code(zip)
  • v0.9.11(Mar 9, 2022)

    Improvements

    • Optimize upstream with consistent hash. Allow adding server with weight.
    • Improve http protocol compatibility of http client task.
    • Improve bazel building.

    Bug Fixes

    • Make UpstreamManager::upstream_delete() safe when the upstream is still in use.
    • Fix the SSL delaying problem on Nagle's algorithm.
    • Fix bug that cannot use the port in URL when using upstream.
    • Fix Kafka client offset timestamp overflow.
    • Fix Kafka client freezing problem when broker toggles the leader.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.10(Jan 21, 2022)

    Improvements

    • Refactor encode stream. This will improve the performance of Redis and Kafka clients.
    • Optimize SSL write. Improve the performance of HTTPS client and server a lot.
    • Optimize weighted-random upstream policy to solve some recovering problem.
    • Update Cmake files to support more platforms. Build both static and dynamic libs.

    Bug Fixes

    • Fix nvswrr upstream policy bug.
    • Fix MySQL client crash on incomplete result sets.
    • Fix Kafka out of range error on fetching.
    • Fix Kafka cgroup bug.
    • Fix SSL wrapper problem on TLS 1.3 handshaking.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.9(Dec 3, 2021)

    Improvements

    • Optimize Dns Cache's searching speed and memory size
    • Optimize route manager and dns resolver
    • Increase server task's performance by reducing some atomic operations
    • Increase global performance by removing some singletons
    • Always use dns resolver as name service policy when redirecting
    • Add WFServer::get_listen_addr() for server started on a random port
    • Support kafka kip 329

    Bug Fixes

    • Fix service governance's ref count bug
    • Fix mysql sequence id bug when retrying big request
    • Fix VNSWRR upstream bug
    • Fix dns client bug when host name has trailing dot
    • Fix URL parser fatal bug
    Source code(tar.gz)
    Source code(zip)
  • v0.9.8(Sep 30, 2021)

    Improvements

    • Enable creating file IO tasks with path name
    • Add server task's push() interface
    • Optimize poller speed and memory occupying
    • Optimize URI parser, more than 50% faster
    • Optimize http implementation

    Bug Fixes

    • Fix crash when resolv.conf is empty
    • Fix Kafka client's memory leak
    • Fix MySQL transaction checking
    • Fix bazel compiling problem
    Source code(tar.gz)
    Source code(zip)
  • v0.9.7(Aug 8, 2021)

    Improvements

    • Implement DNS protocol and add DNS asynchronous client.
    • Use asynchronous DNS as default.
    • Optimize load balancing.
    • Add bazel support and add selective compiling.
    • Support longer timer.
    • Add WFResourcePool.

    Bug fixes

    • Fix Redis double SELECTs problem.
    • Fix upstream_replace_server() bug.
    • Fix timerfd problem on some WSL platforms.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.6(Jun 3, 2021)

    Improvements

    • Add SSLWrapper.
    • Support http/https task with proxy.
    • Support MySQL SSL client.
    • Add vnswrr upstream policy.

    Bug fixes

    • Fix upstream concurrency bug.
    • Fix MySQL multi-resultset for INSERTs
    • Fix Kafka client sasl auth bug.
    • Add -no-rtti compiling flag for kafka to be compatible with snappy 1.1.9.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.5(Apr 12, 2021)

    Improvements

    • Support TLS SNI on both client and server sides;
    • Upstream skips select history;
    • Kafka supports sasl auth;

    Bug Fixes

    • Fix default port bug;
    • MySQL fix decode overflow bug;
    • MySQL fix parsing suffixed ok_packet;
    • Kafka modify logic of versionapi;
    Source code(tar.gz)
    Source code(zip)
  • v0.9.4(Mar 17, 2021)

    Improvement

    • Add WFNameService and refactor "Upstream" modules.
    • Update the definition of WFServer::stop()'s finish time.
    • Kafka client supports offset storage.
    • Redis supports cluster command MOVED and ASK.
    • Supporting VCPKG.

    Bug fixes

    • Crash when dismissing a named counter.
    • WFGoTask implementation.
    • MySQL int/ulonglong length overflow.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.3(Jan 13, 2021)

    Improvements

    • Add Kafka client.
    • Improve client tasks performance.

    Bugs fixes:

    • Fix several MySQL parser bugs.
    • Fix iovcnt==0 problem on macOS.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.2(Nov 13, 2020)

    Improvements:

    • Add WFGraphTask for building DAG.
    • Add WFDynamicTask.
    • Make SeriesWork derivable.
    • Improve MySQL client.

    Bug Fixes:

    • Fix mysql protocol parsing bug.
    • Fix EncodeStream bug.

    Last release before kafka protocol.

    Source code(tar.gz)
    Source code(zip)
  • v0.9.1(Sep 30, 2020)

    Improvements:

    • Complete English documents.
    • Optimize kernel codes. The message queue is a standalone module now.
    • Support MySQL character_set_results.
    • Add benchmark codes and documents.

    Bug Fixes:

    • Fix crashing of MySQL client when the local host is disallowed.
    • Fix MySQL client's problem when using short connection.
    • Fix LRU cache bug when cache is full.
    • Fix upstream bug of division by zero.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.0(Aug 17, 2020)

Owner
Sogou-inc
Sogou-inc
C++14 evented IO libraries for high performance networking and media based applications

LibSourcey C++ Networking Evolved LibSourcey is a collection of cross platform C++14 modules and classes that provide developers with an arsenal for r

Sourcey 1.3k Dec 21, 2022
Idle is an asynchronous and hot-reloadable C++ dynamic component framework

Idle is an asynchronous, hot-reloadable, and highly reactive dynamic component framework similar to OSGI that is: ?? Modular: Your program logic is en

Denis Blank 173 Dec 7, 2022
JUCE is an open-source cross-platform C++ application framework for desktop and mobile applications, including VST, VST3, AU, AUv3, RTAS and AAX audio plug-ins.

JUCE is an open-source cross-platform C++ application framework used for rapidly developing high quality desktop and mobile applications, including VS

JUCE 4.7k Jan 1, 2023
A toolkit for making real world machine learning and data analysis applications in C++

dlib C++ library Dlib is a modern C++ toolkit containing machine learning algorithms and tools for creating complex software in C++ to solve real worl

Davis E. King 11.6k Jan 5, 2023
EASTL stands for Electronic Arts Standard Template Library. It is an extensive and robust implementation that has an emphasis on high performance.

EA Standard Template Library EASTL stands for Electronic Arts Standard Template Library. It is a C++ template library of containers, algorithms, and i

Electronic Arts 6.9k Jan 3, 2023
An open-source C++ library developed and used at Facebook.

Folly: Facebook Open-source Library What is folly? Folly (acronymed loosely after Facebook Open Source Library) is a library of C++14 components desig

Facebook 24k Jan 1, 2023
Functional Programming Library for C++. Write concise and readable C++ code.

FunctionalPlus helps you write concise and readable C++ code. Table of contents Introduction Usage examples Type deduction and useful error messages T

Tobias Hermann 1.7k Dec 29, 2022
An eventing framework for building high performance and high scalability systems in C.

NOTE: THIS PROJECT HAS BEEN DEPRECATED AND IS NO LONGER ACTIVELY MAINTAINED As of 2019-03-08, this project will no longer be maintained and will be ar

Facebook Archive 1.7k Dec 14, 2022
Easy to use, header only, macro generated, generic and type-safe Data Structures in C

C Macro Collections Easy to use, header only, macro generated, generic and type-safe Data Structures in C. Table of Contents Installation Contributing

Leonardo Vencovsky 345 Jan 5, 2023
A collection of single-file C libraries. (generic containers, random number generation, argument parsing and other functionalities)

cauldron A collection of single-file C libraries and tools with the goal to be portable and modifiable. Libraries library description arena-allocator.

Camel Coder 40 Dec 29, 2022
C++ Parallel Computing and Asynchronous Networking Engine

As Sogou`s C++ server engine, Sogou C++ Workflow supports almost all back-end C++ online services of Sogou, including all search services, cloud input method,online advertisements, etc., handling more than 10 billion requests every day. This is an enterprise-level programming engine in light and elegant design which can satisfy most C++ back-end development requirements.

Sogou-inc 9.7k Dec 26, 2022
C++ Parallel Computing and Asynchronous Networking Engine

As Sogou`s C++ server engine, Sogou C++ Workflow supports almost all back-end C++ online services of Sogou, including all search services, cloud input method,online advertisements, etc., handling more than 10 billion requests every day

Sogou Open Source 9.7k Jan 5, 2023
Parallel-util - Simple header-only implementation of "parallel for" and "parallel map" for C++11

parallel-util A single-header implementation of parallel_for, parallel_map, and parallel_exec using C++11. This library is based on multi-threading on

Yuki Koyama 27 Jun 24, 2022
ParallelComputingPlayground - Shows different programming techniques for parallel computing on CPU and GPU

ParallelComputingPlayground Shows different programming techniques for parallel computing on CPU and GPU. Purpose The idea here is to compute a Mandel

Morten Nobel-Jørgensen 2 May 16, 2020
Asynchronous networking for C

Overview Dyad.c is an asynchronous networking library which aims to be lightweight, portable and easy to use. It can be used both to create small stan

null 1.4k Dec 28, 2022
SMID, Parallel computing of CNN

Parallel Computing in Deep Reference Network 1. Introduction Deep neural networks are made up of a number of layers of linked nodes, each of which imp

null 1 Dec 22, 2021
RakNet is a cross platform, open source, C++ networking engine for game programmers.

RakNet 4.081 Copyright (c) 2014, Oculus VR, Inc. Package notes The Help directory contains index.html, which is full help documentation in HTML format

Facebook Archive 3.1k Dec 30, 2022
RakNet is a cross platform, open source, C++ networking engine for game programmers.

RakNet 4.081 Copyright (c) 2014, Oculus VR, Inc. Package notes The Help directory contains index.html, which is full help documentation in HTML format

Facebook Archive 3.1k Dec 30, 2022
Lightweight Peer-to-Peer networking engine for real time applications

Club A lightweight Peer-to-Peer networking engine for real time applications written in C++14. Motivation Real time applications such as Online games,

Peter 186 Nov 25, 2022
The Synapse neural networking engine rewritten in C++

This is an open-source rewrite of Synapse, the closed-source neural networking engine for C++ This project is a work-in-progress. Code is subject to c

Noah Taylor 2 Jul 24, 2022