Skip to content

Commit

Permalink
Merge pull request #341 from hashmapinc/Tempus-221
Browse files Browse the repository at this point in the history
Tempus 221
  • Loading branch information
cherrera2001 authored May 18, 2018
2 parents 0ef31fb + bba0edf commit ae023d2
Show file tree
Hide file tree
Showing 50 changed files with 1,653 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.hashmapinc.server.common.data.plugin.ComponentLifecycleEvent;
import com.hashmapinc.server.controller.plugin.PluginWebSocketMsgEndpoint;
import com.hashmapinc.server.dao.TagMetaData.TagMetaDataService;
import com.hashmapinc.server.dao.application.ApplicationService;
import com.hashmapinc.server.dao.attributes.AttributesService;
import com.hashmapinc.server.dao.cluster.NodeMetricService;
Expand Down Expand Up @@ -113,6 +114,9 @@ public class ActorSystemContext {
@Autowired
@Getter private AttributesService attributesService;

@Autowired
@Getter private TagMetaDataService tagMetaDataService;

@Autowired
@Getter private EventService eventService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.hashmapinc.server.common.data.Customer;
import com.hashmapinc.server.common.data.Device;
import com.hashmapinc.server.common.data.*;
import com.hashmapinc.server.common.data.asset.Asset;
import com.hashmapinc.server.common.data.audit.ActionType;
import com.hashmapinc.server.common.data.id.*;
Expand All @@ -34,8 +33,6 @@
import com.hashmapinc.server.extensions.api.plugins.msg.*;
import com.hashmapinc.server.extensions.api.plugins.rpc.PluginRpcMsg;
import lombok.extern.slf4j.Slf4j;
import com.hashmapinc.server.common.data.EntityType;
import com.hashmapinc.server.common.data.Tenant;
import com.hashmapinc.server.common.data.plugin.PluginMetaData;
import com.hashmapinc.server.common.data.relation.EntityRelation;
import com.hashmapinc.server.common.data.rule.RuleMetaData;
Expand Down Expand Up @@ -182,21 +179,19 @@ public void saveTsData(final TenantId tenantId, final EntityId entityId, final L

@Override
public void saveDsData(final EntityId entityId, final List<DsKvEntry> entries, long ttl, final PluginCallback<Void> callback) {
log.debug(" here in saveDsData");
validate(entityId, new ValidationCallback(callback, ctx -> {
log.debug(" inside validate");
ListenableFuture<List<Void>> rsListFuture = pluginCtx.dsService.save(entityId, entries, ttl);
ListenableFuture<List<DsKvEntry>> rsGetListFuture = pluginCtx.dsService.findAllLatest(entityId);
try {
List<DsKvEntry> list = rsGetListFuture.get();
log.debug(" DsKvList " + list);
}catch (Exception e){
log.error(" exception " + e);
}
Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
}));
}

@Override
public void saveTagMetaData(EntityId entityId, TagMetaData tagMetaData, final PluginCallback<Void> callback) {
validate(entityId, new ValidationCallback(callback, ctx -> {
ListenableFuture<List<Void>> rsListFuture = pluginCtx.tagMetaDataService.saveTagMetaData(tagMetaData);
Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
}));
}

@Override
public void loadTimeseries(final EntityId entityId, final List<TsKvQuery> queries, final PluginCallback<List<TsKvEntry>> callback) {
Expand Down Expand Up @@ -276,6 +271,18 @@ public void loadLatestTimeseries(final EntityId entityId, final Collection<Strin
}));
}

@Override
public void loadLatestTimeseries(final EntityId entityId, final List<KvEntry> kvEntries, final PluginCallback<List<TsKvEntry>> callback) {
Set<String> keys = new HashSet<>();
for (KvEntry kv : kvEntries) {
keys.add(kv.getKey());
}
validate(entityId, new ValidationCallback(callback, ctx -> {
ListenableFuture<List<TsKvEntry>> rsListFuture = pluginCtx.tsService.findLatest(entityId, keys);
Futures.addCallback(rsListFuture, getCallback(callback, v -> v), executor);
}));
}

@Override
public void loadLatestDepthSeries(final EntityId entityId, final PluginCallback<List<DsKvEntry>> callback) {
validate(entityId, new ValidationCallback(callback, ctx -> {
Expand All @@ -292,6 +299,18 @@ public void loadLatestDepthSeries(final EntityId entityId, final Collection<Stri
}));
}

@Override
public void loadLatestDepthSeries(final EntityId entityId, final List<KvEntry> kvEntries, final PluginCallback<List<DsKvEntry>> callback) {
Set<String> keys = new HashSet<>();
for (KvEntry kv : kvEntries) {
keys.add(kv.getKey());
}
validate(entityId, new ValidationCallback(callback, ctx -> {
ListenableFuture<List<DsKvEntry>> rsListFuture = pluginCtx.dsService.findLatest(entityId, keys);
Futures.addCallback(rsListFuture, getCallback(callback, v -> v), executor);
}));
}


@Override
public void reply(PluginToRuleMsg<?> msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import akka.actor.ActorRef;
import com.hashmapinc.server.common.data.id.PluginId;
import com.hashmapinc.server.controller.plugin.PluginWebSocketMsgEndpoint;
import com.hashmapinc.server.dao.TagMetaData.TagMetaDataService;
import com.hashmapinc.server.dao.attributes.AttributesService;
import com.hashmapinc.server.extensions.api.device.DeviceTelemetryEventNotificationMsg;
import com.hashmapinc.server.extensions.api.plugins.msg.TimeoutMsg;
Expand Down Expand Up @@ -62,6 +63,7 @@ public final class SharedPluginProcessingContext {
final TimeseriesService tsService;
final DepthSeriesService dsService;
final AttributesService attributesService;
final TagMetaDataService tagMetaDataService;
final ClusterRpcService rpcService;
final ClusterRoutingService routingService;
final RelationService relationService;
Expand All @@ -81,6 +83,7 @@ public SharedPluginProcessingContext(ActorSystemContext sysContext, TenantId ten
this.tsService = sysContext.getTsService();
this.dsService = sysContext.getDsService();
this.attributesService = sysContext.getAttributesService();
this.tagMetaDataService = sysContext.getTagMetaDataService();
this.assetService = sysContext.getAssetService();
this.deviceService = sysContext.getDeviceService();
this.rpcService = sysContext.getRpcService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,10 @@ public void testBasicPostWithSyncSession() throws Exception {
Thread.sleep(1000);
actorService.process(new BasicToDeviceActorSessionMsg(device, msg));

// Check that device data was saved to DB;
List<TsKvEntry> expected = new ArrayList<>();
expected.add(new BasicTsKvEntry(ts, entry1));
expected.add(new BasicTsKvEntry(ts, entry2));
verify(tsService, Mockito.timeout(5000)).save(deviceId, expected, 0L);
Set<String> keys = new HashSet<>();
keys.add("key1");
keys.add("key2");
verify(tsService, Mockito.timeout(5000)).findLatest(deviceId, keys);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void testDeleteApplicationAndRelatedEntities() throws Exception {
rule1.setName("My Rule1");
rule1.setPluginToken(tenantPlugin.getApiToken());
rule1.setFilters(mapper.readTree("[{\"configuration\":{\"deviceTypes\":[{\"name\":\"Motor\"},{\"name\":\"Pump\"}]},\"clazz\":\"com.hashmapinc.server.extensions.core.filter.DeviceTypeFilter\",\"name\":\"jetinder\"},{\"configuration\":{\"deviceTypes\":[{\"name\":\"Well\"},{}]},\"clazz\":\"com.hashmapinc.server.extensions.core.filter.DeviceTypeFilter\",\"name\":\"F2\"},{\"configuration\":{\"methodNames\":[{\"name\":\"sdsdsdsdsdsdsd\"}]},\"clazz\":\"com.hashmapinc.server.extensions.core.filter.MethodNameFilter\",\"name\":\"sdsdsdsdsdsdsdsdsd\"}]"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
RuleMetaData savedRule1 = doPost("/api/rule", rule1, RuleMetaData.class);

doGet("/api/rule/"+savedRule1.getId().getId().toString()).andExpect(status().isOk());
Expand Down Expand Up @@ -517,7 +517,7 @@ public void testAssignRulesToApplication() throws Exception{
rule1.setName("My Rule1");
rule1.setPluginToken(tenantPlugin.getApiToken());
rule1.setFilters(mapper.readTree("[{\"configuration\":{\"deviceTypes\":[{\"name\":\"Motor\"},{\"name\":\"Pump\"}]},\"clazz\":\"com.hashmapinc.server.extensions.core.filter.DeviceTypeFilter\",\"name\":\"jetinder\"},{\"configuration\":{\"deviceTypes\":[{\"name\":\"Well\"},{}]},\"clazz\":\"com.hashmapinc.server.extensions.core.filter.DeviceTypeFilter\",\"name\":\"F2\"},{\"configuration\":{\"methodNames\":[{\"name\":\"sdsdsdsdsdsdsd\"}]},\"clazz\":\"com.hashmapinc.server.extensions.core.filter.MethodNameFilter\",\"name\":\"sdsdsdsdsdsdsdsdsd\"}]"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
RuleMetaData savedRule1 = doPost("/api/rule", rule1, RuleMetaData.class);

RuleMetaData rule2 = new RuleMetaData();
Expand All @@ -526,7 +526,7 @@ public void testAssignRulesToApplication() throws Exception{
rule2.setFilters(mapper.readTree("[{\"clazz\":\"com.hashmapinc.server.extensions.core.filter.MsgTypeFilter\", " +
"\"name\":\"TelemetryFilter\", " +
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]"));
rule2.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule2.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
RuleMetaData savedRule2 = doPost("/api/rule", rule2, RuleMetaData.class);

ApplicationFieldsWrapper applicationRulesWrapper = new ApplicationFieldsWrapper();
Expand Down Expand Up @@ -555,15 +555,15 @@ public void testUnassignRulesToApplication() throws Exception{
rule1.setPluginToken(tenantPlugin.getApiToken());
rule1.setFilters(mapper.readTree("[{\"configuration\": {\"deviceTypes\": [{\"name\": \"Motor\"}, {\"name\": \"Pump\"}]},\"clazz\": \"com.hashmapinc.server.extensions.core.filter.DeviceTypeFilter\",\"name\": \"jetinder\"\n" +
"}, {\"configuration\": {\"methodNames\": [{\"name\": \"sdsdsdsdsdsdsd\"}]},\"clazz\": \"com.hashmapinc.server.extensions.core.filter.MethodNameFilter\",\"name\": \"sdsdsdsdsdsdsdsdsd\"}]"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
RuleMetaData savedRule1 = doPost("/api/rule", rule1, RuleMetaData.class);

RuleMetaData rule2 = new RuleMetaData();
rule2.setName("My Rule2");
rule2.setPluginToken(tenantPlugin.getApiToken());
rule2.setFilters(mapper.readTree("[{\"configuration\": {\"deviceTypes\": [{\"name\": \"Well\"}]},\"clazz\":\"com.hashmapinc.server.extensions.core.filter.DeviceTypeFilter\",\"name\": \"jetinder\"}, {\"configuration\": {\"methodNames\": [{\"name\": \"sdsdsdsdsdsdsd\"}]},\"clazz\":\"com.hashmapinc.server.extensions.core.filter.MethodNameFilter\",\"name\":\"sdsdsdsdsdsdsdsdsd\"}]"));
rule2.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule2.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule2.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
rule2.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
RuleMetaData savedRule2 = doPost("/api/rule", rule2, RuleMetaData.class);

ApplicationFieldsWrapper applicationRulesWrapper = new ApplicationFieldsWrapper();
Expand Down Expand Up @@ -654,7 +654,7 @@ public void findApplicationsByruleId() throws Exception {
rule1.setFilters(mapper.readTree("[{\"clazz\":\"com.hashmapinc.server.extensions.core.filter.MsgTypeFilter\", " +
"\"name\":\"TelemetryFilter\", " +
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
RuleMetaData savedRule1 = doPost("/api/rule", rule1, RuleMetaData.class);

RuleMetaData rule2 = new RuleMetaData();
Expand All @@ -663,7 +663,7 @@ public void findApplicationsByruleId() throws Exception {
rule2.setFilters(mapper.readTree("[{\"clazz\":\"com.hashmapinc.server.extensions.core.filter.MsgTypeFilter\", " +
"\"name\":\"TelemetryFilter\", " +
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]"));
rule2.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule2.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
RuleMetaData savedRule2 = doPost("/api/rule", rule2, RuleMetaData.class);
JSONObject obj = new JSONObject(rule2.getFilters());

Expand All @@ -673,7 +673,7 @@ public void findApplicationsByruleId() throws Exception {
rule3.setFilters(mapper.readTree("[{\"clazz\":\"com.hashmapinc.server.extensions.core.filter.MsgTypeFilter\", " +
"\"name\":\"TelemetryFilter\", " +
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]"));
rule3.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule3.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
RuleMetaData savedRule3 = doPost("/api/rule", rule3, RuleMetaData.class);

ApplicationFieldsWrapper applicationRulesWrapper = new ApplicationFieldsWrapper();
Expand Down Expand Up @@ -765,7 +765,7 @@ public void testActivateApplicationFailure() throws Exception {
rule1.setFilters(mapper.readTree("[{\"clazz\":\"com.hashmapinc.server.extensions.core.filter.MsgTypeFilter\", " +
"\"name\":\"TelemetryFilter\", " +
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
RuleMetaData savedRule1 = doPost("/api/rule", rule1, RuleMetaData.class);


Expand Down Expand Up @@ -828,7 +828,7 @@ public void testActivateApplicationSuccess() throws Exception {
rule1.setFilters(mapper.readTree("[{\"clazz\":\"com.hashmapinc.server.extensions.core.filter.MsgTypeFilter\", " +
"\"name\":\"TelemetryFilter\", " +
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
rule1.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", \"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
RuleMetaData savedRule1 = doPost("/api/rule", rule1, RuleMetaData.class);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public static RuleMetaData createRuleMetaData(PluginMetaData plugin) throws IOEx
"\"name\":\"TelemetryFilter\", " +
"\"configuration\": {\"messageTypes\":[\"POST_TELEMETRY\",\"POST_ATTRIBUTES\",\"GET_ATTRIBUTES\"]}}]"));
rule.setAction(mapper.readTree("{\"clazz\":\"com.hashmapinc.server.extensions.core.action.telemetry.TelemetryPluginAction\", \"name\":\"TelemetryMsgConverterAction\", " +
"\"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1}}"));
"\"configuration\":{\"timeUnit\":\"DAYS\", \"ttlValue\":1, \"qualityTimeWindow\":60000, \"qualityDepthWindow\":3000}}"));
return rule;
}
}
Loading

0 comments on commit ae023d2

Please sign in to comment.