diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index f54df9c9a58f..645d7bf6b235 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -71,11 +71,13 @@ def create_recoverer(service, callback): clock, store, as_api, self.event_grouper, create_recoverer ) + @defer.inlineCallbacks def start(self): # check for any DOWN ASes and start recoverers for them. - _Recoverer.start( + recoverers = yield _Recoverer.start( self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered ) + self.txn_ctrl.add_recoverers(recoverers) self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): @@ -91,12 +93,34 @@ def __init__(self, service, id, events): self.events = events def send(self, as_api): - # TODO sends this transaction using this as_api - pass + """Sends this transaction using the provided AS API interface. + + Args: + as_api(ApplicationServiceApi): The API to use to send. + Returns: + A Deferred which resolves to True if the transaction was sent. + """ + return as_api.push_bulk( + service=self.service, + events=self.events, + txn_id=self.id + ) def complete(self, store): - # TODO increment txn id on AS and nuke txn contents from db - pass + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + Returns: + A Deferred which resolves to True if the transaction was completed. + """ + return store.complete_appservice_txn( + service=self.service, + txn_id=self.id + ) class _EventGrouper(object): @@ -125,6 +149,8 @@ def __init__(self, clock, store, as_api, event_grouper, recoverer_fn): self.as_api = as_api self.event_grouper = event_grouper self.recoverer_fn = recoverer_fn + # keep track of how many recoverers there are + self.recoverers = [] def start_polling(self): groups = self.event_grouper.drain_groups() @@ -144,6 +170,10 @@ def on_recovered(self, service): # TODO mark AS as UP pass + def add_recoverers(self, recoverers): + for r in recoverers: + self.recoverers.append(r) + def _start_recoverer(self, service): recoverer = self.recoverer_fn(service, self.on_recovered) recoverer.recover() @@ -161,9 +191,15 @@ def _store_txn(self, txn): class _Recoverer(object): @staticmethod + @defer.inlineCallbacks def start(clock, store, as_api, callback): - # TODO check for DOWN ASes and init recoverers - pass + services = yield store.get_failing_appservices() + recoverers = [ + _Recoverer(clock, store, as_api, s, callback) for s in services + ] + for r in recoverers: + r.recover() + defer.returnValue(recoverers) def __init__(self, clock, store, as_api, service, callback): self.clock = clock diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index e30265750ae2..c1762692b946 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -336,3 +336,31 @@ def _populate_cache(self): hs_token=service["hs_token"], sender=service["sender"] )) + + +class ApplicationServiceTransactionStore(SQLBaseStore): + + def __init__(self, hs): + super(ApplicationServiceTransactionStore, self).__init__(hs) + + def get_failing_appservices(self): + """Get a list of application services which are down. + + Returns: + A Deferred which resolves to a list of ApplicationServices, which + may be empty. + """ + pass + + def complete_appservice_txn(self, txn_id, service): + """Completes an application service transaction. + + Args: + txn_id(str): The transaction ID being completed. + service(ApplicationService): The application service which was sent + this transaction. + Returns: + A Deferred which resolves to True if this transaction was completed + successfully. + """ + pass