diff --git a/cmd/relay/main.go b/cmd/relay/main.go index ac0d663..511eb7d 100644 --- a/cmd/relay/main.go +++ b/cmd/relay/main.go @@ -12,6 +12,7 @@ import ( "git.sr.ht/~whereswaldon/forest-go" "git.sr.ht/~whereswaldon/forest-go/grove" + "git.sr.ht/~whereswaldon/forest-go/store" sprout "git.sr.ht/~whereswaldon/sprout-go" "git.sr.ht/~whereswaldon/sprout-go/watch" ) @@ -71,7 +72,7 @@ and will establish Sprout connections to all addresses provided as arguments. if err != nil { log.Fatalf("Failed to create grove at %s: %v", *grovePath, err) } - messages := sprout.NewSubscriberStore(grove) + messages := store.NewArchive(grove) defer messages.Destroy() // track node ids of nodes that we've recently inserted into the grove so that diff --git a/subscriber_store.go b/subscriber_store.go deleted file mode 100644 index 7558841..0000000 --- a/subscriber_store.go +++ /dev/null @@ -1,239 +0,0 @@ -package sprout - -import ( - "git.sr.ht/~whereswaldon/forest-go" - "git.sr.ht/~whereswaldon/forest-go/fields" -) - -// Subscription is an identifier for a particular handler function within -// a SubscriberStore. It can be provided to delete a handler function or to -// suppress notifications to the corresponding handler. -type Subscription uint - -// the zero subscription is never used -const neverAssigned = 0 -const firstSubscription = 1 - -// SubscriberStore is a wrapper type that extends the forest.Store interface -// with the observer pattern. Code can subscribe for updates each time a -// node is inserted into the store using Add or AddAs. -type SubscriberStore struct { - store forest.Store - requests chan func() - nextSubscriberKey Subscription - postAddSubscribers, preAddSubscribers map[Subscription]func(forest.Node) -} - -var _ forest.Store = &SubscriberStore{} - -// NewMessageStore creates a thread-safe storage structure for -// forest nodes by wrapping an existing store implementation -func NewSubscriberStore(store forest.Store) *SubscriberStore { - m := &SubscriberStore{ - store: store, - requests: make(chan func()), - nextSubscriberKey: firstSubscription, - postAddSubscribers: make(map[Subscription]func(forest.Node)), - preAddSubscribers: make(map[Subscription]func(forest.Node)), - } - go func() { - for function := range m.requests { - function() - } - }() - return m -} - -// SubscribeToNewMessages establishes the given function as a handler to be -// invoked on each node added to the store. The returned subscription ID -// can be used to unsubscribe later, as well as to supress notifications -// with AddAs(). -// -// Handler functions are invoked synchronously on the same goroutine that invokes -// Add() or AddAs(), and should not block. If long-running code is needed in a -// handler, launch a new goroutine. -func (m *SubscriberStore) SubscribeToNewMessages(handler func(n forest.Node)) (subscriptionID Subscription) { - return m.subscribeInMap(m.postAddSubscribers, handler) -} - -// PresubscribeToNewMessages establishes the given function as a handler to be -// invoked on each node added to the store. The returned subscription ID -// can be used to unsubscribe later, as well as to supress notifications -// with AddAs(). The handler function will be invoked *before* nodes are -// inserted into the store instead of after (like a normal Subscribe). -// -// Handler functions are invoked synchronously on the same goroutine that invokes -// Add() or AddAs(), and should not block. If long-running code is needed in a -// handler, launch a new goroutine. -func (m *SubscriberStore) PresubscribeToNewMessages(handler func(n forest.Node)) (subscriptionID Subscription) { - return m.subscribeInMap(m.preAddSubscribers, handler) -} - -func (m *SubscriberStore) subscribeInMap(targetMap map[Subscription]func(forest.Node), handler func(n forest.Node)) (subscriptionID Subscription) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - subscriptionID = m.nextSubscriberKey - m.nextSubscriberKey++ - // handler unsigned overflow - // TODO: ensure subscription reuse can't occur - if m.nextSubscriberKey == neverAssigned { - m.nextSubscriberKey = firstSubscription - } - targetMap[subscriptionID] = handler - } - <-done - return -} - -// UnsubscribeToNewMessages removes the handler for a given subscription from -// the store. -func (m *SubscriberStore) UnsubscribeToNewMessages(subscriptionID Subscription) { - m.unsubscribeInMap(m.postAddSubscribers, subscriptionID) -} - -// UnpresubscribeToNewMessages removes the handler for a given subscription from -// the store. -func (m *SubscriberStore) UnpresubscribeToNewMessages(subscriptionID Subscription) { - m.unsubscribeInMap(m.preAddSubscribers, subscriptionID) -} - -func (m *SubscriberStore) unsubscribeInMap(targetMap map[Subscription]func(forest.Node), subscriptionID Subscription) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - if _, subscribed := targetMap[subscriptionID]; subscribed { - delete(targetMap, subscriptionID) - } - } - <-done - return -} - -func (m *SubscriberStore) CopyInto(s forest.Store) (err error) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - err = m.store.CopyInto(s) - } - <-done - return -} - -func (m *SubscriberStore) Get(id *fields.QualifiedHash) (node forest.Node, present bool, err error) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - node, present, err = m.store.Get(id) - } - <-done - return -} - -func (m *SubscriberStore) GetIdentity(id *fields.QualifiedHash) (node forest.Node, present bool, err error) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - node, present, err = m.store.GetIdentity(id) - } - <-done - return -} - -func (m *SubscriberStore) GetCommunity(id *fields.QualifiedHash) (node forest.Node, present bool, err error) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - node, present, err = m.store.GetCommunity(id) - } - <-done - return -} - -func (m *SubscriberStore) GetConversation(communityID, conversationID *fields.QualifiedHash) (node forest.Node, present bool, err error) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - node, present, err = m.store.GetConversation(communityID, conversationID) - } - <-done - return -} - -func (m *SubscriberStore) GetReply(communityID, conversationID, replyID *fields.QualifiedHash) (node forest.Node, present bool, err error) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - node, present, err = m.store.GetReply(communityID, conversationID, replyID) - } - <-done - return -} - -func (m *SubscriberStore) Children(id *fields.QualifiedHash) (ids []*fields.QualifiedHash, err error) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - ids, err = m.store.Children(id) - } - <-done - return -} - -func (m *SubscriberStore) Recent(nodeType fields.NodeType, quantity int) (nodes []forest.Node, err error) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - nodes, err = m.store.Recent(nodeType, quantity) - } - <-done - return -} - -// Add inserts a node into the underlying store. Importantly, this will send a notification -// of a new node to *all* subscribers. If the calling code is a subscriber, it will still -// be notified of the new node. To supress this, use AddAs() instead. -func (m *SubscriberStore) Add(node forest.Node) (err error) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - m.notifySubscribed(m.preAddSubscribers, node, neverAssigned) - if err = m.store.Add(node); err == nil { - m.notifySubscribed(m.postAddSubscribers, node, neverAssigned) - } - } - <-done - return -} - -// AddAs allows adding a node to the underlying store without being notified -// of it as a new node. The addedByID (subscription id returned from SubscribeToNewMessages) -// will not be notified of the new nodes, but all other subscribers will be. -func (m *SubscriberStore) AddAs(node forest.Node, addedByID Subscription) (err error) { - done := make(chan struct{}) - m.requests <- func() { - defer close(done) - m.notifySubscribed(m.preAddSubscribers, node, addedByID) - if err = m.store.Add(node); err == nil { - m.notifySubscribed(m.postAddSubscribers, node, addedByID) - } - } - <-done - return -} - -// notifySubscribed runs all of the subscription handlers in new goroutines with -// the provided node as input to each handler. -func (m *SubscriberStore) notifySubscribed(targetMap map[Subscription]func(forest.Node), node forest.Node, ignore Subscription) { - for subscriptionID, handler := range targetMap { - if subscriptionID != ignore { - handler(node) - } - } -} - -// Shut down the worker gorountine that powers this store. Subsequent -// calls to methods on this MessageStore have undefined behavior -func (m *SubscriberStore) Destroy() { - close(m.requests) -} diff --git a/supervisor.go b/supervisor.go index 82336c9..0d1b525 100644 --- a/supervisor.go +++ b/supervisor.go @@ -5,6 +5,8 @@ import ( "fmt" "log" "time" + + "git.sr.ht/~whereswaldon/forest-go/store" ) // LaunchSupervisedWorker launches a worker in a new goroutine that will @@ -13,7 +15,7 @@ import ( // `logger`. // // BUG(whereswaldon): this interface is experimental and likely to change. -func LaunchSupervisedWorker(done <-chan struct{}, addr string, store SubscribableStore, tlsConfig *tls.Config, logger *log.Logger) { +func LaunchSupervisedWorker(done <-chan struct{}, addr string, s store.ExtendedStore, tlsConfig *tls.Config, logger *log.Logger) { go func() { firstAttempt := true for { @@ -27,7 +29,7 @@ func LaunchSupervisedWorker(done <-chan struct{}, addr string, store Subscribabl logger.Printf("Failed to connect to %s: %v", addr, err) continue } - worker, err := NewWorker(done, conn, store) + worker, err := NewWorker(done, conn, s) if err != nil { logger.Printf("Failed launching worker to connect to address %s: %v", addr, err) continue diff --git a/worker.go b/worker.go index d8a7c26..3d8ff85 100644 --- a/worker.go +++ b/worker.go @@ -13,27 +13,20 @@ import ( "git.sr.ht/~whereswaldon/forest-go/store" ) -type SubscribableStore interface { - forest.Store - SubscribeToNewMessages(handler func(n forest.Node)) Subscription - UnsubscribeToNewMessages(Subscription) - AddAs(forest.Node, Subscription) (err error) -} - type Worker struct { Done <-chan struct{} DefaultTimeout time.Duration *Conn *log.Logger *Session - SubscribableStore - subscriptionID Subscription + SubscribableStore store.ExtendedStore + subscriptionID store.Subscription } -func NewWorker(done <-chan struct{}, conn net.Conn, store SubscribableStore) (*Worker, error) { +func NewWorker(done <-chan struct{}, conn net.Conn, s store.ExtendedStore) (*Worker, error) { w := &Worker{ Done: done, - SubscribableStore: store, + SubscribableStore: s, Logger: log.New(log.Writer(), "", log.LstdFlags|log.Lshortfile), DefaultTimeout: time.Minute, } @@ -280,7 +273,7 @@ func (c *Worker) IngestNode(node forest.Node) error { if err := ancestor.ValidateDeep(c.SubscribableStore); err != nil { return fmt.Errorf("validation failed for ancestor %s: %w", ancestor.ID(), err) } - if err := c.AddAs(ancestor, c.subscriptionID); err != nil { + if err := c.SubscribableStore.AddAs(ancestor, c.subscriptionID); err != nil { return fmt.Errorf("failed inserting ancestory %s into store: %w", ancestor.ID(), err) } } @@ -362,7 +355,7 @@ func (c *Worker) BootstrapLocalStore(maxCommunities int) { c.Printf("Couldn't fetch author information for node %s: %v", community.ID().String(), err) continue } - if err := c.AddAs(community, c.subscriptionID); err != nil { + if err := c.SubscribableStore.AddAs(community, c.subscriptionID); err != nil { c.Printf("Couldn't add community %s to store: %v", community.ID().String(), err) continue } @@ -404,7 +397,7 @@ func (c *Worker) synchronizeFullTree(root forest.Node, maxNodes int, perRequestT } c.Printf("Announced local leaves of %s to peer", root.ID()) for _, leaf := range leafList.Nodes { - if _, alreadyInStore, err := c.Get(leaf.ID()); err != nil { + if _, alreadyInStore, err := c.SubscribableStore.Get(leaf.ID()); err != nil { return fmt.Errorf("failed checking if we already have leaf node %s: %w", leaf.ID().String(), err) } else if alreadyInStore { continue @@ -424,7 +417,7 @@ func (c *Worker) synchronizeFullTree(root forest.Node, maxNodes int, perRequestT if err := ancestor.ValidateDeep(c.SubscribableStore); err != nil { return fmt.Errorf("couldn't validate node %s: %w", ancestor.ID().String(), err) } - if err := c.AddAs(ancestor, c.subscriptionID); err != nil { + if err := c.SubscribableStore.AddAs(ancestor, c.subscriptionID); err != nil { return fmt.Errorf("couldn't add node %s to store: %w", ancestor.ID().String(), err) } } @@ -450,7 +443,7 @@ func (c *Worker) ensureAuthorAvailable(node forest.Node, perRequestTimeout time. default: return fmt.Errorf("unsupported type in ensureAuthorAvailable: %T", n) } - _, inStore, err := c.GetIdentity(authorID) + _, inStore, err := c.SubscribableStore.GetIdentity(authorID) if err != nil { return fmt.Errorf("failed looking for author id %s in store: %w", authorID.String(), err) } @@ -468,7 +461,7 @@ func (c *Worker) ensureAuthorAvailable(node forest.Node, perRequestTimeout time. if err := author.ValidateDeep(c.SubscribableStore); err != nil { return fmt.Errorf("unable to validate author %s: %w", author.ID().String(), err) } - if err := c.AddAs(author, c.subscriptionID); err != nil { + if err := c.SubscribableStore.AddAs(author, c.subscriptionID); err != nil { return fmt.Errorf("failed inserting new valid author %s into store: %w", author.ID().String(), err) } return nil