Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join web views #71

Merged
merged 3 commits into from
Dec 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions examples/monitoring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

var (
brokers = []string{"127.0.0.1:9092"}
brokers = []string{"localhost:9092"}
topic goka.Stream = "user-click"
group goka.Group = "mini-group"
)
Expand Down Expand Up @@ -64,7 +64,7 @@ func runEmitter() {

var i int
for range t.C {
key := fmt.Sprintf("user-%d", i%20)
key := fmt.Sprintf("user-%d", i%50)
value := fmt.Sprintf("%s", time.Now())
emitter.EmitSync(key, value)
i++
Expand Down Expand Up @@ -106,9 +106,44 @@ func runStatelessProcessor(monitor *monitor.Server) {
fmt.Println("Processor stopped without errors")
}
}

func runJoinProcessor(monitor *monitor.Server) {
g := goka.DefineGroup(group+"-join",
goka.Input(topic,
new(codec.String),
func(ctx goka.Context, msg interface{}) {
var u *user
if val := ctx.Value(); val != nil {
u = val.(*user)
} else {
u = new(user)
}

u.Clicks++
ctx.SetValue(u)
}),
goka.Persist(new(userCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
panic(err)
}

// attach the processor to the monitor
monitor.AttachProcessor(p)

err = p.Start()
if err != nil {
panic(err)
} else {
fmt.Println("Processor stopped without errors")
}
}

func runProcessor(monitor *monitor.Server, query *query.Server) {
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), process),
goka.Join(goka.GroupTable(goka.Group(string(group)+"-join")), new(codec.String)),
goka.Persist(new(userCodec)),
)
p, err := goka.NewProcessor(brokers, g)
Expand Down Expand Up @@ -162,5 +197,6 @@ func main() {
go runEmitter()
go runProcessor(monitorServer, queryServer)
go runStatelessProcessor(monitorServer)
go runJoinProcessor(monitorServer)
runView(root, monitorServer)
}
5 changes: 4 additions & 1 deletion kafka/simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ func (c *simpleConsumer) RemovePartition(topic string, partition int32) error {
return fmt.Errorf("%s/%d was not added", topic, partition)
}
delete(c.partitions, tp)
pc.AsyncClose()

if err := pc.Close(); err != nil {
return fmt.Errorf("error closing consumer for %s/%d: %v", topic, partition, err)
}

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion kafka/simple_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestSimpleConsumer_RemovePartition(t *testing.T) {
err = c.AddPartition(topic, partition, offset)
ensure.NotNil(t, err)

pc.EXPECT().AsyncClose()
pc.EXPECT().Close()
err = c.RemovePartition(topic, partition)
ensure.Nil(t, err)

Expand Down
2 changes: 1 addition & 1 deletion partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (p *partition) run() error {
switch ev := ev.(type) {
case *kafka.Message:
if ev.Topic == p.topic {
return fmt.Errorf("received message from group table topic after recovery")
return fmt.Errorf("received message from group table topic after recovery: %s", p.topic)
}

updates, err := p.process(newMessage(ev), p.st, &wg, p.stats)
Expand Down
32 changes: 16 additions & 16 deletions web/templates/bindata.go

Large diffs are not rendered by default.

99 changes: 89 additions & 10 deletions web/templates/monitor/details.go.html
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,46 @@ <h3>Table statistics</h3>
</tbody>
</table>
</div>
</div>

<script type="text/javascript">
{{if eq .renderType "processor"}}
<div class="panel panel-default">
<div class="panel panel-heading">
<h3>Co-Joined Tables</h3>
</div>
<div class="panel-body">
<table class="table table-striped">
<thead>
<tr>
<th title="partition number">Partition</th>
<th title="Topic being joined">Table</th>
<th title="Status of the processor">State</th>
<th title="Number of messages lagging behind HWM">Offset-Lag</th>
<th title="High water mark (offset of the next message being consumed)" rowspan="2">HWM</th>
<th title="Rate of messages written to local state per second">Write-Rate</th>
</tr>
</thead>
<tbody id="joinView">
</tbody>
</table>
</div>
</div>
{{end}}

<script type="text/javascript">

var lastPartition = d3.local();

var lastJoinStats = d3.local();
var statusMap = {0: "recovering", 1:"preparing", 2: "running"};

var renderPartitions = function(partitions){
var renderDetails = function(partitions){

var updatePartitionPanel = function(data){
var partitionId = data[0];
var stats = data[1];

var status = statusMap[stats.Table.Status];

if(stats.Table.Stalled){
status = "stalled";
}
Expand Down Expand Up @@ -87,7 +111,6 @@ <h3>Table statistics</h3>
};
}


var groupedInput = _.mergeWith(curStats.Input, lastStats.Input, valueDiff);
inputRate = _.chain(groupedInput).flatMap("Count").sum().value() / timeDiff;
inputBytes = _.chain(groupedInput).flatMap("Bytes").sum().value() / timeDiff;
Expand Down Expand Up @@ -116,22 +139,78 @@ <h3>Table statistics</h3>
var group = partitions.Partitions;
{{end}}

// Group Table
var groupList = _.toPairs(group).sort(function(a,b){return parseInt(a[0]) - parseInt(b[0]);});
// update, enter and remove data
var d = d3.select("#partitionView").selectAll(".partitionbox").data(groupList, function(d){ return d[0]; });
d.html(updatePartitionPanel);
d3.select("#partitionView").selectAll(".partitionbox").data(groupList, function(d){ return d[0]; }).enter().append("tr").classed("partitionbox", true).html(updatePartitionPanel);
d.exit().remove()

{{if eq .renderType "processor"}}
var updateJoinPanel = function(data){
var partitionId = data[0];
var stats = data[3];
var status = statusMap[stats.Table.Status];

if(stats.Table.Stalled){
status = "stalled";
}
var offsetLag = stats.Table.Hwm - stats.Table.Offset - 1;

var writeRate = 0;

// let's check if we have a previous dataset and set them to their
// correct values
var lastStats = lastJoinStats.get(this);

if(lastStats){
// copy the stats to avoid modifying it inplace
var curStats = JSON.parse(JSON.stringify(stats));
// diff to previous stats in seconds
var timeDiff = (new Date(stats.Now) - new Date(lastStats.Now)) / 1000.0;
writeRate = (stats.Table.Offset - lastStats.Table.Offset) / timeDiff;
}
lastJoinStats.set(this, stats);

return '<td>'+partitionId+'</td>\n'+
'<td>'+data[1]+'</td>\n'+
'<td>'+status+'</td>\n'+
'<td>'+offsetLag+'</td>\n'+
'<td>'+stats.Table.Hwm+'</td>\n'+
'<td>'+writeRate.toFixed(2)+'</td>\n';
};
// CoJoined Partitions
var joinList = _.flatMap(_.toPairs(partitions.Joined), function(value) {
return _.map(value[1], function(innerValue, key) {
var idKey = new String(value[0]).concat(key);
return [parseInt(value[0]), key, idKey, innerValue];
});
}).sort(function(a, b) {
if(a[0] != b[0]) {
return a[0] - b[0];
}
return a[1].localeCompare(b[1]);
});

// update, enter and remove data
var d = d3.select("#joinView").selectAll(".partitionbox").data(joinList, function(d){ return d[2]; });
d.html(updateJoinPanel);
d3.select("#joinView").selectAll(".partitionbox").data(joinList, function(d){ return d[2]; }).enter().append("tr").classed("partitionbox", true).html(updateJoinPanel);
d.exit().remove();

{{end}} // go template end if the component is no processor

};

window.setInterval(function(){
d3.json("{{.base_path}}/data/{{.renderType}}/{{.vars.idx}}", renderPartitions);
window.setInterval(function() {
d3.json("{{.base_path}}/data/{{.renderType}}/{{.vars.idx}}", renderDetails);
}, 2000);

// call it initially
d3.json("{{.base_path}}/data/{{.renderType}}/{{.vars.idx}}", renderPartitions);
d3.json("{{.base_path}}/data/{{.renderType}}/{{.vars.idx}}", renderDetails);

</script>
</div>
</div>
</div>
{{end}}