Skip to content
Snippets Groups Projects
Commit 9a2a11b5 authored by Nils Christian Ehmke's avatar Nils Christian Ehmke
Browse files

Experiments

parent a7da2c1f
Branches
Tags
No related merge requests found
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kieker.analysis.stage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kieker.analysis.IProjectContext;
import kieker.analysis.plugin.annotation.OutputPort;
import kieker.analysis.plugin.annotation.Plugin;
import kieker.analysis.plugin.annotation.Property;
import kieker.analysis.plugin.reader.AbstractReaderPlugin;
import kieker.common.configuration.Configuration;
import kieker.common.record.misc.TimestampRecord;
@Plugin(
description = "Delivers the current (system) time in regular intervals",
outputPorts = {
@OutputPort(name = TimeReader.OUTPUT_PORT_NAME_TIMESTAMPS, eventTypes = Long.class),
@OutputPort(name = TimeReader.OUTPUT_PORT_NAME_TIMESTAMP_RECORDS, eventTypes = TimestampRecord.class)
},
configuration = {
@Property(name = TimeReader.CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS, defaultValue = TimeReader.CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS,
description = "Determines the update interval in nano seconds."),
@Property(name = TimeReader.CONFIG_PROPERTY_NAME_DELAY_NS, defaultValue = TimeReader.CONFIG_PROPERTY_VALUE_DELAY_NS,
description = "Determines the initial delay in nano seconds."),
@Property(name = TimeReader.CONFIG_PROPERTY_NAME_NUMBER_IMPULSES, defaultValue = TimeReader.CONFIG_PROPERTY_VALUE_NUMBER_IMPULSES,
description = "Determines the number of impulses to emit (0 = infinite).")
})
public final class TimeReader extends AbstractReaderPlugin {
/** The name of the output port for the timestamps. */
public static final String OUTPUT_PORT_NAME_TIMESTAMPS = "timestamps";
/** The name of the output port for the timestamp records. */
public static final String OUTPUT_PORT_NAME_TIMESTAMP_RECORDS = "timestampRecords";
/** The name of the property determining the update interval in nanoseconds. */
public static final String CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS = "updateIntervalNS";
/** The default value for the update interval (1 second). */
public static final String CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS = "1000000000";
/** The name of the property determining the initial delay in nanoseconds. */
public static final String CONFIG_PROPERTY_NAME_DELAY_NS = "delayNS";
/** The default value for the initial delay (0 seconds). */
public static final String CONFIG_PROPERTY_VALUE_DELAY_NS = "0";
/** The name of the property determining the number of impulses to emit. */
public static final String CONFIG_PROPERTY_NAME_NUMBER_IMPULSES = "numberImpulses";
/** The default value for number of impulses (infinite). */
public static final String CONFIG_PROPERTY_VALUE_NUMBER_IMPULSES = "0";
/** A value for the number of impulses. It makes sure that the reader emits an infinite amount of signals. */
public static final long INFINITE_EMITS = 0L;
final CountDownLatch impulseEmitLatch = new CountDownLatch(1); // NOCS NOPMD (package visible)
private volatile boolean terminated;
private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
private volatile ScheduledFuture<?> result;
private final long initialDelay;
private final long period;
private final long numberImpulses;
/**
* Creates a new timer using the given configuration.
*
* @param configuration
* The configuration containing the properties to initialize this timer.
* @param projectContext
* The project context.
*/
public TimeReader(final Configuration configuration, final IProjectContext projectContext) {
super(configuration, projectContext);
this.initialDelay = configuration.getLongProperty(CONFIG_PROPERTY_NAME_DELAY_NS);
this.period = configuration.getLongProperty(CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS);
this.numberImpulses = configuration.getLongProperty(CONFIG_PROPERTY_NAME_NUMBER_IMPULSES);
}
/**
* {@inheritDoc}
*/
@Override
public void terminate(final boolean error) {
if (!this.terminated) {
this.log.info("Shutdown of TimeReader requested.");
this.executorService.shutdown();
try {
this.terminated = this.executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException ex) {
// ignore
}
if (!this.terminated && (this.result != null)) {
// problems shutting down
this.result.cancel(true);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean read() {
this.result = this.executorService.scheduleAtFixedRate(new TimestampEventTask(this.numberImpulses), this.initialDelay, this.period, TimeUnit.NANOSECONDS);
return true;
}
@Override
public Configuration getCurrentConfiguration() {
final Configuration configuration = new Configuration();
configuration.setProperty(CONFIG_PROPERTY_NAME_DELAY_NS, Long.toString(this.initialDelay));
configuration.setProperty(CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS, Long.toString(this.period));
configuration.setProperty(CONFIG_PROPERTY_NAME_NUMBER_IMPULSES, Long.toString(this.numberImpulses));
return configuration;
}
/**
* Sends the current system time as a new timestamp event.
*/
protected void sendTimestampEvent() {
final long timestamp = super.recordsTimeUnitFromProjectContext.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
super.deliver(OUTPUT_PORT_NAME_TIMESTAMPS, timestamp);
super.deliver(OUTPUT_PORT_NAME_TIMESTAMP_RECORDS, new TimestampRecord(timestamp));
}
/**
* A simple helper class used to send the current system time.
*
* @author Nils Christian Ehmke
*
* @since 1.8
*/
protected class TimestampEventTask implements Runnable {
private final boolean infinite;
private volatile long numberImpulses;
/**
* Creates a new task.
*
* @param numberImpulses
* 0 = infinite
*/
public TimestampEventTask(final long numberImpulses) {
this.numberImpulses = numberImpulses;
if (numberImpulses == 0) {
this.infinite = true;
} else {
this.infinite = false;
}
}
/**
* Executes the task.
*/
@Override
public void run() {
if (this.infinite || (this.numberImpulses > 0)) {
TimeReader.this.sendTimestampEvent();
if (!this.infinite && (0 == --this.numberImpulses)) { // NOPMD
TimeReader.this.impulseEmitLatch.countDown();
}
}
}
}
}
package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import java.io.File;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import kieker.analysis.AnalysisController;
import kieker.analysis.IAnalysisController;
import kieker.analysis.IProjectContext;
import kieker.analysis.exception.AnalysisConfigurationException;
import kieker.analysis.plugin.filter.AbstractFilterPlugin;
import kieker.analysis.plugin.filter.flow.EventRecordTraceReconstructionFilter;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.analysis.plugin.filter.forward.AnalysisThroughputFilter;
import kieker.analysis.plugin.filter.forward.CountingFilter;
import kieker.analysis.plugin.filter.forward.StringBufferFilter;
import kieker.analysis.plugin.filter.select.TypeFilter;
import kieker.analysis.plugin.reader.AbstractReaderPlugin;
import kieker.analysis.plugin.reader.filesystem.FSReader;
import kieker.analysis.stage.CacheFilter;
import kieker.analysis.stage.CollectorSink;
import kieker.analysis.stage.TimeReader;
import kieker.common.configuration.Configuration;
import kieker.common.record.flow.IFlowRecord;
public class KiekerTraceReconstructionAnalysis extends Analysis {
private final IAnalysisController analysisController = new AnalysisController();
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private final List<Long> throughputCollection = new LinkedList<Long>();
private CountingFilter recordCounter;
private CountingFilter traceCounter;
private AnalysisThroughputFilter throughputFilter;
private final File inputDir;
public KiekerTraceReconstructionAnalysis(final File inputDir) {
this.inputDir = inputDir;
}
@Override
public void init() {
super.init();
final Configuration clockConfiguration = new Configuration();
clockConfiguration.setProperty(TimeReader.CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS, Integer.toString(50 * 1000 * 1000));
final AbstractReaderPlugin clock = new TimeReader(clockConfiguration, this.analysisController);
final Configuration readerConfiguration = new Configuration();
readerConfiguration.setProperty(FSReader.CONFIG_PROPERTY_NAME_INPUTDIRS, this.inputDir.getAbsolutePath());
final AbstractReaderPlugin reader = new FinalTerminationReader(readerConfiguration, this.analysisController, clock);
this.recordCounter = new CountingFilter(new Configuration(), this.analysisController);
final AbstractFilterPlugin cache = new CacheFilter(new Configuration(), this.analysisController);
final AbstractFilterPlugin stringBufferFilter = new StringBufferFilter(new Configuration(), this.analysisController);
final Configuration typeFilterConfiguration = new Configuration();
typeFilterConfiguration.setProperty(TypeFilter.CONFIG_PROPERTY_NAME_TYPES, IFlowRecord.class.getCanonicalName());
final AbstractFilterPlugin typeFilter = new TypeFilter(typeFilterConfiguration, this.analysisController);
this.throughputFilter = new AnalysisThroughputFilter(new Configuration(), this.analysisController);
final EventRecordTraceReconstructionFilter traceReconstructionFilter = new EventRecordTraceReconstructionFilter(new Configuration(), this.analysisController);
this.traceCounter = new CountingFilter(new Configuration(), this.analysisController);
final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(new Configuration(), this.analysisController, this.elementCollection);
final CollectorSink<Long> throughputCollector = new CollectorSink<Long>(new Configuration(), this.analysisController, this.throughputCollection);
try {
this.analysisController.connect(reader, FSReader.OUTPUT_PORT_NAME_RECORDS, this.recordCounter, CountingFilter.INPUT_PORT_NAME_EVENTS);
this.analysisController.connect(this.recordCounter, CountingFilter.OUTPUT_PORT_NAME_RELAYED_EVENTS, cache, CacheFilter.INPUT_PORT_NAME);
this.analysisController.connect(cache, CacheFilter.OUTPUT_PORT_NAME, stringBufferFilter, StringBufferFilter.INPUT_PORT_NAME_EVENTS);
this.analysisController.connect(stringBufferFilter, StringBufferFilter.OUTPUT_PORT_NAME_RELAYED_EVENTS, typeFilter, TypeFilter.INPUT_PORT_NAME_EVENTS);
this.analysisController.connect(typeFilter, TypeFilter.OUTPUT_PORT_NAME_TYPE_MATCH, this.throughputFilter,
AnalysisThroughputFilter.INPUT_PORT_NAME_OBJECTS);
this.analysisController.connect(this.throughputFilter, AnalysisThroughputFilter.OUTPUT_PORT_NAME_RELAYED_OBJECTS, traceReconstructionFilter,
EventRecordTraceReconstructionFilter.INPUT_PORT_NAME_TRACE_RECORDS);
this.analysisController.connect(traceReconstructionFilter, EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_VALID, this.traceCounter,
CountingFilter.INPUT_PORT_NAME_EVENTS);
this.analysisController.connect(traceReconstructionFilter, EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_INVALID, this.traceCounter,
CountingFilter.INPUT_PORT_NAME_EVENTS);
this.analysisController.connect(this.traceCounter, CountingFilter.OUTPUT_PORT_NAME_RELAYED_EVENTS, collector, CollectorSink.INPUT_PORT_NAME);
this.analysisController.connect(this.throughputFilter, AnalysisThroughputFilter.OUTPUT_PORT_NAME_THROUGHPUT, throughputCollector,
CollectorSink.INPUT_PORT_NAME);
this.analysisController.connect(clock, TimeReader.OUTPUT_PORT_NAME_TIMESTAMPS, this.throughputFilter, AnalysisThroughputFilter.INPUT_PORT_NAME_TIME);
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (AnalysisConfigurationException e) {
e.printStackTrace();
}
}
@Override
public void start() {
super.start();
try {
this.analysisController.run();
} catch (IllegalStateException e) {
e.printStackTrace();
} catch (AnalysisConfigurationException e) {
e.printStackTrace();
}
}
public List<TraceEventRecords> getElementCollection() {
return this.elementCollection;
}
public long getNumRecords() {
return this.recordCounter.getMessageCount();
}
public long getNumTraces() {
return this.traceCounter.getMessageCount();
}
public List<Long> getThroughputs() {
return this.throughputCollection;
}
private static class FinalTerminationReader extends FSReader {
private final AbstractReaderPlugin clock;
public FinalTerminationReader(final Configuration configuration, final IProjectContext projectContext, final AbstractReaderPlugin clock) {
super(configuration, projectContext);
this.clock = clock;
}
@Override
public boolean read() {
final boolean result = super.read();
this.clock.terminate(result);
return result;
}
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.traceReconstruction;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.File;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
/**
* @author Nils Christian Ehmke
*/
public class NieWorkKiekerTraceReconstructionAnalysisTest {
private StopWatch stopWatch;
@Before
public void before() {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysisWithEprintsLogs() {
final KiekerTraceReconstructionAnalysis analysis = new KiekerTraceReconstructionAnalysis(new File("src/test/data/Eprints-logs"));
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(50002, analysis.getNumRecords());
assertEquals(2, analysis.getNumTraces());
TraceEventRecords trace6884 = analysis.getElementCollection().get(0);
assertEquals(6884, trace6884.getTraceMetadata().getTraceId());
TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
}
@Test
public void performAnalysisWithKiekerLogs() {
final KiekerTraceReconstructionAnalysis analysis = new KiekerTraceReconstructionAnalysis(new File("src/test/data/kieker-logs"));
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(1489901, analysis.getNumRecords());
assertEquals(24013, analysis.getNumTraces());
TraceEventRecords trace0 = analysis.getElementCollection().get(0);
assertEquals(8974347286117089280l, trace0.getTraceMetadata().getTraceId());
TraceEventRecords trace1 = analysis.getElementCollection().get(1);
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l))));
}
@Test
public void performAnalysisWithKieker2Logs() {
final KiekerTraceReconstructionAnalysis analysis = new KiekerTraceReconstructionAnalysis(new File("src/test/data/kieker2-logs"));
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
assertEquals(17371, analysis.getNumRecords());
assertEquals(22, analysis.getNumTraces());
TraceEventRecords trace0 = analysis.getElementCollection().get(0);
assertEquals(0, trace0.getTraceMetadata().getTraceId());
TraceEventRecords trace1 = analysis.getElementCollection().get(1);
assertEquals(1, trace1.getTraceMetadata().getTraceId());
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms");
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment