Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature add subscriber service for creating/dropping subscriptions #4375

Merged
merged 1 commit into from
Oct 15, 2015

Conversation

nathanielc
Copy link
Contributor

Basic idea is to allow for forks or subscriptions of the data stream to be defined. A subscription is tied to database and retention policy and will send all data received to the subscription endpoint. Currently the only implementation is a UDP protocol. We do not want to create back-pressure within InfluxDB hence UDP.

Usage:

The following query creates a subscription called sub0 that forks all incoming data to database mydb and policy default to the hosts localhost:9090 and remotehost:9090 in a round robin fashion. If you want to send all data to both local and remote hosts create two subscriptions.

CREATE SUBSCRIPTION sub0 ON mydb.default HOSTS 'localhost:9090', 'remotehost:9090' TYPE 'udp'

The following query drops the subscriptions:

DROP SUBSCRIPTION sub0 ON mydb.default

The following query lists all subscriptions:

SHOW SUBSCRIPTIONS

These subscriptions are stored in the raft meta data so the whole cluster will fork its data at each node respectively.

TODO:

Need to update the QL docs and write some basic dos on how to use subcriptions.
Detect new or deleted subscriptions and start/stop them.

@@ -193,6 +198,13 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
case *influxql.ShowStatsStatement, *influxql.ShowDiagnosticsStatement:
// Send monitor-related queries to the monitor service.
res = q.MonitorStatementExecutor.ExecuteStatement(stmt)
case *influxql.CreateSubscriptionStatement, *influxql.DropSubscriptionStatement:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to trigger an event on the Subscriber service whenever a subscription is added or removed.

But I am realizing this is not the right place to tell the Subscriber to update, as only one server in a cluster will actually execute the query. Should I add a Subscriber to the MetaStore instead and have it perform this update after a save? Not entirely sure how to do this part. @pauldix

@jwilder
Copy link
Contributor

jwilder commented Oct 9, 2015

Instead of:

CREATE SUBSCRIPTION sub0 ON mydb.default HOSTS 'localhost:9090', 'remotehost:9090' TYPE 'udp'

What about something like:

CREATE SUBSCRIPTION sub0 ON mydb.default DESTINATIONS 'udp://localhost:9090', 'udp://remotehost:9090' 

I think just a TYPE 'udp' may not be flexible enough to add other types of endpoints in the future or have much flexibility to secure endpoints with authentication creds, etc..

URLs are pretty flexible for identifying the type of endpoint, it's location, specifying options, etc.. which might make it easier to expand without changing the query language each time.

For example:

CREATE SUBSCRIPTION sub0 ON mydb.default DESTINATIONS 'influxdb://datacenter2:8086?db=foo&u=&p=', 'riemann://remotehost: 5555', 'kafka://..., 

@otoolep
Copy link
Contributor

otoolep commented Oct 9, 2015

I like the URL idea.

@jwilder
Copy link
Contributor

jwilder commented Oct 9, 2015

Also, this functionality seems like it should have many of the same capabilities as the hinted handoff (durability, queue messages for failed writes, batching, back-off, node removal, etc..). Any reason not to leverage some of that underlying infrastructure?

@pauldix
Copy link
Member

pauldix commented Oct 9, 2015

@jwilder the goal for this is that it would never cause InfluxDB to back up writes. We didn't want to open up the possibility of causing InfluxDB to go down if the external system did.

Although it could be done in the future. Just doesn't make sense with UDP.

@pauldix
Copy link
Member

pauldix commented Oct 9, 2015

I like the URL syntax that @jwilder suggested

@nathanielc
Copy link
Contributor Author

+1 on the URL scheme.

Two questions:

  • How could we distinguish between multiple destinations and redundant destinations?

I might want to send data to any one of the destinations or to all of them. The use case for sending to just one is you have a backend that can scale and so you would like to balance across them, i.e. round robin balancing.

Something like this:

CREATE SUBSCRIPTION sub0 ON mydb.default DESTINATIONS ANY 'udp://localhost:9090', 'udp://remotehost:9090'

or

CREATE SUBSCRIPTION sub0 ON mydb.default DESTINATIONS ALL 'udp://localhost:9090', 'udp://remotehost:9090', 'kafka://....'

  • As for the HH idea. We were initially thinking that this should be very lightweight and if you drop data not a big deal. Rather keep the InfluxDB server happy then apply back pressure into it. And if you wanted to do something more complex then do it outside of the DB. @pauldix Thoughts? Reasons for using HH for subscriptions vs keeping it simple?

@pauldix
Copy link
Member

pauldix commented Oct 9, 2015

@nathanielc I don't think the HH stuff makes sense with this initial case since it's UDP. I'd vote to keep it simple for now.

@jwilder
Copy link
Contributor

jwilder commented Oct 9, 2015

@pauldix @nathanielc For UDP transport, I agree, HH-like functionality might not be that useful since you would never know if the receiver got the write. I was thinking more about if we did need a reliable transport for a subscription. For example, a s3 subscription that backs up all writes to S3 or a http endpoint where an application wants to ensure it always receives the post. For those use-cases, the subscriber transport would need to durably queue the writes and retry if the receiver failed. If would definitely need to shed load (drop new writes or expire old ones) if the queues filled up though. Seems like those concerns could be addressed in that specific subscriber implementation though.

@nathanielc
Copy link
Contributor Author

@jwilder I like the idea of each subscriber handling the concerns of reliability.

### CREATE SUBSCRIPTION

```
create_subscription_stmt = "CREATE SUBSCRIPTION" subscription_name "ON" db_name "." retention_policy "DESTINATIONS" ("ANY"|"ALL") host { "," host} .
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this EBNF syntax correct? Not sure the ("ANY"|"ALL") part is.

Should it be like this?

create_subscription_stmt = "CREATE SUBSCRIPTION" subscription_name "ON" db_name "." retention_policy "DESTINATIONS" all_any host { "," host} .

all_any = "ALL" | "ANY" 

@nathanielc nathanielc force-pushed the subscriptions branch 2 times, most recently from a6f7640 to f79e1e5 Compare October 9, 2015 19:28
}
writers[i] = w
tags := map[string]string{"database": se.db, "retention_policy": se.rp, "name": se.name, "mode": si.Mode}
statMaps[i] = influxdb.NewStatistics(dest, "subscriber", tags)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@otoolep Does this look good for the stats? I didn't add it to the UDP object since here we can tag the stats with more useful information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does dest look like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its the URL specified in the query 'udp://host:port' should be unique to a given subscription. Does it need to be globally unique, as in all statMaps?

Looking at NewStatistics code now...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so key is global to all statMaps, I updated it to be unique now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best to be sure you're happy with the stats is fire up the system,
generate some activity and the execute the query 'SHOW STATS' at the CLI.
How does it look?

On Fri, Oct 9, 2015 at 1:04 PM, Nathaniel Cook notifications@github.com
wrote:

In services/subscriber/service.go
#4375 (comment):

  •               return fmt.Errorf("unknown balance mode %q", si.Mode)
    
  •           }
    
  •           writers := make([]PointsWriter, len(si.Destinations))
    
  •           statMaps := make([]*expvar.Map, len(writers))
    
  •           for i, dest := range si.Destinations {
    
  •               u, err := url.Parse(dest)
    
  •               if err != nil {
    
  •                   return err
    
  •               }
    
  •               w, err := s.NewPointsWriter(*u)
    
  •               if err != nil {
    
  •                   return err
    
  •               }
    
  •               writers[i] = w
    
  •               tags := map[string]string{"database": se.db, "retention_policy": se.rp, "name": se.name, "mode": si.Mode}
    
  •               statMaps[i] = influxdb.NewStatistics(dest, "subscriber", tags)
    

Ok, so key is global to all statMaps, I updated it to be unique now.


Reply to this email directly or view it on GitHub
https://github.com/influxdb/influxdb/pull/4375/files#r41672288.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good:


name: subscriber
----------------
points_written  write_failures
628             14


name: subscriber
tags: database=telegraf, destination=udp://localhost:9093, mode=ALL, name=k, retention_policy=default
points_written  write_failures
--------------  --------------
342             14

Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you feel better already? :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely!
I am going to have to get monitoring setup on my side of things too soon. That was really easy and nice to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember the stats are going straight back into the _internal database, so
you can query there too, for historical insight.

On Fri, Oct 9, 2015 at 2:37 PM, Nathaniel Cook notifications@github.com
wrote:

In services/subscriber/service.go
#4375 (comment):

  •               return fmt.Errorf("unknown balance mode %q", si.Mode)
    
  •           }
    
  •           writers := make([]PointsWriter, len(si.Destinations))
    
  •           statMaps := make([]*expvar.Map, len(writers))
    
  •           for i, dest := range si.Destinations {
    
  •               u, err := url.Parse(dest)
    
  •               if err != nil {
    
  •                   return err
    
  •               }
    
  •               w, err := s.NewPointsWriter(*u)
    
  •               if err != nil {
    
  •                   return err
    
  •               }
    
  •               writers[i] = w
    
  •               tags := map[string]string{"database": se.db, "retention_policy": se.rp, "name": se.name, "mode": si.Mode}
    
  •               statMaps[i] = influxdb.NewStatistics(dest, "subscriber", tags)
    

Absolutely!
I am going to have to get monitoring setup on my side of things too soon.
That was really easy and nice to use.


Reply to this email directly or view it on GitHub
https://github.com/influxdb/influxdb/pull/4375/files#r41680738.

@nathanielc nathanielc force-pushed the subscriptions branch 4 times, most recently from 262f5e9 to fb4486d Compare October 9, 2015 21:19
}
}

// Ensure the store can delete a continuous query.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And further on too.

@nathanielc nathanielc force-pushed the subscriptions branch 6 times, most recently from 185e58a to d4b1dcf Compare October 14, 2015 19:43
var lastErr error
for range b.writers {
i := b.i
w := b.writers[i]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of the modulo stuff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module code rotates through destinations, otherwise each write would always try the first destination first and it would get a disproportionate amount of data.

@otoolep
Copy link
Contributor

otoolep commented Oct 14, 2015

LGTM, one question.

@@ -45,6 +45,9 @@ bind-address = ":4444"
[monitoring]
enabled = true

[subscriber]
enabled = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably use true since the default for a bool is false.

@jwilder
Copy link
Contributor

jwilder commented Oct 14, 2015

LGTM. 👍

nathanielc pushed a commit that referenced this pull request Oct 15, 2015
Feature add subscriber service for creating/dropping subscriptions
@nathanielc nathanielc merged commit cb1aaa8 into master Oct 15, 2015
@nathanielc nathanielc deleted the subscriptions branch October 15, 2015 15:17
@manishjain002
Copy link

I have a use case, please see if this make sense in implementing in "Subscribe".
I have an application for monitoring of my sensors data that's getting logged. Before "Subscribe" , I had only one way, i.e. Pull the data from the DB server using "polling". But with subscription, I get data Pushed.
But there is a problem, I am getting all the data that is getting stored in the DB, I would be interested in filtering data. What I want is "selective Subscription".

@beckettsean
Copy link
Contributor

@manishjain002 what you are describing is a feature request for Kapacitor. The repo for that project will be public as of tomorrow (Dec 1st) so I encourage you to open an issue there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants