C++ Parallel Computing and Asynchronous Networking Engine



Sogou C++ Workflow

License Language Platform Build Status

As Sogou`s C++ server engine, Sogou C++ Workflow supports almost all back-end C++ online services of Sogou, including all search services, cloud input method,online advertisements, etc., handling more than 10 billion requests every day. This is an enterprise-level programming engine in light and elegant design which can satisfy most C++ back-end development requirements.

You can use it:

  • To quickly build an HTTP server:
#include <stdio.h>
#include "workflow/WFHttpServer.h"

int main()
    WFHttpServer server([](WFHttpTask *task) {
        task->get_resp()->append_output_body("Hello World!");

    if (server.start(8888) == 0) { // start server on port 8888
        getchar(); // press "Enter" to end.

    return 0;
  • As a multifunctional asynchronous client, it currently supports HTTP, Redis, MySQL and Kafka protocols.
  • To implement client/server on user-defined protocol and build your own RPC system.
    • srpc is based on it and it is an independent open source project, which supports srpc, brpc and thrift protocols.
  • To build asynchronous workflow; support common series and parallel structures, and also support any DAG structures.
  • As a parallel computing tool. In addition to networking tasks, Sogou C++ Workflow also includes the scheduling of computing tasks. All types of tasks can be put into the same flow.
  • As a asynchronous file IO tool in Linux system, with high performance exceeding any system call. Disk file IO is also a task.
  • To realize any high-performance and high-concurrency back-end service with a very complex relationship between computing and networking.
  • To build a micro service system.
    • This project has built-in service governance and load balancing features.

Compiling and running environment

  • This project supports Linux, macOS, Windows, Android and other operating systems.
    • Windows version is currently released as an independent branch, using iocp to implement asynchronous networking. All user interfaces are consistent with the Linux version.
  • Supports all CPU platforms, including 32 or 64-bit x86 processors, big-endian or little-endian arm processors.
  • Relies on OpenSSL; OpenSSL 1.1 and above is recommended. If you don't like SSL, you may checkout the nossl branch. But still need to link crypto for md5 and sha1.
  • Uses the C++11 standard and therefore, it should be compiled with a compiler which supports C++11. Does not rely on boost or asio.
  • No other dependencies. However, if you need Kafka protocol, some compression libraries should be installed, including lz4, zstd and snappy.

Try it!

System design features

We believe that a typical back-end program=protocol+algorithm+workflow and should be developed completely independently.

  • Protocol
    • In most cases, users use built-in common network protocols, such as HTTP, Redis or various rpc.
    • Users can also easily customize user-defined network protocol. In the customization, they only need to provide serialization and deserialization functions to define their own client/server.
  • Algorithm
    • In our design, the algorithm is a concept symmetrical to the protocol.
      • If protocol call is rpc, then algorithm call is an apc (Async Procedure Call).
    • We have provided some general algorithms, such as sort, merge, psort, reduce, which can be used directly.
    • Compared with a user-defined protocol, a user-defined algorithm is much more common. Any complicated computation with clear boundaries should be packaged into an algorithm.
  • Workflow
    • Workflow is the actual bussiness logic, which is to put the protocols and algorithms into the flow graph for use.
    • The typical workflow is a closed series-parallel graph. Complex business logic may be a non-closed DAG.
    • The workflow graph can be constructed directly or dynamically generated based on the results of each step. All tasks are executed asynchronously.

Basic task, task factory and complex task

  • Our system contains six basic tasks: networking, file IO, CPU, GPU, timer, and counter.
  • All tasks are generated by the task factory and automatically recycled after callback.
    • Server task is one kind of special networking task, generated by the framework which calls the task factory, and handed over to the user through the process function.
  • In most cases, the task generated by the user through the task factory is a complex task, which is transparent to the user.
    • For example, an HTTP request may include many asynchronous processes (DNS, redirection), but for user, it is just a networking task.
    • File sorting seems to be an algorithm, but it actually includes many complex interaction processes between file IO and CPU computation.
    • If you think of business logic as building circuits with well-designed electronic components, then each electronic component may be a complex circuit.

Asynchrony and encapsulation based on C++11 std::function

  • Not based on user mode coroutines. Users need to know that they are writing asynchronous programs.
  • All calls are executed asynchronously, and there are almost no operation that occupys a thread.
    • Although we also provide some facilities with semi-synchronous interfaces, they are not core features.
  • We try to avoid user's derivations, and encapsulate user behavior with std::function instead, including:
    • The callback of any task.
    • Any server's process. This conforms to the FaaS (Function as a Service) idea.
    • The realization of an algorithm is simply a std::function. But the algorithm can also be implemented by derivation.

Memory reclamation mechanism

  • Every task will be automatically reclaimed after the callback. If a task is created but a user does not want to run it, the user needs to release it through the dismiss method.
  • Any data in the task, such as the response of the network request, will also be recycled with the task. At this time, the user can use std::move() to move the required data.
  • SeriesWork and ParallelWork are two kinds of framework objects, which are also recycled after their callback.
    • When a series is a branch of a parallel, it will be recycled after the callback of the parallel that it belongs to.
  • This project doesn’t use std::shared_ptr to manage memory.

More design documents

To be continued...

  • FAQ(持续更新)



    C++ Workflow项目起源于搜狗公司的分布式存储项目的通讯引擎,并且发展成为搜狗公司级C++标准,应用于搜狗大多数C++后端服务。项目将通讯与计算和谐统一,帮助用户建立通讯与计算关系非常复杂的高性能服务。但同时用户也可以只把它当成简易的异步网络引擎或并行计算框架来使用。



    $ git clone https://github.com/sogou/workflow
    $ cd workflow
    $ make
    $ cd tutorial
    $ make


    $ make KAFKA=y
    $ cd tutorial
    $ make KAFKA=y

    另外,make DEBUG=y,可以编译调试版。通过make REDIS=n MYSQL=n UPSTREAM=n CONSUL=n可以裁剪掉一个或多个功能,让库文件减小到最低400KB,更加适合嵌入式开发。


    • 简单易上手,无依赖
    • 性能和稳定性优异benchmark
    • 丰富的通用协议实现
    • 通讯与计算统一
    • 任务流管理


    • 使用简单
    • 有网络


    • pipeline服务器
    • udp服务器(支持udp客户端)
    • http/2
    • websocket(websocket客户端已实现。websocket分枝)




    我们用C++11 std::function类型的callback和process来包装用户行为,因此用户需要知道自己是在编写异步程序。我们认为callback方式比future或用户态协程能给程序带来更高的效率,并且能很好的实现通信与计算的统一。由于我们的任务封装方式以及std::function带来的便利,在我们的框架里使用callback并没有太多心智负担,反而非常简单明了。


    项目的一个特点是由框架来管理线程,除了一些很特殊情况,callback的调用线程必然是处理网络收发和文件IO结果的handler线程(默认数量20)或者计算线程(默认数量等于CPU总核数)。但无论在哪个线程里执行,都不建议在callback里等待或执行特别复杂的计算。需要等待可以用counter任务进行不占线程的wait,复杂计算则应该包装成计算任务。 需要说明的是,框架里的一切资源都是使用时分配。如果用户没有用到网络通信,那么所有和通信相关的线程都不会被创建。


    int main(void)
        return 0;


    WFFaciliies::WaitGroup wait_group(1);
    void callback(WFHttpTask *task)
    int main(void)
        WFHttpTask *task = WFTaskFactory::create_http_task(url, 0, 0, callback);
        return 0;



    void http_callback(WFHttpTask *task)
        protocol::HttpResponse *resp = task->get_resp();
        protocol::HttpResponse *my_resp = new protocol::HttpResponse(std::move(*resp));
        /* or
        protocol::HttpResponse *my_resp = new protocol::HttpResponse;
        *my_resp = std::move(*resp);

    某些情况下,如果用户创建完任务又不想启动了,那么需要调用task->dismiss()直接销毁任务。 需要特别强调,server的process函数不是callback,server任务的callback发生在回复完成之后,而且默认为nullptr。



    • 串行由任务组成
    • 并行由串行组成
    • 并行是一种任务



    可以使用WFGraphTask,或自己用WFCounterTask来构造。 示例:https://github.com/sogou/workflow/blob/master/tutorial/tutorial-11-graph_task.cc


    不是。server是在server task所在series没有别的任务之后回复请求。如果你不向这个series里添加任何任务,就相当于process结束之后回复。注意不要在process里等待任务的完成,而应该把这个任务添加到series里。


    错误的方法是在process里直接sleep。正确做法,向server所在的series里添加一个timer任务。以http server为例:

    void process(WFHttpTask *server_task)
        WFTimerTask *timer = WFTaskFactory::create_timer_task(100000, nullptr);

    以上代码实现一个100毫秒延迟的http server。一切都是异步执行,等待过程没有线程被占用。


    首先回复成功的定义是成功把数据写入tcp缓冲,所以如果回复包很小而且client端没有因为超时等原因关闭了连接,几乎可以认为一定回复成功。需要查看回复结果,只需给server task设置一个callback,callback里状态码和错误码的定义与client task是一样的,但server task不会出现dns错误。


    可以。任何时候调用server task的noreply()方法,那么在原本回复的时机,连接直接关闭。



    为什么使用redis client时无需先建立连接

    首先看一下redis client任务的创建接口:

    class WFTaskFactory
        WFRedisTask *create_redis_task(const std::string& url, int retry_max, redis_callback_t callback);

    其中url的格式为:redis://:pas[email protected]:port/dbnum。port默认值为6379,dbnum默认值为0。 workflow的一个重要特点是由框架来管理连接,使用户接口可以极致的精简,并实现最有效的连接复用。框架根据任务的用户名密码以及dbnum,来查找一个可以复用的连接。如果找不到则发起新连接并进行用户登陆,数据库选择等操作。如果是一个新的host,还要进行DNS解析。请求出错还可能retry。这每一个步骤都是异步并且透明的,用户只需要填写自己的request,将任务启动,就可以在callback里得到请求的结果。唯一需要注意的是,每次任务的创建都需要带着password,因为可能随时有登陆的需要。 同样的方法我们可以用来创建mysql任务。但对于有事务需求的mysql,则需要通过我们的WFMySQLConnection来创建任务了,否则无法保证整个事务都在同一个连接上进行。WFMySQLConnection依然能做到连接和认证过程的异步性。



    • 如果同一地址端口有满足条件的空闲连接,从中选择最近一个被释放的那个。即空闲连接的复用是先进后出的。
    • 当前地址端口没有满足条件的空闲连接时:
      • 如果当前并发连接数小于最大值(默认200),立刻发起新连接。
      • 并发连接数已经达到最大值,任务将得到系统错误EAGAIN。
    • 并不是所有相同目标地址和端口上的连接都满足复用条件。例如不同用户名或密码下的数据库连接,就不能复用。






    chunked编码的http body如何最高效访问


    #include "workflow/HttpUtil.h"
    void http_callback(WFHttpTask *task)
        protocol::HttpResponse *resp = task->get_resp();
        protocol::HttpChunkCursor cursor(resp);
        const void *chunk;
        size_t size;
        while (cursor.next(&chunk, &size))





    最常见的,同一个series里的任务共享series上下文,通过series的get_context()和set_context()的方法来读取和修改。而parallel在它的callback里,也可以通过series_at()获到它所包含的各个series(这些series的callback已经被调用,但会在parallel callback之后才被销毁),从而获取它们的上下文。由于parallel也是一种任务,所以它可以把汇总的结果通过它所在的series context继续传递。 总之,series是协程,series context就是协程的局部变量。parallel是协程的并行,可汇总所有协程的运行结果。




    Server的stop()操作是优雅关闭,程序结束之前必须关闭所有server。stop()由shutdown()和wait_finish()组成,wait_finish会等待所有运行中server task所在series结束。也就是说,你可以在server task回复完成的callback里,继续向series追加任务。stop()操作会等待这些任务的结束。另外,如果你同时开多个server,最好的关闭方法是:

    int main()
        // 一个server对象不能start多次,所以多端口服务需要定义多个server对象
        WFRedisServer server1(process);
        WFRedisServer server2(process);
        getchar(); // 输入回车结束
        // 先全部关闭,再等待。
        return 0;



    #include <string.h>
    #include <atomic>
    #include “workflow/WFHttpServer.h”
    extern void process(WFHttpTask *task);
    WFHttpServer server(process);
    void process(WFHttpTask *task) {
        if (strcmp(task->get_req()->get_request_uri(), “/stop”) == 0) {
            static std::atomic<int> flag;
            if (flag++ == 0)
            task->get_resp()->append_output_body(“<html>server stop</html>”);
        /* Server’s logic */
        //  ....
    int main() {
        if (server.start(8888) == 0)
        return 0;

    以上代码实现一个http server,在收到/stop的请求时结束程序。process中的flag是必须的,因为process并发执行,只能有一个线程来调用shutdown操作。



    void other_callback(server_task, counter, ...)
    void process(WFHttpTask *server_task)
        WFCounterTask *counter = WFTaskFactory::create_counter_task(1, nullptr);
        OtherAsyncTask *other_task = create_other_task(other_callback, server_task, counter);//非workflow框架的任务




        struct WFGlobalSettings settings = GLOBAL_SETTINGS_DEFAULT;
        settings.endpoint_params.use_tls_sni = true;

    开启这个功能是有一定代价的,所有https站点都会启动SNI,相同IP地址但不同域名的访问,将无法复用SSL连接。 因此用户也可以通过upstream功能,只打开对某个确定域名的SNI功能:

    #Include "workflow/UpstreamManager.h"
    int main()
        UpstreamManager::upstream_create_weighted_random("www.sogou.com", false);
        struct AddressParams params = ADDRESS_PARAMS_DEFAULT;
        params.endpoint_params.use_tls_sni = true;
        UpstreamManager::upstream_add_server("www.sogou.com", "www.sogou.com", &params);



    方法一(只适用于http任务且无法重定向): 可以通过代理服务器的地址创建http任务,并重新设置request_uri和Host头。假设我们想通过代理服务器www.proxy.com:8080访问http://www.sogou.com/ ,方法如下:

    task = WFTaskFactory::create_http_task("http://www.proxy.com:8080", 0, 0, callback);
    task->set_header_pair("Host", "www.sogou.com");

    方法二(通用。但有些代理服务器只支持HTTPS。HTTP还是推荐用方法一): 通过带proxy_url的接口创建http任务:

    class WFTaskFactory
        static WFHttpTask *create_http_task(const std::string& url,
                                            const std::string& proxy_url,
                                            int redirect_max, int retry_max,
                                            http_callback_t callback);

    其中proxy_url的格式为:http://user:[email protected]:port/ proxy只能是"http://"开头,而不能是"https://"。port默认值为80。 这个方法适用于http和https URL的代理,可以重定向,重定向时继续使用该代理服务器。

    http server是否支持RESTful接口

    推荐使用wfrest项目,这是基于workflow的一套RESTful API开发框架,项目地址:https://github.com/wfrest/wfrest

    opened by Barenboim 42
  • Kafka客户端的异常行为


    最近在用Kafka客户端时,发现异常如下: 1、客户端采用每200毫秒创建一个Kafka客户端任务(topic=xxx&api=fetch)。 2、程序运行一段时间后, 就收不到消息了 - 异常情形 3、使用GDB进行调试,断点到创建任务的线程后, 又能收到消息了 - 正常情形。



    opened by song-git 33
  • 回调能递归吗?


    我想实现一个任务出错后,间隔60秒再重试的情景。 void mysql_callback(WFMySQLTask *task) { ::Json json = mysql_concat_json_res(mysql_task); if (json.find("errcode") != json.end()) { WFTimerTask *timer = WFTaskFactory::create_timer_task(60, 0, [](WFTimerTask *) { WFMySQLTask *mysql_task = WFTaskFactory::create_mysql_task(url, 0, mysql_callback); }); } }

    像上面这样写可以吗? 或者有其他最佳实践?

    opened by vincentqin-sys 32
  • benchmark: 希望增加连接数较大的数据对比

    benchmark: 希望增加连接数较大的数据对比

    我本地跑了下 http_server,机器是4c8t,给server设置了4线程


    [email protected]:~/workflow/benchmark$ cat /etc/issue
    Ubuntu 20.04 LTS \n \l
    [email protected]:~/workflow/benchmark$ cat /proc/cpuinfo | grep processor
    processor	: 0
    processor	: 1
    processor	: 2
    processor	: 3
    processor	: 4
    processor	: 5
    processor	: 6
    processor	: 7
    [email protected]:~/workflow/benchmark$ ./http_server 4 9000 32


    [email protected]:~/wrk$ ./wrk --latency -d10 -c200 --timeout 8 -t4
    Running 10s test @
      4 threads and 200 connections
      Thread Stats   Avg      Stdev     Max   +/- Stdev
        Latency   574.13us    1.03ms  22.70ms   97.01%
        Req/Sec    98.28k     8.84k  131.17k    73.07%
      Latency Distribution
         50%  384.00us
         75%  579.00us
         90%    0.85ms
         99%    5.90ms
      3921936 requests in 10.10s, 647.06MB read
    Requests/sec: 388303.50
    Transfer/sec:     64.06MB
    [email protected]:~/workflow/benchmark$ top -d 1 | grep _server
      63161 ubuntu    20   0 1781400   5612   4548 S 276.0   0.1   0:02.76 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   5612   4548 S 465.3   0.1   0:07.46 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   5872   4548 S 467.0   0.1   0:12.13 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 458.4   0.1   0:16.76 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 469.0   0.1   0:21.45 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 457.4   0.1   0:26.07 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 468.0   0.1   0:30.75 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 461.4   0.1   0:35.41 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 463.0   0.1   0:40.04 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 465.3   0.1   0:44.74 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 194.0   0.1   0:46.68 http_server 


    [email protected]:~/wrk$ ./wrk --latency -d10 -c20000 --timeout 8 -t4
    Running 10s test @
      4 threads and 20000 connections
      Thread Stats   Avg      Stdev     Max   +/- Stdev
        Latency    32.63ms   18.67ms 299.99ms   71.61%
        Req/Sec     6.63k     6.71k   37.35k    89.16%
      Latency Distribution
         50%   30.52ms
         75%   42.46ms
         90%   55.49ms
         99%   86.61ms
      244113 requests in 10.13s, 40.28MB read
      Socket errors: connect 0, read 1300855, write 0, timeout 0
    Requests/sec:  24091.07
    Transfer/sec:      3.97MB
      63161 ubuntu    20   0 1781400  14740   4548 S 167.0   0.2   0:48.35 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16456   4612 S 261.4   0.2   0:50.99 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16468   4612 S 193.3   0.2   0:51.28 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16468   4612 S 200.0   0.2   0:51.62 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15200   4612 S 208.0   0.2   0:53.70 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15200   4612 S 195.0   0.2   0:55.67 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15308   4612 S 187.0   0.2   0:57.54 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15696   4612 S 184.2   0.2   0:59.40 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15960   4612 S 185.0   0.2   1:01.25 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16024   4612 S 186.1   0.2   1:03.13 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15984   4612 S 184.2   0.2   1:04.99 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15984   4612 S 185.0   0.2   1:06.84 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15984   4612 S   4.0   0.2   1:06.88 http_server                                                                                                                               


    连接数较大时cpu利用率下降、相对的性能下降也比较明显。 使用是默认的代码、没做修改,是否能够通过修改配置或代码来进行优化?

    opened by lesismal 31
  • 客户端卡死,服务器不响应


    一个简单的http api服务,客户端少于10个,每秒钟都有几次请求。 客户端是java请求,服务端使用workflow实现。 长时间使用,大概7天左右,就会出现客户端请求卡死,达到最大超时时间抛错。而服务端业务部分不能收到任何消息。 使用lsof -i观察端口情况,端口依然处于监听状态。但是最后一条,fd显示1023。连接全部是close_wait状态,后面没用其他日志。 短时间不复现,长时间必现,没有有效日志。 出现此情况后,无论怎么请求,服务器均不再响应。

    opened by zj376879088 30
  • 回包的时候不定期core,请问有遇到过么(MySQL client)?

    回包的时候不定期core,请问有遇到过么(MySQL client)?



    ==37681==ERROR: AddressSanitizer: BUS on unknown address 0x000000000000 (pc 0x7ff8ae4d90de bp 0xbebebebebebebeae sp 0x7ff89b0feef0 T26) #0 0x7ff8ae4d90dd (/opt/compiler/gcc-8.2/lib/libasan.so.5+0x2f0dd) #1 0x7ff8ae595b8a in __interceptor_free (/opt/compiler/gcc-8.2/lib/libasan.so.5+0xebb8a) #2 0x9be470 in mysql_parser_deinit /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/protocol/mysql_parser.c:62 #3 0x9bea6b in protocol::MySQLMessage::~MySQLMessage() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/protocol/MySQLMessage.cc:43 #4 0x9c7d70 in protocol::MySQLResponse::~MySQLResponse() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/_include/workflow/MySQLMessage.h:88 #5 0x9c7d70 in WFComplexClientTask<protocol::MySQLRequest, protocol::MySQLResponse, bool>::clear_resp() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/factory/WFTaskFactory.inl:186 #6 0x9c7d70 in WFComplexClientTask<protocol::MySQLRequest, protocol::MySQLResponse, bool>::switch_callback(WFTimerTask*) /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/factory/WFTaskFactory.inl:421 #7 0x9c7f69 in WFComplexClientTask<protocol::MySQLRequest, protocol::MySQLResponse, bool>::done() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/factory/WFTaskFactory.inl:486 #8 0x9d1696 in SubTask::subtask_done() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/SubTask.cc:31 #9 0x9cf5b9 in Communicator::handle_incoming_reply(poller_result*) /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/Communicator.cc:691 #10 0x9d116a in Communicator::handler_thread_routine(void*) /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/Communicator.cc:1093 #11 0x9e148c in __thrdpool_routine /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/thrdpool.c:72 #12 0x7ff8ae492da3 in start_thread /home/liruihao/mygcc82/glibc-2.21/nptl/pthread_create.c:333 #13 0x7ff8ae0f932c in clone (/opt/compiler/gcc-8.2/lib/libc.so.6+0xeb32c)

    AddressSanitizer can not provide additional info. SUMMARY: AddressSanitizer: BUS (/opt/compiler/gcc-8.2/lib/libasan.so.5+0x2f0dd) Thread T26 created by T0 here: #0 0x7ff8ae4f6f50 in pthread_create (/opt/compiler/gcc-8.2/lib/libasan.so.5+0x4cf50) #1 0x9e1683 in __thrdpool_create_threads /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/thrdpool.c:156 #2 0x9e1683 in thrdpool_create /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/thrdpool.c:192


    opened by chenlinzhong 26
  • pipeline支持


    尝试了下http client pipeline,同一个write发送两个http request的数据,代码如下:

    package main
    import (
    func main() {
    	addr := "localhost:9000"
    	conn, err := net.Dial("tcp", addr)
    	if err != nil {
    	// 一个请求的数据
    	reqData := []byte(fmt.Sprintf("POST / HTTP/1.1\r\nHost: %v\r\nContent-Length: 5\r\n\r\n\r\nhello", addr))
    	// 两个请求一起发送
    	reqData = append(reqData, reqData...)
    	n, err := conn.Write(reqData)
    	if err != nil {
    	fmt.Println("write:\n", string(reqData))
    	defer fmt.Println("---------------------")
    	resData := make([]byte, 1024*8)
    	fmt.Println("read:", n)
    	for {
    		n, err = conn.Read(resData)
    		if err != nil {
    			fmt.Println("read failed:", n, err)


    [email protected]:~/workflow/benchmark# go run ./client.go 
     GET / HTTP/1.1
    Host: localhost:9000
    GET / HTTP/1.1
    Host: localhost:9000
    HTTP/1.1 200 OK
    Date: Sun, 15 May 2022 06:35:29 UTC
    Content-Type: text/plain; charset=UTF-8
    Content-Length: 32
    Connection: Keep-Alive
    read failed: 0 EOF


    opened by lesismal 21
  • workflow如何保证执行顺序的同时又避免开启过多线程数?



    opened by willkk 20
  • kafka集群中leader重启后问题


    我在使用workflow作为kafka客户端消费消息时, 当集群中的一台重启后,kafka客户端回调报错 state 67 ,错误号 5006. 重启客户端进程回调也一直返回这个状态码。知道集群中的那台服务器启动完成后,才能恢复正常。 请问这种情况是客户端的问题吗?客户端应该如何正确的应对这种情况呢。

    opened by song-git 18
  • Help: workflow对长连接推送服务的支持

    Help: workflow对长连接推送服务的支持


    1. 支持tcp长连接,同时若能支持websocket更佳;
    2. 服务端能在客户端在线状态下主动推送消息(不是客户端心跳模式的一来一回);
    3. 客户端网络异常断开情况下,服务端对该客户端推送消息时,workflow能主动鉴别网络断开的状态(网络层主动read socketfd 后异步回调应用层?);
    opened by cxxjava 15
  • kafka一直进入回调问题




    部分源码: `static void kafka_callback(WFKafkaTask *p_kafka) { //do something

    std::cout<<"create new task\n"<<std::endl;
    WFKafkaTask *p_next = g_kafka_client.create_kafka_task(WATCH_TOPIC, 3, kafka_callback);

    } `

    我理解的是,创建一个kafka任务后,当指定topic有消息产生过后才会进入kafka_callback,然后进行相应处理,处理完后创建一个新任务继续监听消息。 但是目前是不断进入kafka_callback这个回调,但是并没有消息产生。

    opened by MrStack 14
  • RESTful API需求,推荐使用wfrest项目

    RESTful API需求,推荐使用wfrest项目

    wfrest是基于workflow的RESTful API项目,是workflow项目的资深粉丝开发。经过半年多的演化,功能上已经比较完善,覆盖了大多数web开发的需求,也形成了稳定的用户群。 具体用法,大家可以到项目主页看看:https://github.com/wfrest/wfrest 同时,欢迎大家开源其它基于workflow的项目。

    opened by Barenboim 0
  • Windows下 https异常

    Windows下 https异常

    #include <stdio.h>
    #include "workflow/WFHttpServer.h"
    int main()
    	WFHttpServer server([](WFHttpTask* task) {
    		task->get_resp()->append_output_body("<html>Hello World!</html>");
    	if (server.start(8888,"server.crt", "server.key") == 0) { // start server on port 8888
    		getchar(); // press "Enter" to end.
    	return 0;

    测试工具postman,openssl 版本为1.1.1c,server.key和server.crt来源于test/http_unittest.cc。 postman全局配置已关闭ssl认证。 image

    1. 情景1:


    • 问题1:需要勾选当前请求的ssl版本才能正常请求,且有响应值。 image

    • 问题2:(1)第二次发送请求,去除http headers ,无法正常响应,postman报 error:socket hang up;(2)重新新建一个请求可以正常响应。(3)重启http服务后,首次使用无http headers的请求,可以正常响应。 image

    1. 情景2: Linux下使用v0.10.1 编译上述同样的代码,请求过程一样,无需设置ssl协议版本,都能够正常请求与响应。没有复现windows下的问题。
    opened by xuebing1995 6
  • 机器学习场景的使用的最佳实践是什么(实用案例推荐)



    服务端是一个 PyTorch C++ 实现的 CTR 预估的服务,预估之前需要去 KV(类似 redis) 里读取特征作为 CTR 预估的输入。一个客户端请求可能读一次,也可能读两次 KV 。

    详细流程可能是这样: 拿到请求体 -> 解析 -> 读 kv -> 产生模型输入 -> CTR 模型 -> calibration 模型 -> 其他逻辑 -> 返回 。


    看了下 demo 然后就写了大概这样的逻辑

    static WFFacilities::WaitGroup wait_group(1);
    void sig_handler(int signo) { wait_group.done(); }
    int main(int argc, char *argv[]) {
      // ....   something init
      unsigned short port = 8083;
      signal(SIGINT, sig_handler);
      signal(SIGTERM, sig_handler);
      WFHttpServer server(process);
      if (server.start(port) == 0) {
      } else {
        perror("Cannot start server");
      return 0;
    void process(WFHttpTask *server_task) {
      protocol::HttpRequest *req = server_task->get_req();
      protocol::HttpResponse *resp = server_task->get_resp();
      long long seq = server_task->get_task_seq();
      std::string body;
      auto uri = req->get_request_uri();
      if (std::strcmp(uri, "/cvr") == 0) {
        const void *body;
        size_t size;
        req->get_parsed_body(&body, &size);
        std::string req_body = static_cast<const char *>(body);
        std::string response_body = "";
        MODL_MANAGER->predict(std::move(req_body),  response_body);
        resp->append_output_body(response_body.data(), response_body.size());
      } else {


    由于在 MODL_MANAGER->predict(std::move(req_body), response_body); 里面实现了等待读取 KV 的逻辑和模型预估的部分。但读 KV 有可能会卡住(看了很久还不是很理解这个问题)。 想不占线程的读 KV,不知道怎么实现合适。一番讨论发现从我开始用 workflow 的时候就好像用错了,似乎在最开始就应该把逻辑分成不同的 task 。所以想是不是有什么最佳的实践。

    补充一下, workflow 线上用了一年多了,在我要读 kv 之前都没任何问题, 50 ms 的预估时间,之前尝试过很多都不能很好的解决超时, workflow 可以..... 目前 p99 30ms cpu 打满基本不超时。

    opened by rockyzhengwu 16
  • 基于wf现有架构,实现full-duplex 通信---channel

    基于wf现有架构,实现full-duplex 通信---channel

    目的是为了更好的让wf支持,full-duplex 通信协议 websocket,quic,http2。。。。

    想法: https://github.com/sogou/workflow/issues/833

    初步的实现channel逻辑 https://github.com/gnblao/workflow/tree/channel

    现有websocket的两个dome server:/tutorial/tutorial-22-ws_echo_server.cc client:/tutorial/tutorial-14-websocket_cli.cc

    wss srv:./ws_echo_server 5679 server.crt server.key
    (住:openssl req -new -x509 -keyout server.key -out server.crt -config openssl.cnf) cli: ./websocket_cli wss://



    new idea 
    opened by gnblao 22
  • workflow框架内置json解析器啦


    为了实现对接Consul服务治理,需要引入json解析。调研过几个主流的json解析器都不是很满意,也不太想引入第三方库依赖。于是过年花了几天时间把json标准看了一遍实现了一个ANSI-C的json-parser,完整支持ECMA-404 json标准。之后也加入一些jons build的功能。主要接口如下:

    json value相关接口

    /* 解析json文档产生json value。返回json value对象。返回NULL代表解析失败(格式不标准,嵌套过深,分配内存失败)
       @doc 文档字符串 */
    json_value_t *json_value_parse(const char *doc);
    /* 销毁json value
       @val json value对象。一般由parse函数产生。*/
    void json_value_destroy(json_value_t *val);
    /* 返回json value类型
       @val:json value对象 */
    int json_value_type(const json_value_t *val);
    /* 获得json string。返回string地址。如果返回NULL,代表value不是STRING型。
       @val:json value对象 */
    const char *json_value_string(const json_value_t *val);
    /* 获得json number。返回数值。如果value不是NUMBER型,返回NAN(不存在的浮点数)。
       @val:json value对象 */
    double json_value_number(const json_value_t *val);
    /* 获得json object。返回object对象。如果value不是OBJECT类型,返回NULL。
       @val:json value对象
    json_object_t *json_value_object(const json_value_t *val);
    /* 获得json array。返回array对象。如果value不是ARRAY类型,返回NULL。
       @val:json value对象
       同样,返回的json_array_t指针不带const。 */
    json_array_t *json_value_array(const json_value_t *val);

    json object相关接口

    /* 返回object的大小。即object包含的name,value对数量。
       @obj:json object对象 */
    int json_object_size(const json_object_t *obj);
    /* 查找name下的value。返回json value对象。返回NULL代表找不到这个name。函数时间复杂度为O(log(size))。
       @obj:json object对象
    const json_value_t *json_object_find(const char *name, const json_object_t *obj);
    /* 遍历json object。
       @name:临时的const char *类型name字符串
       @val:临时的const json_value_t *类型的json value对象
       @obj:json object对象
       这不是一个函数,是一个展开成一个for循环的宏。 */
    json_object_for_each(name, val, obj)

    json array相关接口

    /* 返回json array的大小,即元素个数。
       @arr:json array对象 */
    int json_array_size(const json_array_t *arr);
    /* 遍历json array
       @val:临时的const json_value_t *类型的json value对象
       @arr:json array对象
       同样,这是一个展开成for循环的宏。 */
    json_array_for_each(val, arr)




    使用示例大家可以到这个独立repo上看一下: https://github.com/barenboim/json-parser 目前workflow的master分支,包括了这个json paser。所以,只需在代码里直接引用就可以了。


    实测parse性能大概是cJSON的1.5倍,jsoncpp的10倍左右,但远低于rapidjsonsimdjson。cJSON的parse过程不包含建立查找表,因此对于大型json object,我们的find性能远高于cJSON。

    documentation enhancement 
    opened by Barenboim 9
  • v0.10.2(Jun 26, 2022)

    New Features

    • Add WFModuleTask.
    • Add round-robin upstream policy.
    • Add named conditional to support observer mode.
    • Support creating a go task with running time limit.


    • Use sysconf() to get max open files. Reduce memory usage when it's below 65536.
    • Optimize redis parser to make it safer.
    • Optimize thread pool by using message queue. Reduce the overhead of computing task.

    Bug Fixes

    • Fix bugs to make it fully compatible with boring ssl.
    • Fix double deleting bug when canceling server task's series.
    • Fix Kafka client bugs.
    • Fix upstream UPSVNSWRRPolicy bug.
    Source code(tar.gz)
    Source code(zip)
  • v0.10.1(Apr 29, 2022)

    New Features

    • Add a built-in json parser
    • Add Consul client (prepared for Consul service governance)
    • Add MySQLUtil
    • Add package wrapper


    • Refactor kafka client and fixed some bugs
    • Refactor service governance task and add 'pre_select' for service governance plugins

    Bug Fixes

    • Fix MySQLCell::as_float bug when the it's a negative number.
    • Update CMake and bazel building files to fix kafka task's canceling bug
    Source code(tar.gz)
    Source code(zip)
  • v0.9.11(Mar 9, 2022)


    • Optimize upstream with consistent hash. Allow adding server with weight.
    • Improve http protocol compatibility of http client task.
    • Improve bazel building.

    Bug Fixes

    • Make UpstreamManager::upstream_delete() safe when the upstream is still in use.
    • Fix the SSL delaying problem on Nagle's algorithm.
    • Fix bug that cannot use the port in URL when using upstream.
    • Fix Kafka client offset timestamp overflow.
    • Fix Kafka client freezing problem when broker toggles the leader.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.10(Jan 21, 2022)


    • Refactor encode stream. This will improve the performance of Redis and Kafka clients.
    • Optimize SSL write. Improve the performance of HTTPS client and server a lot.
    • Optimize weighted-random upstream policy to solve some recovering problem.
    • Update Cmake files to support more platforms. Build both static and dynamic libs.

    Bug Fixes

    • Fix nvswrr upstream policy bug.
    • Fix MySQL client crash on incomplete result sets.
    • Fix Kafka out of range error on fetching.
    • Fix Kafka cgroup bug.
    • Fix SSL wrapper problem on TLS 1.3 handshaking.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.9(Dec 3, 2021)


    • Optimize Dns Cache's searching speed and memory size
    • Optimize route manager and dns resolver
    • Increase server task's performance by reducing some atomic operations
    • Increase global performance by removing some singletons
    • Always use dns resolver as name service policy when redirecting
    • Add WFServer::get_listen_addr() for server started on a random port
    • Support kafka kip 329

    Bug Fixes

    • Fix service governance's ref count bug
    • Fix mysql sequence id bug when retrying big request
    • Fix VNSWRR upstream bug
    • Fix dns client bug when host name has trailing dot
    • Fix URL parser fatal bug
    Source code(tar.gz)
    Source code(zip)
  • v0.9.8(Sep 30, 2021)


    • Enable creating file IO tasks with path name
    • Add server task's push() interface
    • Optimize poller speed and memory occupying
    • Optimize URI parser, more than 50% faster
    • Optimize http implementation

    Bug Fixes

    • Fix crash when resolv.conf is empty
    • Fix Kafka client's memory leak
    • Fix MySQL transaction checking
    • Fix bazel compiling problem
    Source code(tar.gz)
    Source code(zip)
  • v0.9.7(Aug 8, 2021)


    • Implement DNS protocol and add DNS asynchronous client.
    • Use asynchronous DNS as default.
    • Optimize load balancing.
    • Add bazel support and add selective compiling.
    • Support longer timer.
    • Add WFResourcePool.

    Bug fixes

    • Fix Redis double SELECTs problem.
    • Fix upstream_replace_server() bug.
    • Fix timerfd problem on some WSL platforms.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.6(Jun 3, 2021)


    • Add SSLWrapper.
    • Support http/https task with proxy.
    • Support MySQL SSL client.
    • Add vnswrr upstream policy.

    Bug fixes

    • Fix upstream concurrency bug.
    • Fix MySQL multi-resultset for INSERTs
    • Fix Kafka client sasl auth bug.
    • Add -no-rtti compiling flag for kafka to be compatible with snappy 1.1.9.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.5(Apr 12, 2021)


    • Support TLS SNI on both client and server sides;
    • Upstream skips select history;
    • Kafka supports sasl auth;

    Bug Fixes

    • Fix default port bug;
    • MySQL fix decode overflow bug;
    • MySQL fix parsing suffixed ok_packet;
    • Kafka modify logic of versionapi;
    Source code(tar.gz)
    Source code(zip)
  • v0.9.4(Mar 17, 2021)


    • Add WFNameService and refactor "Upstream" modules.
    • Update the definition of WFServer::stop()'s finish time.
    • Kafka client supports offset storage.
    • Redis supports cluster command MOVED and ASK.
    • Supporting VCPKG.

    Bug fixes

    • Crash when dismissing a named counter.
    • WFGoTask implementation.
    • MySQL int/ulonglong length overflow.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.3(Jan 13, 2021)


    • Add Kafka client.
    • Improve client tasks performance.

    Bugs fixes:

    • Fix several MySQL parser bugs.
    • Fix iovcnt==0 problem on macOS.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.2(Nov 13, 2020)


    • Add WFGraphTask for building DAG.
    • Add WFDynamicTask.
    • Make SeriesWork derivable.
    • Improve MySQL client.

    Bug Fixes:

    • Fix mysql protocol parsing bug.
    • Fix EncodeStream bug.

    Last release before kafka protocol.

    Source code(tar.gz)
    Source code(zip)
  • v0.9.1(Sep 30, 2020)


    • Complete English documents.
    • Optimize kernel codes. The message queue is a standalone module now.
    • Support MySQL character_set_results.
    • Add benchmark codes and documents.

    Bug Fixes:

    • Fix crashing of MySQL client when the local host is disallowed.
    • Fix MySQL client's problem when using short connection.
    • Fix LRU cache bug when cache is full.
    • Fix upstream bug of division by zero.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.0(Aug 17, 2020)

C++14 evented IO libraries for high performance networking and media based applications

LibSourcey C++ Networking Evolved LibSourcey is a collection of cross platform C++14 modules and classes that provide developers with an arsenal for r

Sourcey 1.2k Aug 1, 2022
Idle is an asynchronous and hot-reloadable C++ dynamic component framework

Idle is an asynchronous, hot-reloadable, and highly reactive dynamic component framework similar to OSGI that is: ?? Modular: Your program logic is en

Denis Blank 165 Jul 28, 2022
JUCE is an open-source cross-platform C++ application framework for desktop and mobile applications, including VST, VST3, AU, AUv3, RTAS and AAX audio plug-ins.

JUCE is an open-source cross-platform C++ application framework used for rapidly developing high quality desktop and mobile applications, including VS

JUCE 4.4k Aug 3, 2022
A toolkit for making real world machine learning and data analysis applications in C++

dlib C++ library Dlib is a modern C++ toolkit containing machine learning algorithms and tools for creating complex software in C++ to solve real worl

Davis E. King 11.3k Aug 6, 2022
EASTL stands for Electronic Arts Standard Template Library. It is an extensive and robust implementation that has an emphasis on high performance.

EA Standard Template Library EASTL stands for Electronic Arts Standard Template Library. It is a C++ template library of containers, algorithms, and i

Electronic Arts 6.6k Aug 10, 2022
An open-source C++ library developed and used at Facebook.

Folly: Facebook Open-source Library What is folly? Folly (acronymed loosely after Facebook Open Source Library) is a library of C++14 components desig

Facebook 22.8k Aug 5, 2022
Functional Programming Library for C++. Write concise and readable C++ code.

FunctionalPlus helps you write concise and readable C++ code. Table of contents Introduction Usage examples Type deduction and useful error messages T

Tobias Hermann 1.7k Aug 9, 2022
An eventing framework for building high performance and high scalability systems in C.

NOTE: THIS PROJECT HAS BEEN DEPRECATED AND IS NO LONGER ACTIVELY MAINTAINED As of 2019-03-08, this project will no longer be maintained and will be ar

Facebook Archive 1.7k Aug 8, 2022
Easy to use, header only, macro generated, generic and type-safe Data Structures in C

C Macro Collections Easy to use, header only, macro generated, generic and type-safe Data Structures in C. Table of Contents Installation Contributing

Leonardo Vencovsky 312 Aug 8, 2022
A collection of single-file C libraries. (generic containers, random number generation, argument parsing and other functionalities)

cauldron A collection of single-file C libraries and tools with the goal to be portable and modifiable. Libraries library description arena-allocator.

Camel Coder 32 Jul 18, 2022
C++ Parallel Computing and Asynchronous Networking Engine

As Sogou`s C++ server engine, Sogou C++ Workflow supports almost all back-end C++ online services of Sogou, including all search services, cloud input method,online advertisements, etc., handling more than 10 billion requests every day. This is an enterprise-level programming engine in light and elegant design which can satisfy most C++ back-end development requirements.

Sogou-inc 8.9k Aug 8, 2022
C++ Parallel Computing and Asynchronous Networking Engine

As Sogou`s C++ server engine, Sogou C++ Workflow supports almost all back-end C++ online services of Sogou, including all search services, cloud input method,online advertisements, etc., handling more than 10 billion requests every day

Sogou Open Source 8.8k Aug 5, 2022
Parallel-util - Simple header-only implementation of "parallel for" and "parallel map" for C++11

parallel-util A single-header implementation of parallel_for, parallel_map, and parallel_exec using C++11. This library is based on multi-threading on

Yuki Koyama 27 Jun 24, 2022
ParallelComputingPlayground - Shows different programming techniques for parallel computing on CPU and GPU

ParallelComputingPlayground Shows different programming techniques for parallel computing on CPU and GPU. Purpose The idea here is to compute a Mandel

Morten Nobel-Jørgensen 2 May 16, 2020
Asynchronous networking for C

Overview Dyad.c is an asynchronous networking library which aims to be lightweight, portable and easy to use. It can be used both to create small stan

null 1.3k Aug 5, 2022
SMID, Parallel computing of CNN

Parallel Computing in Deep Reference Network 1. Introduction Deep neural networks are made up of a number of layers of linked nodes, each of which imp

null 1 Dec 22, 2021
RakNet is a cross platform, open source, C++ networking engine for game programmers.

RakNet 4.081 Copyright (c) 2014, Oculus VR, Inc. Package notes The Help directory contains index.html, which is full help documentation in HTML format

Facebook Archive 3.1k Aug 2, 2022
RakNet is a cross platform, open source, C++ networking engine for game programmers.

RakNet 4.081 Copyright (c) 2014, Oculus VR, Inc. Package notes The Help directory contains index.html, which is full help documentation in HTML format

Facebook Archive 3.1k Aug 11, 2022
Lightweight Peer-to-Peer networking engine for real time applications

Club A lightweight Peer-to-Peer networking engine for real time applications written in C++14. Motivation Real time applications such as Online games,

Peter 187 Mar 3, 2022
The Synapse neural networking engine rewritten in C++

This is an open-source rewrite of Synapse, the closed-source neural networking engine for C++ This project is a work-in-progress. Code is subject to c

Noah Taylor 2 Jul 24, 2022
Powerful multi-threaded coroutine dispatcher and parallel execution engine

Quantum Library : A scalable C++ coroutine framework Quantum is a full-featured and powerful C++ framework build on top of the Boost coroutine library

Bloomberg 447 Jul 25, 2022
C++-based high-performance parallel environment execution engine for general RL environments.

EnvPool is a highly parallel reinforcement learning environment execution engine which significantly outperforms existing environment executors. With

Sea AI Lab 571 Aug 5, 2022
HashLibPlus is a recommended C++11 hashing library that provides a fluent interface for computing hashes and checksums of strings, files, streams, bytearrays and untyped data to mention but a few.

HashLibPlus HashLibPlus is a recommended C++11 hashing library that provides a fluent interface for computing hashes and checksums of strings, files,

Telepati 7 Apr 11, 2022
Experimental and Comparative Performance Measurements of High Performance Computing Based on OpenMP and MPI

High-Performance-Computing-Experiments Experimental and Comparative Performance Measurements of High Performance Computing Based on OpenMP and MPI 实验结

Jiang Lu 1 Nov 27, 2021
KoanLogic 385 Aug 9, 2022
Small and fast cross-platform networking library, with support for messaging, IPv6, HTTP, SSL and WebSocket.

frnetlib Frnetlib, is a cross-platform, small and fast networking library written in C++. There are no library dependencies (unless you want to use SS

Fred Nicolson 22 May 16, 2022
Patterns and behaviors for GPU computing

moderngpu 2.0 (c) 2016 Sean Baxter You can drop me a line here Full documentation with github wiki under heavy construction. Latest update: 2.12 2016

null 1.3k Aug 8, 2022
A C library for statistical and scientific computing

Apophenia is an open statistical library for working with data sets and statistical or simulation models. It provides functions on the same level as t

null 184 Jun 17, 2022
C++ tensors with broadcasting and lazy computing

Multi-dimensional arrays with broadcasting and lazy computing. Introduction xtensor is a C++ library meant for numerical analysis with multi-dimension

Xtensor Stack 2.7k Aug 6, 2022