Skip to content

xiaoquanjie/async

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

c++异步io库
目标:
    1)提供异步访问的通用底层支持(类似node语言里的promise机制)及内置一些常用的io接口
       目前已支持的io接口:redis、mysql、http、mongo、rabbitmq、zookeeper
       未来将支持的io接口: websocket
    2)提供类似服务器进程脚手架的功能,供快速搭建服务器架构

编译:
1、 如需使用redis模块,则cmake时需定义宏USE_ASYNC_REDIS,如需使用集群则需要定义宏USE_ASYNC_CLUSTER_REDIS。如:cmake .. -DUSE_ASYNC_REDIS=1 -DUSE_ASYNC_CLUSTER_REDIS=1
    redis模块依赖:libevent、hiredis_cluster、hiredis库

2、 如需使用mongo模块,则cmake时需定义宏USE_ASYNC_MONGO。如:cmake .. -DUSE_ASYNC_MONGO=1
    monogo模块依赖: bson-1.0、mongoc-1.0库

3、 如需使用libcurl模块,则cmake时需定义宏USE_ASYNC_CURL。如:cmake .. -DUSE_ASYNC_CURL=1
    libcurl模块依赖: libcurl库

4、 如需使用mysql模块,则cmake时需定义宏USE_ASYNC_MYSQL。如:cmake .. -DUSE_ASYNC_MYSQL=1
    mysql模块依赖:libmysqlclient8.0库

5、 如需使用ipc模块,则cmake时需定义宏USE_IPC。如:cmake .. -DUSE_IPC=1
    ipc模块依赖:zeromq库

6、 如需使用transaction模块,则cmake时需定义宏USE_TRANS。如:cmake .. -DUSE_TRANS=1
    transaction模块依赖:无依赖

7、 如需使用net模块,则cmake时需定义宏USE_NET。如: cmake .. -DUSE_NET=1
    net模块依赖:libevent

8、 如需使用rabbitmq模块,则cmake时需要定义宏USE_ASYNC_RABBITMQ。如:cmake .. -DUSE_ASYNC_RABBITMQ=1
    rabbitmq模块依赖:librabbitmq库
   
9、 如需使用zookeeper模块,则cmake时需要定义宏USE_ASYNC_ZOOKEEPER。如:cmake .. -DUSE_ASYNC_ZOOKEEPER=1
    zookeeper模块依赖:zookeeper-c client的单线程版本

20、serve模块,分三子模块
    1)libserve模块,提供基础服务操作。提供了默认的后端进程间通信架构及消息处理样式
    2)router模块,提供默认的后端进程通信需用到的路由功能
    3)gate模块,一个网关服务进程的模板

21、cmake例子:
    1)根目录下执行:mkdir -p build
    2)cd build
    3)cmake .. -DUSE_ASYNC_CURL=1 -DUSE_ASYNC_REDIS=1 -DUSE_ASYNC_CLUSTER_REDIS=1 -DUSE_ASYNC_MONGO=1 \
        -DUSE_ASYNC_MYSQL=1 -DUSE_ASYNC_RABBITMQ=1 -DUSE_ASYNC_ZOOKEEPER=1 -DUSE_IPC=1 -DUSE_TRANS=1 -DUSE_NET=1 -DCMAKE_BUILD_TYPE=Debug
    4)输出文件:
        库文件:build/bin/libasync.so
        头文件: build/include/

依赖库安装:
    如果库安装路径、链接版本、头文件等出现报错,可以根据编译环境调整 common/CMakeLists.txt文件
    centos安装依赖库:
        1、libevent: 
            1)wget https://github.com/libevent/libevent/releases/download/release-2.1.8-stable/libevent-2.1.8-stable.tar.gz
            2)tar zxvf libevent-2.1.8-stable.tar.gz
            3)cd libevent-2.1.8-stable
            4)./configure –prefix=/usr --libdir=/usr/local/lib64
            5)make
            6)make install

        2、redis-server:
            1)wget http://download.redis.io/releases/redis-5.0.9.tar.gz
            2)tar -zxvf redis-5.0.9.tar.gz
            3)cd redis-5.0.9
            4)make
            5)make install
            
        3、hiredis:
            1)https://github.com/redis/hiredis/releases下载源码
            2)tar zxvf hiredis-1.1.0.tar.gz
            3)cd hiredis-1.1.0
            4)make
            5)make install
            
        4、安装hiredis_vip:
            1)https://github.com/vipshop/hiredis-vip 下载源码
            2)unzip hiredis-vip-master.zip
            3)cd hiredis-vip-master
            4)make
            5)make install
            
        4-2、安装hiredis-cluster:
            1)https://github.com/Nordix/hiredis-cluster 下载源码
            2)unzip hiredis-cluster-master.zip
            3)cd hiredis-cluster-master
            4)mkdir build
            5)cd build
            6)cmake .. // (需要cmake3.2版本)  
            
        5、zeromq:
            1)https://github.com/zeromq/libzmq 下载源码
            2)unzip libzmq-master.zip
            3)cd libzmq-master
            4)yum install -y gettext
            5)./autogen.sh (如果报错,就安装yum install -y libtool)
            6)./configure
            7)make
            8)make install
            
        6、安装libmysqlclient8.0:
            1)它可以随8.0的服务器一起安装
            2)https://downloads.mysql.com/archives/community/ 在这个地址下载下来,然后分别拷贝头文件和库到/usr/include/mysql/、/usr/lib64/mysql/
            1)yum install -y mariadb-devel.x86_64
           
        7、mysql-server8.0:
            1)wget http://dev.mysql.com/get/mysql80-community-release-el7-5.noarch.rpm
            2)rpm -ivh mysql80-community-release-el7-5.noarch.rpm
            3)yum install mysql-community-server -y
            4)systemctl restart mysqld
            5)查看默认密码:cat /var/log/mysqld.log | grep password
            6)登录修改密码:
                (1)mysql -uroot -p
                (2)alter user 'root'@'localhost' identified by 'Root@123456'

        8、libcurl-c:
            1)wget https://curl.se/download/curl-7.82.0.tar.gz
            2)tar zxvf curl-7.82.0.tar.gz
            3)./configure --prefix=/usr/local/ --without-ssl
            4)make
            5)make install
            
        9、mongoc安装:
            1)下载 wget https://github.com/mongodb/mongo-c-driver/releases/download/1.23.2/mongo-c-driver-1.23.2.tar.gz 
            2)mkdir cmake-build
            3)cd cmake-build
            4)cmake -DENABLE_AUTOMATIC_INIT_AND_CLEANUP=OFF ..
            5)make
            6)make install
            
        10、rabbitmq-c安装:
            1)下载 wget https://github.com/alanxz/rabbitmq-c/archive/refs/tags/v0.13.0.tar.gz
            2)tar zxvf rabbitmq-c-0.13.0.tar.gz
            3)mkdir build && cd build
            4)cmake -DCMAKE_INSTALL_PREFIX=/usr/local -DENABLE_SSL_SUPPORT=OFF ..
            5)make && make install
            
        11、rabbitmq-server安装:
            1)安装erlang:
                (1) wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
                (2) rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
                (3) rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
                (4) vi /etc/yum.repos.d/erlang.repo
                    [erlang-solutions]
                    name=CentOS $releasever - $basearch - Erlang Solutions
                    baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
                    gpgcheck=1
                    gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
                    enabled=1                    
                (5) yum install erlang
                
            2)下载: rabbitmq-server-3.9.29-1.el8.noarch.rpm 在:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.9.29
            3)yum install -y socat
            4)rpm -ivh rabbitmq-server-3.9.29-1.el8.noarch.rpm
            5)systemctl start rabbitmq-server
            6)systemctl enable rabbitmq-server
            7)启动管理插件:rabbitmq-plugins enable rabbitmq_management, systemctl stop firewalld, http://localhost:15672
            8)创建用户,授权用户:
                rabbitmqctl add_user admin admin 
                rabbitmqctl set_user_tags admin administrator
                
        12、 zookeeper安装(依赖java):
            1)下载:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
            2)解压:tar zxvf apache-zookeeper-3.7.1-bin.tar.gz
            3)cp -r apache-zookeeper-3.7.1-bin /usr/local/zookeeper
            4)cd /usr/local/zookeeper
            5)mkdir logs
            6)mkdir data
            7)vi ./conf/zoo.cfg
                tickTime = 2000
                dataDir = /usr/local/zookeeper/data
                dataLogDir = /usr/local/zookeeper/logs
                clientPort = 2181
                initLimit = 5
                syncLimit = 2
            8)服务命令:
                cd ./bin
                ./zkServer.sh start
                ./zkServer.sh stop
                ./zkServer.sh restart
                ./zkServer.sh status
            9)客户端命令:
                ./zkCli.sh 

        13、zookeeper-c安装:
            依赖工具:
                yum install -y cppunit-devel
                yum install -y ant

            1)下载:https://github.com/apache/zookeeper
            2)进入top目录,运行ant compile_jute, 如果报错没有build.xml文件,需要去3。5.*的版本将build.xml和ivy开头的文件拷贝过来
            3)进入zookeeper-client/zookeeper-client-c目录
            4)autoreconf -if 
            5)./configure
            6)make && make install
目录结构:
example
    example.cpp         ----> 测试用例
    CMakeLists.txt      ----> 编译文件

async                   ----> 库目录
    threads             ----> 线程池目录
        thread_pool.h   ----> 线程池类
    
    coroutine           ----> 协程目录
        coroutine_task.h/cpp ----> 协程池类文件
        coroutine.h/cpp ----> 协程
    
    async               ----> 异步库目录
        cpu             ----> 异步cpu运算实现
        curl            ----> 异步http实现
        mongo           ----> 异步mongo实现
        mysql           ----> 异步mysql实现
        redis           ----> 异步redis实现
        rabbitmq        ----> 异步rabbitmq实现
        zookeeper       ----> 异步zookeeper实现

    co_async            ----> 协程与异步io的结合
        promise         ----> 支持将异步操作变成同步操作,有点类似于js中的promise
        cpu             ----> async::cpu与promise的结合
        curl            ----> async::curl与promise的结合
        mongo           ----> async::mongo与promise的结合
        mysql           ----> async::mysql与promise的结合
        redis           ----> async::redis与promise的结合
        rabbitmq        ----> async::rabbitmq与promise的结合
        zookeeper       ----> async::zookeeper与promise的结合
        ipc             ----> 用以提供进程间通信时协程式的send接口

    ipc                 ----> 提供进程间的通信,适合用于后端服务之间的通信。使用zeromq库作为实现者

    transaction         ----> 提供以协程方式运行的包处理接口

    net                 ----> 提供tcp server/client接口,udp server/client接口,http server接口

serve                   ----> 目标是提供一个服务器进程脚手架,可以快速搭建后端服务,还提供一个通用的route进程。
                              它会依赖common库,且common库需要使用-DUSE_TRANS编译。已实现大部分功能
    serve               ----> 编译输出libserve.so库
    route               ----> ./router -c router.json 启动路由,不同世界的路由互不通信
    gate                ----> 一个网关服案例工程,可以对它进行修改实现自定义网关服
    test                ----> 测试案例,起两个进程,这两个进程使用router做转发相互通信。
    testhttp            ----> 演绎如何监听http服务,并注册httptransaction处理业务
                              
层次结构:co表示coroutine协程

                              co_async
            _______________________________________________________________________
            |      |        |        |         |          |            |           |
        ____|      |        |        |         |          |_           |           |
       |           |        |        |         |           |           |           |
     co_redis   co_mongo   co_curl  co_cpu   co_mysql   parallel接口  co_rabbitmq  co_zookeeper
       |           |        |        |         |           |           |           |
       |           |        |        |         |           |           |           |
       |           |        |        |         |           |           |           |
       -----------------------------------------------------------------------------
       |                                   promise                                 |          
       ----------------------------------------------------------------------------- 
       |           |        |        |         |                  |              |
async_redis  async_mongo async_curl async_cpu async_mysql      async_rabbitmq  async_zookeeper
       |           |        |        |         |                  |              |
       |           |        |        |         |                  |              |
       ---------------------------------------------------------------------------
                                    |
                                    |
                             thread_pool(*可选用)      

线程池是选用模块,建议使用,能使异步io运行速度变快。
如果不使用,则由调用线程来驱动异步io的运行

异步io模型的实现原理描述:

 第一种:同步core,如:async_cpu、async_mongo、async_rabbitmq
 第二种:异步串行core,如:aysnc_mysql
 第三中:异步并行core,如: aysnc_curl、aysnc_redis、aysnc_zookeeper

 同步core:
  1、work_thread 投递一个请求,将请求带上work_thread的信息进行包装,然后请求被放入本地请求队列
  2、work_thread运行loop
      (1)localStatistics打印统计信息		
      (2)localRespond回调回复		
      (3)从本地队列中弹出一个请求,将请求包装成一个任务抛入io任务队列
  3、io线程池
      (1)从io任务队列中取出一个任务
      (2)处理任务:根据请求信息在全局core队列中取出一个合适的core
       (3) 如果core队列中没有合适的core,则创建一个新的core
       (4) core处理完请求拿到请求结果,将结果放入work_thread的回复队列
       (5) core失效则关闭掉,core有效,则放回全局core队列
  
 异步串行core:
  1、维护一个全局的globaldata,globaldata有一张连接池hash表,连接池中的每个连接有一个待处理请求列表
  2、work_thread 投递一个请求,将请求带上work_thread的信息进行包装,然后请求被放入本地请求队列
  3、work_thread运行loop
      (1)localStatistics打印统计信息
      (2)localRespond回调回复
      (3)遍历本地队列
          1)处理请求超时的情况
          2)从valid core列表取出一个有效的core,且core并发处理任务没有超过1个,则将请求放入core的待处理队列
          3)没有有效的core,且core的数量没有超过supposeIothread限制,则创建一个新core放入conning core列表
          4)如果core的数量已达到上线,则将任务均衡挂载到每个core上
      (4)利用原子操作抢占dispatchTask标识
          1)抢占成功
          2)遍历globaldata连接池
          3)将core平均分发到每个线程中去执行任务
  4、io线程池
      (1)从io任务队列取出一个任务
      (2)处理任务:
          1)处理core的连接状态
          2)处理core的待处理任务列表
          
 并行异步core:
  1、维护一个全局的globaldata,globaldata有一张连接池hash表,连接池中的每个连接有一个待处理请求列表
  2、work_thread 投递一个请求,将请求带上work_thread的信息进行包装,然后请求被放入本地请求队列
  3、work_thread运行loop
      (1)localStatistics打印统计信息
      (2)localRespond回调回复
      (3)遍历本地队列
          1)处理请求超时的情况
          2)从valid core列表取出一个有效的core,且core并发处理任务没有超过上限,则将请求放入core的待处理队列
          3)没有有效的core,且core的数量没有超过supposeIothread限制,则创建一个新core放入conning core列表
          4)如果core的数量已达到上线,则将任务均衡挂载到每个core上
      (4)利用原子操作抢占dispatchTask标识
          1)抢占成功
          2)遍历globaldata连接池
          3)将core平均分发到每个线程中去执行任务
  4、io线程池
      (1)从io任务队列取出一个任务
      (2)处理任务:
          1)处理core的连接状态
          2)处理core的待处理任务列表


example:
// 下面代码的输出结果是:
promise begin
异步执行结束
promise over:0

void promise_test() {
    // 起一个协程任务,回调将在协程里运行
    CoroutineTask::doTask([](void*){
        printf("promise begin\n");
        auto ret = co_async::promise([](co_async::Resolve resolve, co_async::Reject reject) {
            // 两秒后执行下面的回调
            co_async::setTimeout([resolve]() {
                printf("异步执行结束\n");
                // 报告promise异步执行结束,并传递结果集,这里传nullptr
                resolve(nullptr);
            }, 2*1000);
        }, 10 * 1000);
        // promise返回,往下继续走
        printf("promise over:%d\n", ret.first);
    }, 0);
}

int main() {
    promise_test();
    //cpu_test(true);
    //co_parallel_test();
    //curl_test(true);
    //mongo_test(true);
    //redis_test(true);
    //co_mysql_test();
    //ipc_test();

    while (true) {
        co_async::loop();
        usleep(100);
        //printf("loop\n");
    }
    return 0;
}

///////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////////

serve模块的使用案例步骤:

1、启动router进程: ./router -c router.json
router.json的配置如下:
{
    "comment": "本配置文件描述的是进程监听及连接端口的信息",

    "@zm_listen": "zeromq的监听信息列表, 一般是给路由使用",
    "zm_listen": [
        {
            "world": 1,
            "protocol": "tcp",
            "ip": "0.0.0.0",
            "port": 5001
        }
    ],

    "@routes": "描述的是将会连接到本进程的连接信息,以便做路由选择使用",
    "routes": [
        {
            "world": 1,
            "type": 1,
            "id": 1
        },
        {
            "world": 1,
            "type": 2,
            "id": 1
        }
    ]
}

2、启动testServe1进程:./testServe -c serve1.json
serve1.json的配置如下:
{
    "comment": "本配置文件描述的是进程监听及连接端口的信息",

    "@self": "标识进程身份的信息",
    "self": {
        "world": 1,
        "type": 1,
        "id": 1
    },

    "@zm_connect": "zeromq的连接信息列表",
    "zm_connect": [
        {
            "protocol": "tcp",
            "ip": "0.0.0.0",
            "port": 5001
        }
    ]
}

3、启动testServe2进程:./testServe -c serve2.json
serve1.json的配置如下:
{
    "comment": "本配置文件描述的是进程监听及连接端口的信息",

    "@self": "标识进程身份的信息",
    "self": {
        "world": 1,
        "type": 2,
        "id": 1
    },

    "@zm_connect": "zeromq的连接信息列表",
    "zm_connect": [
        {
            "protocol": "tcp",
            "ip": "0.0.0.0",
            "port": 5001
        }
    ]
}


About

c++异步io

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 3

  •  
  •  
  •