Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runner simplification refactoring #265

Merged
merged 21 commits into from
Jun 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ All notable changes to AET will be documented in this file.

- [PR-260](https://github.com/Cognifide/aet/pull/260) Upgrade to Karaf 4.2.0
- [PR-261](https://github.com/Cognifide/aet/pull/261) AET artifacts folders watched for new files

- [PR-265](https://github.com/Cognifide/aet/pull/265) Runner simplification refactor

## Version 2.1.5

Expand Down
8 changes: 0 additions & 8 deletions core/runner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-osgi</artifactId>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Thank you @Skejven !

Actually it looks like guice is referenced also in worker module, but is not used at all there (or am I wrong?).
So we could remove Guice from the worker pom and then also from top-level pom, isn't it?

Then we could remove guice from feature.xml file. That would b great, right?

(there will be only some transitive dependencies to guice from our functional tests that use Bobcat).

Could be done here or in another pull request if you prefer...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @wiiitek for the review.
I will remove Guice dependency in the separate PR.

<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
Expand All @@ -81,10 +77,6 @@
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-assistedinject</artifactId>
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.commons.osgi</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.cognifide.aet.runner.util;
package com.cognifide.aet.runner;

import com.cognifide.aet.communication.api.exceptions.AETException;
import java.io.IOException;
Expand All @@ -34,9 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by tomasz.misiewicz on 2015-01-13.
*/
@Service(MessagesManager.class)
@Component(immediate = true, metatype = true, description = "AET Messages Manager", label = "AET Messages Manager")
public class MessagesManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,26 @@
* or implied. See the License for the specific language governing permissions and limitations under
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good changes. thanks!

* the License.
*/
package com.cognifide.aet.runner.main;

import com.cognifide.aet.communication.api.queues.JmsConnection;
import com.cognifide.aet.runner.distribution.RunnerMessageListener;
import com.cognifide.aet.runner.distribution.RunnerMode;
import com.cognifide.aet.runner.modules.SimpleRunnerModule;
import com.cognifide.aet.runner.util.MessagesManager;
import com.cognifide.aet.vs.MetadataDAO;
import com.google.inject.Guice;
import com.google.inject.Injector;
package com.cognifide.aet.runner;

import java.util.Map;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.PropertyOption;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Runner is an entry point for whole application, it main goal is to coordinate JMS communication
* between workers.
* between workers. This class contains all necessary configuration for the runner bundle.
*/
@Component(immediate = true, metatype = true, description = "AET Runner", label = "AET Runner")
public class Runner {
@Service(RunnerConfiguration.class)
@Component(metatype = true, description = "AET Runner Configuration", label = "AET Runner Configuration")
public class RunnerConfiguration {

private static final Logger LOG = LoggerFactory.getLogger(Runner.class);
private static final Logger LOGGER = LoggerFactory.getLogger(RunnerConfiguration.class);

private static final int DEFAULT_TASK_RUN_FAILURE_TIMEOUT_SECONDS = 120;

Expand All @@ -53,10 +44,12 @@ public class Runner {

private static final String PARAM_MAX_MESSAGES_IN_COLLECTOR_QUEUE = "maxMessagesInCollectorQueue";

private static final String RUNNER_MODE = "runnerMode";
private static final String PARAM_MAX_CONCURRENT_SUITES_PROCESSED_COUNT = "maxConcurrentSuitesCount";

private static final int DEFAULT_MAX_CONCURRENT_SUITES_PROCESSED_COUNT = 5;

@Property(name = PARAM_FAILURE_TIMEOUT, label = "Failure timeout", description =
"Time in seconds, test run will be interrupted if no response was received in duration of this parameter. Default: "
"Time in seconds, after which suite processing will be interrupted if no notification was received in duration of this parameter. Default: "
+ DEFAULT_TASK_RUN_FAILURE_TIMEOUT_SECONDS
+ " sec", longValue = DEFAULT_TASK_RUN_FAILURE_TIMEOUT_SECONDS)
private long ft = DEFAULT_TASK_RUN_FAILURE_TIMEOUT_SECONDS;
Expand Down Expand Up @@ -84,24 +77,14 @@ public class Runner {
+ " messages", intValue = DEFAULT_MAX_MESSAGES_IN_COLLECTOR_QUEUE)
private int maxMessagesInCollectorQueue = DEFAULT_MAX_MESSAGES_IN_COLLECTOR_QUEUE;

@Property(name = RUNNER_MODE, label = "Runner mode", options = {
@PropertyOption(name = "online", value = "online"),
@PropertyOption(name = "maintenance", value = "maintenance"),
@PropertyOption(name = "offline", value = "offline")}, value = "online", description = "Runner mode: online - listening to AET.runner-in queue only, maintenance - listening to AET.runner-in and AET.maintenance-in queues (running only from AET.maintenance-in queue), offline - listening only to AET-maintenance-in queue.")
private RunnerMode runnerMode;
@Reference
private JmsConnection jmsConnection;

@Reference
private MessagesManager messagesManager;

@Reference
private MetadataDAO metadataDAO;

private RunnerMessageListener messageListener;
@Property(name = PARAM_MAX_CONCURRENT_SUITES_PROCESSED_COUNT, label = "Max Concurrent Suites Count", description =
"Defines the maximum number of suites processed concurrently byt the Runner. Default: "
+ DEFAULT_MAX_CONCURRENT_SUITES_PROCESSED_COUNT
+ " messages", intValue = DEFAULT_MAX_CONCURRENT_SUITES_PROCESSED_COUNT)
private int maxConcurrentSuitesCount = DEFAULT_MAX_CONCURRENT_SUITES_PROCESSED_COUNT;

@Activate
public void activate(Map properties) {
public void activate(Map<String, String> properties) {
ft = PropertiesUtil.toLong(properties.get(PARAM_FAILURE_TIMEOUT),
DEFAULT_TASK_RUN_FAILURE_TIMEOUT_SECONDS);
mttl = PropertiesUtil
Expand All @@ -111,26 +94,52 @@ public void activate(Map properties) {
maxMessagesInCollectorQueue = PropertiesUtil.toInteger(
properties.get(PARAM_MAX_MESSAGES_IN_COLLECTOR_QUEUE),
DEFAULT_MAX_MESSAGES_IN_COLLECTOR_QUEUE);
maxConcurrentSuitesCount = PropertiesUtil.toInteger(
properties.get(PARAM_MAX_CONCURRENT_SUITES_PROCESSED_COUNT),
DEFAULT_MAX_CONCURRENT_SUITES_PROCESSED_COUNT);

runnerMode = RunnerMode.valueOf(PropertiesUtil.toString(properties.get(RUNNER_MODE), "online")
.toUpperCase());
LOGGER.info(
"Runner configured with parameters: [ft: {} sec ; mttl: {} ; urlPackageSize: {} ; maxMessagesInCollectorQueue: {}; maxConcurrentSuitesCount: {}.]",
ft, mttl, urlPackageSize, maxMessagesInCollectorQueue, maxConcurrentSuitesCount);
}

LOG.info(
"Running with parameters: [ft: {} sec ; mttl: {} ; urlPackageSize: {} ; maxMessagesInCollectorQueue: {}; runnerMode: {}.]",
ft, mttl, urlPackageSize, maxMessagesInCollectorQueue, runnerMode);

Injector injector = Guice
.createInjector(
new SimpleRunnerModule(ft, mttl, urlPackageSize, maxMessagesInCollectorQueue,
jmsConnection, messagesManager, runnerMode, metadataDAO));
messageListener = injector.getInstance(RunnerMessageListener.class);
/**
* @return time in seconds, test run will be interrupted if no response was received in duration
* of this parameter.
*/
public long getFt() {
return ft;
}

@Deactivate
public void deactivate() {
if (messageListener != null) {
messageListener.close();
}

/**
* @return time in seconds after which messages will be thrown out of queues.
*/
public long getMttl() {
return mttl;
}


/**
* @return the maximum amount of messages in the collector queue.
*/
public int getMaxMessagesInCollectorQueue() {
return maxMessagesInCollectorQueue;
}

/**
* @return how many links are being sent in one message. Each message is being processed by single
* CollectorListener.
*/
public int getUrlPackageSize() {
return urlPackageSize;
}

/**
* @return how many suites can be processed concurrently byt the Runner.
*/
public int getMaxConcurrentSuitesCount() {
return maxConcurrentSuitesCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,68 +13,83 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.cognifide.aet.runner.distribution;
package com.cognifide.aet.runner;

import com.cognifide.aet.communication.api.messages.BasicMessage;
import com.cognifide.aet.communication.api.messages.FatalErrorMessage;
import com.cognifide.aet.communication.api.messages.TaskMessage;
import com.cognifide.aet.communication.api.metadata.Suite;
import com.cognifide.aet.communication.api.queues.JmsConnection;
import com.cognifide.aet.queues.JmsUtils;
import com.cognifide.aet.vs.StorageException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.cognifide.aet.runner.scheduler.CollectorJobSchedulerService;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.commons.lang3.StringUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Listens to incoming runner queue messages. When message received, starts suite process.
* Listens to incoming runner queue messages. When message received, schedules suite processing.
*/
@Singleton
@Component(immediate = true, description = "Runner Messages Listener", label = "Runner Messages Listener")
public class RunnerMessageListener implements MessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(RunnerMessageListener.class);

private final String maintenanceDestinationName;
private static final String API_QUEUE_IN = MessagesManager
.createFullQueueName("runner-in");

private final TestRunProcessor testRunProcessor;
private static final String MAINTENANCE_QUEUE_IN = MessagesManager
.createFullQueueName("maintenance-in");

private final MessageConsumer inConsumer;
private MessageConsumer inConsumer;

private final MessageConsumer maintenanceConsumer;
private MessageConsumer maintenanceConsumer;

private final Session session;
private Session session;

@Inject
public RunnerMessageListener(JmsConnection jmsConnection,
@Named("API in") String inDestinationName,
@Named("MAINTENANCE in") String maintenanceDestinationName, TestRunProcessor testRunProcessor,
RunnerMode runnerMode) throws JMSException {
this.maintenanceDestinationName = maintenanceDestinationName;
this.testRunProcessor = testRunProcessor;
session = jmsConnection.getJmsSession();
if (runnerMode != RunnerMode.OFFLINE) {
LOGGER.info("Start listening on {} queue.", inDestinationName);
inConsumer = session.createConsumer(session.createQueue(inDestinationName));
@Reference
private JmsConnection jmsConnection;

@Reference
private RunnerConfiguration runnerConfiguration;

@Reference
private SuiteExecutorService suiteExecutorService;

@Reference
private CollectorJobSchedulerService collectorJobSchedulerService;


@Activate
public void activate(Map<String, String> properties) {

LOGGER.debug("Activating RunnerMessageListener");
try {
session = jmsConnection.getJmsSession();
LOGGER.info("Start listening on {} queue.", API_QUEUE_IN);
inConsumer = session.createConsumer(session.createQueue(API_QUEUE_IN));
inConsumer.setMessageListener(this);
} else {
inConsumer = null;
LOGGER.info("Start listening on {} queue.", MAINTENANCE_QUEUE_IN);
maintenanceConsumer = session.createConsumer(session.createQueue(MAINTENANCE_QUEUE_IN));
maintenanceConsumer.setMessageListener(this);
} catch (JMSException e) {
LOGGER.error("Failed to connect to messages broker ", e);
}
LOGGER.info("Start listening on {} queue.", maintenanceDestinationName);
maintenanceConsumer = session.createConsumer(session.createQueue(maintenanceDestinationName));
maintenanceConsumer.setMessageListener(this);
}

public void close() {
@Deactivate
public void deactivate() {
LOGGER.debug("Deactivating RunnerMessageListener");
JmsUtils.closeQuietly(maintenanceConsumer);
JmsUtils.closeQuietly(inConsumer);
JmsUtils.closeQuietly(session);
Expand All @@ -99,7 +114,7 @@ public void onMessage(Message wrapperMessage) {
case CANCEL:
//This step is not implemented on client
Suite suiteToCancel = (Suite) ((TaskMessage) message).getData();
cancelSuiteExecution(suiteToCancel.getCorrelationId());
collectorJobSchedulerService.cancel(suiteToCancel.getCorrelationId());
break;
default:
LOGGER.error("Unknown message type: {}!", message.getMessageType());
Expand All @@ -110,15 +125,10 @@ public void onMessage(Message wrapperMessage) {
private void processTestSuite(Message wrapperMessage, TaskMessage message) {
Suite suite = (Suite) message.getData();
try {
boolean isMaintenanceMessage = StringUtils.endsWith(
wrapperMessage.getJMSDestination().toString(), maintenanceDestinationName);
testRunProcessor.process(suite, wrapperMessage.getJMSReplyTo(), isMaintenanceMessage);
suiteExecutorService.scheduleSuite(suite, wrapperMessage.getJMSReplyTo());
} catch (JMSException e) {
LOGGER.error("Error wile processing RUN {}: ", suite.getCorrelationId(), e);
sendFatalMessage(wrapperMessage, e.getMessage());
} catch (StorageException e) {
LOGGER.error("Failed to process test suite", e);
sendFatalMessage(wrapperMessage, e.getMessage());
}
}

Expand All @@ -137,9 +147,4 @@ private void sendFatalMessage(Message wrapperMessage, String message) {
LOGGER.error("Fatal exception, can't deliver message {}!", message, e);
}
}

private void cancelSuiteExecution(String correlationID) {
testRunProcessor.cancel(correlationID);
}

}
Loading