Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WAL replay hangs if the response wasn't received #535

Merged
merged 1 commit into from
May 13, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/cluster/cluster_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ClusterServer struct {
type ServerConnection interface {
Connect()
Close()
ClearRequests()
MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error
}

Expand Down Expand Up @@ -94,12 +95,12 @@ func (self *ClusterServer) Connect() {
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) {
err := self.connection.MakeRequest(request, responseStream)
if err != nil {
self.isUp = false
message := err.Error()
select {
case responseStream <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}:
default:
}
self.markServerAsDown()
}
}

Expand Down Expand Up @@ -178,11 +179,16 @@ func (self *ClusterServer) getHeartbeatResponse(responseChan <-chan *protocol.Re
return nil
}

func (self *ClusterServer) markServerAsDown() {
self.isUp = false
self.connection.ClearRequests()
}

func (self *ClusterServer) handleHeartbeatError(err error) {
if self.isUp {
log.Warn("Server marked as down. Hearbeat error for server: %d - %s: %s", self.Id, self.ProtobufConnectionString, err)
}
self.isUp = false
self.markServerAsDown()
self.Backoff *= 2
if self.Backoff > self.MaxBackoff {
self.Backoff = self.MaxBackoff
Expand Down
17 changes: 17 additions & 0 deletions src/coordinator/protobuf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (self *ProtobufClient) Close() {
self.stopped = true
self.conn = nil
}
self.ClearRequests()
}

func (self *ProtobufClient) getConnection() net.Conn {
Expand All @@ -81,6 +82,22 @@ func (self *ProtobufClient) getConnection() net.Conn {
return self.conn
}

func (self *ProtobufClient) ClearRequests() {
self.requestBufferLock.Lock()
defer self.requestBufferLock.Unlock()

message := "clearing all requests"
for _, req := range self.requestBuffer {
select {
case req.responseChan <- &protocol.Response{Type: &endStreamResponse, ErrorMessage: &message}:
default:
log.Debug("Cannot send response on channel")
}
}

self.requestBuffer = map[uint32]*runningRequest{}
}

// Makes a request to the server. If the responseStream chan is not nil it will expect a response from the server
// with a matching request.Id. The REQUEST_RETRY_ATTEMPTS constant of 3 and the RECONNECT_RETRY_WAIT of 100ms means
// that an attempt to make a request to a downed server will take 300ms to time out.
Expand Down
25 changes: 16 additions & 9 deletions src/coordinator/protobuf_request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,7 @@ func NewProtobufRequestHandler(coordinator Coordinator, clusterConfig *cluster.C

func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, conn net.Conn) error {
if *request.Type == protocol.Request_WRITE {
shard := self.clusterConfig.GetLocalShardById(*request.ShardId)
log.Debug("HANDLE: (%d):%d:%v", self.clusterConfig.LocalServer.Id, request.GetId(), shard)
err := shard.WriteLocalOnly(request)
if err != nil {
log.Error("ProtobufRequestHandler: error writing local shard: ", err)
return err
}
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
return self.WriteResponse(conn, response)
go self.handleWrites(request, conn)
} else if *request.Type == protocol.Request_DROP_DATABASE {
go self.handleDropDatabase(request, conn)
return nil
Expand All @@ -55,6 +47,21 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
return nil
}

func (self *ProtobufRequestHandler) handleWrites(request *protocol.Request, conn net.Conn) {
shard := self.clusterConfig.GetLocalShardById(*request.ShardId)
log.Debug("HANDLE: (%d):%d:%v", self.clusterConfig.LocalServer.Id, request.GetId(), shard)
err := shard.WriteLocalOnly(request)
var errorMsg *string
if err != nil {
log.Error("ProtobufRequestHandler: error writing local shard: %s", err)
errorMsg = protocol.String(err.Error())
}
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk, ErrorMessage: errorMsg}
if err := self.WriteResponse(conn, response); err != nil {
log.Error("ProtobufRequestHandler: error writing local shard: %s", err)
}
}

func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn net.Conn) {
// the query should always parse correctly since it was parsed at the originating server.
queries, err := parser.ParseQuery(*request.Query)
Expand Down
1 change: 1 addition & 0 deletions src/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (self *Server) ListenAndServe() error {
if err != nil {
panic(err)
}
log.Info("Connection string changed successfully")
}

go self.ProtobufServer.ListenAndServe()
Expand Down