Skip to content
Snippets Groups Projects
Commit ccc29ce1 authored by Lars Erik Blümke's avatar Lars Erik Blümke
Browse files

migration of AbstractLogReplayer and JMSLogReplayer

parent 68cb9a42
No related branches found
No related tags found
No related merge requests found
/***************************************************************************
* Copyright 2015 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kieker.tools.logReplayer; package kieker.tools.logReplayer;
import kieker.analysis.AnalysisController; import java.util.concurrent.TimeUnit;
import kieker.analysis.IAnalysisController;
import kieker.analysis.exception.AnalysisConfigurationException;
import kieker.analysis.plugin.AbstractPlugin;
import kieker.analysis.plugin.filter.record.RealtimeRecordDelayFilter; import kieker.analysis.plugin.filter.record.RealtimeRecordDelayFilter;
import kieker.analysis.plugin.filter.select.TimestampFilter; import kieker.analysis.plugin.filter.select.timestamp.TimestampFilter;
import kieker.analysis.plugin.reader.AbstractReaderPlugin; import kieker.analysis.plugin.reader.AbstractReader;
import kieker.common.configuration.Configuration;
import kieker.common.logging.Log; import kieker.common.logging.Log;
import kieker.common.logging.LogFactory; import kieker.common.logging.LogFactory;
import kieker.monitoring.core.configuration.ConfigurationFactory;
import kieker.tools.logReplayer.filter.MonitoringRecordLoggerFilter; import teetime.framework.Configuration;
import teetime.framework.Execution;
import teetime.stage.logReplayer.filter.MonitoringRecordLoggerFilter;
/** /**
* Replays a monitoring log to a {@link kieker.monitoring.core.controller.IMonitoringController} with a given {@link Configuration}. * Replays a monitoring log to a {@link kieker.monitoring.core.controller.IMonitoringController} with a given {@link Configuration}.
* The {@link AbstractLogReplayer} can filter by timestamp and replay in real-time. * The {@link AbstractLogReplayer} can filter by timestamp and replay in real-time.
* *
* @author Andre van Hoorn * @author Andre van Hoorn, Lars Erik Bluemke
* *
* @since 1.6 * @since 1.6
*/ */
...@@ -50,37 +33,34 @@ public abstract class AbstractLogReplayer { ...@@ -50,37 +33,34 @@ public abstract class AbstractLogReplayer {
private final String monitoringConfigurationFile; private final String monitoringConfigurationFile;
private final boolean realtimeMode; private final boolean realtimeMode;
private final TimeUnit realtimeTimeunit;
private final double realtimeAccelerationFactor; private final double realtimeAccelerationFactor;
private final boolean keepOriginalLoggingTimestamps; private final long realtimeWarnNegativeSchedTime;
private final int numRealtimeWorkerThreads;
/** /**
* @param monitoringConfigurationFile * @param monitoringConfigurationFile
* The name of the {@code monitoring.properties} file. * The name of the {@code monitoring.properties} file.
* @param realtimeMode * @param realtimeMode
* Determines whether to use real time mode or not. * Determines whether to use real time mode or not.
* @param realtimeTimeunit
* The time unit to be used in realtime mode.
* @param realtimeAccelerationFactor * @param realtimeAccelerationFactor
* Determines whether to accelerate (value > 1.0) or slow down (<1.0) the replay in realtime mode by the given factor. * Determines whether to accelerate (value > 1.0) or slow down (<1.0) the replay in realtime mode by the given factor.
* Choose a value of 1.0 for "real" realtime mode (i.e., no acceleration/slow down) * Choose a value of 1.0 for "real" realtime mode (i.e., no acceleration/slow down)
* @param keepOriginalLoggingTimestamps * @param realtimeWarnNegativeSchedTime
* Determines whether the original logging timestamps will be used of whether the timestamps will be modified. * A time bound to configure a warning when a record is forwarded too late in realtime mode.
* @param numRealtimeWorkerThreads
* Determines how many realtime worker threads should be used.
* @param ignoreRecordsBeforeTimestamp * @param ignoreRecordsBeforeTimestamp
* The lower limit for the time stamps of the records. * The lower limit for the time stamps of the records.
* @param ignoreRecordsAfterTimestamp * @param ignoreRecordsAfterTimestamp
* The upper limit for the time stamps of the records. * The upper limit for the time stamps of the records.
*/ */
public AbstractLogReplayer(final String monitoringConfigurationFile, final boolean realtimeMode, final double realtimeAccelerationFactor, public AbstractLogReplayer(final String monitoringConfigurationFile, final boolean realtimeMode, final TimeUnit realtimeTimeunit,
final boolean keepOriginalLoggingTimestamps, final int numRealtimeWorkerThreads, final long ignoreRecordsBeforeTimestamp, final double realtimeAccelerationFactor, final long realtimeWarnNegativeSchedTime, final long ignoreRecordsBeforeTimestamp,
final long ignoreRecordsAfterTimestamp) { final long ignoreRecordsAfterTimestamp) {
this.realtimeMode = realtimeMode; this.realtimeMode = realtimeMode;
this.realtimeTimeunit = realtimeTimeunit;
this.realtimeAccelerationFactor = realtimeAccelerationFactor; // ignored if realtimeMode == false this.realtimeAccelerationFactor = realtimeAccelerationFactor; // ignored if realtimeMode == false
this.keepOriginalLoggingTimestamps = keepOriginalLoggingTimestamps; this.realtimeWarnNegativeSchedTime = realtimeWarnNegativeSchedTime; // ignored if realtimeMode == false
this.numRealtimeWorkerThreads = numRealtimeWorkerThreads;
if (this.numRealtimeWorkerThreads <= 0) {
LOG.warn("numRealtimeWorkerThreads == " + numRealtimeWorkerThreads);
}
this.ignoreRecordsBeforeTimestamp = ignoreRecordsBeforeTimestamp; this.ignoreRecordsBeforeTimestamp = ignoreRecordsBeforeTimestamp;
this.ignoreRecordsAfterTimestamp = ignoreRecordsAfterTimestamp; this.ignoreRecordsAfterTimestamp = ignoreRecordsAfterTimestamp;
this.monitoringConfigurationFile = monitoringConfigurationFile; this.monitoringConfigurationFile = monitoringConfigurationFile;
...@@ -91,111 +71,46 @@ public abstract class AbstractLogReplayer { ...@@ -91,111 +71,46 @@ public abstract class AbstractLogReplayer {
/** /**
* Replays the monitoring log terminates after the last record was passed to the configured {@link kieker.monitoring.core.controller.IMonitoringController}. * Replays the monitoring log terminates after the last record was passed to the configured {@link kieker.monitoring.core.controller.IMonitoringController}.
*
* @return true on success; false otherwise
*/ */
public boolean replay() { public void replay() {
boolean success = true; LogReplayerConfiguration configuration;
try {
final IAnalysisController analysisInstance = new AnalysisController(); AbstractReader reader = createReader();
MonitoringRecordLoggerFilter recordLogger = new MonitoringRecordLoggerFilter(null); // MonitoringRecordLoggerFilter was migrated but still takes a Kieker
// Initializing the reader // Configuration as argument which is not used anymore.
final AbstractReaderPlugin reader = this.createReader(analysisInstance); // entweder jms oder filesystem reader (je nachdem ob die konkrete Klasse ein
// FilesystemLogReplayer oder ein JMSLogReplayer ist if (isAtLeastOneTimestampGiven() && !realtimeMode) {
TimestampFilter timestampFilter = new TimestampFilter(ignoreRecordsBeforeTimestamp, ignoreRecordsAfterTimestamp);
// These two variables will be updated while plugging together the configuration configuration = new LogReplayerConfiguration(reader, timestampFilter, recordLogger);
AbstractPlugin lastFilter = reader; } else if (!isAtLeastOneTimestampGiven() && realtimeMode) {
String lastOutputPortName = this.readerOutputPortName(); RealtimeRecordDelayFilter realtimeRecordDelayFilter = new RealtimeRecordDelayFilter(realtimeTimeunit, realtimeAccelerationFactor,
realtimeWarnNegativeSchedTime);
// (Potentially) initializing the timestamp filter configuration = new LogReplayerConfiguration(reader, realtimeRecordDelayFilter, recordLogger);
{ // NOCS (nested Block) } else if (isAtLeastOneTimestampGiven() && realtimeMode) {
final Configuration timestampFilterConfiguration = new Configuration(); TimestampFilter timestampFilter = new TimestampFilter(ignoreRecordsBeforeTimestamp, ignoreRecordsAfterTimestamp);
RealtimeRecordDelayFilter realtimeRecordDelayFilter = new RealtimeRecordDelayFilter(realtimeTimeunit, realtimeAccelerationFactor,
boolean atLeastOneTimestampGiven = false; // zeigt an, dass mindestens eins von upper bzw. lower bound gültig sind realtimeWarnNegativeSchedTime);
if (this.ignoreRecordsBeforeTimestamp > MIN_TIMESTAMP) { configuration = new LogReplayerConfiguration(reader, timestampFilter, realtimeRecordDelayFilter, recordLogger);
// wenn lower bound timestamp > 0, } else {
atLeastOneTimestampGiven = true; configuration = new LogReplayerConfiguration(reader, recordLogger);
// meinen lower bound timestamp an TimestampFilter weitergeben
timestampFilterConfiguration.setProperty(TimestampFilter.CONFIG_PROPERTY_NAME_IGNORE_BEFORE_TIMESTAMP,
Long.toString(this.ignoreRecordsBeforeTimestamp));
}
if (this.ignoreRecordsAfterTimestamp < MAX_TIMESTAMP) {
// analog zu voherigem if
atLeastOneTimestampGiven = true;
timestampFilterConfiguration.setProperty(TimestampFilter.CONFIG_PROPERTY_NAME_IGNORE_AFTER_TIMESTAMP,
Long.toString(this.ignoreRecordsAfterTimestamp));
}
// wenn sinnvolle upper/lower timestamps gegeben, wird der oben initialisierte reader mit einem timestamp filter verbunden.
// der lässt dann nur records mit timestamps innerhalb der grenzen durch.
if (atLeastOneTimestampGiven) {
final TimestampFilter timestampFilter = new TimestampFilter(timestampFilterConfiguration, analysisInstance);
// --------------------- reader ---- output des readers
analysisInstance.connect(lastFilter, lastOutputPortName, timestampFilter, TimestampFilter.INPUT_PORT_NAME_ANY_RECORD);
lastFilter = timestampFilter;
lastOutputPortName = TimestampFilter.OUTPUT_PORT_NAME_WITHIN_PERIOD;
} else { // NOCS NOPMD (EmptyIfStmt)
// nothing to do; lastFilter and lastOutputPortName keep their values
}
}
// (Potentially) initializing delay filter
// erstellt wenn gewünscht realtime record delay filter, um records zeitlich an ihrem timestamps orientiert weiterzuleiten
if (this.realtimeMode) {
final Configuration delayFilterConfiguration = new Configuration();
delayFilterConfiguration.setProperty(RealtimeRecordDelayFilter.CONFIG_PROPERTY_NAME_NUM_WORKERS, Integer.toString(this.numRealtimeWorkerThreads));
delayFilterConfiguration.setProperty(RealtimeRecordDelayFilter.CONFIG_PROPERTY_NAME_ACCELERATION_FACTOR,
Double.toString(this.realtimeAccelerationFactor));
final RealtimeRecordDelayFilter rtFilter = new RealtimeRecordDelayFilter(delayFilterConfiguration, analysisInstance);
// --------------------- timestamp filter
// oder auch ----------- reader (wenn im letzten Schritt kein timestamp filter erstellt wurde)
analysisInstance.connect(lastFilter, lastOutputPortName, rtFilter, RealtimeRecordDelayFilter.INPUT_PORT_NAME_RECORDS);
lastFilter = rtFilter;
lastOutputPortName = RealtimeRecordDelayFilter.OUTPUT_PORT_NAME_RECORDS;
}
// And finally, we'll add the MonitoringRecordLoggerFilter
final Configuration recordLoggerConfig = new Configuration();
if (this.monitoringConfigurationFile != null) {
recordLoggerConfig.setProperty(MonitoringRecordLoggerFilter.CONFIG_PROPERTY_NAME_MONITORING_PROPS_FN, this.monitoringConfigurationFile);
}
recordLoggerConfig.setProperty(
ConfigurationFactory.AUTO_SET_LOGGINGTSTAMP,
Boolean.toString(!this.keepOriginalLoggingTimestamps));
final MonitoringRecordLoggerFilter recordLogger = new MonitoringRecordLoggerFilter(recordLoggerConfig, analysisInstance);
// --------------------- realtime record delay filter
// oder ---------------- timestamp filter
// oder ---------------- reader
analysisInstance.connect(lastFilter, lastOutputPortName, recordLogger, MonitoringRecordLoggerFilter.INPUT_PORT_NAME_RECORD);
analysisInstance.run();
} catch (final IllegalStateException e) {
LOG.error("An error occurred while replaying", e);
success = false;
} catch (final AnalysisConfigurationException e) {
LOG.error("An error occurred while replaying", e);
success = false;
} }
return success;
Execution<LogReplayerConfiguration> execution = new Execution<LogReplayerConfiguration>(configuration);
execution.executeBlocking();
} }
/** private boolean isAtLeastOneTimestampGiven() {
* Implementing classes returns the name of the reader's output port which provides the {@link kieker.common.record.IMonitoringRecord}s from the monitoring log. if (ignoreRecordsBeforeTimestamp > MIN_TIMESTAMP || ignoreRecordsAfterTimestamp < MAX_TIMESTAMP) {
* return true;
* @return The name of the reader's output port. } else {
*/ return false;
protected abstract String readerOutputPortName(); }
}
/** /**
* Implementing classes return the reader to be used for reading the monitoring log. * Implementing classes return the reader to be used for reading the monitoring log.
* *
* @param analysisInstance
* The analysis controller which will be the parent of the reader.
*
* @return The reader which can be used to read the monitoring log. * @return The reader which can be used to read the monitoring log.
*/ */
protected abstract AbstractReaderPlugin createReader(final IAnalysisController analysisInstance); protected abstract AbstractReader createReader();
} }
/***************************************************************************
* Copyright 2015 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kieker.tools.logReplayer;
import java.util.concurrent.TimeUnit;
import kieker.analysis.plugin.reader.AbstractReader;
import kieker.analysis.plugin.reader.jms.JMSReader;
import kieker.common.logging.LogFactory;
/**
* An implementation of the {@link AbstractLogReplayer}, using the {@link JMSReader} to replay {@link kieker.common.record.IMonitoringRecord}s from a JMS queue.
*
* @author Andre van Hoorn
*
* @since 1.3
*/
public class JMSLogReplayer extends AbstractLogReplayer {
private final String jmsProviderUrl;
private final String jmsDestination;
private final String jmsFactoryLookupName;
/**
* Creates a new JMS log replayer.
*
* @param monitoringConfigurationFile
* The name of the {@code monitoring.properties} file.
* @param realtimeMode
* Determines whether to use real time mode or not.
* @param realtimeTimeunit
* The time unit to be used in realtime mode.
* @param realtimeAccelerationFactor
* Determines whether to accelerate (value > 1.0) or slow down (<1.0) the replay in realtime mode by the given factor.
* Choose a value of 1.0 for "real" realtime mode (i.e., no acceleration/slow down)
* @param realtimeWarnNegativeSchedTime
* A time bound to configure a warning when a record is forwarded too late in realtime mode.
* @param ignoreRecordsBeforeTimestamp
* The lower limit for the time stamps of the records.
* @param ignoreRecordsAfterTimestamp
* The upper limit for the time stamps of the records.
* @param jmsProviderUrl
* = for instance "tcp://127.0.0.1:3035/"
* @param jmsDestination
* = for instance "queue1"
* @param jmsFactoryLookupName
* = for instance "org.exolab.jms.jndi.InitialContextFactory" (OpenJMS)
* @param monitoringConfigurationFile
* The path of the monitoring.properties file.
*/
public JMSLogReplayer(final String monitoringConfigurationFile, final boolean realtimeMode, final TimeUnit realtimeTimeunit,
final double realtimeAccelerationFactor, final long realtimeWarnNegativeSchedTime, final long ignoreRecordsBeforeTimestamp,
final long ignoreRecordsAfterTimestamp, final String jmsProviderUrl, final String jmsDestination, final String jmsFactoryLookupName) {
super(monitoringConfigurationFile, realtimeMode, realtimeTimeunit, realtimeAccelerationFactor, realtimeWarnNegativeSchedTime, ignoreRecordsBeforeTimestamp,
ignoreRecordsAfterTimestamp);
this.jmsProviderUrl = jmsProviderUrl;
this.jmsDestination = jmsDestination;
this.jmsFactoryLookupName = jmsFactoryLookupName;
}
/**
* {@inheritDoc}
*/
@Override
protected AbstractReader createReader() {
return new JMSReader(jmsProviderUrl, jmsDestination, jmsFactoryLookupName, LogFactory.getLog(JMSReader.class));
}
}
package kieker.tools.logReplayer;
import kieker.analysis.plugin.filter.record.RealtimeRecordDelayFilter;
import kieker.analysis.plugin.filter.select.timestamp.TimestampFilter;
import kieker.analysis.plugin.reader.AbstractReader;
import teetime.framework.Configuration;
import teetime.stage.logReplayer.filter.MonitoringRecordLoggerFilter;
public class LogReplayerConfiguration extends Configuration {
public LogReplayerConfiguration(final AbstractReader reader, final MonitoringRecordLoggerFilter recordLogger) {
connectPorts(reader.getOutputPort(), recordLogger.getInputPort());
}
public LogReplayerConfiguration(final AbstractReader reader, final TimestampFilter timestampFilter, final MonitoringRecordLoggerFilter recordLogger) {
connectPorts(reader.getOutputPort(), timestampFilter.getMonitoringRecordsCombinedInputPort());
connectPorts(timestampFilter.getRecordsWithinTimePeriodOutputPort(), recordLogger.getInputPort());
}
public LogReplayerConfiguration(final AbstractReader reader, final RealtimeRecordDelayFilter realtimeRecordDelayFilter,
final MonitoringRecordLoggerFilter recordLogger) {
connectPorts(reader.getOutputPort(), realtimeRecordDelayFilter.getInputPort());
connectPorts(realtimeRecordDelayFilter.getOutputPort(), recordLogger.getInputPort());
}
public LogReplayerConfiguration(final AbstractReader reader, final TimestampFilter timestampFilter, final RealtimeRecordDelayFilter realtimeRecordDelayFilter,
final MonitoringRecordLoggerFilter recordLogger) {
connectPorts(reader.getOutputPort(), timestampFilter.getMonitoringRecordsCombinedInputPort());
connectPorts(timestampFilter.getRecordsWithinTimePeriodOutputPort(), realtimeRecordDelayFilter.getInputPort());
connectPorts(realtimeRecordDelayFilter.getOutputPort(), recordLogger.getInputPort());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment