C++ Parallel Computing and Asynchronous Networking Engine



Sogou C++ Workflow

License Language Platform Build Status

As Sogou`s C++ server engine, Sogou C++ Workflow supports almost all back-end C++ online services of Sogou, including all search services, cloud input method,online advertisements, etc., handling more than 10 billion requests every day. This is an enterprise-level programming engine in light and elegant design which can satisfy most C++ back-end development requirements.

You can use it:

  • To quickly build an HTTP server:
get_resp()->append_output_body("Hello World!"); }); if (server.start(8888) == 0) { // start server on port 8888 getchar(); // press "Enter" to end. server.stop(); } return 0; } ">
#include <stdio.h>
#include "workflow/WFHttpServer.h"

int main()
    WFHttpServer server([](WFHttpTask *task) {
        task->get_resp()->append_output_body("Hello World!");

    if (server.start(8888) == 0) { // start server on port 8888
        getchar(); // press "Enter" to end.

    return 0;
  • As a multifunctional asynchronous client, it currently supports HTTP, Redis, MySQL and Kafka protocols.
  • To implement client/server on user-defined protocol and build your own RPC system.
    • srpc is based on it and it is an independent open source project, which supports srpc, brpc, trpc and thrift protocols.
  • To build asynchronous workflow; support common series and parallel structures, and also support any DAG structures.
  • As a parallel computing tool. In addition to networking tasks, Sogou C++ Workflow also includes the scheduling of computing tasks. All types of tasks can be put into the same flow.
  • As a asynchronous file IO tool in Linux system, with high performance exceeding any system call. Disk file IO is also a task.
  • To realize any high-performance and high-concurrency back-end service with a very complex relationship between computing and networking.
  • To build a micro service system.
    • This project has built-in service governance and load balancing features.
    • The workflow-k8s plugin enables using name service with kubernetes automated deployment.
  • Wiki link : PaaS Architecture

Compiling and running environment

  • This project supports Linux, macOS, Windows, Android and other operating systems.
    • Windows version is currently released as an independent branch, using iocp to implement asynchronous networking. All user interfaces are consistent with the Linux version.
  • Supports all CPU platforms, including 32 or 64-bit x86 processors, big-endian or little-endian arm processors, loongson processors.
  • Relies on OpenSSL; OpenSSL 1.1 and above is recommended. If you don't like SSL, you may checkout the nossl branch. But still need to link crypto for md5 and sha1.
  • Uses the C++11 standard and therefore, it should be compiled with a compiler which supports C++11. Does not rely on boost or asio.
  • No other dependencies. However, if you need Kafka protocol, some compression libraries should be installed, including lz4, zstd and snappy.

Get started (Linux, macOS):

git clone https://github.com/sogou/workflow
cd tutorial


System design features

We believe that a typical back-end program=protocol+algorithm+workflow and should be developed completely independently.

  • Protocol
    • In most cases, users use built-in common network protocols, such as HTTP, Redis or various rpc.
    • Users can also easily customize user-defined network protocol. In the customization, they only need to provide serialization and deserialization functions to define their own client/server.
  • Algorithm
    • In our design, the algorithm is a concept symmetrical to the protocol.
      • If protocol call is rpc, then algorithm call is an apc (Async Procedure Call).
    • We have provided some general algorithms, such as sort, merge, psort, reduce, which can be used directly.
    • Compared with a user-defined protocol, a user-defined algorithm is much more common. Any complicated computation with clear boundaries should be packaged into an algorithm.
  • Workflow
    • Workflow is the actual bussiness logic, which is to put the protocols and algorithms into the flow graph for use.
    • The typical workflow is a closed series-parallel graph. Complex business logic may be a non-closed DAG.
    • The workflow graph can be constructed directly or dynamically generated based on the results of each step. All tasks are executed asynchronously.

Basic task, task factory and complex task

  • Our system contains six basic tasks: networking, file IO, CPU, GPU, timer, and counter.
  • All tasks are generated by the task factory and automatically recycled after callback.
    • Server task is one kind of special networking task, generated by the framework which calls the task factory, and handed over to the user through the process function.
  • In most cases, the task generated by the user through the task factory is a complex task, which is transparent to the user.
    • For example, an HTTP request may include many asynchronous processes (DNS, redirection), but for user, it is just a networking task.
    • File sorting seems to be an algorithm, but it actually includes many complex interaction processes between file IO and CPU computation.
    • If you think of business logic as building circuits with well-designed electronic components, then each electronic component may be a complex circuit.

Asynchrony and encapsulation based on C++11 std::function

  • Not based on user mode coroutines. Users need to know that they are writing asynchronous programs.
  • All calls are executed asynchronously, and there are almost no operation that occupys a thread.
    • Although we also provide some facilities with semi-synchronous interfaces, they are not core features.
  • We try to avoid user's derivations, and encapsulate user behavior with std::function instead, including:
    • The callback of any task.
    • Any server's process. This conforms to the FaaS (Function as a Service) idea.
    • The realization of an algorithm is simply a std::function. But the algorithm can also be implemented by derivation.

Memory reclamation mechanism

  • Every task will be automatically reclaimed after the callback. If a task is created but a user does not want to run it, the user needs to release it through the dismiss method.
  • Any data in the task, such as the response of the network request, will also be recycled with the task. At this time, the user can use std::move() to move the required data.
  • SeriesWork and ParallelWork are two kinds of framework objects, which are also recycled after their callback.
    • When a series is a branch of a parallel, it will be recycled after the callback of the parallel that it belongs to.
  • This project doesn’t use std::shared_ptr to manage memory.

Any other questions?

You may check the FAQ and issues list first to see if you can find the answer.

You are very welcome to send the problems you encounter in use to issues, and we will answer them as soon as possible. At the same time, more issues will also help new users.

  • FAQ(持续更新)



    C++ Workflow项目起源于搜狗公司的分布式存储项目的通讯引擎,并且发展成为搜狗公司级C++标准,应用于搜狗大多数C++后端服务。项目将通讯与计算和谐统一,帮助用户建立通讯与计算关系非常复杂的高性能服务。但同时用户也可以只把它当成简易的异步网络引擎或并行计算框架来使用。



    $ git clone https://github.com/sogou/workflow
    $ cd workflow
    $ make
    $ cd tutorial
    $ make


    $ make KAFKA=y
    $ cd tutorial
    $ make KAFKA=y

    另外,make DEBUG=y,可以编译调试版。通过make REDIS=n MYSQL=n UPSTREAM=n CONSUL=n可以裁剪掉一个或多个功能,让库文件减小到最低400KB,更加适合嵌入式开发。


    • 简单易上手,无依赖
    • 性能和稳定性优异benchmark
    • 丰富的通用协议实现
    • 通讯与计算统一
    • 任务流管理


    • 使用简单
    • 有网络


    • pipeline服务器
    • udp服务器(支持udp客户端)
    • http/2
    • websocket(websocket客户端已实现。websocket分枝)




    我们用C++11 std::function类型的callback和process来包装用户行为,因此用户需要知道自己是在编写异步程序。我们认为callback方式比future或用户态协程能给程序带来更高的效率,并且能很好的实现通信与计算的统一。由于我们的任务封装方式以及std::function带来的便利,在我们的框架里使用callback并没有太多心智负担,反而非常简单明了。


    项目的一个特点是由框架来管理线程,除了一些很特殊情况,callback的调用线程必然是处理网络收发和文件IO结果的handler线程(默认数量20)或者计算线程(默认数量等于CPU总核数)。但无论在哪个线程里执行,都不建议在callback里等待或执行特别复杂的计算。需要等待可以用counter任务进行不占线程的wait,复杂计算则应该包装成计算任务。 需要说明的是,框架里的一切资源都是使用时分配。如果用户没有用到网络通信,那么所有和通信相关的线程都不会被创建。


    int main(void)
        return 0;


    WFFaciliies::WaitGroup wait_group(1);
    void callback(WFHttpTask *task)
    int main(void)
        WFHttpTask *task = WFTaskFactory::create_http_task(url, 0, 0, callback);
        return 0;



    void http_callback(WFHttpTask *task)
        protocol::HttpResponse *resp = task->get_resp();
        protocol::HttpResponse *my_resp = new protocol::HttpResponse(std::move(*resp));
        /* or
        protocol::HttpResponse *my_resp = new protocol::HttpResponse;
        *my_resp = std::move(*resp);

    某些情况下,如果用户创建完任务又不想启动了,那么需要调用task->dismiss()直接销毁任务。 需要特别强调,server的process函数不是callback,server任务的callback发生在回复完成之后,而且默认为nullptr。



    • 串行由任务组成
    • 并行由串行组成
    • 并行是一种任务



    可以使用WFGraphTask,或自己用WFCounterTask来构造。 示例:https://github.com/sogou/workflow/blob/master/tutorial/tutorial-11-graph_task.cc


    不是。server是在server task所在series没有别的任务之后回复请求。如果你不向这个series里添加任何任务,就相当于process结束之后回复。注意不要在process里等待任务的完成,而应该把这个任务添加到series里。


    错误的方法是在process里直接sleep。正确做法,向server所在的series里添加一个timer任务。以http server为例:

    void process(WFHttpTask *server_task)
        WFTimerTask *timer = WFTaskFactory::create_timer_task(100000, nullptr);

    以上代码实现一个100毫秒延迟的http server。一切都是异步执行,等待过程没有线程被占用。


    首先回复成功的定义是成功把数据写入tcp缓冲,所以如果回复包很小而且client端没有因为超时等原因关闭了连接,几乎可以认为一定回复成功。需要查看回复结果,只需给server task设置一个callback,callback里状态码和错误码的定义与client task是一样的,但server task不会出现dns错误。


    可以。任何时候调用server task的noreply()方法,那么在原本回复的时机,连接直接关闭。



    为什么使用redis client时无需先建立连接

    首先看一下redis client任务的创建接口:

    class WFTaskFactory
        WFRedisTask *create_redis_task(const std::string& url, int retry_max, redis_callback_t callback);

    其中url的格式为:redis://:[email protected]:port/dbnum。port默认值为6379,dbnum默认值为0。 workflow的一个重要特点是由框架来管理连接,使用户接口可以极致的精简,并实现最有效的连接复用。框架根据任务的用户名密码以及dbnum,来查找一个可以复用的连接。如果找不到则发起新连接并进行用户登陆,数据库选择等操作。如果是一个新的host,还要进行DNS解析。请求出错还可能retry。这每一个步骤都是异步并且透明的,用户只需要填写自己的request,将任务启动,就可以在callback里得到请求的结果。唯一需要注意的是,每次任务的创建都需要带着password,因为可能随时有登陆的需要。 同样的方法我们可以用来创建mysql任务。但对于有事务需求的mysql,则需要通过我们的WFMySQLConnection来创建任务了,否则无法保证整个事务都在同一个连接上进行。WFMySQLConnection依然能做到连接和认证过程的异步性。



    • 如果同一地址端口有满足条件的空闲连接,从中选择最近一个被释放的那个。即空闲连接的复用是先进后出的。
    • 当前地址端口没有满足条件的空闲连接时:
      • 如果当前并发连接数小于最大值(默认200),立刻发起新连接。
      • 并发连接数已经达到最大值,任务将得到系统错误EAGAIN。
    • 并不是所有相同目标地址和端口上的连接都满足复用条件。例如不同用户名或密码下的数据库连接,就不能复用。






    chunked编码的http body如何最高效访问


    #include "workflow/HttpUtil.h"
    void http_callback(WFHttpTask *task)
        protocol::HttpResponse *resp = task->get_resp();
        protocol::HttpChunkCursor cursor(resp);
        const void *chunk;
        size_t size;
        while (cursor.next(&chunk, &size))





    最常见的,同一个series里的任务共享series上下文,通过series的get_context()和set_context()的方法来读取和修改。而parallel在它的callback里,也可以通过series_at()获到它所包含的各个series(这些series的callback已经被调用,但会在parallel callback之后才被销毁),从而获取它们的上下文。由于parallel也是一种任务,所以它可以把汇总的结果通过它所在的series context继续传递。 总之,series是协程,series context就是协程的局部变量。parallel是协程的并行,可汇总所有协程的运行结果。




    Server的stop()操作是优雅关闭,程序结束之前必须关闭所有server。stop()由shutdown()和wait_finish()组成,wait_finish会等待所有运行中server task所在series结束。也就是说,你可以在server task回复完成的callback里,继续向series追加任务。stop()操作会等待这些任务的结束。另外,如果你同时开多个server,最好的关闭方法是:

    int main()
        // 一个server对象不能start多次,所以多端口服务需要定义多个server对象
        WFRedisServer server1(process);
        WFRedisServer server2(process);
        getchar(); // 输入回车结束
        // 先全部关闭,再等待。
        return 0;



    #include <string.h>
    #include <atomic>
    #include “workflow/WFHttpServer.h”
    extern void process(WFHttpTask *task);
    WFHttpServer server(process);
    void process(WFHttpTask *task) {
        if (strcmp(task->get_req()->get_request_uri(), “/stop”) == 0) {
            static std::atomic<int> flag;
            if (flag++ == 0)
            task->get_resp()->append_output_body(“<html>server stop</html>”);
        /* Server’s logic */
        //  ....
    int main() {
        if (server.start(8888) == 0)
        return 0;

    以上代码实现一个http server,在收到/stop的请求时结束程序。process中的flag是必须的,因为process并发执行,只能有一个线程来调用shutdown操作。



    void other_callback(server_task, counter, ...)
    void process(WFHttpTask *server_task)
        WFCounterTask *counter = WFTaskFactory::create_counter_task(1, nullptr);
        OtherAsyncTask *other_task = create_other_task(other_callback, server_task, counter);//非workflow框架的任务




        struct WFGlobalSettings settings = GLOBAL_SETTINGS_DEFAULT;
        settings.endpoint_params.use_tls_sni = true;

    开启这个功能是有一定代价的,所有https站点都会启动SNI,相同IP地址但不同域名的访问,将无法复用SSL连接。 因此用户也可以通过upstream功能,只打开对某个确定域名的SNI功能:

    #Include "workflow/UpstreamManager.h"
    int main()
        UpstreamManager::upstream_create_weighted_random("www.sogou.com", false);
        struct AddressParams params = ADDRESS_PARAMS_DEFAULT;
        params.endpoint_params.use_tls_sni = true;
        UpstreamManager::upstream_add_server("www.sogou.com", "www.sogou.com", &params);



    方法一(只适用于http任务且无法重定向): 可以通过代理服务器的地址创建http任务,并重新设置request_uri和Host头。假设我们想通过代理服务器www.proxy.com:8080访问http://www.sogou.com/ ,方法如下:

    task = WFTaskFactory::create_http_task("http://www.proxy.com:8080", 0, 0, callback);
    task->set_header_pair("Host", "www.sogou.com");

    方法二(通用。但有些代理服务器只支持HTTPS。HTTP还是推荐用方法一): 通过带proxy_url的接口创建http任务:

    class WFTaskFactory
        static WFHttpTask *create_http_task(const std::string& url,
                                            const std::string& proxy_url,
                                            int redirect_max, int retry_max,
                                            http_callback_t callback);

    其中proxy_url的格式为:http://user:[email protected]:port/ proxy只能是"http://"开头,而不能是"https://"。port默认值为80。 这个方法适用于http和https URL的代理,可以重定向,重定向时继续使用该代理服务器。

    http server是否支持RESTful接口

    推荐使用wfrest项目,这是基于workflow的一套RESTful API开发框架,项目地址:https://github.com/wfrest/wfrest

    opened by Barenboim 42
  • Kafka客户端的异常行为


    最近在用Kafka客户端时,发现异常如下: 1、客户端采用每200毫秒创建一个Kafka客户端任务(topic=xxx&api=fetch)。 2、程序运行一段时间后, 就收不到消息了 - 异常情形 3、使用GDB进行调试,断点到创建任务的线程后, 又能收到消息了 - 正常情形。



    opened by song-git 33
  • 回调能递归吗?


    我想实现一个任务出错后,间隔60秒再重试的情景。 void mysql_callback(WFMySQLTask *task) { ::Json json = mysql_concat_json_res(mysql_task); if (json.find("errcode") != json.end()) { WFTimerTask *timer = WFTaskFactory::create_timer_task(60, 0, [](WFTimerTask *) { WFMySQLTask *mysql_task = WFTaskFactory::create_mysql_task(url, 0, mysql_callback); }); } }

    像上面这样写可以吗? 或者有其他最佳实践?

    opened by vincentqin-sys 32
  • benchmark: 希望增加连接数较大的数据对比

    benchmark: 希望增加连接数较大的数据对比

    我本地跑了下 http_server,机器是4c8t,给server设置了4线程


    [email protected]:~/workflow/benchmark$ cat /etc/issue
    Ubuntu 20.04 LTS \n \l
    [email protected]:~/workflow/benchmark$ cat /proc/cpuinfo | grep processor
    processor	: 0
    processor	: 1
    processor	: 2
    processor	: 3
    processor	: 4
    processor	: 5
    processor	: 6
    processor	: 7
    [email protected]:~/workflow/benchmark$ ./http_server 4 9000 32


    [email protected]:~/wrk$ ./wrk --latency -d10 -c200 --timeout 8 -t4
    Running 10s test @
      4 threads and 200 connections
      Thread Stats   Avg      Stdev     Max   +/- Stdev
        Latency   574.13us    1.03ms  22.70ms   97.01%
        Req/Sec    98.28k     8.84k  131.17k    73.07%
      Latency Distribution
         50%  384.00us
         75%  579.00us
         90%    0.85ms
         99%    5.90ms
      3921936 requests in 10.10s, 647.06MB read
    Requests/sec: 388303.50
    Transfer/sec:     64.06MB
    [email protected]:~/workflow/benchmark$ top -d 1 | grep _server
      63161 ubuntu    20   0 1781400   5612   4548 S 276.0   0.1   0:02.76 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   5612   4548 S 465.3   0.1   0:07.46 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   5872   4548 S 467.0   0.1   0:12.13 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 458.4   0.1   0:16.76 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 469.0   0.1   0:21.45 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 457.4   0.1   0:26.07 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 468.0   0.1   0:30.75 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 461.4   0.1   0:35.41 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 463.0   0.1   0:40.04 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 465.3   0.1   0:44.74 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400   6132   4548 S 194.0   0.1   0:46.68 http_server 


    [email protected]:~/wrk$ ./wrk --latency -d10 -c20000 --timeout 8 -t4
    Running 10s test @
      4 threads and 20000 connections
      Thread Stats   Avg      Stdev     Max   +/- Stdev
        Latency    32.63ms   18.67ms 299.99ms   71.61%
        Req/Sec     6.63k     6.71k   37.35k    89.16%
      Latency Distribution
         50%   30.52ms
         75%   42.46ms
         90%   55.49ms
         99%   86.61ms
      244113 requests in 10.13s, 40.28MB read
      Socket errors: connect 0, read 1300855, write 0, timeout 0
    Requests/sec:  24091.07
    Transfer/sec:      3.97MB
      63161 ubuntu    20   0 1781400  14740   4548 S 167.0   0.2   0:48.35 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16456   4612 S 261.4   0.2   0:50.99 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16468   4612 S 193.3   0.2   0:51.28 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16468   4612 S 200.0   0.2   0:51.62 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15200   4612 S 208.0   0.2   0:53.70 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15200   4612 S 195.0   0.2   0:55.67 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15308   4612 S 187.0   0.2   0:57.54 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15696   4612 S 184.2   0.2   0:59.40 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15960   4612 S 185.0   0.2   1:01.25 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  16024   4612 S 186.1   0.2   1:03.13 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15984   4612 S 184.2   0.2   1:04.99 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15984   4612 S 185.0   0.2   1:06.84 http_server                                                                                                                               
      63161 ubuntu    20   0 1781400  15984   4612 S   4.0   0.2   1:06.88 http_server                                                                                                                               


    连接数较大时cpu利用率下降、相对的性能下降也比较明显。 使用是默认的代码、没做修改,是否能够通过修改配置或代码来进行优化?

    opened by lesismal 31
  • 客户端卡死,服务器不响应


    一个简单的http api服务,客户端少于10个,每秒钟都有几次请求。 客户端是java请求,服务端使用workflow实现。 长时间使用,大概7天左右,就会出现客户端请求卡死,达到最大超时时间抛错。而服务端业务部分不能收到任何消息。 使用lsof -i观察端口情况,端口依然处于监听状态。但是最后一条,fd显示1023。连接全部是close_wait状态,后面没用其他日志。 短时间不复现,长时间必现,没有有效日志。 出现此情况后,无论怎么请求,服务器均不再响应。

    opened by zj376879088 30
  • WF DAG能否实现视频结构化类的应用?

    WF DAG能否实现视频结构化类的应用?

    看到WF支持DAG吗,串并联。请问怎么使用WF来实现视频结构化类或者多媒体类的应用?比如: video capture ---> crop-resize-convert---> inference --> osd- > video output video streaming-> video decode -> video output 暂不考虑 batch inference。

    pipeline的每一个stage都可以并发执行。 video capture 按照固定频率比如30fps连续获取输入。


    opened by chris78l 26
  • 回包的时候不定期core,请问有遇到过么(MySQL client)?

    回包的时候不定期core,请问有遇到过么(MySQL client)?



    ==37681==ERROR: AddressSanitizer: BUS on unknown address 0x000000000000 (pc 0x7ff8ae4d90de bp 0xbebebebebebebeae sp 0x7ff89b0feef0 T26) #0 0x7ff8ae4d90dd (/opt/compiler/gcc-8.2/lib/libasan.so.5+0x2f0dd) #1 0x7ff8ae595b8a in __interceptor_free (/opt/compiler/gcc-8.2/lib/libasan.so.5+0xebb8a) #2 0x9be470 in mysql_parser_deinit /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/protocol/mysql_parser.c:62 #3 0x9bea6b in protocol::MySQLMessage::~MySQLMessage() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/protocol/MySQLMessage.cc:43 #4 0x9c7d70 in protocol::MySQLResponse::~MySQLResponse() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/_include/workflow/MySQLMessage.h:88 #5 0x9c7d70 in WFComplexClientTask<protocol::MySQLRequest, protocol::MySQLResponse, bool>::clear_resp() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/factory/WFTaskFactory.inl:186 #6 0x9c7d70 in WFComplexClientTask<protocol::MySQLRequest, protocol::MySQLResponse, bool>::switch_callback(WFTimerTask*) /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/factory/WFTaskFactory.inl:421 #7 0x9c7f69 in WFComplexClientTask<protocol::MySQLRequest, protocol::MySQLResponse, bool>::done() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/factory/WFTaskFactory.inl:486 #8 0x9d1696 in SubTask::subtask_done() /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/SubTask.cc:31 #9 0x9cf5b9 in Communicator::handle_incoming_reply(poller_result*) /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/Communicator.cc:691 #10 0x9d116a in Communicator::handler_thread_routine(void*) /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/Communicator.cc:1093 #11 0x9e148c in __thrdpool_routine /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/thrdpool.c:72 #12 0x7ff8ae492da3 in start_thread /home/liruihao/mygcc82/glibc-2.21/nptl/pthread_create.c:333 #13 0x7ff8ae0f932c in clone (/opt/compiler/gcc-8.2/lib/libc.so.6+0xeb32c)

    AddressSanitizer can not provide additional info. SUMMARY: AddressSanitizer: BUS (/opt/compiler/gcc-8.2/lib/libasan.so.5+0x2f0dd) Thread T26 created by T0 here: #0 0x7ff8ae4f6f50 in pthread_create (/opt/compiler/gcc-8.2/lib/libasan.so.5+0x4cf50) #1 0x9e1683 in __thrdpool_create_threads /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/thrdpool.c:156 #2 0x9e1683 in thrdpool_create /home/ferry/ONLINE_SERVICE/other/ferry/task_workspace//third-party/sogou-workflow/src/kernel/thrdpool.c:192


    opened by chenlinzhong 26
  • pipeline支持


    尝试了下http client pipeline,同一个write发送两个http request的数据,代码如下:

    package main
    import (
    func main() {
    	addr := "localhost:9000"
    	conn, err := net.Dial("tcp", addr)
    	if err != nil {
    	// 一个请求的数据
    	reqData := []byte(fmt.Sprintf("POST / HTTP/1.1\r\nHost: %v\r\nContent-Length: 5\r\n\r\n\r\nhello", addr))
    	// 两个请求一起发送
    	reqData = append(reqData, reqData...)
    	n, err := conn.Write(reqData)
    	if err != nil {
    	fmt.Println("write:\n", string(reqData))
    	defer fmt.Println("---------------------")
    	resData := make([]byte, 1024*8)
    	fmt.Println("read:", n)
    	for {
    		n, err = conn.Read(resData)
    		if err != nil {
    			fmt.Println("read failed:", n, err)


    [email protected]:~/workflow/benchmark# go run ./client.go 
     GET / HTTP/1.1
    Host: localhost:9000
    GET / HTTP/1.1
    Host: localhost:9000
    HTTP/1.1 200 OK
    Date: Sun, 15 May 2022 06:35:29 UTC
    Content-Type: text/plain; charset=UTF-8
    Content-Length: 32
    Connection: Keep-Alive
    read failed: 0 EOF


    opened by lesismal 21
  • workflow如何保证执行顺序的同时又避免开启过多线程数?



    opened by willkk 20
  • kafka集群中leader重启后问题


    我在使用workflow作为kafka客户端消费消息时, 当集群中的一台重启后,kafka客户端回调报错 state 67 ,错误号 5006. 重启客户端进程回调也一直返回这个状态码。知道集群中的那台服务器启动完成后,才能恢复正常。 请问这种情况是客户端的问题吗?客户端应该如何正确的应对这种情况呢。

    opened by song-git 18
  • Help: workflow对长连接推送服务的支持

    Help: workflow对长连接推送服务的支持


    1. 支持tcp长连接,同时若能支持websocket更佳;
    2. 服务端能在客户端在线状态下主动推送消息(不是客户端心跳模式的一来一回);
    3. 客户端网络异常断开情况下,服务端对该客户端推送消息时,workflow能主动鉴别网络断开的状态(网络层主动read socketfd 后异步回调应用层?);
    opened by cxxjava 15
  • WF部署网关遇到了一点问题



    现象:频繁错误:/: Fetch failed. state = 1, error = 11: Resource temporarily unavailable 除此之外。在公司还会报 errorn 2 No such file or directory

    环境: open files (-n) 65535 wf配置:maxconnections=10000,keepalive=-1,其他默认 服务器配置(这个肯定够了):64核-----

    复现环境: open files (-n) 1048576 wf配置:maxconnections=2000,keepalive=-1,其他默认 配置:虚拟机20核,内存6G



    请求: helloworld<--proxy<--用户

    回复: helloworld-->proxy-->用户


    void process(WFHttpTask *proxy_task)
    	auto *req = proxy_task->get_req();
    	SeriesWork *series = series_of(proxy_task);
    	WFHttpTask *http_task; /* for requesting remote webserver. */
    	tutorial_series_context *context = new tutorial_series_context;
    	context->url = req->get_request_uri();
    	context->proxy_task = proxy_task;
    	series->set_callback([](const SeriesWork *series) {
    		delete (tutorial_series_context *)series->get_context();
    	context->is_keep_alive = req->is_keep_alive();
    	http_task = WFTaskFactory::create_http_task("", 0, 0, http_callback);									
    	const void *body;
    	size_t len;
    	http_task->get_resp()->set_size_limit(200 * 1024 * 1024);
    	*series << http_task;
    opened by modestpower 8
  • kafka_cli使用中无响应和内存泄漏的问题(Windows)



    1. 如使用kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic import_data_v6创建topic后,再启动程序去Fetch record,程序无反应,不会有CallBack被调用。 此后即使向该topic中produce record,程序依然无响应。 但如果把程序重启动一下,就可以正常Fetch记录了,此后再插入该topic中的记录也能正常消费。
    2. 如使用kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 6 --topic import_data_v6创建6个partition的topic,再启动程序去Fetch record,程序无反应,也不会有CallBack被调用。并且此时内存在不停增长。



    bool g_run_flag_ = true;
    std::future<void> g_consumer_thread_;
    static WFFacilities::WaitGroup wait_group_(1);
    WFKafkaClient client_;
    std::string query_ = "";
    bool no_cgroup_ = false;
    void kafkaCallback(WFKafkaTask* task)
    	int state = task->get_state();
    	int error = task->get_error();
    	if (state != WFT_STATE_SUCCESS){
    		PLOG_ERR("kafka error, msg: %s", WFGlobal::get_error_string(state, error));
    	WFKafkaTask* next_task = nullptr;
    	int api_type = task->get_api_type();
    	switch (api_type)
    	case Kafka_Produce:
    	case Kafka_Fetch:
    		std::vector<std::vector<protocol::KafkaRecord*>> records;
    		protocol::KafkaResult new_result;
    		new_result = std::move(*task->get_result());
    		if (!records.empty()) {
    			std::string out;
    			for (const auto& v : records) {
    				if (v.empty())
    				long long offset = 0;
    				int partition = 0;
    				std::string topic;
    				for (const auto& rcd : v) {
    					const void* value;
    					size_t value_len;
    					rcd->get_value(&value, &value_len);
    					void* pChar = const_cast<void*>(value);
    					const char* p = static_cast<const char*>(pChar);
    					std::string message = std::string(p);
    					auto gdal_path = Singleton<SettingFileGroup>::Instance().Find(DIR_GDAL);
    					TaskConf task_conf;
    					if (!task_conf.LoadJson(message) || !task_conf.IsValid()) {
    						PLOG_ERR("failed to get task");
    					//这里不用异步处理,方便导入完成后做kafka commit操作
    					if (ImportData(task_conf.GetMissionId(), gdal_path)) {
    						offset = rcd->get_offset();
    						partition = rcd->get_partition();
    						topic = rcd->get_topic();
    						if (!no_cgroup_) {
    							next_task = client_.create_kafka_task("api=commit", 3, kafkaCallback);
    				if (!topic.empty()) {
    					out += "topic: " + topic;
    					out += ",partition: " + std::to_string(partition);
    					out += ",offset: " + std::to_string(offset) + ";";
    			PLOG_DEBUG("kafka fetch: %s", out.c_str());
    		if ( next_task ){
    			if (!no_cgroup_) {
    		else {
    			if (g_run_flag_) {
    				next_task = client_.create_kafka_task(query_, 3, kafkaCallback);
    			else {
    				next_task = client_.create_leavegroup_task(3, kafkaCallback);
    	case Kafka_OffsetCommit:
    		std::vector<protocol::KafkaToppar*> toppars;
    		if (!toppars.empty())
    			for (const auto& v : toppars)
    				PLOG_DEBUG("kafka commit topic: %s, partition: %d, offset: %llu, error: %d",
    					v->get_topic(), v->get_partition(),
    					v->get_offset(), v->get_error());
    		next_task = client_.create_kafka_task(query_, 3, kafkaCallback);
    	case Kafka_LeaveGroup:
    		PLOG_DEBUG("leavegroup callback");
    	if (!next_task){
    bool DataImport()
    	auto url = Singleton<KafkaConf>::Instance().broker_list_[0];
    	auto topic = Singleton<KafkaConf>::Instance().topic_list_[0];
    	WFKafkaTask* task;
    	int ret = client_.init(url, "ImporterGroup");
    	if (ret) {
    		PLOG_ERR("failed to init wfkafka client");
    		return false;
    	query_ = se::FormatStr("api=fetch&topic=%s", topic.c_str());
    	task = client_.create_kafka_task(query_, 3, kafkaCallback);
    opened by cmoth150415 6
  • [document] Add vcpkg instruction step

    [document] Add vcpkg instruction step

    workflow is available as a port in vcpkg, a C++ library manager that simplifies installation for workflow and other project dependencies. Documenting the install process here will help users get started by providing a single set of commands to build workflow, ready to be included in their projects.

    We also test whether our library ports build in various configurations (dynamic, static) on various platforms (OSX, Linux, Windows: x86, x64) to keep a wide coverage for users.

    I'm a maintainer for vcpkg, and here is what the port script looks like. We try to keep the library maintained as close as possible to the original library. :)

    opened by JackBoosY 7
  • RESTful API需求,推荐使用wfrest项目

    RESTful API需求,推荐使用wfrest项目

    wfrest是基于workflow的RESTful API项目,是workflow项目的资深粉丝开发。经过半年多的演化,功能上已经比较完善,覆盖了大多数web开发的需求,也形成了稳定的用户群。 具体用法,大家可以到项目主页看看:https://github.com/wfrest/wfrest 同时,欢迎大家开源其它基于workflow的项目。

    opened by Barenboim 0
  • v0.10.4(Nov 4, 2022)

    New Features

    • Add WFTaskFactory::reset_go_task to enable capturing the go task itself
    • Support creating WFThreadTask with running time limit
    • Add WFMessageQueue


    • Optimize route manager
    • Remove MD5Util
    • Optimize communicator

    Bug Fixes

    • Fix named conditional bug
    • Fix Kafka consuming bug that always returns the latest record
    • Fix service governance's fail to fuse a server on DNS lookup error
    • Fix the problem that always creating network threads when using wait group
    Source code(tar.gz)
    Source code(zip)
  • v0.10.3(Aug 26, 2022)

    New Features

    • Add WFRepeaterTask
    • Redis server supports inline commands
    • Reconstruct kafka task error. Add WFKafkaTask::get_kafka_error()


    • Reduce new/delete times when running any series
    • Optimize communicator's locking to improve client tasks' speed
    • Add 'append_output_body' interfaces for HttpMessage

    Bug Fixes

    • Fix the bug that server does not accept any connection after fd number reached max
    • Fix dns task scheme checking bug
    • Fix kafka client bug of all SASL authentication fails
    • Fix compiling error on macOS with gcc
    Source code(tar.gz)
    Source code(zip)
  • v0.10.2(Jun 26, 2022)

    New Features

    • Add WFModuleTask.
    • Add round-robin upstream policy.
    • Add named conditional to support observer mode.
    • Support creating a go task with running time limit.


    • Use sysconf() to get max open files. Reduce memory usage when it's below 65536.
    • Optimize redis parser to make it safer.
    • Optimize thread pool by using message queue. Reduce the overhead of computing task.

    Bug Fixes

    • Fix bugs to make it fully compatible with boring ssl.
    • Fix double deleting bug when canceling server task's series.
    • Fix Kafka client bugs.
    • Fix upstream UPSVNSWRRPolicy bug.
    Source code(tar.gz)
    Source code(zip)
  • v0.10.1(Apr 29, 2022)

    New Features

    • Add a built-in json parser
    • Add Consul client (prepared for Consul service governance)
    • Add MySQLUtil
    • Add package wrapper


    • Refactor kafka client and fixed some bugs
    • Refactor service governance task and add 'pre_select' for service governance plugins

    Bug Fixes

    • Fix MySQLCell::as_float bug when the it's a negative number.
    • Update CMake and bazel building files to fix kafka task's canceling bug
    Source code(tar.gz)
    Source code(zip)
  • v0.9.11(Mar 9, 2022)


    • Optimize upstream with consistent hash. Allow adding server with weight.
    • Improve http protocol compatibility of http client task.
    • Improve bazel building.

    Bug Fixes

    • Make UpstreamManager::upstream_delete() safe when the upstream is still in use.
    • Fix the SSL delaying problem on Nagle's algorithm.
    • Fix bug that cannot use the port in URL when using upstream.
    • Fix Kafka client offset timestamp overflow.
    • Fix Kafka client freezing problem when broker toggles the leader.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.10(Jan 21, 2022)


    • Refactor encode stream. This will improve the performance of Redis and Kafka clients.
    • Optimize SSL write. Improve the performance of HTTPS client and server a lot.
    • Optimize weighted-random upstream policy to solve some recovering problem.
    • Update Cmake files to support more platforms. Build both static and dynamic libs.

    Bug Fixes

    • Fix nvswrr upstream policy bug.
    • Fix MySQL client crash on incomplete result sets.
    • Fix Kafka out of range error on fetching.
    • Fix Kafka cgroup bug.
    • Fix SSL wrapper problem on TLS 1.3 handshaking.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.9(Dec 3, 2021)


    • Optimize Dns Cache's searching speed and memory size
    • Optimize route manager and dns resolver
    • Increase server task's performance by reducing some atomic operations
    • Increase global performance by removing some singletons
    • Always use dns resolver as name service policy when redirecting
    • Add WFServer::get_listen_addr() for server started on a random port
    • Support kafka kip 329

    Bug Fixes

    • Fix service governance's ref count bug
    • Fix mysql sequence id bug when retrying big request
    • Fix VNSWRR upstream bug
    • Fix dns client bug when host name has trailing dot
    • Fix URL parser fatal bug
    Source code(tar.gz)
    Source code(zip)
  • v0.9.8(Sep 30, 2021)


    • Enable creating file IO tasks with path name
    • Add server task's push() interface
    • Optimize poller speed and memory occupying
    • Optimize URI parser, more than 50% faster
    • Optimize http implementation

    Bug Fixes

    • Fix crash when resolv.conf is empty
    • Fix Kafka client's memory leak
    • Fix MySQL transaction checking
    • Fix bazel compiling problem
    Source code(tar.gz)
    Source code(zip)
  • v0.9.7(Aug 8, 2021)


    • Implement DNS protocol and add DNS asynchronous client.
    • Use asynchronous DNS as default.
    • Optimize load balancing.
    • Add bazel support and add selective compiling.
    • Support longer timer.
    • Add WFResourcePool.

    Bug fixes

    • Fix Redis double SELECTs problem.
    • Fix upstream_replace_server() bug.
    • Fix timerfd problem on some WSL platforms.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.6(Jun 3, 2021)


    • Add SSLWrapper.
    • Support http/https task with proxy.
    • Support MySQL SSL client.
    • Add vnswrr upstream policy.

    Bug fixes

    • Fix upstream concurrency bug.
    • Fix MySQL multi-resultset for INSERTs
    • Fix Kafka client sasl auth bug.
    • Add -no-rtti compiling flag for kafka to be compatible with snappy 1.1.9.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.5(Apr 12, 2021)


    • Support TLS SNI on both client and server sides;
    • Upstream skips select history;
    • Kafka supports sasl auth;

    Bug Fixes

    • Fix default port bug;
    • MySQL fix decode overflow bug;
    • MySQL fix parsing suffixed ok_packet;
    • Kafka modify logic of versionapi;
    Source code(tar.gz)
    Source code(zip)
  • v0.9.4(Mar 17, 2021)


    • Add WFNameService and refactor "Upstream" modules.
    • Update the definition of WFServer::stop()'s finish time.
    • Kafka client supports offset storage.
    • Redis supports cluster command MOVED and ASK.
    • Supporting VCPKG.

    Bug fixes

    • Crash when dismissing a named counter.
    • WFGoTask implementation.
    • MySQL int/ulonglong length overflow.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.3(Jan 13, 2021)


    • Add Kafka client.
    • Improve client tasks performance.

    Bugs fixes:

    • Fix several MySQL parser bugs.
    • Fix iovcnt==0 problem on macOS.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.2(Nov 13, 2020)


    • Add WFGraphTask for building DAG.
    • Add WFDynamicTask.
    • Make SeriesWork derivable.
    • Improve MySQL client.

    Bug Fixes:

    • Fix mysql protocol parsing bug.
    • Fix EncodeStream bug.

    Last release before kafka protocol.

    Source code(tar.gz)
    Source code(zip)
  • v0.9.1(Sep 30, 2020)


    • Complete English documents.
    • Optimize kernel codes. The message queue is a standalone module now.
    • Support MySQL character_set_results.
    • Add benchmark codes and documents.

    Bug Fixes:

    • Fix crashing of MySQL client when the local host is disallowed.
    • Fix MySQL client's problem when using short connection.
    • Fix LRU cache bug when cache is full.
    • Fix upstream bug of division by zero.
    Source code(tar.gz)
    Source code(zip)
  • v0.9.0(Aug 17, 2020)

Sogou Open Source
Sogou Open Source
RakNet is a cross platform, open source, C++ networking engine for game programmers.

RakNet 4.081 Copyright (c) 2014, Oculus VR, Inc. Package notes The Help directory contains index.html, which is full help documentation in HTML format

Facebook Archive 3.1k Nov 29, 2022
RakNet is a cross platform, open source, C++ networking engine for game programmers.

RakNet 4.081 Copyright (c) 2014, Oculus VR, Inc. Package notes The Help directory contains index.html, which is full help documentation in HTML format

Facebook Archive 3.1k Nov 29, 2022
Small and fast cross-platform networking library, with support for messaging, IPv6, HTTP, SSL and WebSocket.

frnetlib Frnetlib, is a cross-platform, small and fast networking library written in C++. There are no library dependencies (unless you want to use SS

Fred Nicolson 22 May 16, 2022
Cross-platform, efficient, customizable, and robust asynchronous HTTP/WebSocket server C++14 library with the right balance between performance and ease of use

What Is RESTinio? RESTinio is a header-only C++14 library that gives you an embedded HTTP/Websocket server. It is based on standalone version of ASIO

Stiffstream 912 Nov 25, 2022
Ultra fast and low latency asynchronous socket server & client C++ library with support TCP, SSL, UDP, HTTP, HTTPS, WebSocket protocols and 10K connections problem solution

CppServer Ultra fast and low latency asynchronous socket server & client C++ library with support TCP, SSL, UDP, HTTP, HTTPS, WebSocket protocols and

Ivan Shynkarenka 938 Nov 29, 2022
Mongoose Embedded Web Server Library - a multi-protocol embedded networking library with TCP/UDP, HTTP, WebSocket, MQTT built-in protocols, async DNS resolver, and non-blocking API.

Mongoose - Embedded Web Server / Embedded Networking Library Mongoose is a networking library for C/C++. It implements event-driven non-blocking APIs

Cesanta Software 9k Dec 2, 2022
WAFer is a C language-based software platform for scalable server-side and networking applications. Think node.js for C programmers.

WAFer WAFer is a C language-based ultra-light scalable server-side web applications framework. Think node.js for C programmers. Because it's written i

Riolet Corporation 692 Nov 19, 2022
High-level networking API for real-time simulations with primitives for remote procedure call and object state replication

tnl2 - Torque Network Library version 2 tnl2 is a high-level networking API for real-time simulations with primitives for remote procedure call and o

Mark Frohnmayer 23 Apr 10, 2022
C++ networking library including UniConf and a convenient D-Bus API

This is wvstreams, a nominally platform-independent networking and utilities library for C++. Some documentation is in the Docs/ directory. If that

null 27 Dec 29, 2021
Socket and Networking Library using msgpack.org[C++11]

netLink C++ 11 KISS principle networking library. Features: C++ 11 IPv4, IPv6 Protocols: TCP, UDP Enable/Disable blocking mode Join/Leave UDP-Multicas

Alexander Meißner 210 Oct 18, 2022
Portable, single-file, protocol-agnostic TCP and UDP socket wrapper, primarily for game networking

Documentation This is a header-only library, as such most of its functional documentation is contained within the "header section" of the source code

null 65 Aug 29, 2022
Simple and small reliable UDP networking library for games

libquicknet Simple and small reliable UDP networking library for games ❗ libquicknet is under development and not suitable for production code ❗ The m

null 25 Oct 26, 2022
The C++ REST SDK is a Microsoft project for cloud-based client-server communication in native code using a modern asynchronous C++ API design. This project aims to help C++ developers connect to and interact with services.

Welcome! The C++ REST SDK is a Microsoft project for cloud-based client-server communication in native code using a modern asynchronous C++ API design

Microsoft 7.1k Nov 26, 2022
Enabling services on your device 79 Nov 13, 2022
A Lightweight and fully asynchronous WebSocket client library based on libev

libuwsc(中文) A Lightweight and fully asynchronous WebSocket client library based on libev for Embedded Linux. And provide Lua-binding. Why should I cho

Jianhui Zhao 285 Nov 17, 2022
Packio - An asynchronous msgpack-RPC and JSON-RPC library built on top of Boost.Asio.

Header-only | JSON-RPC | msgpack-RPC | asio | coroutines This library requires C++17 and is designed as an extension to boost.asio. It will let you bu

Quentin Chateau 54 Nov 26, 2022
ENet reliable UDP networking library

Please visit the ENet homepage at http://enet.bespin.org for installation and usage instructions. If you obtained this package from github, the quick

Lee Salzman 2.2k Nov 28, 2022
canonical libwebsockets.org networking library

Libwebsockets Libwebsockets is a simple-to-use, MIT-license, pure C library providing client and server for http/1, http/2, websockets, MQTT and other

lws-team 3.7k Nov 29, 2022
Reliable & unreliable messages over UDP. Robust message fragmentation & reassembly. P2P networking / NAT traversal. Encryption.

GameNetworkingSockets GameNetworkingSockets is a basic transport layer for games. The features are: Connection-oriented API (like TCP) ... but message

Valve Software 6.3k Nov 29, 2022