Skip to content
Snippets Groups Projects
Commit 229fe495 authored by Nils Christian Ehmke's avatar Nils Christian Ehmke
Browse files

Experiments

parent 09591c24
No related branches found
No related tags found
No related merge requests found
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kieker.analysis.stage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.reader.AbstractReaderPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.record.misc.TimestampRecord;
@Plugin(
description = "Delivers the current (system) time in regular intervals",
outputPorts = {
@OutputPort(name = TimeReader.OUTPUT_PORT_NAME_TIMESTAMPS, eventTypes = Long.class),
@OutputPort(name = TimeReader.OUTPUT_PORT_NAME_TIMESTAMP_RECORDS, eventTypes = TimestampRecord.class)
},
configuration = {
@Property(name = TimeReader.CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS, defaultValue = TimeReader.CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS,
description = "Determines the update interval in nano seconds."),
@Property(name = TimeReader.CONFIG_PROPERTY_NAME_DELAY_NS, defaultValue = TimeReader.CONFIG_PROPERTY_VALUE_DELAY_NS,
description = "Determines the initial delay in nano seconds."),
@Property(name = TimeReader.CONFIG_PROPERTY_NAME_NUMBER_IMPULSES, defaultValue = TimeReader.CONFIG_PROPERTY_VALUE_NUMBER_IMPULSES,
description = "Determines the number of impulses to emit (0 = infinite).")
})
public final class TimeReader extends AbstractReaderPlugin {
/** The name of the output port for the timestamps. */
public static final String OUTPUT_PORT_NAME_TIMESTAMPS = "timestamps";
/** The name of the output port for the timestamp records. */
public static final String OUTPUT_PORT_NAME_TIMESTAMP_RECORDS = "timestampRecords";
/** The name of the property determining the update interval in nanoseconds. */
public static final String CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS = "updateIntervalNS";
/** The default value for the update interval (1 second). */
public static final String CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS = "1000000000";
/** The name of the property determining the initial delay in nanoseconds. */
public static final String CONFIG_PROPERTY_NAME_DELAY_NS = "delayNS";
/** The default value for the initial delay (0 seconds). */
public static final String CONFIG_PROPERTY_VALUE_DELAY_NS = "0";
/** The name of the property determining the number of impulses to emit. */
public static final String CONFIG_PROPERTY_NAME_NUMBER_IMPULSES = "numberImpulses";
/** The default value for number of impulses (infinite). */
public static final String CONFIG_PROPERTY_VALUE_NUMBER_IMPULSES = "0";
/** A value for the number of impulses. It makes sure that the reader emits an infinite amount of signals. */
public static final long INFINITE_EMITS = 0L;
final CountDownLatch impulseEmitLatch = new CountDownLatch(1); // NOCS NOPMD (package visible)
private volatile boolean terminated;
private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
private volatile ScheduledFuture<?> result;
private final long initialDelay;
private final long period;
private final long numberImpulses;
/**
* Creates a new timer using the given configuration.
*
* @param configuration
* The configuration containing the properties to initialize this timer.
* @param projectContext
* The project context.
*/
public TimeReader(final Configuration configuration, final IProjectContext projectContext) {
super(configuration, projectContext);
this.initialDelay = configuration.getLongProperty(CONFIG_PROPERTY_NAME_DELAY_NS);
this.period = configuration.getLongProperty(CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS);
this.numberImpulses = configuration.getLongProperty(CONFIG_PROPERTY_NAME_NUMBER_IMPULSES);
}
/**
* {@inheritDoc}
*/
@Override
public void terminate(final boolean error) {
if (!this.terminated) {
this.log.info("Shutdown of TimeReader requested.");
this.executorService.shutdown();
try {
this.terminated = this.executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException ex) {
// ignore
}
if (!this.terminated && (this.result != null)) {
// problems shutting down
this.result.cancel(true);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean read() {
this.result = this.executorService.scheduleAtFixedRate(new TimestampEventTask(this.numberImpulses), this.initialDelay, this.period, TimeUnit.NANOSECONDS);
return true;
}
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration.setProperty(CONFIG_PROPERTY_NAME_DELAY_NS, Long.toString(this.initialDelay));
configuration.setProperty(CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS, Long.toString(this.period));
configuration.setProperty(CONFIG_PROPERTY_NAME_NUMBER_IMPULSES, Long.toString(this.numberImpulses));
return configuration;
}
/**
* Sends the current system time as a new timestamp event.
*/
protected void sendTimestampEvent() {
final long timestamp = super.recordsTimeUnitFromProjectContext.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
super.deliver(OUTPUT_PORT_NAME_TIMESTAMPS, timestamp);
super.deliver(OUTPUT_PORT_NAME_TIMESTAMP_RECORDS, new TimestampRecord(timestamp));
}
/**
* A simple helper class used to send the current system time.
*
* @author Nils Christian Ehmke
*
* @since 1.8
*/
protected class TimestampEventTask implements Runnable {
private final boolean infinite;
private volatile long numberImpulses;
/**
* Creates a new task.
*
* @param numberImpulses
* 0 = infinite
*/
public TimestampEventTask(final long numberImpulses) {
this.numberImpulses = numberImpulses;
if (numberImpulses == 0) {
this.infinite = true;
} else {
this.infinite = false;
}
}
/**
* Executes the task.
*/
@Override
public void run() {
if (this.infinite || (this.numberImpulses > 0)) {
TimeReader.this.sendTimestampEvent();
if (!this.infinite && (0 == --this.numberImpulses)) { // NOPMD
TimeReader.this.impulseEmitLatch.countDown();
}
}
}
}
}
package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import java.io.File;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import kieker.analysis.AnalysisController;
import kieker.analysis.IAnalysisController;
import kieker.analysis.IProjectContext;
import kieker.analysis.exception.AnalysisConfigurationException;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.analysis.plugin.filter.flow.EventRecordTraceReconstructionFilter;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.analysis.plugin.filter.forward.AnalysisThroughputFilter;
import kieker.analysis.plugin.filter.forward.CountingFilter;
import kieker.analysis.plugin.filter.forward.StringBufferFilter;
import kieker.analysis.plugin.filter.select.TypeFilter;
import kieker.analysis.plugin.reader.AbstractReaderPlugin;
import kieker.analysis.plugin.reader.filesystem.FSReader;
import kieker.analysis.stage.CacheFilter;
import kieker.analysis.stage.CollectorSink;
import kieker.analysis.stage.TimeReader;
import kieker.common.configuration.Configuration;
import kieker.common.record.flow.IFlowRecord;
public class KiekerTraceReconstructionAnalysis extends Analysis {
private final IAnalysisController analysisController = new AnalysisController();
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private final List<Long> throughputCollection = new LinkedList<Long>();
private CountingFilter recordCounter;
private CountingFilter traceCounter;
private AnalysisThroughputFilter throughputFilter;
private final File inputDir;
public KiekerTraceReconstructionAnalysis(final File inputDir) {
this.inputDir = inputDir;
}
@Override
public void init() {
super.init();
final Configuration clockConfiguration = new Configuration();
clockConfiguration.setProperty(TimeReader.CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS, Integer.toString(50 * 1000 * 1000));
final AbstractReaderPlugin clock = new TimeReader(clockConfiguration, this.analysisController);
final Configuration readerConfiguration = new Configuration();
readerConfiguration.setProperty(FSReader.CONFIG_PROPERTY_NAME_INPUTDIRS, this.inputDir.getAbsolutePath());
final AbstractReaderPlugin reader = new FinalTerminationReader(readerConfiguration, this.analysisController, clock);
this.recordCounter = new CountingFilter(new Configuration(), this.analysisController);
final AbstractFilterPlugin cache = new CacheFilter(new Configuration(), this.analysisController);
final AbstractFilterPlugin stringBufferFilter = new StringBufferFilter(new Configuration(), this.analysisController);
final Configuration typeFilterConfiguration = new Configuration();
typeFilterConfiguration.setProperty(TypeFilter.CONFIG_PROPERTY_NAME_TYPES, IFlowRecord.class.getCanonicalName());
final AbstractFilterPlugin typeFilter = new TypeFilter(typeFilterConfiguration, this.analysisController);
this.throughputFilter = new AnalysisThroughputFilter(new Configuration(), this.analysisController);
final EventRecordTraceReconstructionFilter traceReconstructionFilter = new EventRecordTraceReconstructionFilter(new Configuration(), this.analysisController);
this.traceCounter = new CountingFilter(new Configuration(), this.analysisController);
final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(new Configuration(), this.analysisController, this.elementCollection);
final CollectorSink<Long> throughputCollector = new CollectorSink<Long>(new Configuration(), this.analysisController, this.throughputCollection);
try {
this.analysisController.connect(reader, FSReader.OUTPUT_PORT_NAME_RECORDS, this.recordCounter, CountingFilter.INPUT_PORT_NAME_EVENTS);
this.analysisController.connect(this.recordCounter, CountingFilter.OUTPUT_PORT_NAME_RELAYED_EVENTS, cache, CacheFilter.INPUT_PORT_NAME);
this.analysisController.connect(cache, CacheFilter.OUTPUT_PORT_NAME, stringBufferFilter, StringBufferFilter.INPUT_PORT_NAME_EVENTS);
this.analysisController.connect(stringBufferFilter, StringBufferFilter.OUTPUT_PORT_NAME_RELAYED_EVENTS, typeFilter, TypeFilter.INPUT_PORT_NAME_EVENTS);
this.analysisController.connect(typeFilter, TypeFilter.OUTPUT_PORT_NAME_TYPE_MATCH, this.throughputFilter,
AnalysisThroughputFilter.INPUT_PORT_NAME_OBJECTS);
this.analysisController.connect(this.throughputFilter, AnalysisThroughputFilter.OUTPUT_PORT_NAME_RELAYED_OBJECTS, traceReconstructionFilter,
EventRecordTraceReconstructionFilter.INPUT_PORT_NAME_TRACE_RECORDS);
this.analysisController.connect(traceReconstructionFilter, EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_VALID, this.traceCounter,
CountingFilter.INPUT_PORT_NAME_EVENTS);
this.analysisController.connect(traceReconstructionFilter, EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_INVALID, this.traceCounter,
CountingFilter.INPUT_PORT_NAME_EVENTS);
this.analysisController.connect(this.traceCounter, CountingFilter.OUTPUT_PORT_NAME_RELAYED_EVENTS, collector, CollectorSink.INPUT_PORT_NAME);
this.analysisController.connect(this.throughputFilter, AnalysisThroughputFilter.OUTPUT_PORT_NAME_THROUGHPUT, throughputCollector,
CollectorSink.INPUT_PORT_NAME);
this.analysisController.connect(clock, TimeReader.OUTPUT_PORT_NAME_TIMESTAMPS, this.throughputFilter, AnalysisThroughputFilter.INPUT_PORT_NAME_TIME);
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (AnalysisConfigurationException e) {
e.printStackTrace();
}
}
@Override
public void start() {
super.start();
try {
this.analysisController.run();
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (AnalysisConfigurationException e) {
e.printStackTrace();
}
}
public List<TraceEventRecords> getElementCollection() {
return this.elementCollection;
}
public long getNumRecords() {
return this.recordCounter.getMessageCount();
}
public long getNumTraces() {
return this.traceCounter.getMessageCount();
}
public List<Long> getThroughputs() {
return this.throughputCollection;
}
private static class FinalTerminationReader extends FSReader {
private final AbstractReaderPlugin clock;
public FinalTerminationReader(final Configuration configuration, final IProjectContext projectContext, final AbstractReaderPlugin clock) {
super(configuration, projectContext);
this.clock = clock;
}
@Override
public boolean read() {
final boolean result = super.read();
this.clock.terminate(result);
return result;
}
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.File;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
/**
* @author Nils Christian Ehmke
*/
public class NieWorkKiekerTraceReconstructionAnalysisTest {
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysisWithEprintsLogs() {
final KiekerTraceReconstructionAnalysis analysis = new KiekerTraceReconstructionAnalysis(new File("src/test/data/Eprints-logs"));
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(50002, analysis.getNumRecords());
assertEquals(2, analysis.getNumTraces());
TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
}
@Test
public void performAnalysisWithKiekerLogs() {
final KiekerTraceReconstructionAnalysis analysis = new KiekerTraceReconstructionAnalysis(new File("src/test/data/kieker-logs"));
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(1489901, analysis.getNumRecords());
assertEquals(24013, analysis.getNumTraces());
TraceEventRecords trace0 = analysis.getElementCollection().get(0);
assertEquals(8974347286117089280l, trace0.getTraceMetadata().getTraceId());
TraceEventRecords trace1 = analysis.getElementCollection().get(1);
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l))));
}
@Test
public void performAnalysisWithKieker2Logs() {
final KiekerTraceReconstructionAnalysis analysis = new KiekerTraceReconstructionAnalysis(new File("src/test/data/kieker2-logs"));
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(17371, analysis.getNumRecords());
assertEquals(22, analysis.getNumTraces());
TraceEventRecords trace0 = analysis.getElementCollection().get(0);
assertEquals(0, trace0.getTraceMetadata().getTraceId());
TraceEventRecords trace1 = analysis.getElementCollection().get(1);
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment