|
| 1 | + |
| 2 | +相对于普通的push, 使用者需要自己编解码协议, 本文介绍tarscpp中另外一种push callback的方式, 这种方式可以大幅度降低服务间push的开发难度. |
| 3 | + |
| 4 | +## 背景 |
| 5 | +当服务A和服务B间有交互, 当A主动发起连接B, 此时A作为客户端, B作为服务端, 当B希望主动给A push消息时, 我们可以采取tarscpp中的第三方协议来完成, 但是这种模式下, 主要有两个缺点: |
| 6 | + |
| 7 | +服务的协议完全自定义的, 提高了开发难度 |
| 8 | +服务协议非普通的tars协议, 如果一个普通的tars协议服务希望具备这个功能就比较难做到 |
| 9 | +因此tarscpp框架提供了一种全新的push callback模式来解决这两个问题. |
| 10 | + |
| 11 | +## 接口定义 |
| 12 | +我们先看看具体的使用案例, 首先定义服务B的接口如下: |
| 13 | +```c++ |
| 14 | +interface Hello |
| 15 | +{ |
| 16 | + int registerPush(); |
| 17 | +}; |
| 18 | +``` |
| 19 | +可以看到这个接口比较简单, 就是提供A调用, 用于注册push. |
| 20 | + |
| 21 | +在定义服务A和B之间的push接口, 这个定义非常重要, 是整个push callback的精髓, 需要仔细理解. |
| 22 | + |
| 23 | +```c++ |
| 24 | +interface Push |
| 25 | +{ |
| 26 | + int pushMsg(out string sRsp); |
| 27 | +}; |
| 28 | +``` |
| 29 | +### 服务端实现 |
| 30 | + |
| 31 | +完成接口定义后, 我们再来看服务B的核心实现, 首先服务B在Hello的实现中, 实现registerPush, 头文件HelloImp核心定义如下: |
| 32 | + |
| 33 | +```c++ |
| 34 | + /** |
| 35 | + * 连接关闭 |
| 36 | + * @param current |
| 37 | + * @return |
| 38 | + */ |
| 39 | + virtual int doClose(tars::TarsCurrentPtr current); |
| 40 | + |
| 41 | + /** |
| 42 | + * 客户端请求注册push通知 |
| 43 | + */ |
| 44 | + virtual int registerPush(tars::TarsCurrentPtr current); |
| 45 | +``` |
| 46 | +实现文件如下: |
| 47 | +
|
| 48 | +```c++ |
| 49 | +int HelloImp::doClose(tars::TarsCurrentPtr current) |
| 50 | +{ |
| 51 | + g_app._pushThread.delCurrent(current); |
| 52 | +} |
| 53 | +int HelloImp::registerPush(tars::TarsCurrentPtr current) |
| 54 | +{ |
| 55 | + g_app._pushThread.addCurrent(current); |
| 56 | + return 0; |
| 57 | +} |
| 58 | +``` |
| 59 | + |
| 60 | +说明: |
| 61 | +- push是个长连接,需要关注连接关闭事件, 因此可以看到实现了doClose方法, 将current去掉; |
| 62 | +- 而registerPush是服务A主动调用, 此时服务B将current保存起来, 用于后续push; |
| 63 | +- current表示了请求的上下文, 如果是同一个连接上来的请求, current->getUId()的值相同的, 框架层也是利用这个Id给对应的连接回消息 |
| 64 | +- _pushThread是一个自己实现的push线程类, 这个你可以根据实际情况去实现 |
| 65 | + |
| 66 | +接下来我们看看_pushThread 到底做了哪些工作. |
| 67 | + |
| 68 | +_pushThread是自己实现的一个对象, 由于示例比较简单, 它会启动一个线程然后使用保存下来的current给上来的请求发送push消息, 核心示例代码如下: |
| 69 | + |
| 70 | +```c++ |
| 71 | +void PushThread::addCurrent(CurrentPtr ¤t) |
| 72 | +{ |
| 73 | + std::lock_guard<std::mutex> lock(_mutex); |
| 74 | + |
| 75 | + _currents[current->getUId()] = current; |
| 76 | +} |
| 77 | + |
| 78 | +void PushThread::delCurrent(CurrentPtr ¤t) |
| 79 | +{ |
| 80 | + std::lock_guard<std::mutex> lock(_mutex); |
| 81 | + |
| 82 | + _currents.erase(current->getUId()); |
| 83 | +} |
| 84 | + |
| 85 | +void PushThread::run() |
| 86 | +{ |
| 87 | + while(!_terminate) |
| 88 | + { |
| 89 | + std::unique_lock<std::mutex> lock(_mutex); |
| 90 | + |
| 91 | + for(auto it : _currents) |
| 92 | + { |
| 93 | + TestApp::Push::async_response_push_pushMsg(it.second, 0, "push message"); |
| 94 | + } |
| 95 | + |
| 96 | + _cond.wait_for(lock, std::chrono::milliseconds(1000)); |
| 97 | + } |
| 98 | + |
| 99 | +} |
| 100 | + |
| 101 | +``` |
| 102 | +其中最核心的一行代码: |
| 103 | +```c++ |
| 104 | +TestApp::Push::async_response_push_pushMsg(it.second, 0, "push message"); |
| 105 | +``` |
| 106 | + |
| 107 | +分析: |
| 108 | + |
| 109 | +- async_response_push_pushMsg函数是根据前面Push接口自动生成的, 匹配了Push接口的pushMsg函数 |
| 110 | +- 第一参数是current, 代表了是哪个连接上来的, 第二个参数是pushMsg的返回值, 第三个参数是pushMsg的out返回参数, 可以有多个返回参数, 这种push方式, 不需要有输入参数. |
| 111 | + |
| 112 | +可以看到利用这个方法就可以给客户端返回想要的数据. |
| 113 | + |
| 114 | +- 客户端实现 |
| 115 | +对于客户端而言, 获取服务端的通知也非常简单, 代码示例如下: |
| 116 | + |
| 117 | +```c++ |
| 118 | +class PushCallbackImp : public PushPrxCallback |
| 119 | +{ |
| 120 | +public: |
| 121 | + virtual void callback_pushMsg(tars::Int32 ret, const std::string& sRsp) |
| 122 | + { |
| 123 | + LOG_CONSOLE_DEBUG << ret << ", " << sRsp << endl; |
| 124 | + } |
| 125 | +}; |
| 126 | + |
| 127 | +_comm = new Communicator(); |
| 128 | + |
| 129 | +HelloPrx pPrx = _comm->stringToProxy<HelloPrx>(helloObj); |
| 130 | +pPrx->tars_set_push_callback(new PushCallbackImp()); |
| 131 | +pPrx->registerPush(); |
| 132 | +``` |
| 133 | +分析: |
| 134 | +
|
| 135 | +- 客户端为了收取push消息, 核心就是要注册tars_set_push_callback一个回调; |
| 136 | +- PushPrxCallback就是代码生成中的类, 实现对应的callback_pushMsg 即可完成push回调! |
| 137 | +
|
| 138 | +可以看到, 客户端的操作也非常简单! |
| 139 | +
|
| 140 | +## 其他说明 |
| 141 | +利用push callback机制, 可以非常方便完成服务间的push机制, 但是需要注意几点 |
| 142 | +
|
| 143 | +- registerPush可以定时调用, 保持A和B之间连接的活跃度 |
| 144 | +- 需要注意B有多台服务器的场景, 每次调用registerPush都是轮训的, 这个时候就需要考虑清楚push机制该如何设计, 这个本文就不展开,使用者可以自己仔细思考一下 |
| 145 | +- tarsgo也已经支持该模式, 其他语言暂时未实现该机制. |
0 commit comments