From 9a2a11b5733cf96911ee4673836328b9435794e3 Mon Sep 17 00:00:00 2001 From: Nils Christian Ehmke <nie@informatik.uni-kiel.de> Date: Fri, 27 Jun 2014 14:12:30 +0200 Subject: [PATCH] Experiments --- .../kieker/analysis/stage/TimeReader.java | 185 ++++++++++++++++++ .../KiekerTraceReconstructionAnalysis.java | 152 ++++++++++++++ ...KiekerTraceReconstructionAnalysisTest.java | 136 +++++++++++++ 3 files changed, 473 insertions(+) create mode 100644 src/main/java/kieker/analysis/stage/TimeReader.java create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/KiekerTraceReconstructionAnalysis.java create mode 100644 src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java diff --git a/src/main/java/kieker/analysis/stage/TimeReader.java b/src/main/java/kieker/analysis/stage/TimeReader.java new file mode 100644 index 0000000..30ec2b5 --- /dev/null +++ b/src/main/java/kieker/analysis/stage/TimeReader.java @@ -0,0 +1,185 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.stage; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.OutputPort; +import kieker.analysis.plugin.annotation.Plugin; +import kieker.analysis.plugin.annotation.Property; +import kieker.analysis.plugin.reader.AbstractReaderPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.record.misc.TimestampRecord; + +@Plugin( + description = "Delivers the current (system) time in regular intervals", + outputPorts = { + @OutputPort(name = TimeReader.OUTPUT_PORT_NAME_TIMESTAMPS, eventTypes = Long.class), + @OutputPort(name = TimeReader.OUTPUT_PORT_NAME_TIMESTAMP_RECORDS, eventTypes = TimestampRecord.class) + }, + configuration = { + @Property(name = TimeReader.CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS, defaultValue = TimeReader.CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS, + description = "Determines the update interval in nano seconds."), + @Property(name = TimeReader.CONFIG_PROPERTY_NAME_DELAY_NS, defaultValue = TimeReader.CONFIG_PROPERTY_VALUE_DELAY_NS, + description = "Determines the initial delay in nano seconds."), + @Property(name = TimeReader.CONFIG_PROPERTY_NAME_NUMBER_IMPULSES, defaultValue = TimeReader.CONFIG_PROPERTY_VALUE_NUMBER_IMPULSES, + description = "Determines the number of impulses to emit (0 = infinite).") + }) +public final class TimeReader extends AbstractReaderPlugin { + + /** The name of the output port for the timestamps. */ + public static final String OUTPUT_PORT_NAME_TIMESTAMPS = "timestamps"; + /** The name of the output port for the timestamp records. */ + public static final String OUTPUT_PORT_NAME_TIMESTAMP_RECORDS = "timestampRecords"; + + /** The name of the property determining the update interval in nanoseconds. */ + public static final String CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS = "updateIntervalNS"; + /** The default value for the update interval (1 second). */ + public static final String CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS = "1000000000"; + + /** The name of the property determining the initial delay in nanoseconds. */ + public static final String CONFIG_PROPERTY_NAME_DELAY_NS = "delayNS"; + /** The default value for the initial delay (0 seconds). */ + public static final String CONFIG_PROPERTY_VALUE_DELAY_NS = "0"; + + /** The name of the property determining the number of impulses to emit. */ + public static final String CONFIG_PROPERTY_NAME_NUMBER_IMPULSES = "numberImpulses"; + /** The default value for number of impulses (infinite). */ + public static final String CONFIG_PROPERTY_VALUE_NUMBER_IMPULSES = "0"; + + /** A value for the number of impulses. It makes sure that the reader emits an infinite amount of signals. */ + public static final long INFINITE_EMITS = 0L; + + final CountDownLatch impulseEmitLatch = new CountDownLatch(1); // NOCS NOPMD (package visible) + + private volatile boolean terminated; + + private final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); + private volatile ScheduledFuture<?> result; + + private final long initialDelay; + private final long period; + private final long numberImpulses; + + /** + * Creates a new timer using the given configuration. + * + * @param configuration + * The configuration containing the properties to initialize this timer. + * @param projectContext + * The project context. + */ + public TimeReader(final Configuration configuration, final IProjectContext projectContext) { + super(configuration, projectContext); + this.initialDelay = configuration.getLongProperty(CONFIG_PROPERTY_NAME_DELAY_NS); + this.period = configuration.getLongProperty(CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS); + this.numberImpulses = configuration.getLongProperty(CONFIG_PROPERTY_NAME_NUMBER_IMPULSES); + } + + /** + * {@inheritDoc} + */ + @Override + public void terminate(final boolean error) { + if (!this.terminated) { + this.log.info("Shutdown of TimeReader requested."); + this.executorService.shutdown(); + try { + this.terminated = this.executorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (final InterruptedException ex) { + // ignore + } + if (!this.terminated && (this.result != null)) { + // problems shutting down + this.result.cancel(true); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean read() { + this.result = this.executorService.scheduleAtFixedRate(new TimestampEventTask(this.numberImpulses), this.initialDelay, this.period, TimeUnit.NANOSECONDS); + + return true; + } + + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_DELAY_NS, Long.toString(this.initialDelay)); + configuration.setProperty(CONFIG_PROPERTY_NAME_UPDATE_INTERVAL_NS, Long.toString(this.period)); + configuration.setProperty(CONFIG_PROPERTY_NAME_NUMBER_IMPULSES, Long.toString(this.numberImpulses)); + return configuration; + } + + /** + * Sends the current system time as a new timestamp event. + */ + protected void sendTimestampEvent() { + final long timestamp = super.recordsTimeUnitFromProjectContext.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + super.deliver(OUTPUT_PORT_NAME_TIMESTAMPS, timestamp); + super.deliver(OUTPUT_PORT_NAME_TIMESTAMP_RECORDS, new TimestampRecord(timestamp)); + } + + /** + * A simple helper class used to send the current system time. + * + * @author Nils Christian Ehmke + * + * @since 1.8 + */ + protected class TimestampEventTask implements Runnable { + private final boolean infinite; + private volatile long numberImpulses; + + /** + * Creates a new task. + * + * @param numberImpulses + * 0 = infinite + */ + public TimestampEventTask(final long numberImpulses) { + this.numberImpulses = numberImpulses; + if (numberImpulses == 0) { + this.infinite = true; + } else { + this.infinite = false; + } + } + + /** + * Executes the task. + */ + @Override + public void run() { + if (this.infinite || (this.numberImpulses > 0)) { + TimeReader.this.sendTimestampEvent(); + if (!this.infinite && (0 == --this.numberImpulses)) { // NOPMD + TimeReader.this.impulseEmitLatch.countDown(); + } + } + } + } +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/KiekerTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/KiekerTraceReconstructionAnalysis.java new file mode 100644 index 0000000..8bd6ea9 --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/KiekerTraceReconstructionAnalysis.java @@ -0,0 +1,152 @@ +package teetime.variant.methodcallWithPorts.examples.traceReconstruction; + +import java.io.File; +import java.util.LinkedList; +import java.util.List; + +import teetime.variant.explicitScheduling.framework.core.Analysis; + +import kieker.analysis.AnalysisController; +import kieker.analysis.IAnalysisController; +import kieker.analysis.IProjectContext; +import kieker.analysis.exception.AnalysisConfigurationException; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.analysis.plugin.filter.flow.EventRecordTraceReconstructionFilter; +import kieker.analysis.plugin.filter.flow.TraceEventRecords; +import kieker.analysis.plugin.filter.forward.AnalysisThroughputFilter; +import kieker.analysis.plugin.filter.forward.CountingFilter; +import kieker.analysis.plugin.filter.forward.StringBufferFilter; +import kieker.analysis.plugin.filter.select.TypeFilter; +import kieker.analysis.plugin.reader.AbstractReaderPlugin; +import kieker.analysis.plugin.reader.filesystem.FSReader; +import kieker.analysis.stage.CacheFilter; +import kieker.analysis.stage.CollectorSink; +import kieker.analysis.stage.TimeReader; +import kieker.common.configuration.Configuration; +import kieker.common.record.flow.IFlowRecord; + +public class KiekerTraceReconstructionAnalysis extends Analysis { + + private final IAnalysisController analysisController = new AnalysisController(); + + private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); + private final List<Long> throughputCollection = new LinkedList<Long>(); + + private CountingFilter recordCounter; + private CountingFilter traceCounter; + private AnalysisThroughputFilter throughputFilter; + + private final File inputDir; + + public KiekerTraceReconstructionAnalysis(final File inputDir) { + this.inputDir = inputDir; + } + + @Override + public void init() { + super.init(); + + final Configuration clockConfiguration = new Configuration(); + clockConfiguration.setProperty(TimeReader.CONFIG_PROPERTY_VALUE_UPDATE_INTERVAL_NS, Integer.toString(50 * 1000 * 1000)); + final AbstractReaderPlugin clock = new TimeReader(clockConfiguration, this.analysisController); + + final Configuration readerConfiguration = new Configuration(); + readerConfiguration.setProperty(FSReader.CONFIG_PROPERTY_NAME_INPUTDIRS, this.inputDir.getAbsolutePath()); + final AbstractReaderPlugin reader = new FinalTerminationReader(readerConfiguration, this.analysisController, clock); + + this.recordCounter = new CountingFilter(new Configuration(), this.analysisController); + + final AbstractFilterPlugin cache = new CacheFilter(new Configuration(), this.analysisController); + + final AbstractFilterPlugin stringBufferFilter = new StringBufferFilter(new Configuration(), this.analysisController); + + final Configuration typeFilterConfiguration = new Configuration(); + typeFilterConfiguration.setProperty(TypeFilter.CONFIG_PROPERTY_NAME_TYPES, IFlowRecord.class.getCanonicalName()); + final AbstractFilterPlugin typeFilter = new TypeFilter(typeFilterConfiguration, this.analysisController); + + this.throughputFilter = new AnalysisThroughputFilter(new Configuration(), this.analysisController); + final EventRecordTraceReconstructionFilter traceReconstructionFilter = new EventRecordTraceReconstructionFilter(new Configuration(), this.analysisController); + this.traceCounter = new CountingFilter(new Configuration(), this.analysisController); + final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(new Configuration(), this.analysisController, this.elementCollection); + + final CollectorSink<Long> throughputCollector = new CollectorSink<Long>(new Configuration(), this.analysisController, this.throughputCollection); + + try { + this.analysisController.connect(reader, FSReader.OUTPUT_PORT_NAME_RECORDS, this.recordCounter, CountingFilter.INPUT_PORT_NAME_EVENTS); + this.analysisController.connect(this.recordCounter, CountingFilter.OUTPUT_PORT_NAME_RELAYED_EVENTS, cache, CacheFilter.INPUT_PORT_NAME); + + this.analysisController.connect(cache, CacheFilter.OUTPUT_PORT_NAME, stringBufferFilter, StringBufferFilter.INPUT_PORT_NAME_EVENTS); + + this.analysisController.connect(stringBufferFilter, StringBufferFilter.OUTPUT_PORT_NAME_RELAYED_EVENTS, typeFilter, TypeFilter.INPUT_PORT_NAME_EVENTS); + this.analysisController.connect(typeFilter, TypeFilter.OUTPUT_PORT_NAME_TYPE_MATCH, this.throughputFilter, + AnalysisThroughputFilter.INPUT_PORT_NAME_OBJECTS); + this.analysisController.connect(this.throughputFilter, AnalysisThroughputFilter.OUTPUT_PORT_NAME_RELAYED_OBJECTS, traceReconstructionFilter, + EventRecordTraceReconstructionFilter.INPUT_PORT_NAME_TRACE_RECORDS); + this.analysisController.connect(traceReconstructionFilter, EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_VALID, this.traceCounter, + CountingFilter.INPUT_PORT_NAME_EVENTS); + this.analysisController.connect(traceReconstructionFilter, EventRecordTraceReconstructionFilter.OUTPUT_PORT_NAME_TRACE_INVALID, this.traceCounter, + CountingFilter.INPUT_PORT_NAME_EVENTS); + this.analysisController.connect(this.traceCounter, CountingFilter.OUTPUT_PORT_NAME_RELAYED_EVENTS, collector, CollectorSink.INPUT_PORT_NAME); + this.analysisController.connect(this.throughputFilter, AnalysisThroughputFilter.OUTPUT_PORT_NAME_THROUGHPUT, throughputCollector, + CollectorSink.INPUT_PORT_NAME); + + this.analysisController.connect(clock, TimeReader.OUTPUT_PORT_NAME_TIMESTAMPS, this.throughputFilter, AnalysisThroughputFilter.INPUT_PORT_NAME_TIME); + } catch (IllegalStateException e) { + e.printStackTrace(); + } catch (AnalysisConfigurationException e) { + e.printStackTrace(); + } + + } + + @Override + public void start() { + super.start(); + + try { + this.analysisController.run(); + } catch (IllegalStateException e) { + e.printStackTrace(); + } catch (AnalysisConfigurationException e) { + e.printStackTrace(); + } + } + + public List<TraceEventRecords> getElementCollection() { + return this.elementCollection; + } + + public long getNumRecords() { + return this.recordCounter.getMessageCount(); + } + + public long getNumTraces() { + return this.traceCounter.getMessageCount(); + } + + public List<Long> getThroughputs() { + return this.throughputCollection; + } + + private static class FinalTerminationReader extends FSReader { + + private final AbstractReaderPlugin clock; + + public FinalTerminationReader(final Configuration configuration, final IProjectContext projectContext, final AbstractReaderPlugin clock) { + super(configuration, projectContext); + + this.clock = clock; + } + + @Override + public boolean read() { + final boolean result = super.read(); + + this.clock.terminate(result); + + return result; + } + + } + +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java new file mode 100644 index 0000000..4a9f9fc --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/NieWorkKiekerTraceReconstructionAnalysisTest.java @@ -0,0 +1,136 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.examples.traceReconstruction; + +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.io.File; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import teetime.util.StatisticsUtil; +import teetime.util.StopWatch; + +import kieker.analysis.plugin.filter.flow.TraceEventRecords; + +/** + * @author Nils Christian Ehmke + */ +public class NieWorkKiekerTraceReconstructionAnalysisTest { + + private StopWatch stopWatch; + + @Before + public void before() { + this.stopWatch = new StopWatch(); + } + + @After + public void after() { + long overallDurationInNs = this.stopWatch.getDurationInNs(); + System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms"); + } + + @Test + public void performAnalysisWithEprintsLogs() { + final KiekerTraceReconstructionAnalysis analysis = new KiekerTraceReconstructionAnalysis(new File("src/test/data/Eprints-logs")); + analysis.init(); + + this.stopWatch.start(); + try { + analysis.start(); + } finally { + this.stopWatch.end(); + analysis.onTerminate(); + } + + assertEquals(50002, analysis.getNumRecords()); + assertEquals(2, analysis.getNumTraces()); + + TraceEventRecords trace6884 = analysis.getElementCollection().get(0); + assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); + + TraceEventRecords trace6886 = analysis.getElementCollection().get(1); + assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); + + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + } + + @Test + public void performAnalysisWithKiekerLogs() { + final KiekerTraceReconstructionAnalysis analysis = new KiekerTraceReconstructionAnalysis(new File("src/test/data/kieker-logs")); + analysis.init(); + + this.stopWatch.start(); + try { + analysis.start(); + } finally { + this.stopWatch.end(); + analysis.onTerminate(); + } + + assertEquals(1489901, analysis.getNumRecords()); + assertEquals(24013, analysis.getNumTraces()); + + TraceEventRecords trace0 = analysis.getElementCollection().get(0); + assertEquals(8974347286117089280l, trace0.getTraceMetadata().getTraceId()); + + TraceEventRecords trace1 = analysis.getElementCollection().get(1); + assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); + + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + + assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l)))); + } + + @Test + public void performAnalysisWithKieker2Logs() { + final KiekerTraceReconstructionAnalysis analysis = new KiekerTraceReconstructionAnalysis(new File("src/test/data/kieker2-logs")); + analysis.init(); + + this.stopWatch.start(); + try { + analysis.start(); + } finally { + this.stopWatch.end(); + analysis.onTerminate(); + } + + assertEquals(17371, analysis.getNumRecords()); + assertEquals(22, analysis.getNumTraces()); + + TraceEventRecords trace0 = analysis.getElementCollection().get(0); + assertEquals(0, trace0.getTraceMetadata().getTraceId()); + + TraceEventRecords trace1 = analysis.getElementCollection().get(1); + assertEquals(1, trace1.getTraceMetadata().getTraceId()); + + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); + System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + } + +} -- GitLab