diff --git a/cluster/build.sh b/cluster/build.sh index 2a5ee9594..c3dd13e14 100755 --- a/cluster/build.sh +++ b/cluster/build.sh @@ -1,4 +1,5 @@ protoc -I=../actor --go_out=. --go_opt=paths=source_relative --proto_path=. cluster.proto protoc -I=../actor --go_out=. --go_opt=paths=source_relative --proto_path=. gossip.proto protoc -I=../actor --go_out=. --go_opt=paths=source_relative --proto_path=. grain.proto +protoc -I=../actor --go_out=. --go_opt=paths=source_relative --proto_path=. pubsub.proto diff --git a/cluster/cluster.go b/cluster/cluster.go index f22bcd3ee..c7d2f0e37 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -17,6 +17,7 @@ type Cluster struct { ActorSystem *actor.ActorSystem Config *Config Gossip Gossiper + PubSub *PubSub Remote *remote.Remote PidCache *PidCacheValue MemberList *MemberList @@ -44,6 +45,7 @@ func New(actorSystem *actor.ActorSystem, config *Config) *Cluster { var err error c.Gossip, err = newGossiper(c) + c.PubSub = NewPubSub(c) if err != nil { panic(err) @@ -97,6 +99,7 @@ func (c *Cluster) StartMember() { if err := c.Gossip.StartGossiping(); err != nil { panic(err) } + c.PubSub.Start() c.MemberList.InitializeTopologyConsensus() if err := cfg.ClusterProvider.StartMember(c); err != nil { @@ -130,6 +133,7 @@ func (c *Cluster) StartClient() { if err := cfg.ClusterProvider.StartClient(c); err != nil { panic(err) } + c.PubSub.Start() } func (c *Cluster) Shutdown(graceful bool) { diff --git a/cluster/config.go b/cluster/config.go index 476ca64b7..b4ac4b456 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -26,6 +26,7 @@ type Config struct { GossipRequestTimeout time.Duration GossipFanOut int GossipMaxSend int + PubSubConfig *PubSubConfig } func Configure(clusterName string, clusterProvider ClusterProvider, identityLookup IdentityLookup, remoteConfig *remote.Config, options ...ConfigOption) *Config { @@ -45,6 +46,7 @@ func Configure(clusterName string, clusterProvider ClusterProvider, identityLook GossipRequestTimeout: time.Millisecond * 500, GossipFanOut: 3, GossipMaxSend: 50, + PubSubConfig: newPubSubConfig(), } for _, option := range options { diff --git a/cluster/config_opts.go b/cluster/config_opts.go index 7cebbd011..ebb919ac2 100644 --- a/cluster/config_opts.go +++ b/cluster/config_opts.go @@ -39,3 +39,11 @@ func WithKinds(kinds ...*Kind) ConfigOption { } } } + +// WithPubSubSubscriberTimeout sets a timeout used when delivering a message batch to a subscriber. +// Default is 5s. +func WithPubSubSubscriberTimeout(timeout time.Duration) ConfigOption { + return func(c *Config) { + c.PubSubConfig.SubscriberTimeout = timeout + } +} diff --git a/cluster/key_value_store.go b/cluster/key_value_store.go new file mode 100644 index 000000000..1d90ab377 --- /dev/null +++ b/cluster/key_value_store.go @@ -0,0 +1,25 @@ +package cluster + +import "golang.org/x/net/context" + +// KeyValueStore is a distributed key value store +type KeyValueStore[T any] interface { + // Set the value for the given key. + Set(ctx context.Context, key string, value T) error + // Get the value for the given key.. + Get(ctx context.Context, key string) (T, error) + // Clear the value for the given key. + Clear(ctx context.Context, key string) error +} + +// EmptyKeyValueStore is a key value store that does nothing. +type EmptyKeyValueStore[T any] struct{} + +func (e *EmptyKeyValueStore[T]) Set(_ context.Context, _ string, _ T) error { return nil } + +func (e *EmptyKeyValueStore[T]) Get(_ context.Context, _ string) (T, error) { + var r T + return r, nil +} + +func (e *EmptyKeyValueStore[T]) Clear(_ context.Context, _ string) error { return nil } diff --git a/cluster/pubsub.go b/cluster/pubsub.go new file mode 100644 index 000000000..fac34a67e --- /dev/null +++ b/cluster/pubsub.go @@ -0,0 +1,57 @@ +package cluster + +import ( + "github.com/asynkron/protoactor-go/actor" + "github.com/asynkron/protoactor-go/extensions" + "time" +) + +const PubSubDeliveryName = "$pubsub-delivery" + +var pubsubExtensionID = extensions.NextExtensionID() + +type PubSub struct { + cluster *Cluster +} + +func NewPubSub(cluster *Cluster) *PubSub { + p := &PubSub{ + cluster: cluster, + } + cluster.ActorSystem.Extensions.Register(p) + return p +} + +// Start the PubSubMemberDeliveryActor +func (p *PubSub) Start() { + props := actor.PropsFromProducer(func() actor.Actor { + return NewPubSubMemberDeliveryActor(p.cluster.Config.PubSubConfig.SubscriberTimeout) + }) + _, err := p.cluster.ActorSystem.Root.SpawnNamed(props, PubSubDeliveryName) + if err != nil { + panic(err) // let it crash + } +} + +func (p *PubSub) ExtensionID() extensions.ExtensionID { + return pubsubExtensionID +} + +type PubSubConfig struct { + // SubscriberTimeout is a timeout used when delivering a message batch to a subscriber. Default is 5s. + // + // This value gets rounded to seconds for optimization of cancellation token creation. Note that internally, + // cluster request is used to deliver messages to ClusterIdentity subscribers. + SubscriberTimeout time.Duration +} + +func newPubSubConfig() *PubSubConfig { + return &PubSubConfig{ + SubscriberTimeout: 5 * time.Second, + } +} + +// GetPubSub returns the PubSub extension from the actor system +func GetPubSub(system *actor.ActorSystem) *PubSub { + return system.Extensions.Get(pubsubExtensionID).(*PubSub) +} diff --git a/cluster/pubsub.pb.go b/cluster/pubsub.pb.go new file mode 100644 index 000000000..95e2f2744 --- /dev/null +++ b/cluster/pubsub.pb.go @@ -0,0 +1,1346 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.6 +// source: pubsub.proto + +package cluster + +import ( + actor "github.com/asynkron/protoactor-go/actor" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Delivery status as seen by the delivery actor +type DeliveryStatus int32 + +const ( + // Message was put in the queue of the subscriber + DeliveryStatus_Delivered DeliveryStatus = 0 + // Message did not reach subscriber, because it was dead + DeliveryStatus_SubscriberNoLongerReachable DeliveryStatus = 1 + // Delivery timed out + DeliveryStatus_Timeout DeliveryStatus = 2 + // Some other problem happened + DeliveryStatus_OtherError DeliveryStatus = 127 +) + +// Enum value maps for DeliveryStatus. +var ( + DeliveryStatus_name = map[int32]string{ + 0: "Delivered", + 1: "SubscriberNoLongerReachable", + 2: "Timeout", + 127: "OtherError", + } + DeliveryStatus_value = map[string]int32{ + "Delivered": 0, + "SubscriberNoLongerReachable": 1, + "Timeout": 2, + "OtherError": 127, + } +) + +func (x DeliveryStatus) Enum() *DeliveryStatus { + p := new(DeliveryStatus) + *p = x + return p +} + +func (x DeliveryStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (DeliveryStatus) Descriptor() protoreflect.EnumDescriptor { + return file_pubsub_proto_enumTypes[0].Descriptor() +} + +func (DeliveryStatus) Type() protoreflect.EnumType { + return &file_pubsub_proto_enumTypes[0] +} + +func (x DeliveryStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use DeliveryStatus.Descriptor instead. +func (DeliveryStatus) EnumDescriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{0} +} + +// Status of the whole published batch or single message +type PublishStatus int32 + +const ( + // Batch or message was successfully published according to the delivery guarantees + PublishStatus_Ok PublishStatus = 0 + // Topic failed to forward the message + PublishStatus_Failed PublishStatus = 1 +) + +// Enum value maps for PublishStatus. +var ( + PublishStatus_name = map[int32]string{ + 0: "Ok", + 1: "Failed", + } + PublishStatus_value = map[string]int32{ + "Ok": 0, + "Failed": 1, + } +) + +func (x PublishStatus) Enum() *PublishStatus { + p := new(PublishStatus) + *p = x + return p +} + +func (x PublishStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (PublishStatus) Descriptor() protoreflect.EnumDescriptor { + return file_pubsub_proto_enumTypes[1].Descriptor() +} + +func (PublishStatus) Type() protoreflect.EnumType { + return &file_pubsub_proto_enumTypes[1] +} + +func (x PublishStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use PublishStatus.Descriptor instead. +func (PublishStatus) EnumDescriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{1} +} + +// Identifies a subscriber by either ClusterIdentity or PID +type SubscriberIdentity struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Identity: + // + // *SubscriberIdentity_Pid + // *SubscriberIdentity_ClusterIdentity + Identity isSubscriberIdentity_Identity `protobuf_oneof:"Identity"` +} + +func (x *SubscriberIdentity) Reset() { + *x = SubscriberIdentity{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscriberIdentity) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscriberIdentity) ProtoMessage() {} + +func (x *SubscriberIdentity) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscriberIdentity.ProtoReflect.Descriptor instead. +func (*SubscriberIdentity) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{0} +} + +func (m *SubscriberIdentity) GetIdentity() isSubscriberIdentity_Identity { + if m != nil { + return m.Identity + } + return nil +} + +func (x *SubscriberIdentity) GetPid() *actor.PID { + if x, ok := x.GetIdentity().(*SubscriberIdentity_Pid); ok { + return x.Pid + } + return nil +} + +func (x *SubscriberIdentity) GetClusterIdentity() *ClusterIdentity { + if x, ok := x.GetIdentity().(*SubscriberIdentity_ClusterIdentity); ok { + return x.ClusterIdentity + } + return nil +} + +type isSubscriberIdentity_Identity interface { + isSubscriberIdentity_Identity() +} + +type SubscriberIdentity_Pid struct { + Pid *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3,oneof"` +} + +type SubscriberIdentity_ClusterIdentity struct { + ClusterIdentity *ClusterIdentity `protobuf:"bytes,2,opt,name=cluster_identity,json=clusterIdentity,proto3,oneof"` +} + +func (*SubscriberIdentity_Pid) isSubscriberIdentity_Identity() {} + +func (*SubscriberIdentity_ClusterIdentity) isSubscriberIdentity_Identity() {} + +// First request to initialize the actor. +type Initialize struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + IdleTimeout *durationpb.Duration `protobuf:"bytes,1,opt,name=idleTimeout,proto3" json:"idleTimeout,omitempty"` +} + +func (x *Initialize) Reset() { + *x = Initialize{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Initialize) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Initialize) ProtoMessage() {} + +func (x *Initialize) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Initialize.ProtoReflect.Descriptor instead. +func (*Initialize) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{1} +} + +func (x *Initialize) GetIdleTimeout() *durationpb.Duration { + if x != nil { + return x.IdleTimeout + } + return nil +} + +type Acknowledge struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Acknowledge) Reset() { + *x = Acknowledge{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Acknowledge) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Acknowledge) ProtoMessage() {} + +func (x *Acknowledge) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Acknowledge.ProtoReflect.Descriptor instead. +func (*Acknowledge) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{2} +} + +// A list of subscribers +type Subscribers struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Subscribers []*SubscriberIdentity `protobuf:"bytes,1,rep,name=subscribers,proto3" json:"subscribers,omitempty"` +} + +func (x *Subscribers) Reset() { + *x = Subscribers{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Subscribers) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Subscribers) ProtoMessage() {} + +func (x *Subscribers) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Subscribers.ProtoReflect.Descriptor instead. +func (*Subscribers) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{3} +} + +func (x *Subscribers) GetSubscribers() []*SubscriberIdentity { + if x != nil { + return x.Subscribers + } + return nil +} + +// Sent to topic actor to add a subscriber +type SubscribeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"` +} + +func (x *SubscribeRequest) Reset() { + *x = SubscribeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeRequest) ProtoMessage() {} + +func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. +func (*SubscribeRequest) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{4} +} + +func (x *SubscribeRequest) GetSubscriber() *SubscriberIdentity { + if x != nil { + return x.Subscriber + } + return nil +} + +// Subscribe acknowledgement +type SubscribeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SubscribeResponse) Reset() { + *x = SubscribeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeResponse) ProtoMessage() {} + +func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. +func (*SubscribeResponse) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{5} +} + +// Sent to topic actor to remove a subscriber +type UnsubscribeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"` +} + +func (x *UnsubscribeRequest) Reset() { + *x = UnsubscribeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UnsubscribeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UnsubscribeRequest) ProtoMessage() {} + +func (x *UnsubscribeRequest) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UnsubscribeRequest.ProtoReflect.Descriptor instead. +func (*UnsubscribeRequest) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{6} +} + +func (x *UnsubscribeRequest) GetSubscriber() *SubscriberIdentity { + if x != nil { + return x.Subscriber + } + return nil +} + +// Unsubscribe acknowledgement +type UnsubscribeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *UnsubscribeResponse) Reset() { + *x = UnsubscribeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UnsubscribeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UnsubscribeResponse) ProtoMessage() {} + +func (x *UnsubscribeResponse) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UnsubscribeResponse.ProtoReflect.Descriptor instead. +func (*UnsubscribeResponse) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{7} +} + +// Message sent from publisher to topic actor +// See also PubSubBatch +type PubSubBatchTransport struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TypeNames []string `protobuf:"bytes,1,rep,name=type_names,json=typeNames,proto3" json:"type_names,omitempty"` + Envelopes []*PubSubEnvelope `protobuf:"bytes,2,rep,name=envelopes,proto3" json:"envelopes,omitempty"` +} + +func (x *PubSubBatchTransport) Reset() { + *x = PubSubBatchTransport{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PubSubBatchTransport) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PubSubBatchTransport) ProtoMessage() {} + +func (x *PubSubBatchTransport) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PubSubBatchTransport.ProtoReflect.Descriptor instead. +func (*PubSubBatchTransport) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{8} +} + +func (x *PubSubBatchTransport) GetTypeNames() []string { + if x != nil { + return x.TypeNames + } + return nil +} + +func (x *PubSubBatchTransport) GetEnvelopes() []*PubSubEnvelope { + if x != nil { + return x.Envelopes + } + return nil +} + +// Contains message byte representation and type reference +type PubSubEnvelope struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TypeId int32 `protobuf:"varint,1,opt,name=type_id,json=typeId,proto3" json:"type_id,omitempty"` + MessageData []byte `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"` + SerializerId int32 `protobuf:"varint,3,opt,name=serializer_id,json=serializerId,proto3" json:"serializer_id,omitempty"` +} + +func (x *PubSubEnvelope) Reset() { + *x = PubSubEnvelope{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PubSubEnvelope) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PubSubEnvelope) ProtoMessage() {} + +func (x *PubSubEnvelope) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PubSubEnvelope.ProtoReflect.Descriptor instead. +func (*PubSubEnvelope) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{9} +} + +func (x *PubSubEnvelope) GetTypeId() int32 { + if x != nil { + return x.TypeId + } + return 0 +} + +func (x *PubSubEnvelope) GetMessageData() []byte { + if x != nil { + return x.MessageData + } + return nil +} + +func (x *PubSubEnvelope) GetSerializerId() int32 { + if x != nil { + return x.SerializerId + } + return 0 +} + +// Message sent from topic to delivery actor +type DeliverBatchRequestTransport struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Subscribers *Subscribers `protobuf:"bytes,1,opt,name=subscribers,proto3" json:"subscribers,omitempty"` + Batch *PubSubBatchTransport `protobuf:"bytes,2,opt,name=batch,proto3" json:"batch,omitempty"` + Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` +} + +func (x *DeliverBatchRequestTransport) Reset() { + *x = DeliverBatchRequestTransport{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DeliverBatchRequestTransport) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeliverBatchRequestTransport) ProtoMessage() {} + +func (x *DeliverBatchRequestTransport) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeliverBatchRequestTransport.ProtoReflect.Descriptor instead. +func (*DeliverBatchRequestTransport) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{10} +} + +func (x *DeliverBatchRequestTransport) GetSubscribers() *Subscribers { + if x != nil { + return x.Subscribers + } + return nil +} + +func (x *DeliverBatchRequestTransport) GetBatch() *PubSubBatchTransport { + if x != nil { + return x.Batch + } + return nil +} + +func (x *DeliverBatchRequestTransport) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +// Message sent from delivery actor to topic to notify of subscribers that fail to process the messages +type NotifyAboutFailingSubscribersRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + InvalidDeliveries []*SubscriberDeliveryReport `protobuf:"bytes,1,rep,name=invalid_deliveries,json=invalidDeliveries,proto3" json:"invalid_deliveries,omitempty"` +} + +func (x *NotifyAboutFailingSubscribersRequest) Reset() { + *x = NotifyAboutFailingSubscribersRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NotifyAboutFailingSubscribersRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NotifyAboutFailingSubscribersRequest) ProtoMessage() {} + +func (x *NotifyAboutFailingSubscribersRequest) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[11] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NotifyAboutFailingSubscribersRequest.ProtoReflect.Descriptor instead. +func (*NotifyAboutFailingSubscribersRequest) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{11} +} + +func (x *NotifyAboutFailingSubscribersRequest) GetInvalidDeliveries() []*SubscriberDeliveryReport { + if x != nil { + return x.InvalidDeliveries + } + return nil +} + +// Ack to the delivery actor after notification of subscribers that fail to process the messages +type NotifyAboutFailingSubscribersResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *NotifyAboutFailingSubscribersResponse) Reset() { + *x = NotifyAboutFailingSubscribersResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NotifyAboutFailingSubscribersResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NotifyAboutFailingSubscribersResponse) ProtoMessage() {} + +func (x *NotifyAboutFailingSubscribersResponse) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NotifyAboutFailingSubscribersResponse.ProtoReflect.Descriptor instead. +func (*NotifyAboutFailingSubscribersResponse) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{12} +} + +// Contains information about a failed delivery +type SubscriberDeliveryReport struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Subscriber *SubscriberIdentity `protobuf:"bytes,1,opt,name=subscriber,proto3" json:"subscriber,omitempty"` + Status DeliveryStatus `protobuf:"varint,2,opt,name=status,proto3,enum=clusterIdentity.DeliveryStatus" json:"status,omitempty"` +} + +func (x *SubscriberDeliveryReport) Reset() { + *x = SubscriberDeliveryReport{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscriberDeliveryReport) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscriberDeliveryReport) ProtoMessage() {} + +func (x *SubscriberDeliveryReport) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscriberDeliveryReport.ProtoReflect.Descriptor instead. +func (*SubscriberDeliveryReport) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{13} +} + +func (x *SubscriberDeliveryReport) GetSubscriber() *SubscriberIdentity { + if x != nil { + return x.Subscriber + } + return nil +} + +func (x *SubscriberDeliveryReport) GetStatus() DeliveryStatus { + if x != nil { + return x.Status + } + return DeliveryStatus_Delivered +} + +// Message posted to subscriber's mailbox, that is then unrolled to single messages, and has ability to auto respond +// See also PubSubAutoRespondBatch +type PubSubAutoRespondBatchTransport struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TypeNames []string `protobuf:"bytes,1,rep,name=type_names,json=typeNames,proto3" json:"type_names,omitempty"` + Envelopes []*PubSubEnvelope `protobuf:"bytes,2,rep,name=envelopes,proto3" json:"envelopes,omitempty"` +} + +func (x *PubSubAutoRespondBatchTransport) Reset() { + *x = PubSubAutoRespondBatchTransport{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PubSubAutoRespondBatchTransport) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PubSubAutoRespondBatchTransport) ProtoMessage() {} + +func (x *PubSubAutoRespondBatchTransport) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[14] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PubSubAutoRespondBatchTransport.ProtoReflect.Descriptor instead. +func (*PubSubAutoRespondBatchTransport) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{14} +} + +func (x *PubSubAutoRespondBatchTransport) GetTypeNames() []string { + if x != nil { + return x.TypeNames + } + return nil +} + +func (x *PubSubAutoRespondBatchTransport) GetEnvelopes() []*PubSubEnvelope { + if x != nil { + return x.Envelopes + } + return nil +} + +// Publish ack/nack response +type PublishResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Status of the whole published batch or single message + Status PublishStatus `protobuf:"varint,1,opt,name=status,proto3,enum=clusterIdentity.PublishStatus" json:"status,omitempty"` +} + +func (x *PublishResponse) Reset() { + *x = PublishResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pubsub_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PublishResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishResponse) ProtoMessage() {} + +func (x *PublishResponse) ProtoReflect() protoreflect.Message { + mi := &file_pubsub_proto_msgTypes[15] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. +func (*PublishResponse) Descriptor() ([]byte, []int) { + return file_pubsub_proto_rawDescGZIP(), []int{15} +} + +func (x *PublishResponse) GetStatus() PublishStatus { + if x != nil { + return x.Status + } + return PublishStatus_Ok +} + +var File_pubsub_proto protoreflect.FileDescriptor + +var file_pubsub_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x1a, 0x0d, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0b, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x01, 0x0a, 0x12, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x72, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x1e, 0x0a, 0x03, 0x70, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, + 0x50, 0x49, 0x44, 0x48, 0x00, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x45, 0x0a, 0x10, 0x63, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x48, 0x00, + 0x52, 0x0f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, + 0x79, 0x42, 0x0a, 0x0a, 0x08, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x22, 0x49, 0x0a, + 0x0a, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x12, 0x3b, 0x0a, 0x0b, 0x69, + 0x64, 0x6c, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0b, 0x69, 0x64, 0x6c, + 0x65, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x0d, 0x0a, 0x0b, 0x41, 0x63, 0x6b, 0x6e, + 0x6f, 0x77, 0x6c, 0x65, 0x64, 0x67, 0x65, 0x22, 0x4c, 0x0a, 0x0b, 0x53, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x12, 0x3d, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x63, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, + 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x52, 0x0b, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x72, 0x73, 0x22, 0x4f, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3b, 0x0a, 0x0a, 0x73, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x72, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x73, + 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x22, 0x13, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x51, 0x0a, 0x12, 0x55, + 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x3b, 0x0a, 0x0a, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, + 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, + 0x74, 0x79, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x22, 0x15, + 0x0a, 0x13, 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x6c, 0x0a, 0x14, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x42, + 0x61, 0x74, 0x63, 0x68, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x1d, 0x0a, + 0x0a, 0x74, 0x79, 0x70, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x09, 0x52, 0x09, 0x74, 0x79, 0x70, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x12, 0x35, 0x0a, 0x09, + 0x65, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x17, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, + 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x52, 0x09, 0x65, 0x6e, 0x76, 0x65, 0x6c, 0x6f, + 0x70, 0x65, 0x73, 0x22, 0x71, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x45, 0x6e, 0x76, + 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x79, 0x70, 0x65, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x74, 0x79, 0x70, 0x65, 0x49, 0x64, 0x12, 0x21, + 0x0a, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x44, 0x61, 0x74, + 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x72, 0x5f, + 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0c, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, + 0x69, 0x7a, 0x65, 0x72, 0x49, 0x64, 0x22, 0xa1, 0x01, 0x0a, 0x1c, 0x44, 0x65, 0x6c, 0x69, 0x76, + 0x65, 0x72, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x72, + 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x72, 0x73, 0x52, 0x0b, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x12, + 0x33, 0x0a, 0x05, 0x62, 0x61, 0x74, 0x63, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, + 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x42, + 0x61, 0x74, 0x63, 0x68, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x52, 0x05, 0x62, + 0x61, 0x74, 0x63, 0x68, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x22, 0x78, 0x0a, 0x24, 0x4e, 0x6f, + 0x74, 0x69, 0x66, 0x79, 0x41, 0x62, 0x6f, 0x75, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x69, 0x6e, 0x67, + 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x50, 0x0a, 0x12, 0x69, 0x6e, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x5f, 0x64, 0x65, + 0x6c, 0x69, 0x76, 0x65, 0x72, 0x69, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, + 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x72, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x70, 0x6f, 0x72, + 0x74, 0x52, 0x11, 0x69, 0x6e, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, + 0x72, 0x69, 0x65, 0x73, 0x22, 0x27, 0x0a, 0x25, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x41, 0x62, + 0x6f, 0x75, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x69, 0x6e, 0x67, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x88, 0x01, + 0x0a, 0x18, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x44, 0x65, 0x6c, 0x69, + 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x3b, 0x0a, 0x0a, 0x73, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, + 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x72, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x52, 0x0a, 0x73, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x12, 0x2f, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x17, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x2e, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x77, 0x0a, 0x1f, 0x50, 0x75, 0x62, 0x53, + 0x75, 0x62, 0x41, 0x75, 0x74, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x64, 0x42, 0x61, 0x74, + 0x63, 0x68, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x74, + 0x79, 0x70, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, + 0x09, 0x74, 0x79, 0x70, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x12, 0x35, 0x0a, 0x09, 0x65, 0x6e, + 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x45, 0x6e, + 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x52, 0x09, 0x65, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, + 0x73, 0x22, 0x41, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2e, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0e, 0x32, 0x16, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x50, + 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x2a, 0x5d, 0x0a, 0x0e, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0d, 0x0a, 0x09, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, + 0x72, 0x65, 0x64, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x72, 0x4e, 0x6f, 0x4c, 0x6f, 0x6e, 0x67, 0x65, 0x72, 0x52, 0x65, 0x61, 0x63, 0x68, + 0x61, 0x62, 0x6c, 0x65, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, + 0x74, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x4f, 0x74, 0x68, 0x65, 0x72, 0x45, 0x72, 0x72, 0x6f, + 0x72, 0x10, 0x7f, 0x2a, 0x23, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x6b, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, + 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x10, 0x01, 0x42, 0x2c, 0x5a, 0x2a, 0x2f, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x73, 0x79, 0x6e, 0x6b, 0x72, 0x6f, 0x6e, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2d, 0x67, 0x6f, 0x2f, 0x63, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pubsub_proto_rawDescOnce sync.Once + file_pubsub_proto_rawDescData = file_pubsub_proto_rawDesc +) + +func file_pubsub_proto_rawDescGZIP() []byte { + file_pubsub_proto_rawDescOnce.Do(func() { + file_pubsub_proto_rawDescData = protoimpl.X.CompressGZIP(file_pubsub_proto_rawDescData) + }) + return file_pubsub_proto_rawDescData +} + +var file_pubsub_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_pubsub_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_pubsub_proto_goTypes = []interface{}{ + (DeliveryStatus)(0), // 0: clusterIdentity.DeliveryStatus + (PublishStatus)(0), // 1: clusterIdentity.PublishStatus + (*SubscriberIdentity)(nil), // 2: clusterIdentity.SubscriberIdentity + (*Initialize)(nil), // 3: clusterIdentity.Initialize + (*Acknowledge)(nil), // 4: clusterIdentity.Acknowledge + (*Subscribers)(nil), // 5: clusterIdentity.Subscribers + (*SubscribeRequest)(nil), // 6: clusterIdentity.SubscribeRequest + (*SubscribeResponse)(nil), // 7: clusterIdentity.SubscribeResponse + (*UnsubscribeRequest)(nil), // 8: clusterIdentity.UnsubscribeRequest + (*UnsubscribeResponse)(nil), // 9: clusterIdentity.UnsubscribeResponse + (*PubSubBatchTransport)(nil), // 10: clusterIdentity.PubSubBatchTransport + (*PubSubEnvelope)(nil), // 11: clusterIdentity.PubSubEnvelope + (*DeliverBatchRequestTransport)(nil), // 12: clusterIdentity.DeliverBatchRequestTransport + (*NotifyAboutFailingSubscribersRequest)(nil), // 13: clusterIdentity.NotifyAboutFailingSubscribersRequest + (*NotifyAboutFailingSubscribersResponse)(nil), // 14: clusterIdentity.NotifyAboutFailingSubscribersResponse + (*SubscriberDeliveryReport)(nil), // 15: clusterIdentity.SubscriberDeliveryReport + (*PubSubAutoRespondBatchTransport)(nil), // 16: clusterIdentity.PubSubAutoRespondBatchTransport + (*PublishResponse)(nil), // 17: clusterIdentity.PublishResponse + (*actor.PID)(nil), // 18: actor.PID + (*ClusterIdentity)(nil), // 19: clusterIdentity.ClusterIdentity + (*durationpb.Duration)(nil), // 20: google.protobuf.Duration +} +var file_pubsub_proto_depIdxs = []int32{ + 18, // 0: clusterIdentity.SubscriberIdentity.pid:type_name -> actor.PID + 19, // 1: clusterIdentity.SubscriberIdentity.cluster_identity:type_name -> clusterIdentity.ClusterIdentity + 20, // 2: clusterIdentity.Initialize.idleTimeout:type_name -> google.protobuf.Duration + 2, // 3: clusterIdentity.Subscribers.subscribers:type_name -> clusterIdentity.SubscriberIdentity + 2, // 4: clusterIdentity.SubscribeRequest.subscriber:type_name -> clusterIdentity.SubscriberIdentity + 2, // 5: clusterIdentity.UnsubscribeRequest.subscriber:type_name -> clusterIdentity.SubscriberIdentity + 11, // 6: clusterIdentity.PubSubBatchTransport.envelopes:type_name -> clusterIdentity.PubSubEnvelope + 5, // 7: clusterIdentity.DeliverBatchRequestTransport.subscribers:type_name -> clusterIdentity.Subscribers + 10, // 8: clusterIdentity.DeliverBatchRequestTransport.batch:type_name -> clusterIdentity.PubSubBatchTransport + 15, // 9: clusterIdentity.NotifyAboutFailingSubscribersRequest.invalid_deliveries:type_name -> clusterIdentity.SubscriberDeliveryReport + 2, // 10: clusterIdentity.SubscriberDeliveryReport.subscriber:type_name -> clusterIdentity.SubscriberIdentity + 0, // 11: clusterIdentity.SubscriberDeliveryReport.status:type_name -> clusterIdentity.DeliveryStatus + 11, // 12: clusterIdentity.PubSubAutoRespondBatchTransport.envelopes:type_name -> clusterIdentity.PubSubEnvelope + 1, // 13: clusterIdentity.PublishResponse.status:type_name -> clusterIdentity.PublishStatus + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name +} + +func init() { file_pubsub_proto_init() } +func file_pubsub_proto_init() { + if File_pubsub_proto != nil { + return + } + file_cluster_proto_init() + if !protoimpl.UnsafeEnabled { + file_pubsub_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscriberIdentity); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Initialize); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Acknowledge); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Subscribers); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UnsubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UnsubscribeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PubSubBatchTransport); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PubSubEnvelope); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DeliverBatchRequestTransport); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NotifyAboutFailingSubscribersRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NotifyAboutFailingSubscribersResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscriberDeliveryReport); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PubSubAutoRespondBatchTransport); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pubsub_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_pubsub_proto_msgTypes[0].OneofWrappers = []interface{}{ + (*SubscriberIdentity_Pid)(nil), + (*SubscriberIdentity_ClusterIdentity)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pubsub_proto_rawDesc, + NumEnums: 2, + NumMessages: 16, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_pubsub_proto_goTypes, + DependencyIndexes: file_pubsub_proto_depIdxs, + EnumInfos: file_pubsub_proto_enumTypes, + MessageInfos: file_pubsub_proto_msgTypes, + }.Build() + File_pubsub_proto = out.File + file_pubsub_proto_rawDesc = nil + file_pubsub_proto_goTypes = nil + file_pubsub_proto_depIdxs = nil +} diff --git a/cluster/pubsub.proto b/cluster/pubsub.proto new file mode 100644 index 000000000..319627270 --- /dev/null +++ b/cluster/pubsub.proto @@ -0,0 +1,115 @@ +syntax = "proto3"; +package cluster; +option go_package = "/github.com/asynkron/protoactor-go/cluster"; + +import "cluster.proto"; +import "google/protobuf/duration.proto"; +import "actor.proto"; + +// Identifies a subscriber by either ClusterIdentity or PID +message SubscriberIdentity { + oneof Identity { + actor.PID pid = 1; + cluster.ClusterIdentity cluster_identity = 2; + } +} + +// First request to initialize the actor. +message Initialize { + google.protobuf.Duration idleTimeout = 1; +} + +message Acknowledge {} + +// A list of subscribers +message Subscribers { + repeated SubscriberIdentity subscribers = 1; +} + +// Sent to topic actor to add a subscriber +message SubscribeRequest { + SubscriberIdentity subscriber = 1; +} + +// Subscribe acknowledgement +message SubscribeResponse {} + +// Sent to topic actor to remove a subscriber +message UnsubscribeRequest { + SubscriberIdentity subscriber = 1; +} + +// Unsubscribe acknowledgement +message UnsubscribeResponse {} + +// Message sent from publisher to topic actor +// See also PubSubBatch +message PubSubBatchTransport { + repeated string type_names = 1; + repeated PubSubEnvelope envelopes = 2; +} + +// Contains message byte representation and type reference +message PubSubEnvelope { + int32 type_id = 1; + bytes message_data = 2; + int32 serializer_id = 3; +} + +// Message sent from topic to delivery actor +message DeliverBatchRequestTransport { + Subscribers subscribers = 1; + PubSubBatchTransport batch = 2; + string topic = 3; +} + +// Message sent from delivery actor to topic to notify of subscribers that fail to process the messages +message NotifyAboutFailingSubscribersRequest { + repeated SubscriberDeliveryReport invalid_deliveries = 1; +} + +// Ack to the delivery actor after notification of subscribers that fail to process the messages +message NotifyAboutFailingSubscribersResponse {} + +// Contains information about a failed delivery +message SubscriberDeliveryReport { + SubscriberIdentity subscriber = 1; + DeliveryStatus status = 2; +} + +// Delivery status as seen by the delivery actor +enum DeliveryStatus { + // Message was put in the queue of the subscriber + Delivered = 0; + + // Message did not reach subscriber, because it was dead + SubscriberNoLongerReachable = 1; + + // Delivery timed out + Timeout = 2; + + // Some other problem happened + OtherError = 127; +} + +// Message posted to subscriber's mailbox, that is then unrolled to single messages, and has ability to auto respond +// See also PubSubAutoRespondBatch +message PubSubAutoRespondBatchTransport { + repeated string type_names = 1; + repeated PubSubEnvelope envelopes = 2; +} + +// Status of the whole published batch or single message +enum PublishStatus { + // Batch or message was successfully published according to the delivery guarantees + Ok = 0; + + // Topic failed to forward the message + Failed = 1; +} + +// Publish ack/nack response +message PublishResponse { + // Status of the whole published batch or single message + PublishStatus status = 1; +} diff --git a/cluster/pubsub_batch.go b/cluster/pubsub_batch.go new file mode 100644 index 000000000..0871ec99d --- /dev/null +++ b/cluster/pubsub_batch.go @@ -0,0 +1,118 @@ +package cluster + +import ( + "github.com/asynkron/protoactor-go/remote" +) + +type PubSubBatch struct { + Envelopes []interface{} +} + +// Serialize converts a PubSubBatch to a PubSubBatchTransport. +func (b *PubSubBatch) Serialize() remote.RootSerialized { + batch := &PubSubBatchTransport{ + TypeNames: make([]string, 0), + Envelopes: make([]*PubSubEnvelope, 0), + } + + for _, envelope := range b.Envelopes { + var serializerId int32 + messageData, typeName, err := remote.Serialize(envelope, serializerId) + if err != nil { + panic(err) + } + // batch.TypeNames.IndexOf(typeName) + typeIndex := -1 + for i, t := range batch.TypeNames { + if t == typeName { + typeIndex = i + break + } + } + if typeIndex == -1 { + batch.TypeNames = append(batch.TypeNames, typeName) + typeIndex = len(batch.TypeNames) - 1 + } + batch.Envelopes = append(batch.Envelopes, &PubSubEnvelope{ + MessageData: messageData, + TypeId: int32(typeIndex), + SerializerId: serializerId, + }) + } + return batch +} + +// Deserialize converts a PubSubBatchTransport to a PubSubBatch. +func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable { + b := &PubSubBatch{ + Envelopes: make([]interface{}, 0), + } + + for _, envelope := range t.Envelopes { + message, err := remote.Deserialize(envelope.MessageData, t.TypeNames[envelope.TypeId], envelope.SerializerId) + if err != nil { + panic(err) + } + b.Envelopes = append(b.Envelopes, message) + } + return b +} + +type DeliverBatchRequest struct { + Subscribers *Subscribers + PubSubBatch *PubSubBatch + Topic string +} + +func (d *DeliverBatchRequest) Serialize() remote.RootSerialized { + return &DeliverBatchRequestTransport{ + Subscribers: d.Subscribers, + Batch: d.PubSubBatch.Serialize().(*PubSubBatchTransport), + Topic: d.Topic, + } +} + +func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable { + return &DeliverBatchRequest{ + Subscribers: t.Subscribers, + PubSubBatch: t.Batch.Deserialize().(*PubSubBatch), + Topic: t.Topic, + } +} + +type PubSubAutoRespondBatch struct { + Envelopes []interface{} +} + +// Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport. +func (b *PubSubAutoRespondBatch) Serialize() remote.RootSerialized { + batch := &PubSubBatch{Envelopes: b.Envelopes} + transport := batch.Serialize().(*PubSubBatchTransport) + return &PubSubAutoRespondBatchTransport{ + TypeNames: transport.TypeNames, + Envelopes: transport.Envelopes, + } +} + +// GetAutoResponse returns a PublishResponse. +func (b *PubSubAutoRespondBatch) GetAutoResponse() *PublishResponse { + return &PublishResponse{ + Status: PublishStatus_Ok, + } +} + +// GetMessages returns the message. +func (b *PubSubAutoRespondBatch) GetMessages() []interface{} { + return b.Envelopes +} + +// Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch. +func (t *PubSubAutoRespondBatchTransport) Deserialize() remote.RootSerializable { + batch := &PubSubBatchTransport{ + TypeNames: t.TypeNames, + Envelopes: t.Envelopes, + } + return &PubSubAutoRespondBatch{ + Envelopes: batch.Deserialize().(*PubSubBatch).Envelopes, + } +} diff --git a/cluster/pubsub_delivery.go b/cluster/pubsub_delivery.go new file mode 100644 index 000000000..cdd9c9011 --- /dev/null +++ b/cluster/pubsub_delivery.go @@ -0,0 +1,128 @@ +package cluster + +import ( + "github.com/asynkron/protoactor-go/actor" + "github.com/asynkron/protoactor-go/log" + "github.com/asynkron/protoactor-go/remote" + "sync" + "time" +) + +var pubsubMemberDeliveryLogThrottle = actor.NewThrottle(10, time.Second, func(i int32) { + plog.Warn("[PubSubMemberDeliveryActor] Throttled logs", log.Int("count", int(i))) +}) + +type PubSubMemberDeliveryActor struct { + subscriberTimeout time.Duration +} + +func NewPubSubMemberDeliveryActor(subscriberTimeout time.Duration) *PubSubMemberDeliveryActor { + return &PubSubMemberDeliveryActor{ + subscriberTimeout: subscriberTimeout, + } +} + +func (p *PubSubMemberDeliveryActor) Receive(c actor.Context) { + if batch, ok := c.Message().(*DeliverBatchRequest); ok { + topicBatch := &PubSubAutoRespondBatch{Envelopes: batch.PubSubBatch.Envelopes} + siList := batch.Subscribers.Subscribers + + invalidDeliveries := make([]*SubscriberDeliveryReport, 0, len(siList)) + var lock sync.Mutex + + var wg sync.WaitGroup + for _, identity := range siList { + wg.Add(1) + go func(identity *SubscriberIdentity) { + defer wg.Done() + report := p.DeliverBatch(c, topicBatch, identity) // generally concurrent safe, depends on the implementation of cluster.Call and actor.RequestFuture + if report.Status != DeliveryStatus_Delivered { + lock.Lock() + invalidDeliveries = append(invalidDeliveries, report) + lock.Unlock() + } + }(identity) + } + + if len(invalidDeliveries) > 0 { + cluster := GetCluster(c.ActorSystem()) + // we use cluster.Call to locate the topic actor in the cluster + _, _ = cluster.Call(batch.Topic, TopicActorKind, &NotifyAboutFailingSubscribersRequest{InvalidDeliveries: invalidDeliveries}) + } + + } +} + +// DeliverBatch delivers PubSubAutoRespondBatch to SubscriberIdentity. +func (p *PubSubMemberDeliveryActor) DeliverBatch(c actor.Context, batch *PubSubAutoRespondBatch, s *SubscriberIdentity) *SubscriberDeliveryReport { + status := DeliveryStatus_OtherError + if pid := s.GetPid(); pid != nil { + status = p.DeliverToPid(c, batch, pid) + } + if ci := s.GetClusterIdentity(); ci != nil { + status = p.DeliverToClusterIdentity(c, batch, ci) + } + return &SubscriberDeliveryReport{ + Subscriber: s, + Status: status, + } +} + +// DeliverToPid delivers PubSubAutoRespondBatch to PID. +func (p *PubSubMemberDeliveryActor) DeliverToPid(c actor.Context, batch *PubSubAutoRespondBatch, pid *actor.PID) DeliveryStatus { + _, err := c.RequestFuture(pid, batch, p.subscriberTimeout).Result() + if err != nil { + switch err { + case actor.ErrTimeout, remote.ErrTimeout: + if pubsubMemberDeliveryLogThrottle() == actor.Open { + plog.Warn("Pub-sub message delivered to pid timed out", log.String("pid", pid.String())) + } + return DeliveryStatus_Timeout + case actor.ErrDeadLetter, remote.ErrDeadLetter: + if pubsubMemberDeliveryLogThrottle() == actor.Open { + plog.Warn("Pub-sub message cannot be delivered to pid as it is no longer available", log.String("pid", pid.String())) + } + return DeliveryStatus_SubscriberNoLongerReachable + default: + if pubsubMemberDeliveryLogThrottle() == actor.Open { + plog.Warn("Error while delivering pub-sub message to pid", log.String("pid", pid.String()), log.Error(err)) + } + return DeliveryStatus_OtherError + } + } + return DeliveryStatus_Delivered +} + +// DeliverToClusterIdentity delivers PubSubAutoRespondBatch to ClusterIdentity. +func (p *PubSubMemberDeliveryActor) DeliverToClusterIdentity(c actor.Context, batch *PubSubAutoRespondBatch, ci *ClusterIdentity) DeliveryStatus { + cluster := GetCluster(c.ActorSystem()) + // deliver to virtual actor + // delivery should always be possible, since a virtual actor always exists + response, err := cluster.Call(ci.Identity, ci.Kind, batch, WithTimeout(p.subscriberTimeout)) + if err != nil { + switch err { + case actor.ErrTimeout, remote.ErrTimeout: + if pubsubMemberDeliveryLogThrottle() == actor.Open { + plog.Warn("Pub-sub message delivered to cluster identity timed out", log.String("cluster identity", ci.String())) + } + return DeliveryStatus_Timeout + case actor.ErrDeadLetter, remote.ErrDeadLetter: + if pubsubMemberDeliveryLogThrottle() == actor.Open { + plog.Warn("Pub-sub message cannot be delivered to cluster identity as it is no longer available", log.String("cluster identity", ci.String())) + } + return DeliveryStatus_SubscriberNoLongerReachable + default: + if pubsubMemberDeliveryLogThrottle() == actor.Open { + plog.Warn("Error while delivering pub-sub message to cluster identity", log.String("cluster identity", ci.String()), log.Error(err)) + } + return DeliveryStatus_OtherError + } + } + if response == nil { + if pubsubMemberDeliveryLogThrottle() == actor.Open { + plog.Warn("Pub-sub message delivered to cluster identity timed out", log.String("cluster identity", ci.String())) + } + return DeliveryStatus_Timeout + } + return DeliveryStatus_Delivered +} diff --git a/cluster/pubsub_extensions.go b/cluster/pubsub_extensions.go new file mode 100644 index 000000000..3b57fcb01 --- /dev/null +++ b/cluster/pubsub_extensions.go @@ -0,0 +1,62 @@ +package cluster + +import ( + "github.com/asynkron/protoactor-go/actor" +) + +// Publisher creates a new PubSub publisher that publishes messages directly to the TopicActor +func (c *Cluster) Publisher() Publisher { + return NewPublisher(c) +} + +// BatchingProducer create a new PubSub batching producer for specified topic, that publishes directly to the topic actor +func (c *Cluster) BatchingProducer(topic string, opts ...BatchProducerConfigOption) *BatchingProducer { + return NewBatchingProducer(c.Publisher(), topic, opts...) +} + +// SubscribeByPid subscribes to a PubSub topic by subscriber PID +func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) error { + _, err := c.Call(topic, TopicActorKind, &SubscribeRequest{ + Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_Pid{Pid: pid}}, + }, opts...) + return err +} + +// SubscribeByClusterIdentity subscribes to a PubSub topic by cluster identity +func (c *Cluster) SubscribeByClusterIdentity(topic string, identity ClusterIdentity, opts ...GrainCallOption) error { + _, err := c.Call(topic, TopicActorKind, &SubscribeRequest{ + Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: &identity}}, + }, opts...) + return err +} + +// SubscribeWithReceive subscribe to a PubSub topic by providing a Receive function, that will be used to spawn a subscriber actor +func (c *Cluster) SubscribeWithReceive(topic string, receive actor.ReceiveFunc, opts ...GrainCallOption) error { + props := actor.PropsFromFunc(receive) + pid := c.ActorSystem.Root.Spawn(props) + return c.SubscribeByPid(topic, pid, opts...) +} + +// UnsubscribeByPid unsubscribes from a PubSub topic by subscriber PID +func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) error { + _, err := c.Call(topic, TopicActorKind, &UnsubscribeRequest{ + Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_Pid{Pid: pid}}, + }, opts...) + return err +} + +// UnsubscribeByClusterIdentity unsubscribes from a PubSub topic by cluster identity +func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity ClusterIdentity, opts ...GrainCallOption) error { + _, err := c.Call(topic, TopicActorKind, &UnsubscribeRequest{ + Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: &identity}}, + }, opts...) + return err +} + +// UnsubscribeByIdentityAndKind unsubscribes from a PubSub topic by cluster identity +func (c *Cluster) UnsubscribeByIdentityAndKind(topic string, identity string, kind string, opts ...GrainCallOption) error { + _, err := c.Call(topic, TopicActorKind, &UnsubscribeRequest{ + Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: &ClusterIdentity{Identity: identity, Kind: kind}}}, + }, opts...) + return err +} diff --git a/cluster/pubsub_producer.go b/cluster/pubsub_producer.go new file mode 100644 index 000000000..5ffd5fb35 --- /dev/null +++ b/cluster/pubsub_producer.go @@ -0,0 +1,527 @@ +package cluster + +import ( + "errors" + "github.com/asynkron/protoactor-go/actor" + "github.com/asynkron/protoactor-go/internal/queue/mpsc" + "github.com/asynkron/protoactor-go/log" + "golang.org/x/net/context" + "sync" + "time" +) + +// PublishingErrorHandler decides what to do with a publishing error in BatchingProducer +type PublishingErrorHandler func(retries int, e error, batch PubSubBatch) PublishingErrorDecision + +type BatchingProducerConfig struct { + // Maximum size of the published batch. Default: 2000. + BatchSize int + // Max size of the requests waiting in queue. If value is provided, the producer will throw + // ProducerQueueFullException when queue size is exceeded. If 0 or unset, the queue is unbounded + // Note that bounded queue has better performance than unbounded queue. + // Default: 0 (unbounded) + MaxQueueSize int + + // How long to wait for the publishing to complete. + // Default: 5s + PublishTimeout time.Duration + + // Error handler that can decide what to do with an error when publishing a batch. + // Default: Fail and stop the BatchingProducer + OnPublishingError PublishingErrorHandler + + // A throttle for logging from this producer. By default, a throttle shared between all instances of + // BatchingProducer is used, that allows for 10 events in 1 second. + LogThrottle actor.ShouldThrottle + + // Optional idle timeout which will specify to the `IPublisher` how long it should wait before invoking clean + // up code to recover resources. + PublisherIdleTimeout time.Duration +} + +var defaultBatchingProducerLogThrottle = actor.NewThrottle(10, time.Second, func(i int32) { + plog.Info("[BatchingProducer] Throttled logs", log.Int("count", int(i))) +}) + +func newBatchingProducerConfig(opts ...BatchProducerConfigOption) *BatchingProducerConfig { + config := &BatchingProducerConfig{ + BatchSize: 2000, + PublishTimeout: 5 * time.Second, + OnPublishingError: func(retries int, e error, batch PubSubBatch) PublishingErrorDecision { + return FailBatchAndStop + }, + LogThrottle: defaultBatchingProducerLogThrottle, + } + + for _, opt := range opts { + opt(config) + } + + return config +} + +type BatchingProducer struct { + config *BatchingProducerConfig + topic string + publisher Publisher + publisherChannel channel[produceMessage] + loopCancel context.CancelFunc + msgLeft uint32 +} + +func NewBatchingProducer(publisher Publisher, topic string, opts ...BatchProducerConfigOption) *BatchingProducer { + config := newBatchingProducerConfig(opts...) + p := &BatchingProducer{ + config: config, + topic: topic, + publisher: publisher, + msgLeft: 0, + } + if config.MaxQueueSize > 0 { + p.publisherChannel = newBoundedChannel[produceMessage](config.MaxQueueSize) + } else { + p.publisherChannel = newUnboundedChannel[produceMessage]() + } + ctx, cancelFunc := context.WithCancel(context.Background()) + p.loopCancel = cancelFunc + go p.publishLoop(ctx) + + return p +} + +type pubsubBatchWithReceipts struct { + batch PubSubBatch + ctxArr []context.Context +} + +// newPubSubBatchWithReceipts creates a new pubsubBatchWithReceipts +func newPubSubBatchWithReceipts() *pubsubBatchWithReceipts { + return &pubsubBatchWithReceipts{ + batch: PubSubBatch{Envelopes: make([]interface{}, 0, 10)}, + ctxArr: make([]context.Context, 0, 10), + } +} + +type produceMessage struct { + message interface{} + ctx context.Context +} + +// Dispose stops the producer and releases all resources. +func (p *BatchingProducer) Dispose() { + p.loopCancel() +} + +// ProduceProcessInfo is the context for a Produce call +type ProduceProcessInfo struct { + Finished chan struct{} + Err error + cancelFunc context.CancelFunc + ctx context.Context +} + +// IsCancelled returns true if the context has been cancelled +func (p *ProduceProcessInfo) IsCancelled() bool { + select { + case <-p.ctx.Done(): + return p.ctx.Err() == context.Canceled + default: + return false + } +} + +// setErr sets the error for the ProduceProcessInfo +func (p *ProduceProcessInfo) setErr(err error) { + p.Err = err + close(p.Finished) +} + +// cancel the ProduceProcessInfo context +func (p *ProduceProcessInfo) cancel() { + p.cancel() + close(p.Finished) +} + +// success closes the ProduceProcessInfo Finished channel +func (p *ProduceProcessInfo) success() { + close(p.Finished) +} + +type produceProcessInfoKey struct{} + +// GetProduceProcessInfo adds a new produce info to the BatchingProducer.Produce context +func (p *BatchingProducer) getProduceProcessInfo(ctx context.Context) *ProduceProcessInfo { + return ctx.Value(produceProcessInfoKey{}).(*ProduceProcessInfo) +} + +// Produce a message to producer queue. The returned context will be done when the message is actually published. +func (p *BatchingProducer) Produce(ctx context.Context, message interface{}) (*ProduceProcessInfo, error) { + ctx, cancel := context.WithCancel(ctx) + info := &ProduceProcessInfo{ + Finished: make(chan struct{}), + cancelFunc: cancel, + ctx: ctx, + } + ctx = context.WithValue(ctx, produceProcessInfoKey{}, info) + if !p.publisherChannel.tryWrite(produceMessage{ + message: message, + ctx: ctx, + }) { + if p.publisherChannel.isComplete() { + return info, &InvalidOperationException{topic: p.topic} + } + return info, &ProducerQueueFullException{topic: p.topic} + } + return info, nil +} + +// publishLoop is the main loop of the producer. It reads messages from the queue and publishes them in batches. +func (p *BatchingProducer) publishLoop(ctx context.Context) { + plog.Debug("Producer is starting the publisher loop for topic", log.String("topic", p.topic)) + batchWrapper := newPubSubBatchWithReceipts() + + handleUnrecoverableError := func(err error) { + p.stopAcceptingNewMessages() + if p.config.LogThrottle() == actor.Open { + plog.Error("Error in the publisher loop of Producer for topic", log.String("topic", p.topic), log.Error(err)) + } + p.failBatch(batchWrapper, err) + p.failPendingMessages(err) + } + + _, err := p.publisher.Initialize(ctx, p.topic, PublisherConfig{IdleTimeout: p.config.PublisherIdleTimeout}) + if err != nil && err != context.Canceled { + handleUnrecoverableError(err) + } + +loop: + for { + select { + case <-ctx.Done(): + p.stopAcceptingNewMessages() + break loop + default: + if msg, ok := p.publisherChannel.tryRead(); ok { + + // if msg ctx not done + if _, done := <-msg.ctx.Done(); !done { + batchWrapper.batch.Envelopes = append(batchWrapper.batch.Envelopes, msg.message) + batchWrapper.ctxArr = append(batchWrapper.ctxArr, msg.ctx) + } + if len(batchWrapper.batch.Envelopes) < p.config.BatchSize { + continue + } + + err := p.publishBatch(ctx, batchWrapper) + if err != nil { + handleUnrecoverableError(err) + break loop + } + batchWrapper = newPubSubBatchWithReceipts() + } else { + if len(batchWrapper.batch.Envelopes) > 0 { + err := p.publishBatch(ctx, batchWrapper) + if err != nil { + handleUnrecoverableError(err) + break loop + } + batchWrapper = newPubSubBatchWithReceipts() + } + p.publisherChannel.waitToRead() + } + } + } + p.cancelBatch(batchWrapper) + p.cancelPendingMessages() +} + +// cancelPendingMessages cancels all pending messages +func (p *BatchingProducer) cancelPendingMessages() { + for { + if msg, ok := p.publisherChannel.tryRead(); ok { + p.getProduceProcessInfo(msg.ctx).cancel() + } else { + break + } + } +} + +// cancelBatch cancels all contexts in the batch wrapper +func (p *BatchingProducer) cancelBatch(batchWrapper *pubsubBatchWithReceipts) { + for _, ctx := range batchWrapper.ctxArr { + p.getProduceProcessInfo(ctx).cancel() + } + + // ensure once cancelled, we won't touch the batch anymore + p.clearBatch(batchWrapper) +} + +// failPendingMessages fails all pending messages +func (p *BatchingProducer) failPendingMessages(err error) { + for { + if msg, ok := p.publisherChannel.tryRead(); ok { + p.getProduceProcessInfo(msg.ctx).setErr(err) + } else { + break + } + } +} + +// failBatch marks all contexts in the batch wrapper as failed +func (p *BatchingProducer) failBatch(batchWrapper *pubsubBatchWithReceipts, err error) { + for _, ctx := range batchWrapper.ctxArr { + p.getProduceProcessInfo(ctx).setErr(err) + } + + // ensure once failed, we won't touch the batch anymore + p.clearBatch(batchWrapper) +} + +// clearBatch clears the batch wrapper +func (p *BatchingProducer) clearBatch(batchWrapper *pubsubBatchWithReceipts) { + batchWrapper.batch = PubSubBatch{Envelopes: make([]interface{}, 0, 10)} + batchWrapper.ctxArr = batchWrapper.ctxArr[:0] +} + +// completeBatch marks all contexts in the batch wrapper as completed +func (p *BatchingProducer) completeBatch(batchWrapper *pubsubBatchWithReceipts) { + for _, ctx := range batchWrapper.ctxArr { + p.getProduceProcessInfo(ctx).success() + } + + // ensure once completed, we won't touch the batch anymore + p.clearBatch(batchWrapper) +} + +// removeCancelledFromBatch removes all cancelled contexts from the batch wrapper +func (p *BatchingProducer) removeCancelledFromBatch(batchWrapper *pubsubBatchWithReceipts) { + for i := 0; i < len(batchWrapper.ctxArr); i++ { + if _, done := <-batchWrapper.ctxArr[i].Done(); done { + batchWrapper.batch.Envelopes = append(batchWrapper.batch.Envelopes[:i], batchWrapper.batch.Envelopes[i+1:]...) + batchWrapper.ctxArr = append(batchWrapper.ctxArr[:i], batchWrapper.ctxArr[i+1:]...) + i-- + } + } +} + +// stopAcceptingNewMessages stops accepting new messages into the channel. +func (p *BatchingProducer) stopAcceptingNewMessages() { + p.publisherChannel.complete() +} + +// publishBatch publishes a batch of messages using Publisher. +func (p *BatchingProducer) publishBatch(ctx context.Context, batchWrapper *pubsubBatchWithReceipts) error { + retries := 0 + retry := true + + for retry { + select { + case <-ctx.Done(): + p.cancelBatch(batchWrapper) + break + default: + retries++ + + timeoutCtx, _ := context.WithTimeout(ctx, p.config.PublishTimeout) + resp, err := p.publisher.PublishBatch(timeoutCtx, p.topic, &batchWrapper.batch) + if err != nil { + decision := p.config.OnPublishingError(retries, err, batchWrapper.batch) + if decision == FailBatchAndStop { + p.stopAcceptingNewMessages() + p.failBatch(batchWrapper, err) + return err // let the main producer loop exit + } + + if p.config.LogThrottle() == actor.Open { + plog.Warn("Error while publishing batch", log.Error(err)) + } + + if decision == FailBatchAndContinue { + p.failBatch(batchWrapper, err) + return nil + } + + // the decision is to retry + // if any of the messages have been canceled in the meantime, remove them and cancel the delivery report + p.removeCancelledFromBatch(batchWrapper) + + if len(batchWrapper.batch.Envelopes) == 0 { + retry = false + } else if decision.Delay > 0 { + time.Sleep(decision.Delay) + } + + continue + } + + if resp == nil { + return errors.New("timeout when publishing message batch") + } + + retry = false + p.completeBatch(batchWrapper) + } + } + + return nil +} + +type ProducerQueueFullException struct { + topic string +} + +func (p *ProducerQueueFullException) Error() string { + return "Producer for topic " + p.topic + " has full queue" +} + +type InvalidOperationException struct { + topic string +} + +func (i *InvalidOperationException) Error() string { + return "Producer for topic " + i.topic + " is stopped, cannot produce more messages." +} + +// channel is a wrapper around a channel that can be used to read and write messages. +// messages must be pointers. +type channel[T any] interface { + tryWrite(msg T) bool + tryRead() (T, bool) + isComplete() bool + complete() + empty() bool + waitToRead() +} + +// BoundedChannel is a bounded channel with the given capacity. +type boundedChannel[T any] struct { + capacity int + c chan T + quit chan struct{} + once *sync.Once + cond *sync.Cond +} + +func (b boundedChannel[T]) tryWrite(msg T) bool { + select { + case b.c <- msg: + b.cond.Broadcast() + return true + case <-b.quit: + return false + default: + return false + } +} + +func (b boundedChannel[T]) tryRead() (msg T, ok bool) { + msg, ok = <-b.c + if ok { + b.cond.Broadcast() + } + return +} + +func (b boundedChannel[T]) isComplete() bool { + select { + case <-b.quit: + return true + default: + return false + } +} + +func (b boundedChannel[T]) complete() { + b.once.Do(func() { + close(b.quit) + }) +} + +func (b boundedChannel[T]) empty() bool { + return len(b.c) == 0 +} + +func (b boundedChannel[T]) waitToRead() { + b.cond.L.Lock() + defer b.cond.L.Unlock() + for b.empty() { + b.cond.Wait() + } +} + +// newBoundedChannel creates a new bounded channel with the given capacity. +func newBoundedChannel[T any](capacity int) channel[T] { + return &boundedChannel[T]{ + capacity: capacity, + c: make(chan T, capacity), + quit: make(chan struct{}), + cond: sync.NewCond(&sync.Mutex{}), + once: &sync.Once{}, + } +} + +// UnboundedChannel is an unbounded channel. +type unboundedChannel[T any] struct { + queue *mpsc.Queue + quit chan struct{} + once *sync.Once + cond *sync.Cond +} + +func (u unboundedChannel[T]) tryWrite(msg T) bool { + select { + case <-u.quit: + return false + default: + u.queue.Push(msg) + u.cond.Broadcast() + return true + } +} + +func (u unboundedChannel[T]) tryRead() (T, bool) { + msg := u.queue.Pop() + if msg == nil { + return msg, false + } else { + u.cond.Broadcast() + return msg.(T), true + } +} + +func (u unboundedChannel[T]) complete() { + u.once.Do(func() { + close(u.quit) + }) +} + +func (u unboundedChannel[T]) isComplete() bool { + select { + case <-u.quit: + return true + default: + return false + } +} + +func (u unboundedChannel[T]) empty() bool { + return u.queue.Empty() +} + +func (u unboundedChannel[T]) waitToRead() { + u.cond.L.Lock() + defer u.cond.L.Unlock() + for u.empty() { + u.cond.Wait() + } +} + +// newUnboundedChannel creates a new unbounded channel. +func newUnboundedChannel[T any]() channel[T] { + return unboundedChannel[T]{ + queue: mpsc.New(), + quit: make(chan struct{}), + cond: sync.NewCond(&sync.Mutex{}), + once: &sync.Once{}, + } +} diff --git a/cluster/pubsub_producer_opts.go b/cluster/pubsub_producer_opts.go new file mode 100644 index 000000000..520f75362 --- /dev/null +++ b/cluster/pubsub_producer_opts.go @@ -0,0 +1,81 @@ +package cluster + +import ( + "github.com/asynkron/protoactor-go/actor" + "time" +) + +type BatchProducerConfigOption func(config *BatchingProducerConfig) + +// WithBatchProducerBatchSize sets maximum size of the published batch. Default: 2000. +func WithBatchProducerBatchSize(batchSize int) BatchProducerConfigOption { + return func(config *BatchingProducerConfig) { + config.BatchSize = batchSize + } +} + +// WithBatchProducerMaxQueueSize set max size of the requests waiting in queue. If value is provided, the producer will throw +// ProducerQueueFullException when queue size is exceeded. If 0 or unset, the queue is unbounded +// Note that bounded queue has better performance than unbounded queue. +// Default: 0 (unbounded) +func WithBatchProducerMaxQueueSize(maxQueueSize int) BatchProducerConfigOption { + return func(config *BatchingProducerConfig) { + config.MaxQueueSize = maxQueueSize + } +} + +// WithBatchProducerPublishTimeout sets how long to wait for the publishing to complete. +// Default: 5s +func WithBatchProducerPublishTimeout(publishTimeout time.Duration) BatchProducerConfigOption { + return func(config *BatchingProducerConfig) { + config.PublishTimeout = publishTimeout + } +} + +// WithBatchProducerOnPublishingError sets error handler that can decide what to do with an error when publishing a batch. +// Default: Fail and stop the BatchingProducer +func WithBatchProducerOnPublishingError(onPublishingError PublishingErrorHandler) BatchProducerConfigOption { + return func(config *BatchingProducerConfig) { + config.OnPublishingError = onPublishingError + } +} + +// WithBatchProducerLogThrottle sets a throttle for logging from this producer. By default, a throttle shared between all instances of +// BatchingProducer is used, that allows for 10 events in 10 seconds. +func WithBatchProducerLogThrottle(logThrottle actor.ShouldThrottle) BatchProducerConfigOption { + return func(config *BatchingProducerConfig) { + config.LogThrottle = logThrottle + } +} + +// WithBatchProducerPublisherIdleTimeout sets an optional idle timeout which will specify to the `IPublisher` how long it should wait before invoking clean +// up code to recover resources. +func WithBatchProducerPublisherIdleTimeout(publisherIdleTimeout time.Duration) BatchProducerConfigOption { + return func(config *BatchingProducerConfig) { + config.PublisherIdleTimeout = publisherIdleTimeout + } +} + +type PublishingErrorDecision struct { + Delay time.Duration +} + +// NewPublishingErrorDecision creates a new PublishingErrorDecision +func NewPublishingErrorDecision(delay time.Duration) PublishingErrorDecision { + return PublishingErrorDecision{Delay: delay} +} + +// RetryBatchAfter returns a new PublishingErrorDecision with the Delay set to the given duration +func RetryBatchAfter(delay time.Duration) PublishingErrorDecision { + return NewPublishingErrorDecision(delay) +} + +// FailBatchAndStop causes the BatchingProducer to stop and fail the pending messages +var FailBatchAndStop = NewPublishingErrorDecision(0) + +// FailBatchAndContinue skips the current batch and proceeds to the next one. The delivery reports (tasks) related to that batch are still +// failed with the exception that triggered the error handling. +var FailBatchAndContinue = NewPublishingErrorDecision(0) + +// RetryBatchImmediately retries the current batch immediately +var RetryBatchImmediately = NewPublishingErrorDecision(0) diff --git a/cluster/pubsub_publisher.go b/cluster/pubsub_publisher.go new file mode 100644 index 000000000..e781e9c4b --- /dev/null +++ b/cluster/pubsub_publisher.go @@ -0,0 +1,51 @@ +package cluster + +import ( + "context" + "google.golang.org/protobuf/types/known/durationpb" + "time" +) + +type PublisherConfig struct { + IdleTimeout time.Duration +} + +type Publisher interface { + // Initialize the internal mechanisms of this publisher. + Initialize(ctx context.Context, topic string, config PublisherConfig) (*Acknowledge, error) + + // PublishBatch publishes a batch of messages to the topic. + PublishBatch(ctx context.Context, topic string, batch *PubSubBatch) (*PublishResponse, error) +} + +type defaultPublisher struct { + cluster *Cluster +} + +func NewPublisher(cluster *Cluster) Publisher { + return &defaultPublisher{ + cluster: cluster, + } +} + +func (p *defaultPublisher) Initialize(ctx context.Context, topic string, config PublisherConfig) (*Acknowledge, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + res, err := p.cluster.Call(topic, TopicActorKind, &Initialize{ + IdleTimeout: durationpb.New(config.IdleTimeout), + }) + return res.(*Acknowledge), err + } +} + +func (p *defaultPublisher) PublishBatch(ctx context.Context, topic string, batch *PubSubBatch) (*PublishResponse, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + res, err := p.cluster.Call(topic, TopicActorKind, batch) + return res.(*PublishResponse), err + } +} diff --git a/cluster/pubsub_topic.go b/cluster/pubsub_topic.go new file mode 100644 index 000000000..c323190f8 --- /dev/null +++ b/cluster/pubsub_topic.go @@ -0,0 +1,354 @@ +package cluster + +import ( + "context" + "github.com/asynkron/protoactor-go/actor" + "github.com/asynkron/protoactor-go/eventstream" + "github.com/asynkron/protoactor-go/log" + "golang.org/x/exp/maps" + "strings" + "time" +) + +const TopicActorKind = "prototopic" + +var topicLogThrottle = actor.NewThrottle(10, time.Second, func(count int32) { + plog.Info("[TopicActor] Throttled logs", log.Int("count", int(count))) +}) + +type TopicActor struct { + topic string + subscribers map[subscribeIdentityStruct]*SubscriberIdentity + subscriptionStore KeyValueStore[*Subscribers] + topologySubscription *eventstream.Subscription +} + +func NewTopicActor(store KeyValueStore[*Subscribers]) TopicActor { + return TopicActor{ + subscriptionStore: store, + subscribers: make(map[subscribeIdentityStruct]*SubscriberIdentity), + } +} + +func (t *TopicActor) Receive(c actor.Context) { + switch msg := c.Message().(type) { + case *actor.Started: + t.onStarted(c) + case *actor.Stopping: + t.onStopping(c) + case *actor.ReceiveTimeout: + t.onReceiveTimeout(c) + case *Initialize: + t.onInitialize(c, msg) + case *SubscribeRequest: + t.onSubscribe(c, msg) + case *UnsubscribeRequest: + t.onUnsubscribe(c, msg) + case *PubSubBatch: + t.onPubSubBatch(c, msg) + case *NotifyAboutFailingSubscribersRequest: + t.onNotifyAboutFailingSubscribers(c, msg) + case *ClusterTopology: + t.onClusterTopologyChanged(c, msg) + } + +} + +func (t *TopicActor) onStarted(c actor.Context) { + t.topic = GetClusterIdentity(c).Identity + t.topologySubscription = c.ActorSystem().EventStream.Subscribe(func(evt interface{}) { + if clusterTopology, ok := evt.(*ClusterTopology); ok { + c.Send(c.Self(), clusterTopology) + } + }) + + sub := t.loadSubscriptions(t.topic) + if sub.Subscribers != nil { + for _, subscriber := range sub.Subscribers { + t.subscribers[newSubscribeIdentityStruct(subscriber)] = subscriber + } + } + t.unsubscribeSubscribersOnMembersThatLeft(c) + + plog.Debug("Topic started", log.String("topic", t.topic)) +} + +func (t *TopicActor) onStopping(c actor.Context) { + if t.topologySubscription != nil { + c.ActorSystem().EventStream.Unsubscribe(t.topologySubscription) + t.topologySubscription = nil + } +} + +func (t *TopicActor) onReceiveTimeout(c actor.Context) { + c.Stop(c.Self()) +} + +func (t *TopicActor) onInitialize(c actor.Context, msg *Initialize) { + if msg.IdleTimeout != nil { + c.SetReceiveTimeout(msg.IdleTimeout.AsDuration()) + } + c.Respond(&Acknowledge{}) +} + +type pidAndSubscriber struct { + pid *actor.PID + subscriber *SubscriberIdentity +} + +// onPubSubBatch handles a PubSubBatch message, sends the message to all subscribers +func (t *TopicActor) onPubSubBatch(c actor.Context, batch *PubSubBatch) { + // map subscribers to map[address][](pid, subscriber) + members := make(map[string][]pidAndSubscriber) + for _, identity := range t.subscribers { + pid := t.getPID(c, identity) + if pid != nil { + members[pid.Address] = append(members[pid.Address], pidAndSubscriber{pid: pid, subscriber: identity}) + } + } + + // send message to each member + for address, member := range members { + subscribersOnMember := t.getSubscribersForAddress(member) + deliveryMessage := &DeliverBatchRequest{ + Subscribers: subscribersOnMember, + PubSubBatch: batch, + Topic: t.topic, + } + deliveryPid := actor.NewPID(address, PubSubDeliveryName) + c.Send(deliveryPid, deliveryMessage) + } + c.Respond(&PublishResponse{}) +} + +// getSubscribersForAddress returns the subscribers for the given member list +func (t *TopicActor) getSubscribersForAddress(members []pidAndSubscriber) *Subscribers { + subscribers := make([]*SubscriberIdentity, len(members)) + for i, member := range members { + subscribers[i] = member.subscriber + } + return &Subscribers{Subscribers: subscribers} +} + +// getPID returns the PID of the subscriber +func (t *TopicActor) getPID(c actor.Context, subscriber *SubscriberIdentity) *actor.PID { + if pid := subscriber.GetPid(); pid != nil { + return pid + } + + return t.getClusterIdentityPid(c, subscriber.GetClusterIdentity()) +} + +// getClusterIdentityPid returns the PID of the clusterIdentity actor +func (t *TopicActor) getClusterIdentityPid(c actor.Context, identity *ClusterIdentity) *actor.PID { + if identity == nil { + return nil + } + + return GetCluster(c.ActorSystem()).Get(identity.Identity, identity.Kind) +} + +// onNotifyAboutFailingSubscribers handles a NotifyAboutFailingSubscribersRequest message +func (t *TopicActor) onNotifyAboutFailingSubscribers(c actor.Context, msg *NotifyAboutFailingSubscribersRequest) { + t.unsubscribeUnreachablePidSubscribers(c, msg.InvalidDeliveries) + t.logDeliveryErrors(msg.InvalidDeliveries) + c.Respond(&NotifyAboutFailingSubscribersResponse{}) +} + +// logDeliveryErrors logs the delivery errors in one log line +func (t *TopicActor) logDeliveryErrors(reports []*SubscriberDeliveryReport) { + if len(reports) > 0 || topicLogThrottle() == actor.Open { + subscribers := make([]string, len(reports)) + for i, report := range reports { + subscribers[i] = report.Subscriber.String() + } + plog.Error("Topic following subscribers could not process the batch", log.String("topic", t.topic), log.String("subscribers", strings.Join(subscribers, ","))) + } +} + +// unsubscribeUnreachablePidSubscribers deletes all subscribers that have a PID that is unreachable +func (t *TopicActor) unsubscribeUnreachablePidSubscribers(_ actor.Context, allInvalidDeliveryReports []*SubscriberDeliveryReport) { + subscribers := make([]subscribeIdentityStruct, 0, len(allInvalidDeliveryReports)) + for _, r := range allInvalidDeliveryReports { + if r.Subscriber.GetPid() != nil && r.Status == DeliveryStatus_SubscriberNoLongerReachable { + subscribers = append(subscribers, newSubscribeIdentityStruct(r.Subscriber)) + } + } + t.removeSubscribers(subscribers) +} + +// onClusterTopologyChanged handles a ClusterTopology message +func (t *TopicActor) onClusterTopologyChanged(_ actor.Context, msg *ClusterTopology) { + if len(msg.Left) > 0 { + addressMap := make(map[string]struct{}) + for _, member := range msg.Left { + addressMap[member.Address()] = struct{}{} + } + + subscribersThatLeft := make([]subscribeIdentityStruct, 0, len(msg.Left)) + + for identityStruct, identity := range t.subscribers { + if pid := identity.GetPid(); pid != nil { + if _, ok := addressMap[pid.Address]; ok { + subscribersThatLeft = append(subscribersThatLeft, identityStruct) + } + } + } + t.removeSubscribers(subscribersThatLeft) + } +} + +// unsubscribeSubscribersOnMembersThatLeft removes subscribers that are on members that left the clusterIdentity +func (t *TopicActor) unsubscribeSubscribersOnMembersThatLeft(c actor.Context) { + members := GetCluster(c.ActorSystem()).MemberList.Members() + activeMemberAddresses := make(map[string]struct{}) + for _, member := range members.Members() { + activeMemberAddresses[member.Address()] = struct{}{} + } + + subscribersThatLeft := make([]subscribeIdentityStruct, 0) + for s := range t.subscribers { + if s.isPID { + if _, ok := activeMemberAddresses[s.pid.address]; !ok { + subscribersThatLeft = append(subscribersThatLeft, s) + } + } + } + t.removeSubscribers(subscribersThatLeft) +} + +// removeSubscribers remove subscribers from the topic +func (t *TopicActor) removeSubscribers(subscribersThatLeft []subscribeIdentityStruct) { + if len(subscribersThatLeft) > 0 { + for _, subscriber := range subscribersThatLeft { + delete(t.subscribers, subscriber) + } + if topicLogThrottle() == actor.Open { + plog.Warn("Topic removed subscribers, because they are dead or they are on members that left the clusterIdentity:", log.String("topic", t.topic), log.Object("subscribers", subscribersThatLeft)) + } + t.saveSubscriptionsInTopicActor() + } +} + +// loadSubscriptions loads the subscriptions for the topic from the subscription store +func (t *TopicActor) loadSubscriptions(topic string) *Subscribers { + //TODO: cancellation logic config? + state, err := t.subscriptionStore.Get(context.Background(), topic) + if err != nil { + if topicLogThrottle() == actor.Open { + plog.Error("Error when loading subscriptions", log.String("topic", topic), log.Error(err)) + } + return &Subscribers{} + } + plog.Debug("Loaded subscriptions for topic", log.String("topic", topic), log.Object("subscriptions", state)) + return state +} + +// saveSubscriptionsInTopicActor saves the TopicActor.subscribers for the TopicActor.topic to the subscription store +func (t *TopicActor) saveSubscriptionsInTopicActor() { + t.saveSubscriptions(t.topic, &Subscribers{Subscribers: maps.Values(t.subscribers)}) +} + +// saveSubscriptions saves the subscribers for the topic to the subscription store +func (t *TopicActor) saveSubscriptions(topic string, subscribers *Subscribers) { + //TODO: cancellation logic config? + plog.Debug("Saving subscriptions for topic", log.String("topic", topic), log.Object("subscriptions", subscribers)) + err := t.subscriptionStore.Set(context.Background(), topic, subscribers) + if err != nil && topicLogThrottle() == actor.Open { + plog.Error("Error when saving subscriptions", log.String("topic", topic), log.Error(err)) + } +} + +func (t *TopicActor) onUnsubscribe(c actor.Context, msg *UnsubscribeRequest) { + delete(t.subscribers, newSubscribeIdentityStruct(msg.Subscriber)) + t.saveSubscriptionsInTopicActor() + c.Respond(&Acknowledge{}) +} + +func (t *TopicActor) onSubscribe(c actor.Context, msg *SubscribeRequest) { + t.subscribers[newSubscribeIdentityStruct(msg.Subscriber)] = msg.Subscriber + plog.Debug("Topic subscribed", log.String("topic", t.topic), log.Object("subscriber", msg.Subscriber)) + t.saveSubscriptionsInTopicActor() + c.Respond(&SubscribeResponse{}) +} + +// pidStruct is a struct that represents a PID +// It is used to implement the comparison interface +type pidStruct struct { + address string + id string + requestId uint32 +} + +// newPIDStruct creates a new pidStruct from a *actor.PID +func newPidStruct(pid *actor.PID) pidStruct { + return pidStruct{ + address: pid.Address, + id: pid.Id, + requestId: pid.RequestId, + } +} + +// toPID converts a pidStruct to a *actor.PID +func (p pidStruct) toPID() *actor.PID { + return &actor.PID{ + Address: p.address, + Id: p.id, + RequestId: p.requestId, + } +} + +type clusterIdentityStruct struct { + identity string + kind string +} + +// newClusterIdentityStruct creates a new clusterIdentityStruct from a *ClusterIdentity +func newClusterIdentityStruct(clusterIdentity *ClusterIdentity) clusterIdentityStruct { + return clusterIdentityStruct{ + identity: clusterIdentity.Identity, + kind: clusterIdentity.Kind, + } +} + +// toClusterIdentity converts a clusterIdentityStruct to a *ClusterIdentity +func (c clusterIdentityStruct) toClusterIdentity() *ClusterIdentity { + return &ClusterIdentity{ + Identity: c.identity, + Kind: c.kind, + } +} + +// subscriberIdentityStruct is a struct that represents a SubscriberIdentity +// It is used to implement the comparison interface +type subscribeIdentityStruct struct { + isPID bool + pid pidStruct + clusterIdentity clusterIdentityStruct +} + +// newSubscriberIdentityStruct creates a new subscriberIdentityStruct from a *SubscriberIdentity +func newSubscribeIdentityStruct(subscriberIdentity *SubscriberIdentity) subscribeIdentityStruct { + if subscriberIdentity.GetPid() != nil { + return subscribeIdentityStruct{ + isPID: true, + pid: newPidStruct(subscriberIdentity.GetPid()), + } + } + return subscribeIdentityStruct{ + isPID: false, + clusterIdentity: newClusterIdentityStruct(subscriberIdentity.GetClusterIdentity()), + } +} + +// toSubscriberIdentity converts a subscribeIdentityStruct to a *SubscriberIdentity +func (s subscribeIdentityStruct) toSubscriberIdentity() *SubscriberIdentity { + if s.isPID { + return &SubscriberIdentity{ + Identity: &SubscriberIdentity_Pid{Pid: s.pid.toPID()}, + } + } + return &SubscriberIdentity{ + Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: s.clusterIdentity.toClusterIdentity()}, + } +} diff --git a/go.mod b/go.mod index ddd93ff83..0fdb23999 100644 --- a/go.mod +++ b/go.mod @@ -29,10 +29,10 @@ require ( require ( github.com/go-zookeeper/zk v1.0.2 github.com/golang/mock v1.5.0 - github.com/golang/protobuf v1.5.2 github.com/labstack/echo v3.3.10+incompatible github.com/lithammer/shortuuid/v4 v4.0.0 go.etcd.io/etcd/client/v3 v3.5.4 + golang.org/x/exp v0.0.0-20220518171630-0b5c67f07fdf golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 k8s.io/api v0.24.1 k8s.io/apimachinery v0.24.1 @@ -53,6 +53,7 @@ require ( github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect github.com/go-openapi/swag v0.21.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic v0.6.9 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -92,7 +93,6 @@ require ( go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect - golang.org/x/exp v0.0.0-20220518171630-0b5c67f07fdf // indirect golang.org/x/oauth2 v0.0.0-20220524215830-622c5d57e401 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 // indirect diff --git a/internal/queue/mpsc/mpsc_test.go b/internal/queue/mpsc/mpsc_test.go index 22f542a8f..92082b241 100644 --- a/internal/queue/mpsc/mpsc_test.go +++ b/internal/queue/mpsc/mpsc_test.go @@ -147,6 +147,34 @@ func benchmarkPushPop(count, c int) { wg.Wait() } +func benchmarkChannelPushPop(count, c int) { + var wg sync.WaitGroup + wg.Add(1) + ch := make(chan interface{}, 100) + go func() { + i := 0 + for { + <-ch + i++ + if i == count { + wg.Done() + return + } + } + }() + + var val interface{} = "foo" + + for i := 0; i < c; i++ { + go func(n int) { + for n > 0 { + ch <- val + n-- + } + }(count / c) + } +} + func BenchmarkPushPop(b *testing.B) { benchmarks := []struct { count int @@ -164,6 +192,10 @@ func BenchmarkPushPop(b *testing.B) { count: 10000, concurrency: 4, }, + { + count: 10000, + concurrency: 8, + }, } for _, bm := range benchmarks { b.Run(fmt.Sprintf("%d_%d", bm.count, bm.concurrency), func(b *testing.B) { @@ -173,3 +205,34 @@ func BenchmarkPushPop(b *testing.B) { }) } } + +func BenchmarkChannelPushPop(b *testing.B) { + benchmarks := []struct { + count int + concurrency int + }{ + { + count: 10000, + concurrency: 1, + }, + { + count: 10000, + concurrency: 2, + }, + { + count: 10000, + concurrency: 4, + }, + { + count: 10000, + concurrency: 8, + }, + } + for _, bm := range benchmarks { + b.Run(fmt.Sprintf("%d_%d", bm.count, bm.concurrency), func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchmarkChannelPushPop(bm.count, bm.concurrency) + } + }) + } +} diff --git a/remote/endpoint_reader.go b/remote/endpoint_reader.go index a51cbff25..eeb2d9bed 100644 --- a/remote/endpoint_reader.go +++ b/remote/endpoint_reader.go @@ -138,6 +138,12 @@ func (s *endpointReader) onMessageBatch(m *MessageBatch) error { return err } + // translate from on-the-wire representation to in-process representation + // this only applies to root level messages, and never on nested child messages + if v, ok := message.(RootSerialized); ok { + message = v.Deserialize() + } + switch msg := message.(type) { case *actor.Terminated: rt := &remoteTerminate{ diff --git a/remote/endpoint_writer.go b/remote/endpoint_writer.go index 442a08cd7..e34ecbe86 100644 --- a/remote/endpoint_writer.go +++ b/remote/endpoint_writer.go @@ -61,16 +61,16 @@ func (state *endpointWriter) initialize(ctx actor.Context) { return - // plog.Error("EndpointWriter failed to connect", log.String("address", state.address), log.Error(err)) + // plog.Error("EndpointWriter failed to connect", log.String("address", state.address), log.Error(err)) // Wait 2 seconds to restart and retry // TODO: Replace with Exponential Backoff // send this as a message to self - do not block the mailbox processing // if in the meantime the actor is stopped (EndpointTerminated event), the message will be ignored (deadlettered) // TODO: would it be a better idea to just publish EndpointTerminatedEvent here? to use the same path as when the connection is lost? - // time.AfterFunc(2*time.Second, func() { - // ctx.Send(ctx.Self(), &restartAfterConnectFailure{err}) - // }) + // time.AfterFunc(2*time.Second, func() { + // ctx.Send(ctx.Self(), &restartAfterConnectFailure{err}) + // }) } @@ -201,7 +201,14 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) } } - bytes, typeName, err := Serialize(rd.message, serializerID) + // if the message can be translated to a serialization representation, we do this here + // this only apply to root level messages and never to nested child objects inside the message + message := rd.message + if v, ok := message.(RootSerializable); ok { + message = v.Serialize() + } + + bytes, typeName, err := Serialize(message, serializerID) if err != nil { panic(err) } diff --git a/remote/serializer.go b/remote/serializer.go index c39a6c406..144bad502 100644 --- a/remote/serializer.go +++ b/remote/serializer.go @@ -29,3 +29,17 @@ func Serialize(message interface{}, serializerID int32) ([]byte, string, error) func Deserialize(message []byte, typeName string, serializerID int32) (interface{}, error) { return serializers[serializerID].Deserialize(typeName, message) } + +// RootSerializable is the root level in-process representation of a message +type RootSerializable interface { + // Serialize returns the on-the-wire representation of the message + // Message -> IRootSerialized -> ByteString + Serialize() RootSerialized +} + +// RootSerialized is the root level on-the-wire representation of a message +type RootSerialized interface { + //Deserialize returns the in-process representation of a message + // ByteString -> IRootSerialized -> Message + Deserialize() RootSerializable +}