Skip to content

Commit

Permalink
Merge pull request #71 from lovoo/join-web-views
Browse files Browse the repository at this point in the history
Join web views
  • Loading branch information
SamiHiltunen committed Dec 20, 2017
2 parents ca3dd7c + 9591977 commit 263656d
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 31 deletions.
40 changes: 38 additions & 2 deletions examples/monitoring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

var (
brokers = []string{"127.0.0.1:9092"}
brokers = []string{"localhost:9092"}
topic goka.Stream = "user-click"
group goka.Group = "mini-group"
)
Expand Down Expand Up @@ -64,7 +64,7 @@ func runEmitter() {

var i int
for range t.C {
key := fmt.Sprintf("user-%d", i%20)
key := fmt.Sprintf("user-%d", i%50)
value := fmt.Sprintf("%s", time.Now())
emitter.EmitSync(key, value)
i++
Expand Down Expand Up @@ -106,9 +106,44 @@ func runStatelessProcessor(monitor *monitor.Server) {
fmt.Println("Processor stopped without errors")
}
}

func runJoinProcessor(monitor *monitor.Server) {
g := goka.DefineGroup(group+"-join",
goka.Input(topic,
new(codec.String),
func(ctx goka.Context, msg interface{}) {
var u *user
if val := ctx.Value(); val != nil {
u = val.(*user)
} else {
u = new(user)
}

u.Clicks++
ctx.SetValue(u)
}),
goka.Persist(new(userCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
panic(err)
}

// attach the processor to the monitor
monitor.AttachProcessor(p)

err = p.Start()
if err != nil {
panic(err)
} else {
fmt.Println("Processor stopped without errors")
}
}

func runProcessor(monitor *monitor.Server, query *query.Server) {
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), process),
goka.Join(goka.GroupTable(goka.Group(string(group)+"-join")), new(codec.String)),
goka.Persist(new(userCodec)),
)
p, err := goka.NewProcessor(brokers, g)
Expand Down Expand Up @@ -162,5 +197,6 @@ func main() {
go runEmitter()
go runProcessor(monitorServer, queryServer)
go runStatelessProcessor(monitorServer)
go runJoinProcessor(monitorServer)
runView(root, monitorServer)
}
5 changes: 4 additions & 1 deletion kafka/simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ func (c *simpleConsumer) RemovePartition(topic string, partition int32) error {
return fmt.Errorf("%s/%d was not added", topic, partition)
}
delete(c.partitions, tp)
pc.AsyncClose()

if err := pc.Close(); err != nil {
return fmt.Errorf("error closing consumer for %s/%d: %v", topic, partition, err)
}

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion kafka/simple_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestSimpleConsumer_RemovePartition(t *testing.T) {
err = c.AddPartition(topic, partition, offset)
ensure.NotNil(t, err)

pc.EXPECT().AsyncClose()
pc.EXPECT().Close()
err = c.RemovePartition(topic, partition)
ensure.Nil(t, err)

Expand Down
2 changes: 1 addition & 1 deletion partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (p *partition) run() error {
switch ev := ev.(type) {
case *kafka.Message:
if ev.Topic == p.topic {
return fmt.Errorf("received message from group table topic after recovery")
return fmt.Errorf("received message from group table topic after recovery: %s", p.topic)
}

updates, err := p.process(newMessage(ev), p.st, &wg, p.stats)
Expand Down
32 changes: 16 additions & 16 deletions web/templates/bindata.go

Large diffs are not rendered by default.

99 changes: 89 additions & 10 deletions web/templates/monitor/details.go.html
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,46 @@ <h3>Table statistics</h3>
</tbody>
</table>
</div>
</div>

<script type="text/javascript">
{{if eq .renderType "processor"}}
<div class="panel panel-default">
<div class="panel panel-heading">
<h3>Co-Joined Tables</h3>
</div>
<div class="panel-body">
<table class="table table-striped">
<thead>
<tr>
<th title="partition number">Partition</th>
<th title="Topic being joined">Table</th>
<th title="Status of the processor">State</th>
<th title="Number of messages lagging behind HWM">Offset-Lag</th>
<th title="High water mark (offset of the next message being consumed)" rowspan="2">HWM</th>
<th title="Rate of messages written to local state per second">Write-Rate</th>
</tr>
</thead>
<tbody id="joinView">
</tbody>
</table>
</div>
</div>
{{end}}

<script type="text/javascript">

var lastPartition = d3.local();

var lastJoinStats = d3.local();
var statusMap = {0: "recovering", 1:"preparing", 2: "running"};

var renderPartitions = function(partitions){
var renderDetails = function(partitions){

var updatePartitionPanel = function(data){
var partitionId = data[0];
var stats = data[1];

var status = statusMap[stats.Table.Status];

if(stats.Table.Stalled){
status = "stalled";
}
Expand Down Expand Up @@ -87,7 +111,6 @@ <h3>Table statistics</h3>
};
}


var groupedInput = _.mergeWith(curStats.Input, lastStats.Input, valueDiff);
inputRate = _.chain(groupedInput).flatMap("Count").sum().value() / timeDiff;
inputBytes = _.chain(groupedInput).flatMap("Bytes").sum().value() / timeDiff;
Expand Down Expand Up @@ -116,22 +139,78 @@ <h3>Table statistics</h3>
var group = partitions.Partitions;
{{end}}

// Group Table
var groupList = _.toPairs(group).sort(function(a,b){return parseInt(a[0]) - parseInt(b[0]);});
// update, enter and remove data
var d = d3.select("#partitionView").selectAll(".partitionbox").data(groupList, function(d){ return d[0]; });
d.html(updatePartitionPanel);
d3.select("#partitionView").selectAll(".partitionbox").data(groupList, function(d){ return d[0]; }).enter().append("tr").classed("partitionbox", true).html(updatePartitionPanel);
d.exit().remove()

{{if eq .renderType "processor"}}
var updateJoinPanel = function(data){
var partitionId = data[0];
var stats = data[3];
var status = statusMap[stats.Table.Status];

if(stats.Table.Stalled){
status = "stalled";
}
var offsetLag = stats.Table.Hwm - stats.Table.Offset - 1;

var writeRate = 0;

// let's check if we have a previous dataset and set them to their
// correct values
var lastStats = lastJoinStats.get(this);

if(lastStats){
// copy the stats to avoid modifying it inplace
var curStats = JSON.parse(JSON.stringify(stats));
// diff to previous stats in seconds
var timeDiff = (new Date(stats.Now) - new Date(lastStats.Now)) / 1000.0;
writeRate = (stats.Table.Offset - lastStats.Table.Offset) / timeDiff;
}
lastJoinStats.set(this, stats);

return '<td>'+partitionId+'</td>\n'+
'<td>'+data[1]+'</td>\n'+
'<td>'+status+'</td>\n'+
'<td>'+offsetLag+'</td>\n'+
'<td>'+stats.Table.Hwm+'</td>\n'+
'<td>'+writeRate.toFixed(2)+'</td>\n';
};
// CoJoined Partitions
var joinList = _.flatMap(_.toPairs(partitions.Joined), function(value) {
return _.map(value[1], function(innerValue, key) {
var idKey = new String(value[0]).concat(key);
return [parseInt(value[0]), key, idKey, innerValue];
});
}).sort(function(a, b) {
if(a[0] != b[0]) {
return a[0] - b[0];
}
return a[1].localeCompare(b[1]);
});

// update, enter and remove data
var d = d3.select("#joinView").selectAll(".partitionbox").data(joinList, function(d){ return d[2]; });
d.html(updateJoinPanel);
d3.select("#joinView").selectAll(".partitionbox").data(joinList, function(d){ return d[2]; }).enter().append("tr").classed("partitionbox", true).html(updateJoinPanel);
d.exit().remove();

{{end}} // go template end if the component is no processor

};

window.setInterval(function(){
d3.json("{{.base_path}}/data/{{.renderType}}/{{.vars.idx}}", renderPartitions);
window.setInterval(function() {
d3.json("{{.base_path}}/data/{{.renderType}}/{{.vars.idx}}", renderDetails);
}, 2000);

// call it initially
d3.json("{{.base_path}}/data/{{.renderType}}/{{.vars.idx}}", renderPartitions);
d3.json("{{.base_path}}/data/{{.renderType}}/{{.vars.idx}}", renderDetails);

</script>
</div>
</div>
</div>
{{end}}

0 comments on commit 263656d

Please sign in to comment.