diff --git a/api-contracts/dispatcher/dispatcher.proto b/api-contracts/dispatcher/dispatcher.proto index d4759ba6c..d78265386 100644 --- a/api-contracts/dispatcher/dispatcher.proto +++ b/api-contracts/dispatcher/dispatcher.proto @@ -9,6 +9,13 @@ service Dispatcher { rpc Listen(WorkerListenRequest) returns (stream AssignedAction) {} + // ListenV2 is like listen, but implementation does not include heartbeats. This should only used by SDKs + // against engine version v0.18.1+ + rpc ListenV2(WorkerListenRequest) returns (stream AssignedAction) {} + + // Heartbeat is a method for workers to send heartbeats to the dispatcher + rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {} + rpc SubscribeToWorkflowEvents(SubscribeToWorkflowEventsRequest) returns (stream WorkflowEvent) {} rpc SendStepActionEvent(StepActionEvent) returns (ActionEventResponse) {} @@ -238,4 +245,14 @@ message OverridesData { string callerFilename = 4; } -message OverridesDataResponse {} \ No newline at end of file +message OverridesDataResponse {} + +message HeartbeatRequest { + // the id of the worker + string workerId = 1; + + // heartbeatAt is the time the worker sent the heartbeat + google.protobuf.Timestamp heartbeatAt = 2; +} + +message HeartbeatResponse {} \ No newline at end of file diff --git a/internal/services/dispatcher/contracts/dispatcher.pb.go b/internal/services/dispatcher/contracts/dispatcher.pb.go index 22eb851ec..9da0a3f4a 100644 --- a/internal/services/dispatcher/contracts/dispatcher.pb.go +++ b/internal/services/dispatcher/contracts/dispatcher.pb.go @@ -1276,6 +1276,101 @@ func (*OverridesDataResponse) Descriptor() ([]byte, []int) { return file_dispatcher_proto_rawDescGZIP(), []int{12} } +type HeartbeatRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // the id of the worker + WorkerId string `protobuf:"bytes,1,opt,name=workerId,proto3" json:"workerId,omitempty"` + // heartbeatAt is the time the worker sent the heartbeat + HeartbeatAt *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=heartbeatAt,proto3" json:"heartbeatAt,omitempty"` +} + +func (x *HeartbeatRequest) Reset() { + *x = HeartbeatRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_dispatcher_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HeartbeatRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeartbeatRequest) ProtoMessage() {} + +func (x *HeartbeatRequest) ProtoReflect() protoreflect.Message { + mi := &file_dispatcher_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeartbeatRequest.ProtoReflect.Descriptor instead. +func (*HeartbeatRequest) Descriptor() ([]byte, []int) { + return file_dispatcher_proto_rawDescGZIP(), []int{13} +} + +func (x *HeartbeatRequest) GetWorkerId() string { + if x != nil { + return x.WorkerId + } + return "" +} + +func (x *HeartbeatRequest) GetHeartbeatAt() *timestamppb.Timestamp { + if x != nil { + return x.HeartbeatAt + } + return nil +} + +type HeartbeatResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *HeartbeatResponse) Reset() { + *x = HeartbeatResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_dispatcher_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HeartbeatResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeartbeatResponse) ProtoMessage() {} + +func (x *HeartbeatResponse) ProtoReflect() protoreflect.Message { + mi := &file_dispatcher_proto_msgTypes[14] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeartbeatResponse.ProtoReflect.Descriptor instead. +func (*HeartbeatResponse) Descriptor() ([]byte, []int) { + return file_dispatcher_proto_rawDescGZIP(), []int{14} +} + var File_dispatcher_proto protoreflect.FileDescriptor var file_dispatcher_proto_rawDesc = []byte{ @@ -1422,60 +1517,75 @@ var file_dispatcher_proto_rawDesc = []byte{ 0x6c, 0x6c, 0x65, 0x72, 0x46, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x46, 0x69, 0x6c, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x17, 0x0a, 0x15, 0x4f, 0x76, 0x65, 0x72, 0x72, 0x69, 0x64, 0x65, 0x73, 0x44, - 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0x4e, 0x0a, 0x0a, 0x41, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x41, - 0x52, 0x54, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x00, 0x12, 0x13, 0x0a, - 0x0f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52, 0x55, 0x4e, - 0x10, 0x01, 0x12, 0x17, 0x0a, 0x13, 0x53, 0x54, 0x41, 0x52, 0x54, 0x5f, 0x47, 0x45, 0x54, 0x5f, - 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x10, 0x02, 0x2a, 0xa2, 0x01, 0x0a, 0x17, - 0x47, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x20, 0x0a, 0x1c, 0x47, 0x52, 0x4f, 0x55, 0x50, - 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, - 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x20, 0x0a, 0x1c, 0x47, 0x52, 0x4f, - 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, - 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x22, 0x0a, 0x1e, 0x47, - 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, - 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, - 0x1f, 0x0a, 0x1b, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, + 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x6c, 0x0a, 0x10, 0x48, + 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x12, 0x3c, 0x0a, 0x0b, 0x68, + 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x41, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x68, 0x65, + 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x41, 0x74, 0x22, 0x13, 0x0a, 0x11, 0x48, 0x65, 0x61, + 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0x4e, + 0x0a, 0x0a, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x0e, + 0x53, 0x54, 0x41, 0x52, 0x54, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x00, + 0x12, 0x13, 0x0a, 0x0f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, + 0x52, 0x55, 0x4e, 0x10, 0x01, 0x12, 0x17, 0x0a, 0x13, 0x53, 0x54, 0x41, 0x52, 0x54, 0x5f, 0x47, + 0x45, 0x54, 0x5f, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x10, 0x02, 0x2a, 0xa2, + 0x01, 0x0a, 0x17, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x41, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x20, 0x0a, 0x1c, 0x47, 0x52, + 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x20, 0x0a, 0x1c, + 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, + 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x22, + 0x0a, 0x1e, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, 0x45, 0x56, 0x45, 0x4e, + 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, + 0x10, 0x02, 0x12, 0x1f, 0x0a, 0x1b, 0x47, 0x52, 0x4f, 0x55, 0x50, 0x5f, 0x4b, 0x45, 0x59, 0x5f, + 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, + 0x44, 0x10, 0x03, 0x2a, 0x8a, 0x01, 0x0a, 0x13, 0x53, 0x74, 0x65, 0x70, 0x41, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x17, 0x53, + 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, + 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x45, 0x50, + 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, + 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, + 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, + 0x45, 0x44, 0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, - 0x2a, 0x8a, 0x01, 0x0a, 0x13, 0x53, 0x74, 0x65, 0x70, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x45, 0x50, - 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, - 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, - 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, - 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, - 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, - 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, - 0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x2a, 0x65, 0x0a, - 0x0c, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x19, 0x0a, - 0x15, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, - 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x52, 0x45, 0x53, 0x4f, - 0x55, 0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52, - 0x55, 0x4e, 0x10, 0x01, 0x12, 0x1e, 0x0a, 0x1a, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, - 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, 0x57, 0x5f, 0x52, - 0x55, 0x4e, 0x10, 0x02, 0x2a, 0xde, 0x01, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x52, 0x45, - 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, - 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x52, - 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, - 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x21, 0x0a, 0x1d, - 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, - 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, - 0x1e, 0x0a, 0x1a, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, - 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x12, + 0x2a, 0x65, 0x0a, 0x0c, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x19, 0x0a, 0x15, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x52, + 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x45, + 0x50, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x01, 0x12, 0x1e, 0x0a, 0x1a, 0x52, 0x45, 0x53, 0x4f, 0x55, + 0x52, 0x43, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x57, 0x4f, 0x52, 0x4b, 0x46, 0x4c, 0x4f, + 0x57, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x02, 0x2a, 0xde, 0x01, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, + 0x1b, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, + 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1f, + 0x0a, 0x1b, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, + 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x21, 0x0a, 0x1d, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, - 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x45, 0x44, - 0x10, 0x04, 0x12, 0x21, 0x0a, 0x1d, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, - 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x54, 0x49, 0x4d, 0x45, 0x44, 0x5f, - 0x4f, 0x55, 0x54, 0x10, 0x05, 0x32, 0xe4, 0x03, 0x0a, 0x0a, 0x44, 0x69, 0x73, 0x70, 0x61, 0x74, - 0x63, 0x68, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, - 0x12, 0x16, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, - 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x06, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x12, 0x14, 0x2e, - 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x19, 0x53, 0x75, 0x62, 0x73, + 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, + 0x10, 0x02, 0x12, 0x1e, 0x0a, 0x1a, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, + 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, + 0x10, 0x03, 0x12, 0x21, 0x0a, 0x1d, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, 0x45, 0x5f, 0x45, + 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, + 0x4c, 0x45, 0x44, 0x10, 0x04, 0x12, 0x21, 0x0a, 0x1d, 0x52, 0x45, 0x53, 0x4f, 0x55, 0x52, 0x43, + 0x45, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x54, 0x49, 0x4d, + 0x45, 0x44, 0x5f, 0x4f, 0x55, 0x54, 0x10, 0x05, 0x32, 0xd1, 0x04, 0x0a, 0x0a, 0x44, 0x69, 0x73, + 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, + 0x74, 0x65, 0x72, 0x12, 0x16, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, + 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x57, 0x6f, + 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x06, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, + 0x12, 0x14, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, + 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x30, 0x01, 0x12, 0x35, 0x0a, 0x08, 0x4c, + 0x69, 0x73, 0x74, 0x65, 0x6e, 0x56, 0x32, 0x12, 0x14, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, + 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, + 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, + 0x30, 0x01, 0x12, 0x34, 0x0a, 0x09, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x12, + 0x11, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x52, 0x0a, 0x19, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x54, 0x6f, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x21, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x54, 0x6f, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, @@ -1518,7 +1628,7 @@ func file_dispatcher_proto_rawDescGZIP() []byte { } var file_dispatcher_proto_enumTypes = make([]protoimpl.EnumInfo, 5) -var file_dispatcher_proto_msgTypes = make([]protoimpl.MessageInfo, 13) +var file_dispatcher_proto_msgTypes = make([]protoimpl.MessageInfo, 15) var file_dispatcher_proto_goTypes = []interface{}{ (ActionType)(0), // 0: ActionType (GroupKeyActionEventType)(0), // 1: GroupKeyActionEventType @@ -1538,36 +1648,43 @@ var file_dispatcher_proto_goTypes = []interface{}{ (*WorkflowEvent)(nil), // 15: WorkflowEvent (*OverridesData)(nil), // 16: OverridesData (*OverridesDataResponse)(nil), // 17: OverridesDataResponse - (*timestamppb.Timestamp)(nil), // 18: google.protobuf.Timestamp + (*HeartbeatRequest)(nil), // 18: HeartbeatRequest + (*HeartbeatResponse)(nil), // 19: HeartbeatResponse + (*timestamppb.Timestamp)(nil), // 20: google.protobuf.Timestamp } var file_dispatcher_proto_depIdxs = []int32{ 0, // 0: AssignedAction.actionType:type_name -> ActionType - 18, // 1: GroupKeyActionEvent.eventTimestamp:type_name -> google.protobuf.Timestamp + 20, // 1: GroupKeyActionEvent.eventTimestamp:type_name -> google.protobuf.Timestamp 1, // 2: GroupKeyActionEvent.eventType:type_name -> GroupKeyActionEventType - 18, // 3: StepActionEvent.eventTimestamp:type_name -> google.protobuf.Timestamp + 20, // 3: StepActionEvent.eventTimestamp:type_name -> google.protobuf.Timestamp 2, // 4: StepActionEvent.eventType:type_name -> StepActionEventType 3, // 5: WorkflowEvent.resourceType:type_name -> ResourceType 4, // 6: WorkflowEvent.eventType:type_name -> ResourceEventType - 18, // 7: WorkflowEvent.eventTimestamp:type_name -> google.protobuf.Timestamp - 5, // 8: Dispatcher.Register:input_type -> WorkerRegisterRequest - 8, // 9: Dispatcher.Listen:input_type -> WorkerListenRequest - 14, // 10: Dispatcher.SubscribeToWorkflowEvents:input_type -> SubscribeToWorkflowEventsRequest - 12, // 11: Dispatcher.SendStepActionEvent:input_type -> StepActionEvent - 11, // 12: Dispatcher.SendGroupKeyActionEvent:input_type -> GroupKeyActionEvent - 16, // 13: Dispatcher.PutOverridesData:input_type -> OverridesData - 9, // 14: Dispatcher.Unsubscribe:input_type -> WorkerUnsubscribeRequest - 6, // 15: Dispatcher.Register:output_type -> WorkerRegisterResponse - 7, // 16: Dispatcher.Listen:output_type -> AssignedAction - 15, // 17: Dispatcher.SubscribeToWorkflowEvents:output_type -> WorkflowEvent - 13, // 18: Dispatcher.SendStepActionEvent:output_type -> ActionEventResponse - 13, // 19: Dispatcher.SendGroupKeyActionEvent:output_type -> ActionEventResponse - 17, // 20: Dispatcher.PutOverridesData:output_type -> OverridesDataResponse - 10, // 21: Dispatcher.Unsubscribe:output_type -> WorkerUnsubscribeResponse - 15, // [15:22] is the sub-list for method output_type - 8, // [8:15] is the sub-list for method input_type - 8, // [8:8] is the sub-list for extension type_name - 8, // [8:8] is the sub-list for extension extendee - 0, // [0:8] is the sub-list for field type_name + 20, // 7: WorkflowEvent.eventTimestamp:type_name -> google.protobuf.Timestamp + 20, // 8: HeartbeatRequest.heartbeatAt:type_name -> google.protobuf.Timestamp + 5, // 9: Dispatcher.Register:input_type -> WorkerRegisterRequest + 8, // 10: Dispatcher.Listen:input_type -> WorkerListenRequest + 8, // 11: Dispatcher.ListenV2:input_type -> WorkerListenRequest + 18, // 12: Dispatcher.Heartbeat:input_type -> HeartbeatRequest + 14, // 13: Dispatcher.SubscribeToWorkflowEvents:input_type -> SubscribeToWorkflowEventsRequest + 12, // 14: Dispatcher.SendStepActionEvent:input_type -> StepActionEvent + 11, // 15: Dispatcher.SendGroupKeyActionEvent:input_type -> GroupKeyActionEvent + 16, // 16: Dispatcher.PutOverridesData:input_type -> OverridesData + 9, // 17: Dispatcher.Unsubscribe:input_type -> WorkerUnsubscribeRequest + 6, // 18: Dispatcher.Register:output_type -> WorkerRegisterResponse + 7, // 19: Dispatcher.Listen:output_type -> AssignedAction + 7, // 20: Dispatcher.ListenV2:output_type -> AssignedAction + 19, // 21: Dispatcher.Heartbeat:output_type -> HeartbeatResponse + 15, // 22: Dispatcher.SubscribeToWorkflowEvents:output_type -> WorkflowEvent + 13, // 23: Dispatcher.SendStepActionEvent:output_type -> ActionEventResponse + 13, // 24: Dispatcher.SendGroupKeyActionEvent:output_type -> ActionEventResponse + 17, // 25: Dispatcher.PutOverridesData:output_type -> OverridesDataResponse + 10, // 26: Dispatcher.Unsubscribe:output_type -> WorkerUnsubscribeResponse + 18, // [18:27] is the sub-list for method output_type + 9, // [9:18] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_dispatcher_proto_init() } @@ -1732,6 +1849,30 @@ func file_dispatcher_proto_init() { return nil } } + file_dispatcher_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HeartbeatRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_dispatcher_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HeartbeatResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_dispatcher_proto_msgTypes[0].OneofWrappers = []interface{}{} file_dispatcher_proto_msgTypes[10].OneofWrappers = []interface{}{} @@ -1741,7 +1882,7 @@ func file_dispatcher_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_dispatcher_proto_rawDesc, NumEnums: 5, - NumMessages: 13, + NumMessages: 15, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/services/dispatcher/contracts/dispatcher_grpc.pb.go b/internal/services/dispatcher/contracts/dispatcher_grpc.pb.go index c97e922da..e607d0d34 100644 --- a/internal/services/dispatcher/contracts/dispatcher_grpc.pb.go +++ b/internal/services/dispatcher/contracts/dispatcher_grpc.pb.go @@ -24,6 +24,11 @@ const _ = grpc.SupportPackageIsVersion7 type DispatcherClient interface { Register(ctx context.Context, in *WorkerRegisterRequest, opts ...grpc.CallOption) (*WorkerRegisterResponse, error) Listen(ctx context.Context, in *WorkerListenRequest, opts ...grpc.CallOption) (Dispatcher_ListenClient, error) + // ListenV2 is like listen, but implementation does not include heartbeats. This should only used by SDKs + // against engine version v0.18.1+ + ListenV2(ctx context.Context, in *WorkerListenRequest, opts ...grpc.CallOption) (Dispatcher_ListenV2Client, error) + // Heartbeat is a method for workers to send heartbeats to the dispatcher + Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) SubscribeToWorkflowEvents(ctx context.Context, in *SubscribeToWorkflowEventsRequest, opts ...grpc.CallOption) (Dispatcher_SubscribeToWorkflowEventsClient, error) SendStepActionEvent(ctx context.Context, in *StepActionEvent, opts ...grpc.CallOption) (*ActionEventResponse, error) SendGroupKeyActionEvent(ctx context.Context, in *GroupKeyActionEvent, opts ...grpc.CallOption) (*ActionEventResponse, error) @@ -80,8 +85,49 @@ func (x *dispatcherListenClient) Recv() (*AssignedAction, error) { return m, nil } +func (c *dispatcherClient) ListenV2(ctx context.Context, in *WorkerListenRequest, opts ...grpc.CallOption) (Dispatcher_ListenV2Client, error) { + stream, err := c.cc.NewStream(ctx, &Dispatcher_ServiceDesc.Streams[1], "/Dispatcher/ListenV2", opts...) + if err != nil { + return nil, err + } + x := &dispatcherListenV2Client{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Dispatcher_ListenV2Client interface { + Recv() (*AssignedAction, error) + grpc.ClientStream +} + +type dispatcherListenV2Client struct { + grpc.ClientStream +} + +func (x *dispatcherListenV2Client) Recv() (*AssignedAction, error) { + m := new(AssignedAction) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *dispatcherClient) Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) { + out := new(HeartbeatResponse) + err := c.cc.Invoke(ctx, "/Dispatcher/Heartbeat", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *dispatcherClient) SubscribeToWorkflowEvents(ctx context.Context, in *SubscribeToWorkflowEventsRequest, opts ...grpc.CallOption) (Dispatcher_SubscribeToWorkflowEventsClient, error) { - stream, err := c.cc.NewStream(ctx, &Dispatcher_ServiceDesc.Streams[1], "/Dispatcher/SubscribeToWorkflowEvents", opts...) + stream, err := c.cc.NewStream(ctx, &Dispatcher_ServiceDesc.Streams[2], "/Dispatcher/SubscribeToWorkflowEvents", opts...) if err != nil { return nil, err } @@ -154,6 +200,11 @@ func (c *dispatcherClient) Unsubscribe(ctx context.Context, in *WorkerUnsubscrib type DispatcherServer interface { Register(context.Context, *WorkerRegisterRequest) (*WorkerRegisterResponse, error) Listen(*WorkerListenRequest, Dispatcher_ListenServer) error + // ListenV2 is like listen, but implementation does not include heartbeats. This should only used by SDKs + // against engine version v0.18.1+ + ListenV2(*WorkerListenRequest, Dispatcher_ListenV2Server) error + // Heartbeat is a method for workers to send heartbeats to the dispatcher + Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) SubscribeToWorkflowEvents(*SubscribeToWorkflowEventsRequest, Dispatcher_SubscribeToWorkflowEventsServer) error SendStepActionEvent(context.Context, *StepActionEvent) (*ActionEventResponse, error) SendGroupKeyActionEvent(context.Context, *GroupKeyActionEvent) (*ActionEventResponse, error) @@ -172,6 +223,12 @@ func (UnimplementedDispatcherServer) Register(context.Context, *WorkerRegisterRe func (UnimplementedDispatcherServer) Listen(*WorkerListenRequest, Dispatcher_ListenServer) error { return status.Errorf(codes.Unimplemented, "method Listen not implemented") } +func (UnimplementedDispatcherServer) ListenV2(*WorkerListenRequest, Dispatcher_ListenV2Server) error { + return status.Errorf(codes.Unimplemented, "method ListenV2 not implemented") +} +func (UnimplementedDispatcherServer) Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Heartbeat not implemented") +} func (UnimplementedDispatcherServer) SubscribeToWorkflowEvents(*SubscribeToWorkflowEventsRequest, Dispatcher_SubscribeToWorkflowEventsServer) error { return status.Errorf(codes.Unimplemented, "method SubscribeToWorkflowEvents not implemented") } @@ -239,6 +296,45 @@ func (x *dispatcherListenServer) Send(m *AssignedAction) error { return x.ServerStream.SendMsg(m) } +func _Dispatcher_ListenV2_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(WorkerListenRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(DispatcherServer).ListenV2(m, &dispatcherListenV2Server{stream}) +} + +type Dispatcher_ListenV2Server interface { + Send(*AssignedAction) error + grpc.ServerStream +} + +type dispatcherListenV2Server struct { + grpc.ServerStream +} + +func (x *dispatcherListenV2Server) Send(m *AssignedAction) error { + return x.ServerStream.SendMsg(m) +} + +func _Dispatcher_Heartbeat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HeartbeatRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DispatcherServer).Heartbeat(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/Dispatcher/Heartbeat", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DispatcherServer).Heartbeat(ctx, req.(*HeartbeatRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Dispatcher_SubscribeToWorkflowEvents_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(SubscribeToWorkflowEventsRequest) if err := stream.RecvMsg(m); err != nil { @@ -343,6 +439,10 @@ var Dispatcher_ServiceDesc = grpc.ServiceDesc{ MethodName: "Register", Handler: _Dispatcher_Register_Handler, }, + { + MethodName: "Heartbeat", + Handler: _Dispatcher_Heartbeat_Handler, + }, { MethodName: "SendStepActionEvent", Handler: _Dispatcher_SendStepActionEvent_Handler, @@ -366,6 +466,11 @@ var Dispatcher_ServiceDesc = grpc.ServiceDesc{ Handler: _Dispatcher_Listen_Handler, ServerStreams: true, }, + { + StreamName: "ListenV2", + Handler: _Dispatcher_ListenV2_Handler, + ServerStreams: true, + }, { StreamName: "SubscribeToWorkflowEvents", Handler: _Dispatcher_SubscribeToWorkflowEvents_Handler, diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index d600fdcd3..a9d789dbe 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -258,6 +258,100 @@ func (s *DispatcherImpl) Listen(request *contracts.WorkerListenRequest, stream c } } +// ListenV2 is like Listen, but implementation does not include heartbeats. This should only used by SDKs +// against engine version v0.18.1+ +func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream contracts.Dispatcher_ListenV2Server) error { + tenant := stream.Context().Value("tenant").(*dbsqlc.Tenant) + tenantId := sqlchelpers.UUIDToStr(tenant.ID) + + s.l.Debug().Msgf("Received subscribe request from ID: %s", request.WorkerId) + + worker, err := s.repo.Worker().GetWorkerForEngine(tenantId, request.WorkerId) + + if err != nil { + s.l.Error().Err(err).Msgf("could not get worker %s", request.WorkerId) + return err + } + + // check the worker's dispatcher against the current dispatcher. if they don't match, then update the worker + if worker.DispatcherId.Valid && sqlchelpers.UUIDToStr(worker.DispatcherId) != s.dispatcherId { + _, err = s.repo.Worker().UpdateWorker(tenantId, request.WorkerId, &repository.UpdateWorkerOpts{ + DispatcherId: &s.dispatcherId, + }) + + if err != nil { + s.l.Error().Err(err).Msgf("could not update worker %s dispatcher", request.WorkerId) + return err + } + } + + fin := make(chan bool) + + s.workers.Store(request.WorkerId, subscribedWorker{stream: stream, finished: fin}) + + defer func() { + // non-blocking send + select { + case fin <- true: + default: + } + + s.workers.Delete(request.WorkerId) + + inactive := db.WorkerStatusInactive + + _, err := s.repo.Worker().UpdateWorker(tenantId, request.WorkerId, &repository.UpdateWorkerOpts{ + Status: &inactive, + }) + + if err != nil { + s.l.Error().Err(err).Msgf("could not update worker %s status to inactive", request.WorkerId) + } + }() + + ctx := stream.Context() + + // Keep the connection alive for sending messages + for { + select { + case <-fin: + s.l.Debug().Msgf("closing stream for worker id: %s", request.WorkerId) + return nil + case <-ctx.Done(): + s.l.Debug().Msgf("worker id %s has disconnected", request.WorkerId) + return nil + } + } +} + +const HeartbeatInterval = 4 * time.Second + +// Heartbeat is used to update the last heartbeat time for a worker +func (s *DispatcherImpl) Heartbeat(ctx context.Context, req *contracts.HeartbeatRequest) (*contracts.HeartbeatResponse, error) { + tenant := ctx.Value("tenant").(*dbsqlc.Tenant) + tenantId := sqlchelpers.UUIDToStr(tenant.ID) + + heartbeatAt := time.Now().UTC() + + s.l.Debug().Msgf("Received heartbeat request from ID: %s", req.WorkerId) + + // if heartbeat time is greater than expected heartbeat interval, show a warning + if req.HeartbeatAt.AsTime().Before(heartbeatAt.Add(-1 * HeartbeatInterval)) { + s.l.Warn().Msgf("heartbeat time is greater than expected heartbeat interval") + } + + _, err := s.repo.Worker().UpdateWorker(tenantId, req.WorkerId, &repository.UpdateWorkerOpts{ + // use the system time for heartbeat + LastHeartbeatAt: &heartbeatAt, + }) + + if err != nil { + return nil, err + } + + return &contracts.HeartbeatResponse{}, nil +} + // SubscribeToWorkflowEvents registers workflow events with the dispatcher func (s *DispatcherImpl) SubscribeToWorkflowEvents(request *contracts.SubscribeToWorkflowEventsRequest, stream contracts.Dispatcher_SubscribeToWorkflowEventsServer) error { tenant := stream.Context().Value("tenant").(*dbsqlc.Tenant) diff --git a/pkg/client/dispatcher.go b/pkg/client/dispatcher.go index 182c73405..bb2c64e69 100644 --- a/pkg/client/dispatcher.go +++ b/pkg/client/dispatcher.go @@ -8,6 +8,8 @@ import ( "github.com/rs/zerolog" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" dispatchercontracts "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts" @@ -145,6 +147,13 @@ func newDispatcher(conn *grpc.ClientConn, opts *sharedClientOpts) DispatcherClie } } +type ListenerStrategy string + +const ( + ListenerStrategyV1 ListenerStrategy = "v1" + ListenerStrategyV2 ListenerStrategy = "v2" +) + type actionListenerImpl struct { client dispatchercontracts.DispatcherClient @@ -159,6 +168,8 @@ type actionListenerImpl struct { v validator.Validator ctx *contextLoader + + listenerStrategy ListenerStrategy } func (d *dispatcherClientImpl) newActionListener(ctx context.Context, req *GetActionListenerRequest) (*actionListenerImpl, error) { @@ -188,7 +199,7 @@ func (d *dispatcherClientImpl) newActionListener(ctx context.Context, req *GetAc d.l.Debug().Msgf("Registered worker with id: %s", resp.WorkerId) // subscribe to the worker - listener, err := d.client.Listen(d.ctx.newContext(ctx), &dispatchercontracts.WorkerListenRequest{ + listener, err := d.client.ListenV2(d.ctx.newContext(ctx), &dispatchercontracts.WorkerListenRequest{ WorkerId: resp.WorkerId, }) @@ -197,13 +208,14 @@ func (d *dispatcherClientImpl) newActionListener(ctx context.Context, req *GetAc } return &actionListenerImpl{ - client: d.client, - listenClient: listener, - workerId: resp.WorkerId, - l: d.l, - v: d.v, - tenantId: d.tenantId, - ctx: d.ctx, + client: d.client, + listenClient: listener, + workerId: resp.WorkerId, + l: d.l, + v: d.v, + tenantId: d.tenantId, + ctx: d.ctx, + listenerStrategy: ListenerStrategyV2, }, nil } @@ -212,6 +224,42 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error a.l.Debug().Msgf("Starting to listen for actions") + // update the worker with a last heartbeat time every 4 seconds as long as the worker is connected + go func() { + timer := time.NewTicker(100 * time.Millisecond) + defer timer.Stop() + + // set last heartbeat to 5 seconds ago so that the first heartbeat is sent immediately + lastHeartbeat := time.Now().Add(-5 * time.Second) + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + if now := time.Now().UTC(); lastHeartbeat.Add(4 * time.Second).Before(now) { + a.l.Debug().Msgf("updating worker %s heartbeat", a.workerId) + + _, err := a.client.Heartbeat(a.ctx.newContext(ctx), &dispatchercontracts.HeartbeatRequest{ + WorkerId: a.workerId, + HeartbeatAt: timestamppb.New(now), + }) + + if err != nil { + a.l.Error().Err(err).Msgf("could not update worker %s heartbeat", a.workerId) + + // if the heartbeat method is unimplemented, don't continue to send heartbeats + if status.Code(err) == codes.Unimplemented { + return + } + } + + lastHeartbeat = time.Now().UTC() + } + } + } + }() + go func() { for { assignedAction, err := a.listenClient.Recv() @@ -232,6 +280,12 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error return } + // if this is an unimplemented error, default to v1 + if a.listenerStrategy == ListenerStrategyV2 && status.Code(err) == codes.Unimplemented { + a.l.Debug().Msgf("Falling back to v1 listener strategy") + a.listenerStrategy = ListenerStrategyV1 + } + err = a.retrySubscribe(ctx) if err != nil { @@ -287,9 +341,18 @@ func (a *actionListenerImpl) retrySubscribe(ctx context.Context) error { for retries < DefaultActionListenerRetryCount { time.Sleep(DefaultActionListenerRetryInterval) - listenClient, err := a.client.Listen(a.ctx.newContext(ctx), &dispatchercontracts.WorkerListenRequest{ - WorkerId: a.workerId, - }) + var err error + var listenClient dispatchercontracts.Dispatcher_ListenClient + + if a.listenerStrategy == ListenerStrategyV1 { + listenClient, err = a.client.Listen(a.ctx.newContext(ctx), &dispatchercontracts.WorkerListenRequest{ + WorkerId: a.workerId, + }) + } else if a.listenerStrategy == ListenerStrategyV2 { + listenClient, err = a.client.ListenV2(a.ctx.newContext(ctx), &dispatchercontracts.WorkerListenRequest{ + WorkerId: a.workerId, + }) + } if err != nil { retries++ diff --git a/python-sdk/examples/dag/worker.py b/python-sdk/examples/dag/worker.py index 525aa90ca..006265321 100644 --- a/python-sdk/examples/dag/worker.py +++ b/python-sdk/examples/dag/worker.py @@ -4,7 +4,7 @@ load_dotenv() -hatchet = Hatchet() +hatchet = Hatchet(debug=True) @hatchet.workflow(on_events=["user:create"],schedule_timeout="10m") class MyWorkflow: diff --git a/python-sdk/hatchet_sdk/clients/dispatcher.py b/python-sdk/hatchet_sdk/clients/dispatcher.py index 9d3f96acf..5463930d7 100644 --- a/python-sdk/hatchet_sdk/clients/dispatcher.py +++ b/python-sdk/hatchet_sdk/clients/dispatcher.py @@ -1,5 +1,6 @@ # relative imports -from ..dispatcher_pb2 import GroupKeyActionEvent, StepActionEvent, ActionEventResponse, ActionType, AssignedAction, WorkerListenRequest, WorkerRegisterRequest, WorkerUnsubscribeRequest, WorkerRegisterResponse, OverridesData +import threading +from ..dispatcher_pb2 import GroupKeyActionEvent, StepActionEvent, ActionEventResponse, ActionType, AssignedAction, WorkerListenRequest, WorkerRegisterRequest, WorkerUnsubscribeRequest, WorkerRegisterResponse, OverridesData, HeartbeatRequest from ..dispatcher_pb2_grpc import DispatcherStub import time @@ -9,6 +10,7 @@ import grpc from typing import Callable, List, Union from ..metadata import get_metadata +from .events import proto_timestamp_now import time @@ -72,8 +74,43 @@ def __init__(self, client : DispatcherStub, token, worker_id): self.worker_id = worker_id self.retries = 0 self.last_connection_attempt = 0 - # self.logger = logger - # self.validator = validator + self.heartbeat_thread = None + self.run_heartbeat = True + self.listen_strategy = "v2" + + def heartbeat(self): + # send a heartbeat every 4 seconds + while True: + if not self.run_heartbeat: + break + + try: + self.client.Heartbeat( + HeartbeatRequest( + workerId=self.worker_id, + heartbeatAt=proto_timestamp_now(), + ), + timeout=DEFAULT_REGISTER_TIMEOUT, + metadata=get_metadata(self.token), + ) + except grpc.RpcError as e: + # we don't reraise the error here, as we don't want to stop the heartbeat thread + logger.error(f"Failed to send heartbeat: {e}") + + if e.code() == grpc.StatusCode.UNIMPLEMENTED: + break + + time.sleep(4) + + def start_heartbeater(self): + if self.heartbeat_thread is not None: + return + + # create a new thread to send heartbeats + heartbeat_thread = threading.Thread(target=self.heartbeat) + heartbeat_thread.start() + + self.heartbeat_thread = heartbeat_thread def actions(self): while True: @@ -119,6 +156,12 @@ def actions(self): elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: logger.info("Deadline exceeded, retrying subscription") continue + elif self.listen_strategy == "v2" and e.code() == grpc.StatusCode.UNIMPLEMENTED: + # ListenV2 is not available, fallback to Listen + self.listen_strategy = "v1" + self.run_heartbeat = False + logger.info("ListenV2 not available, falling back to Listen") + continue else: # Unknown error, report and break # self.logger.error(f"Failed to receive message: {e}") @@ -160,13 +203,23 @@ def get_listen_client(self): time.sleep(DEFAULT_ACTION_LISTENER_RETRY_INTERVAL) logger.info( f"Could not connect to Hatchet, retrying... {self.retries}/{DEFAULT_ACTION_LISTENER_RETRY_COUNT}") + + if self.listen_strategy == "v2": + listener = self.client.ListenV2(WorkerListenRequest( + workerId=self.worker_id + ), + metadata=get_metadata(self.token), + ) - listener = self.client.Listen(WorkerListenRequest( - workerId=self.worker_id - ), - timeout=DEFAULT_ACTION_TIMEOUT, - metadata=get_metadata(self.token), - ) + self.start_heartbeater() + else: + # if ListenV2 is not available, fallback to Listen + listener = self.client.Listen(WorkerListenRequest( + workerId=self.worker_id + ), + timeout=DEFAULT_ACTION_TIMEOUT, + metadata=get_metadata(self.token), + ) self.last_connection_attempt = current_time @@ -174,6 +227,8 @@ def get_listen_client(self): return listener def unregister(self): + self.run_heartbeat = False + try: self.client.Unsubscribe( WorkerUnsubscribeRequest( diff --git a/python-sdk/hatchet_sdk/dispatcher_pb2.py b/python-sdk/hatchet_sdk/dispatcher_pb2.py index 90fbc0d93..1777e1c61 100644 --- a/python-sdk/hatchet_sdk/dispatcher_pb2.py +++ b/python-sdk/hatchet_sdk/dispatcher_pb2.py @@ -15,7 +15,7 @@ from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64ispatcher.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"p\n\x15WorkerRegisterRequest\x12\x12\n\nworkerName\x18\x01 \x01(\t\x12\x0f\n\x07\x61\x63tions\x18\x02 \x03(\t\x12\x10\n\x08services\x18\x03 \x03(\t\x12\x14\n\x07maxRuns\x18\x04 \x01(\x05H\x00\x88\x01\x01\x42\n\n\x08_maxRuns\"P\n\x16WorkerRegisterResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\x12\x12\n\nworkerName\x18\x03 \x01(\t\"\x84\x02\n\x0e\x41ssignedAction\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x15\n\rworkflowRunId\x18\x02 \x01(\t\x12\x18\n\x10getGroupKeyRunId\x18\x03 \x01(\t\x12\r\n\x05jobId\x18\x04 \x01(\t\x12\x0f\n\x07jobName\x18\x05 \x01(\t\x12\x10\n\x08jobRunId\x18\x06 \x01(\t\x12\x0e\n\x06stepId\x18\x07 \x01(\t\x12\x11\n\tstepRunId\x18\x08 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\t \x01(\t\x12\x1f\n\nactionType\x18\n \x01(\x0e\x32\x0b.ActionType\x12\x15\n\ractionPayload\x18\x0b \x01(\t\x12\x10\n\x08stepName\x18\x0c \x01(\t\"\'\n\x13WorkerListenRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\",\n\x18WorkerUnsubscribeRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\"?\n\x19WorkerUnsubscribeResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\"\xe1\x01\n\x13GroupKeyActionEvent\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12\x15\n\rworkflowRunId\x18\x02 \x01(\t\x12\x18\n\x10getGroupKeyRunId\x18\x03 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12+\n\teventType\x18\x06 \x01(\x0e\x32\x18.GroupKeyActionEventType\x12\x14\n\x0c\x65ventPayload\x18\x07 \x01(\t\"\xec\x01\n\x0fStepActionEvent\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12\r\n\x05jobId\x18\x02 \x01(\t\x12\x10\n\x08jobRunId\x18\x03 \x01(\t\x12\x0e\n\x06stepId\x18\x04 \x01(\t\x12\x11\n\tstepRunId\x18\x05 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\x06 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x07 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\'\n\teventType\x18\x08 \x01(\x0e\x32\x14.StepActionEventType\x12\x14\n\x0c\x65ventPayload\x18\t \x01(\t\"9\n\x13\x41\x63tionEventResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\"9\n SubscribeToWorkflowEventsRequest\x12\x15\n\rworkflowRunId\x18\x01 \x01(\t\"\xb2\x02\n\rWorkflowEvent\x12\x15\n\rworkflowRunId\x18\x01 \x01(\t\x12#\n\x0cresourceType\x18\x02 \x01(\x0e\x32\r.ResourceType\x12%\n\teventType\x18\x03 \x01(\x0e\x32\x12.ResourceEventType\x12\x12\n\nresourceId\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x14\n\x0c\x65ventPayload\x18\x06 \x01(\t\x12\x0e\n\x06hangup\x18\x07 \x01(\x08\x12\x18\n\x0bstepRetries\x18\x08 \x01(\x05H\x00\x88\x01\x01\x12\x17\n\nretryCount\x18\t \x01(\x05H\x01\x88\x01\x01\x42\x0e\n\x0c_stepRetriesB\r\n\x0b_retryCount\"W\n\rOverridesData\x12\x11\n\tstepRunId\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\t\x12\x16\n\x0e\x63\x61llerFilename\x18\x04 \x01(\t\"\x17\n\x15OverridesDataResponse*N\n\nActionType\x12\x12\n\x0eSTART_STEP_RUN\x10\x00\x12\x13\n\x0f\x43\x41NCEL_STEP_RUN\x10\x01\x12\x17\n\x13START_GET_GROUP_KEY\x10\x02*\xa2\x01\n\x17GroupKeyActionEventType\x12 \n\x1cGROUP_KEY_EVENT_TYPE_UNKNOWN\x10\x00\x12 \n\x1cGROUP_KEY_EVENT_TYPE_STARTED\x10\x01\x12\"\n\x1eGROUP_KEY_EVENT_TYPE_COMPLETED\x10\x02\x12\x1f\n\x1bGROUP_KEY_EVENT_TYPE_FAILED\x10\x03*\x8a\x01\n\x13StepActionEventType\x12\x1b\n\x17STEP_EVENT_TYPE_UNKNOWN\x10\x00\x12\x1b\n\x17STEP_EVENT_TYPE_STARTED\x10\x01\x12\x1d\n\x19STEP_EVENT_TYPE_COMPLETED\x10\x02\x12\x1a\n\x16STEP_EVENT_TYPE_FAILED\x10\x03*e\n\x0cResourceType\x12\x19\n\x15RESOURCE_TYPE_UNKNOWN\x10\x00\x12\x1a\n\x16RESOURCE_TYPE_STEP_RUN\x10\x01\x12\x1e\n\x1aRESOURCE_TYPE_WORKFLOW_RUN\x10\x02*\xde\x01\n\x11ResourceEventType\x12\x1f\n\x1bRESOURCE_EVENT_TYPE_UNKNOWN\x10\x00\x12\x1f\n\x1bRESOURCE_EVENT_TYPE_STARTED\x10\x01\x12!\n\x1dRESOURCE_EVENT_TYPE_COMPLETED\x10\x02\x12\x1e\n\x1aRESOURCE_EVENT_TYPE_FAILED\x10\x03\x12!\n\x1dRESOURCE_EVENT_TYPE_CANCELLED\x10\x04\x12!\n\x1dRESOURCE_EVENT_TYPE_TIMED_OUT\x10\x05\x32\xe4\x03\n\nDispatcher\x12=\n\x08Register\x12\x16.WorkerRegisterRequest\x1a\x17.WorkerRegisterResponse\"\x00\x12\x33\n\x06Listen\x12\x14.WorkerListenRequest\x1a\x0f.AssignedAction\"\x00\x30\x01\x12R\n\x19SubscribeToWorkflowEvents\x12!.SubscribeToWorkflowEventsRequest\x1a\x0e.WorkflowEvent\"\x00\x30\x01\x12?\n\x13SendStepActionEvent\x12\x10.StepActionEvent\x1a\x14.ActionEventResponse\"\x00\x12G\n\x17SendGroupKeyActionEvent\x12\x14.GroupKeyActionEvent\x1a\x14.ActionEventResponse\"\x00\x12<\n\x10PutOverridesData\x12\x0e.OverridesData\x1a\x16.OverridesDataResponse\"\x00\x12\x46\n\x0bUnsubscribe\x12\x19.WorkerUnsubscribeRequest\x1a\x1a.WorkerUnsubscribeResponse\"\x00\x42GZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contractsb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64ispatcher.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"p\n\x15WorkerRegisterRequest\x12\x12\n\nworkerName\x18\x01 \x01(\t\x12\x0f\n\x07\x61\x63tions\x18\x02 \x03(\t\x12\x10\n\x08services\x18\x03 \x03(\t\x12\x14\n\x07maxRuns\x18\x04 \x01(\x05H\x00\x88\x01\x01\x42\n\n\x08_maxRuns\"P\n\x16WorkerRegisterResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\x12\x12\n\nworkerName\x18\x03 \x01(\t\"\x84\x02\n\x0e\x41ssignedAction\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x15\n\rworkflowRunId\x18\x02 \x01(\t\x12\x18\n\x10getGroupKeyRunId\x18\x03 \x01(\t\x12\r\n\x05jobId\x18\x04 \x01(\t\x12\x0f\n\x07jobName\x18\x05 \x01(\t\x12\x10\n\x08jobRunId\x18\x06 \x01(\t\x12\x0e\n\x06stepId\x18\x07 \x01(\t\x12\x11\n\tstepRunId\x18\x08 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\t \x01(\t\x12\x1f\n\nactionType\x18\n \x01(\x0e\x32\x0b.ActionType\x12\x15\n\ractionPayload\x18\x0b \x01(\t\x12\x10\n\x08stepName\x18\x0c \x01(\t\"\'\n\x13WorkerListenRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\",\n\x18WorkerUnsubscribeRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\"?\n\x19WorkerUnsubscribeResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\"\xe1\x01\n\x13GroupKeyActionEvent\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12\x15\n\rworkflowRunId\x18\x02 \x01(\t\x12\x18\n\x10getGroupKeyRunId\x18\x03 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12+\n\teventType\x18\x06 \x01(\x0e\x32\x18.GroupKeyActionEventType\x12\x14\n\x0c\x65ventPayload\x18\x07 \x01(\t\"\xec\x01\n\x0fStepActionEvent\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12\r\n\x05jobId\x18\x02 \x01(\t\x12\x10\n\x08jobRunId\x18\x03 \x01(\t\x12\x0e\n\x06stepId\x18\x04 \x01(\t\x12\x11\n\tstepRunId\x18\x05 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\x06 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x07 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\'\n\teventType\x18\x08 \x01(\x0e\x32\x14.StepActionEventType\x12\x14\n\x0c\x65ventPayload\x18\t \x01(\t\"9\n\x13\x41\x63tionEventResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\"9\n SubscribeToWorkflowEventsRequest\x12\x15\n\rworkflowRunId\x18\x01 \x01(\t\"\xb2\x02\n\rWorkflowEvent\x12\x15\n\rworkflowRunId\x18\x01 \x01(\t\x12#\n\x0cresourceType\x18\x02 \x01(\x0e\x32\r.ResourceType\x12%\n\teventType\x18\x03 \x01(\x0e\x32\x12.ResourceEventType\x12\x12\n\nresourceId\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x14\n\x0c\x65ventPayload\x18\x06 \x01(\t\x12\x0e\n\x06hangup\x18\x07 \x01(\x08\x12\x18\n\x0bstepRetries\x18\x08 \x01(\x05H\x00\x88\x01\x01\x12\x17\n\nretryCount\x18\t \x01(\x05H\x01\x88\x01\x01\x42\x0e\n\x0c_stepRetriesB\r\n\x0b_retryCount\"W\n\rOverridesData\x12\x11\n\tstepRunId\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\t\x12\x16\n\x0e\x63\x61llerFilename\x18\x04 \x01(\t\"\x17\n\x15OverridesDataResponse\"U\n\x10HeartbeatRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12/\n\x0bheartbeatAt\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x13\n\x11HeartbeatResponse*N\n\nActionType\x12\x12\n\x0eSTART_STEP_RUN\x10\x00\x12\x13\n\x0f\x43\x41NCEL_STEP_RUN\x10\x01\x12\x17\n\x13START_GET_GROUP_KEY\x10\x02*\xa2\x01\n\x17GroupKeyActionEventType\x12 \n\x1cGROUP_KEY_EVENT_TYPE_UNKNOWN\x10\x00\x12 \n\x1cGROUP_KEY_EVENT_TYPE_STARTED\x10\x01\x12\"\n\x1eGROUP_KEY_EVENT_TYPE_COMPLETED\x10\x02\x12\x1f\n\x1bGROUP_KEY_EVENT_TYPE_FAILED\x10\x03*\x8a\x01\n\x13StepActionEventType\x12\x1b\n\x17STEP_EVENT_TYPE_UNKNOWN\x10\x00\x12\x1b\n\x17STEP_EVENT_TYPE_STARTED\x10\x01\x12\x1d\n\x19STEP_EVENT_TYPE_COMPLETED\x10\x02\x12\x1a\n\x16STEP_EVENT_TYPE_FAILED\x10\x03*e\n\x0cResourceType\x12\x19\n\x15RESOURCE_TYPE_UNKNOWN\x10\x00\x12\x1a\n\x16RESOURCE_TYPE_STEP_RUN\x10\x01\x12\x1e\n\x1aRESOURCE_TYPE_WORKFLOW_RUN\x10\x02*\xde\x01\n\x11ResourceEventType\x12\x1f\n\x1bRESOURCE_EVENT_TYPE_UNKNOWN\x10\x00\x12\x1f\n\x1bRESOURCE_EVENT_TYPE_STARTED\x10\x01\x12!\n\x1dRESOURCE_EVENT_TYPE_COMPLETED\x10\x02\x12\x1e\n\x1aRESOURCE_EVENT_TYPE_FAILED\x10\x03\x12!\n\x1dRESOURCE_EVENT_TYPE_CANCELLED\x10\x04\x12!\n\x1dRESOURCE_EVENT_TYPE_TIMED_OUT\x10\x05\x32\xd1\x04\n\nDispatcher\x12=\n\x08Register\x12\x16.WorkerRegisterRequest\x1a\x17.WorkerRegisterResponse\"\x00\x12\x33\n\x06Listen\x12\x14.WorkerListenRequest\x1a\x0f.AssignedAction\"\x00\x30\x01\x12\x35\n\x08ListenV2\x12\x14.WorkerListenRequest\x1a\x0f.AssignedAction\"\x00\x30\x01\x12\x34\n\tHeartbeat\x12\x11.HeartbeatRequest\x1a\x12.HeartbeatResponse\"\x00\x12R\n\x19SubscribeToWorkflowEvents\x12!.SubscribeToWorkflowEventsRequest\x1a\x0e.WorkflowEvent\"\x00\x30\x01\x12?\n\x13SendStepActionEvent\x12\x10.StepActionEvent\x1a\x14.ActionEventResponse\"\x00\x12G\n\x17SendGroupKeyActionEvent\x12\x14.GroupKeyActionEvent\x1a\x14.ActionEventResponse\"\x00\x12<\n\x10PutOverridesData\x12\x0e.OverridesData\x1a\x16.OverridesDataResponse\"\x00\x12\x46\n\x0bUnsubscribe\x12\x19.WorkerUnsubscribeRequest\x1a\x1a.WorkerUnsubscribeResponse\"\x00\x42GZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contractsb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -23,16 +23,16 @@ if _descriptor._USE_C_DESCRIPTORS == False: _globals['DESCRIPTOR']._options = None _globals['DESCRIPTOR']._serialized_options = b'ZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts' - _globals['_ACTIONTYPE']._serialized_start=1672 - _globals['_ACTIONTYPE']._serialized_end=1750 - _globals['_GROUPKEYACTIONEVENTTYPE']._serialized_start=1753 - _globals['_GROUPKEYACTIONEVENTTYPE']._serialized_end=1915 - _globals['_STEPACTIONEVENTTYPE']._serialized_start=1918 - _globals['_STEPACTIONEVENTTYPE']._serialized_end=2056 - _globals['_RESOURCETYPE']._serialized_start=2058 - _globals['_RESOURCETYPE']._serialized_end=2159 - _globals['_RESOURCEEVENTTYPE']._serialized_start=2162 - _globals['_RESOURCEEVENTTYPE']._serialized_end=2384 + _globals['_ACTIONTYPE']._serialized_start=1780 + _globals['_ACTIONTYPE']._serialized_end=1858 + _globals['_GROUPKEYACTIONEVENTTYPE']._serialized_start=1861 + _globals['_GROUPKEYACTIONEVENTTYPE']._serialized_end=2023 + _globals['_STEPACTIONEVENTTYPE']._serialized_start=2026 + _globals['_STEPACTIONEVENTTYPE']._serialized_end=2164 + _globals['_RESOURCETYPE']._serialized_start=2166 + _globals['_RESOURCETYPE']._serialized_end=2267 + _globals['_RESOURCEEVENTTYPE']._serialized_start=2270 + _globals['_RESOURCEEVENTTYPE']._serialized_end=2492 _globals['_WORKERREGISTERREQUEST']._serialized_start=53 _globals['_WORKERREGISTERREQUEST']._serialized_end=165 _globals['_WORKERREGISTERRESPONSE']._serialized_start=167 @@ -59,6 +59,10 @@ _globals['_OVERRIDESDATA']._serialized_end=1645 _globals['_OVERRIDESDATARESPONSE']._serialized_start=1647 _globals['_OVERRIDESDATARESPONSE']._serialized_end=1670 - _globals['_DISPATCHER']._serialized_start=2387 - _globals['_DISPATCHER']._serialized_end=2871 + _globals['_HEARTBEATREQUEST']._serialized_start=1672 + _globals['_HEARTBEATREQUEST']._serialized_end=1757 + _globals['_HEARTBEATRESPONSE']._serialized_start=1759 + _globals['_HEARTBEATRESPONSE']._serialized_end=1778 + _globals['_DISPATCHER']._serialized_start=2495 + _globals['_DISPATCHER']._serialized_end=3088 # @@protoc_insertion_point(module_scope) diff --git a/python-sdk/hatchet_sdk/dispatcher_pb2.pyi b/python-sdk/hatchet_sdk/dispatcher_pb2.pyi index 8e519953e..70c09eb34 100644 --- a/python-sdk/hatchet_sdk/dispatcher_pb2.pyi +++ b/python-sdk/hatchet_sdk/dispatcher_pb2.pyi @@ -223,3 +223,15 @@ class OverridesData(_message.Message): class OverridesDataResponse(_message.Message): __slots__ = () def __init__(self) -> None: ... + +class HeartbeatRequest(_message.Message): + __slots__ = ("workerId", "heartbeatAt") + WORKERID_FIELD_NUMBER: _ClassVar[int] + HEARTBEATAT_FIELD_NUMBER: _ClassVar[int] + workerId: str + heartbeatAt: _timestamp_pb2.Timestamp + def __init__(self, workerId: _Optional[str] = ..., heartbeatAt: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...) -> None: ... + +class HeartbeatResponse(_message.Message): + __slots__ = () + def __init__(self) -> None: ... diff --git a/python-sdk/hatchet_sdk/dispatcher_pb2_grpc.py b/python-sdk/hatchet_sdk/dispatcher_pb2_grpc.py index 856af8d05..e9fe31235 100644 --- a/python-sdk/hatchet_sdk/dispatcher_pb2_grpc.py +++ b/python-sdk/hatchet_sdk/dispatcher_pb2_grpc.py @@ -23,6 +23,16 @@ def __init__(self, channel): request_serializer=dispatcher__pb2.WorkerListenRequest.SerializeToString, response_deserializer=dispatcher__pb2.AssignedAction.FromString, ) + self.ListenV2 = channel.unary_stream( + '/Dispatcher/ListenV2', + request_serializer=dispatcher__pb2.WorkerListenRequest.SerializeToString, + response_deserializer=dispatcher__pb2.AssignedAction.FromString, + ) + self.Heartbeat = channel.unary_unary( + '/Dispatcher/Heartbeat', + request_serializer=dispatcher__pb2.HeartbeatRequest.SerializeToString, + response_deserializer=dispatcher__pb2.HeartbeatResponse.FromString, + ) self.SubscribeToWorkflowEvents = channel.unary_stream( '/Dispatcher/SubscribeToWorkflowEvents', request_serializer=dispatcher__pb2.SubscribeToWorkflowEventsRequest.SerializeToString, @@ -65,6 +75,21 @@ def Listen(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def ListenV2(self, request, context): + """ListenV2 is like listen, but implementation does not include heartbeats. This should only used by SDKs + against engine version v0.18.1+ + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Heartbeat(self, request, context): + """Heartbeat is a method for workers to send heartbeats to the dispatcher + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def SubscribeToWorkflowEvents(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) @@ -108,6 +133,16 @@ def add_DispatcherServicer_to_server(servicer, server): request_deserializer=dispatcher__pb2.WorkerListenRequest.FromString, response_serializer=dispatcher__pb2.AssignedAction.SerializeToString, ), + 'ListenV2': grpc.unary_stream_rpc_method_handler( + servicer.ListenV2, + request_deserializer=dispatcher__pb2.WorkerListenRequest.FromString, + response_serializer=dispatcher__pb2.AssignedAction.SerializeToString, + ), + 'Heartbeat': grpc.unary_unary_rpc_method_handler( + servicer.Heartbeat, + request_deserializer=dispatcher__pb2.HeartbeatRequest.FromString, + response_serializer=dispatcher__pb2.HeartbeatResponse.SerializeToString, + ), 'SubscribeToWorkflowEvents': grpc.unary_stream_rpc_method_handler( servicer.SubscribeToWorkflowEvents, request_deserializer=dispatcher__pb2.SubscribeToWorkflowEventsRequest.FromString, @@ -177,6 +212,40 @@ def Listen(request, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + @staticmethod + def ListenV2(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream(request, target, '/Dispatcher/ListenV2', + dispatcher__pb2.WorkerListenRequest.SerializeToString, + dispatcher__pb2.AssignedAction.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def Heartbeat(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/Dispatcher/Heartbeat', + dispatcher__pb2.HeartbeatRequest.SerializeToString, + dispatcher__pb2.HeartbeatResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + @staticmethod def SubscribeToWorkflowEvents(request, target,