Skip to content

Commit

Permalink
Replace throughput related methods with specific MessageCounter
Browse files Browse the repository at this point in the history
  • Loading branch information
smasset committed Dec 31, 2012
1 parent 04de434 commit 05bc0a0
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 65 deletions.
3 changes: 3 additions & 0 deletions src/main/java/org/graylog2/Core.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class Core implements GraylogServer {
public static final String GRAYLOG2_VERSION = "0.10.0-rc.2";

public static final String MASTER_COUNTER_NAME = "master";

public static final String THROUGHPUT_COUNTER_NAME = "throughput";

private int lastReceivedMessageTimestamp = 0;

Expand Down Expand Up @@ -153,6 +155,7 @@ public void initialize(Configuration configuration) {

messageCounterManager = new MessageCounterManagerImpl();
messageCounterManager.register(MASTER_COUNTER_NAME);
messageCounterManager.register(THROUGHPUT_COUNTER_NAME);

hostCounterCache = new HostCounterCacheImpl();

Expand Down
36 changes: 0 additions & 36 deletions src/main/java/org/graylog2/MessageCounterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ public final class MessageCounterImpl implements MessageCounter {
private final Map<String, Integer> streams = Maps.newConcurrentMap();
private final Map<String, Integer> hosts = Maps.newConcurrentMap();

private int throughput = 0;
private int highestThroughput = 0;

public int getTotalCount() {
return this.total;
}
Expand All @@ -53,14 +50,6 @@ public Map<String, Integer> getHostCounts() {
return this.hosts;
}

public int getThroughput() {
return this.throughput;
}

public int getHighestThroughput() {
return this.highestThroughput;
}

public void resetAllCounts() {
this.resetTotal();
this.resetStreamCounts();
Expand All @@ -79,24 +68,13 @@ public void resetTotal() {
this.total = 0;
}

public void resetThroughput() {
this.throughput = 0;
}

/**
* Increment total count by 1.
*/
public void incrementTotal() {
this.countUpTotal(1);
}

/**
* Increment five second throughput by 1.
*/
public void incrementThroughput() {
this.countUpThroughput(1);
}

/**
* Count up the total count.
*
Expand All @@ -106,20 +84,6 @@ public void countUpTotal(final int x) {
this.total += x;
}

/**
* Counts up the five second througput counter which is handled and reset by
* the ServerValueWriterThread.
*
* @param x The value to add on top of five second throuput.
*/
public void countUpThroughput(final int x) {
this.throughput += x;

if (this.throughput > this.highestThroughput) {
this.highestThroughput = this.throughput;
}
}

/**
* Increment stream count by 1.
*
Expand Down
12 changes: 3 additions & 9 deletions src/main/java/org/graylog2/ServerValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,8 @@

package org.graylog2;

import org.graylog2.plugin.Tools;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.graylog2.buffers.BufferWatermark;
import org.graylog2.plugin.alarms.transports.Transport;
import org.graylog2.plugin.Tools;

/**
* Filling the server_values collection
Expand Down Expand Up @@ -66,8 +60,8 @@ public void setLocalHostname(String hostname) {
set("local_hostname", hostname);
}

public void writeThroughput(int current, int highest) {
graylogServer.getMongoBridge().writeThroughput(graylogServer.getServerId(), current, highest);
public void writeThroughput(int current) {
graylogServer.getMongoBridge().writeThroughput(graylogServer.getServerId(), current);
}

public void writeBufferWatermarks(BufferWatermark outputBuffer, BufferWatermark processBuffer) {
Expand Down
28 changes: 17 additions & 11 deletions src/main/java/org/graylog2/database/MongoBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,27 @@ public void upsertHostCount(String hostname, int add) {
}
}

public void writeThroughput(String serverId, int current, int highest) {
BasicDBObject query = new BasicDBObject();
query.put("server_id", serverId);
query.put("type", "total_throughput");
public void writeThroughput(String serverId, int current) {
BasicDBObject totalQuery = new BasicDBObject();
totalQuery.put("server_id", serverId);
totalQuery.put("type", "total_throughput");

BasicDBObject update = new BasicDBObject();
update.put("server_id", serverId);
update.put("type", "total_throughput");
update.put("current", current);
update.put("highest", highest);
BasicDBObject totalUpdate = new BasicDBObject();
totalUpdate.put("$set", new BasicDBObject("current", current));

BasicDBObject highestQuery = new BasicDBObject();
highestQuery.put("server_id", serverId);
highestQuery.put("type", "total_throughput");
highestQuery.put("highest", new BasicDBObject("$lt", current));

BasicDBObject highestUpdate = new BasicDBObject();
highestUpdate.put("$set", new BasicDBObject("highest", current));

DBCollection coll = getConnection().getDatabase().getCollection("server_values");
coll.update(query, update, true, false);
coll.update(totalQuery, totalUpdate, true, false);
coll.update(highestQuery, highestUpdate, true, false);
}

public void writeBufferWatermarks(String serverId, BufferWatermark outputBuffer, BufferWatermark processBuffer) {
BasicDBObject query = new BasicDBObject();
query.put("server_id", serverId);
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/org/graylog2/filters/CounterUpdateFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ public boolean filter(LogMessage msg, GraylogServer server) {

// Increment all registered message counters.
for (MessageCounter counter : serverImpl.getMessageCounterManager().getAllCounters().values()) {
// Five second throughput for health page.
counter.incrementThroughput();

// Total count.
counter.incrementTotal();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@

package org.graylog2.periodical;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.graylog2.Core;
import org.graylog2.MessageCounterImpl;
import org.graylog2.plugin.MessageCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Periodically writes server values to MongoDB.
Expand Down Expand Up @@ -54,9 +53,9 @@ public void run() {
graylogServer.getServerValues().ping();

// Current throughput.
MessageCounter c = this.graylogServer.getMessageCounterManager().get(Core.MASTER_COUNTER_NAME);
graylogServer.getServerValues().writeThroughput(c.getThroughput(), c.getHighestThroughput());
c.resetThroughput(); // Reset five second throughput count.
MessageCounter c = this.graylogServer.getMessageCounterManager().get(Core.THROUGHPUT_COUNTER_NAME);
graylogServer.getServerValues().writeThroughput(c.getTotalCount());
c.resetTotal(); // Reset five second throughput count.

/*
* Message queue size is written in BulkIndexerThread. More about the
Expand Down

0 comments on commit 05bc0a0

Please sign in to comment.