Skip to content

Commit

Permalink
Merge pull request #58 from unixliang/master
Browse files Browse the repository at this point in the history
fix consumer retry
  • Loading branch information
unixliang authored Aug 2, 2018
2 parents 0032bde + 97fab5d commit a779693
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 25 deletions.
2 changes: 1 addition & 1 deletion phxqueue/consumer/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class Consumer : public comm::MultiProc {
// Add items to Store.
// Handling failed items requires retry.
// Need to implement an RPC that corresponds to Store::Add().
virtual comm::RetCode Add(const comm::proto::AddRequest &req, comm::proto::AddResponse &resp) = 0;
virtual comm::RetCode Add(comm::proto::AddRequest &req, comm::proto::AddResponse &resp) = 0;

// Consumer reports its own machine load to the Scheduler, which, after statistics, returns the dynamic weight to the Consumer.
// Depending on the dynamic weight, each Consumer uses a same algorithm to calculate which queues should be handled by themselves.
Expand Down
3 changes: 2 additions & 1 deletion phxqueue/producer/batchhelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ comm::RetCode BatchTask::Process(bool is_timeout) {
uint64_t time_wait_ms = now_timestamp_ms - start_timestamp_ms_;

comm::proto::AddRequest batch_req;
comm::proto::AddResponse batch_resp;

for (auto &task : tasks_) {
auto req = task->GetReq();
Expand All @@ -164,7 +165,7 @@ comm::RetCode BatchTask::Process(bool is_timeout) {
}
}

auto retcode = producer_->RawAdd(batch_req);
auto retcode = producer_->RawAdd(batch_req, batch_resp);

comm::ProducerBP::GetThreadInstance()->OnBatchStat(batch_req, retcode, time_wait_ms, is_timeout);
//printf("batch %d time_wait_ms %" PRIu64 " is_timeout %d\n", batch_req.items_size(), time_wait_ms, is_timeout);
Expand Down
11 changes: 5 additions & 6 deletions phxqueue/producer/producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ comm::RetCode Producer::Enqueue(const int topic_id, const uint64_t uin, const in
}

for (auto &&req : reqs) {
if (comm::RetCode::RET_OK != (ret = SelectAndAdd(*req, nullptr, nullptr))) {
comm::proto::AddResponse resp;
if (comm::RetCode::RET_OK != (ret = SelectAndAdd(*req, resp, nullptr, nullptr))) {
comm::ProducerBP::GetThreadInstance()->OnSelectAndAddFail(topic_id, pub_id, handle_id, uin);
QLErr("SelectAndAdd client_id %s ret %d", client_id.c_str(), as_integer(ret));
return ret;
Expand Down Expand Up @@ -297,7 +298,7 @@ comm::RetCode Producer::MakeAddRequests(const int topic_id,
}


comm::RetCode Producer::SelectAndAdd(comm::proto::AddRequest &req, StoreSelector *ss, QueueSelector *qs) {
comm::RetCode Producer::SelectAndAdd(comm::proto::AddRequest &req, comm::proto::AddResponse &resp, StoreSelector *ss, QueueSelector *qs) {
QLVerb("SelectAndAdd");

comm::ProducerBP::GetThreadInstance()->OnSelectAndAdd(req);
Expand Down Expand Up @@ -359,7 +360,7 @@ comm::RetCode Producer::SelectAndAdd(comm::proto::AddRequest &req, StoreSelector
QLErr("BatchRawEnqueue ret %d store %d queue %d uin %" PRIu64, as_integer(ret), store_id, queue_id, uin);
}
} else {
if (comm::RetCode::RET_OK != (ret = RawAdd(req))) {
if (comm::RetCode::RET_OK != (ret = RawAdd(req, resp))) {
comm::ProducerBP::GetThreadInstance()->OnRawAddFail(req);
QLErr("RawEnqueue ret %d store %d queue %d uin %" PRIu64, as_integer(ret), store_id, queue_id, uin);
}
Expand All @@ -374,7 +375,7 @@ comm::RetCode Producer::SelectAndAdd(comm::proto::AddRequest &req, StoreSelector
return ret;
}

comm::RetCode Producer::RawAdd(comm::proto::AddRequest &req) {
comm::RetCode Producer::RawAdd(comm::proto::AddRequest &req, comm::proto::AddResponse &resp) {
QLVerb("RawEnqueue");

comm::ProducerBP::GetThreadInstance()->OnRawAdd(req);
Expand All @@ -396,8 +397,6 @@ comm::RetCode Producer::RawAdd(comm::proto::AddRequest &req) {
}
if (queue_info->drop_all()) return comm::RetCode::RET_OK;

comm::proto::AddResponse resp;

BeforeAdd(req);

store::StoreMasterClient<comm::proto::AddRequest, comm::proto::AddResponse> store_master_client;
Expand Down
4 changes: 2 additions & 2 deletions phxqueue/producer/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ class Producer {

// Process a batch add to Store.
// Customize StoreSelector/QueueSelector can be specified to determine which store/queue to add.
comm::RetCode SelectAndAdd(comm::proto::AddRequest &req,
comm::RetCode SelectAndAdd(comm::proto::AddRequest &req, comm::proto::AddResponse &resp,
StoreSelector *ss, QueueSelector *qs);

// Process a batch add to Store.
comm::RetCode RawAdd(comm::proto::AddRequest &req);
comm::RetCode RawAdd(comm::proto::AddRequest &req, comm::proto::AddResponse &resp);


// ------------------------ Interfaces MUST be overrided ------------------------
Expand Down
2 changes: 1 addition & 1 deletion phxqueue/test/simpleconsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ comm::RetCode SimpleConsumer::Get(const comm::proto::GetRequest &req,
return comm::RetCode::RET_OK;
}

comm::RetCode SimpleConsumer::Add(const comm::proto::AddRequest &req,
comm::RetCode SimpleConsumer::Add(comm::proto::AddRequest &req,
comm::proto::AddResponse &resp) {
return comm::RetCode::RET_OK;
}
Expand Down
2 changes: 1 addition & 1 deletion phxqueue/test/simpleconsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SimpleConsumer : public consumer::Consumer {

virtual comm::RetCode Get(const comm::proto::GetRequest &req,
comm::proto::GetResponse &resp) override;
virtual comm::RetCode Add(const comm::proto::AddRequest &req,
virtual comm::RetCode Add(comm::proto::AddRequest &req,
comm::proto::AddResponse &resp) override;

virtual comm::RetCode UncompressBuffer(const std::string &buffer,
Expand Down
11 changes: 6 additions & 5 deletions phxqueue_phxrpc/consumer/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Unless required by applicable law or agreed to in writing, software distributed
#include "phxqueue_phxrpc/app/lock/lock_client.h"
#include "phxqueue_phxrpc/app/scheduler/scheduler_client.h"
#include "phxqueue_phxrpc/app/store/store_client.h"
#include "phxqueue_phxrpc/producer.h"


namespace phxqueue_phxrpc {
Expand Down Expand Up @@ -61,13 +62,13 @@ Consumer::Get(const phxqueue::comm::proto::GetRequest &req,
}

phxqueue::comm::RetCode
Consumer::Add(const phxqueue::comm::proto::AddRequest &req,
Consumer::Add(phxqueue::comm::proto::AddRequest &req,
phxqueue::comm::proto::AddResponse &resp) {

static __thread StoreClient store_client;
auto ret = store_client.ProtoAdd(req, resp);
phxqueue::producer::ProducerOption opt;
phxqueue_phxrpc::producer::Producer producer(opt);
auto ret = producer.SelectAndAdd(req, resp, nullptr, nullptr);
if (phxqueue::comm::RetCode::RET_OK != ret) {
QLErr("ProtoAdd ret %d", phxqueue::comm::as_integer(ret));
QLErr("Producer::SelectAndAdd ret %d", phxqueue::comm::as_integer(ret));
}
return ret;
}
Expand Down
14 changes: 7 additions & 7 deletions phxqueue_phxrpc/consumer/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@ class Consumer : public phxqueue::consumer::Consumer {

virtual phxqueue::comm::RetCode
UncompressBuffer(const std::string &buffer, const int buffer_type,
std::string &uncompressed_buffer);
std::string &uncompressed_buffer) override;
virtual void CompressBuffer(const std::string &buffer,
std::string &compress_buffer, const int buffer_type);
std::string &compress_buffer, const int buffer_type) override;
virtual phxqueue::comm::RetCode
Get(const phxqueue::comm::proto::GetRequest &req, phxqueue::comm::proto::GetResponse &resp);
Get(const phxqueue::comm::proto::GetRequest &req, phxqueue::comm::proto::GetResponse &resp) override;
virtual phxqueue::comm::RetCode
Add(const phxqueue::comm::proto::AddRequest &req, phxqueue::comm::proto::AddResponse &resp);
Add(phxqueue::comm::proto::AddRequest &req, phxqueue::comm::proto::AddResponse &resp) override;
virtual phxqueue::comm::RetCode
GetAddrScale(const phxqueue::comm::proto::GetAddrScaleRequest &req,
phxqueue::comm::proto::GetAddrScaleResponse &resp);
phxqueue::comm::proto::GetAddrScaleResponse &resp) override;
virtual phxqueue::comm::RetCode
GetLockInfo(const phxqueue::comm::proto::GetLockInfoRequest &req,
phxqueue::comm::proto::GetLockInfoResponse &resp);
phxqueue::comm::proto::GetLockInfoResponse &resp) override;
virtual phxqueue::comm::RetCode
AcquireLock(const phxqueue::comm::proto::AcquireLockRequest &req,
phxqueue::comm::proto::AcquireLockResponse &resp);
phxqueue::comm::proto::AcquireLockResponse &resp) override;

private:
virtual void RestoreUserCookies(const phxqueue::comm::proto::Cookies &user_cookies) {}
Expand Down
2 changes: 1 addition & 1 deletion phxqueue_phxrpc/test/consumer_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ int main(int argc, char ** argv) {
opt.proc_pid_path = config.GetProto().consumer().proc_pid_path();
opt.lock_path_base = config.GetProto().consumer().lock_path_base();
opt.use_store_master_client_on_get = 1;
opt.use_store_master_client_on_add = 1;
opt.use_store_master_client_on_add = 0;
opt.shm_key_base = config.GetProto().consumer().shm_key_base();


Expand Down

0 comments on commit a779693

Please sign in to comment.