-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature add subscriber service for creating/dropping subscriptions #4375
Conversation
d451a43
to
f9563bf
Compare
@@ -193,6 +198,13 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu | |||
case *influxql.ShowStatsStatement, *influxql.ShowDiagnosticsStatement: | |||
// Send monitor-related queries to the monitor service. | |||
res = q.MonitorStatementExecutor.ExecuteStatement(stmt) | |||
case *influxql.CreateSubscriptionStatement, *influxql.DropSubscriptionStatement: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to trigger an event on the Subscriber service whenever a subscription is added or removed.
But I am realizing this is not the right place to tell the Subscriber to update, as only one server in a cluster will actually execute the query. Should I add a Subscriber to the MetaStore instead and have it perform this update after a save? Not entirely sure how to do this part. @pauldix
0594c2f
to
32b914e
Compare
Instead of:
What about something like:
I think just a URLs are pretty flexible for identifying the type of endpoint, it's location, specifying options, etc.. which might make it easier to expand without changing the query language each time. For example:
|
I like the URL idea. |
Also, this functionality seems like it should have many of the same capabilities as the hinted handoff (durability, queue messages for failed writes, batching, back-off, node removal, etc..). Any reason not to leverage some of that underlying infrastructure? |
@jwilder the goal for this is that it would never cause InfluxDB to back up writes. We didn't want to open up the possibility of causing InfluxDB to go down if the external system did. Although it could be done in the future. Just doesn't make sense with UDP. |
I like the URL syntax that @jwilder suggested |
+1 on the URL scheme. Two questions:
I might want to send data to any one of the destinations or to all of them. The use case for sending to just one is you have a backend that can scale and so you would like to balance across them, i.e. round robin balancing. Something like this:
or
|
@nathanielc I don't think the HH stuff makes sense with this initial case since it's UDP. I'd vote to keep it simple for now. |
@pauldix @nathanielc For UDP transport, I agree, HH-like functionality might not be that useful since you would never know if the receiver got the write. I was thinking more about if we did need a reliable transport for a subscription. For example, a |
@jwilder I like the idea of each subscriber handling the concerns of reliability. |
32b914e
to
dc067df
Compare
### CREATE SUBSCRIPTION | ||
|
||
``` | ||
create_subscription_stmt = "CREATE SUBSCRIPTION" subscription_name "ON" db_name "." retention_policy "DESTINATIONS" ("ANY"|"ALL") host { "," host} . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this EBNF syntax correct? Not sure the ("ANY"|"ALL")
part is.
Should it be like this?
create_subscription_stmt = "CREATE SUBSCRIPTION" subscription_name "ON" db_name "." retention_policy "DESTINATIONS" all_any host { "," host} .
all_any = "ALL" | "ANY"
a6f7640
to
f79e1e5
Compare
} | ||
writers[i] = w | ||
tags := map[string]string{"database": se.db, "retention_policy": se.rp, "name": se.name, "mode": si.Mode} | ||
statMaps[i] = influxdb.NewStatistics(dest, "subscriber", tags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@otoolep Does this look good for the stats? I didn't add it to the UDP object since here we can tag the stats with more useful information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does dest
look like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its the URL specified in the query 'udp://host:port' should be unique to a given subscription. Does it need to be globally unique, as in all statMaps?
Looking at NewStatistics code now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so key is global to all statMaps, I updated it to be unique now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The best to be sure you're happy with the stats is fire up the system,
generate some activity and the execute the query 'SHOW STATS' at the CLI.
How does it look?
On Fri, Oct 9, 2015 at 1:04 PM, Nathaniel Cook notifications@github.com
wrote:
In services/subscriber/service.go
#4375 (comment):
return fmt.Errorf("unknown balance mode %q", si.Mode)
}
writers := make([]PointsWriter, len(si.Destinations))
statMaps := make([]*expvar.Map, len(writers))
for i, dest := range si.Destinations {
u, err := url.Parse(dest)
if err != nil {
return err
}
w, err := s.NewPointsWriter(*u)
if err != nil {
return err
}
writers[i] = w
tags := map[string]string{"database": se.db, "retention_policy": se.rp, "name": se.name, "mode": si.Mode}
statMaps[i] = influxdb.NewStatistics(dest, "subscriber", tags)
Ok, so key is global to all statMaps, I updated it to be unique now.
—
Reply to this email directly or view it on GitHub
https://github.com/influxdb/influxdb/pull/4375/files#r41672288.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good:
name: subscriber
----------------
points_written write_failures
628 14
name: subscriber
tags: database=telegraf, destination=udp://localhost:9093, mode=ALL, name=k, retention_policy=default
points_written write_failures
-------------- --------------
342 14
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you feel better already? :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely!
I am going to have to get monitoring setup on my side of things too soon. That was really easy and nice to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remember the stats are going straight back into the _internal database, so
you can query there too, for historical insight.
On Fri, Oct 9, 2015 at 2:37 PM, Nathaniel Cook notifications@github.com
wrote:
In services/subscriber/service.go
#4375 (comment):
return fmt.Errorf("unknown balance mode %q", si.Mode)
}
writers := make([]PointsWriter, len(si.Destinations))
statMaps := make([]*expvar.Map, len(writers))
for i, dest := range si.Destinations {
u, err := url.Parse(dest)
if err != nil {
return err
}
w, err := s.NewPointsWriter(*u)
if err != nil {
return err
}
writers[i] = w
tags := map[string]string{"database": se.db, "retention_policy": se.rp, "name": se.name, "mode": si.Mode}
statMaps[i] = influxdb.NewStatistics(dest, "subscriber", tags)
Absolutely!
I am going to have to get monitoring setup on my side of things too soon.
That was really easy and nice to use.—
Reply to this email directly or view it on GitHub
https://github.com/influxdb/influxdb/pull/4375/files#r41680738.
262f5e9
to
fb4486d
Compare
} | ||
} | ||
|
||
// Ensure the store can delete a continuous query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And further on too.
185e58a
to
d4b1dcf
Compare
var lastErr error | ||
for range b.writers { | ||
i := b.i | ||
w := b.writers[i] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the point of the modulo stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module code rotates through destinations, otherwise each write would always try the first destination first and it would get a disproportionate amount of data.
LGTM, one question. |
d4b1dcf
to
5cededa
Compare
@@ -45,6 +45,9 @@ bind-address = ":4444" | |||
[monitoring] | |||
enabled = true | |||
|
|||
[subscriber] | |||
enabled = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably use true
since the default for a bool is false
.
LGTM. 👍 |
InfluxDB data stream.
5cededa
to
8b31007
Compare
Feature add subscriber service for creating/dropping subscriptions
I have a use case, please see if this make sense in implementing in "Subscribe". |
@manishjain002 what you are describing is a feature request for Kapacitor. The repo for that project will be public as of tomorrow (Dec 1st) so I encourage you to open an issue there. |
Basic idea is to allow for forks or subscriptions of the data stream to be defined. A subscription is tied to database and retention policy and will send all data received to the subscription endpoint. Currently the only implementation is a UDP protocol. We do not want to create back-pressure within InfluxDB hence UDP.
Usage:
The following query creates a subscription called
sub0
that forks all incoming data to databasemydb
and policydefault
to the hostslocalhost:9090
andremotehost:9090
in a round robin fashion. If you want to send all data to both local and remote hosts create two subscriptions.CREATE SUBSCRIPTION sub0 ON mydb.default HOSTS 'localhost:9090', 'remotehost:9090' TYPE 'udp'
The following query drops the subscriptions:
DROP SUBSCRIPTION sub0 ON mydb.default
The following query lists all subscriptions:
SHOW SUBSCRIPTIONS
These subscriptions are stored in the raft meta data so the whole cluster will fork its data at each node respectively.
TODO:
Need to update the QL docs and write some basic dos on how to use subcriptions.Detect new or deleted subscriptions and start/stop them.