Skip to content
Snippets Groups Projects
Commit 1e1c0b15 authored by Sören Henning's avatar Sören Henning
Browse files

Cleanup Hazelcast Jet implementations

parent ccbb08b3
No related branches found
No related tags found
No related merge requests found
Pipeline #10132 failed
Showing
with 71 additions and 67 deletions
...@@ -32,9 +32,6 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; ...@@ -32,9 +32,6 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import rocks.theodolite.benchmarks.uc1.hazelcastjet.Uc1PipelineFactory; import rocks.theodolite.benchmarks.uc1.hazelcastjet.Uc1PipelineFactory;
import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
import static com.hazelcast.logging.Logger.getLogger;
/** /**
* Test methods for the Hazelcast Jet Implementation of UC1. * Test methods for the Hazelcast Jet Implementation of UC1.
*/ */
...@@ -89,7 +86,7 @@ public class Uc1PipelineTest extends JetTestSupport { ...@@ -89,7 +86,7 @@ public class Uc1PipelineTest extends JetTestSupport {
// Create pipeline to test // Create pipeline to test
final Properties properties = new Properties(); final Properties properties = new Properties();
final Uc1PipelineFactory factory = new Uc1PipelineFactory(properties, ""); final Uc1PipelineFactory factory = new Uc1PipelineFactory(properties, "");
uc1Topology = factory.extendUc1Topology(testSource); this.uc1Topology = factory.extendUc1Topology(testSource);
this.testPipeline = factory.getPipe(); this.testPipeline = factory.getPipe();
// Create DatabaseWriter sink // Create DatabaseWriter sink
......
...@@ -11,7 +11,6 @@ import org.slf4j.Logger; ...@@ -11,7 +11,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer;
/** /**
......
package rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics; package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator; import com.google.common.math.StatsAccumulator;
......
package rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics; package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.google.common.math.StatsAccumulator; import com.google.common.math.StatsAccumulator;
import com.hazelcast.function.SupplierEx; import com.hazelcast.function.SupplierEx;
......
package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import java.util.Map.Entry;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
/**
* Factory for creating an aggregation operator for {@link Stats} objects.
*/
public final class StatsAggregatorFactory {
/**
* Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast
* Jet implementation of UC2.
*
* <p>
* Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a
* {@link Stats} object.
* </p>
*
* @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates
* ActivePowerRecord Objects into Stats Objects.
*/
public static AggregateOperation1<Entry<String, ActivePowerRecord>, StatsAccumulator, Stats> // NOCS
create() {
// Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using
// the StatsAccumulator.
return AggregateOperation
// Creates the accumulator
.withCreate(new StatsAccumulatorSupplier())
// Defines the accumulation
.<Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> {
accumulator.add(item.getValue().getValueInW());
})
// Defines the combination of spread out instances
.andCombine((left, right) -> {
final Stats rightStats = right.snapshot();
left.addAll(rightStats);
})
// Finishes the aggregation
.andExportFinish(StatsAccumulator::snapshot);
}
}
package rocks.theodolite.benchmarks.uc2.hazelcastjet; package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
...@@ -13,16 +9,13 @@ import com.hazelcast.jet.pipeline.StreamStage; ...@@ -13,16 +9,13 @@ import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition; import com.hazelcast.jet.pipeline.WindowDefinition;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory; import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier;
/** /**
* PipelineFactory for use case 2. * PipelineFactory for use case 2. Allows to build and extend a pipeline.
* Allows to build and extend a pipeline.
*/ */
public class Uc2PipelineFactory extends PipelineFactory { public class Uc2PipelineFactory extends PipelineFactory {
...@@ -30,6 +23,7 @@ public class Uc2PipelineFactory extends PipelineFactory { ...@@ -30,6 +23,7 @@ public class Uc2PipelineFactory extends PipelineFactory {
/** /**
* Factory for uc2 pipelines. * Factory for uc2 pipelines.
*
* @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
* attributes. * attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline. * @param kafkaInputTopic The name of the input topic used for the pipeline.
...@@ -60,17 +54,18 @@ public class Uc2PipelineFactory extends PipelineFactory { ...@@ -60,17 +54,18 @@ public class Uc2PipelineFactory extends PipelineFactory {
// Define the Kafka Source // Define the Kafka Source
final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource = final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource =
KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic); KafkaSources.<String, ActivePowerRecord>kafka(this.kafkaReadPropsForPipeline,
this.kafkaInputTopic);
// Extend UC2 topology to the pipeline // Extend UC2 topology to the pipeline
final StreamStage<Map.Entry<String, String>> uc2TopologyProduct = final StreamStage<Map.Entry<String, String>> uc2TopologyProduct =
this.extendUc2Topology(kafkaSource); this.extendUc2Topology(kafkaSource);
// Add Sink1: Logger // Add Sink1: Logger
uc2TopologyProduct.writeTo(Sinks.logger()); uc2TopologyProduct.writeTo(Sinks.logger()); // TODO align implementations
// Add Sink2: Write back to kafka for the final benchmark // Add Sink2: Write back to kafka for the final benchmark
uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka( uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka(
kafkaWritePropsForPipeline, kafkaOutputTopic)); this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
return this.pipe; return this.pipe;
} }
...@@ -90,15 +85,15 @@ public class Uc2PipelineFactory extends PipelineFactory { ...@@ -90,15 +85,15 @@ public class Uc2PipelineFactory extends PipelineFactory {
* and value of the Entry object. It can be used to be further modified or directly be * and value of the Entry object. It can be used to be further modified or directly be
* written into a sink. * written into a sink.
*/ */
public StreamStage<Map.Entry<String, String>> public StreamStage<Map.Entry<String, String>> extendUc2Topology(
extendUc2Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
// Build the pipeline topology. // Build the pipeline topology.
return pipe.readFrom(source) return this.pipe.readFrom(source)
.withNativeTimestamps(0) .withNativeTimestamps(0)
.setLocalParallelism(1) .setLocalParallelism(1)
.groupingKey(record -> record.getValue().getIdentifier()) .groupingKey(record -> record.getValue().getIdentifier())
.window(WindowDefinition.tumbling(downsampleInterval.toMillis())) .window(WindowDefinition.tumbling(this.downsampleInterval.toMillis()))
.aggregate(this.uc2AggregateOperation()) .aggregate(StatsAggregatorFactory.create())
.map(agg -> { .map(agg -> {
final String theKey = agg.key(); final String theKey = agg.key();
final String theValue = agg.getValue().toString(); final String theValue = agg.getValue().toString();
...@@ -106,37 +101,4 @@ public class Uc2PipelineFactory extends PipelineFactory { ...@@ -106,37 +101,4 @@ public class Uc2PipelineFactory extends PipelineFactory {
}); });
} }
/**
* Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast
* Jet implementation of UC2.
*
* <p>
* Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a
* {@link Stats} object.
* </p>
*
* @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates
* ActivePowerRecord Objects into Stats Objects.
*/
public AggregateOperation1<Entry<String, ActivePowerRecord>,
StatsAccumulator, Stats> uc2AggregateOperation() { // NOCS
// Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using
// the Statsaccumulator.
return AggregateOperation
// Creates the accumulator
.withCreate(new StatsAccumulatorSupplier())
// Defines the accumulation
.<Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> {
accumulator.add(item.getValue().getValueInW());
})
// Defines the combination of spread out instances
.andCombine((left, right) -> {
final Stats rightStats = right.snapshot();
left.addAll(rightStats);
})
// Finishes the aggregation
.andExportFinish(
StatsAccumulator::snapshot);
}
} }
...@@ -5,14 +5,12 @@ import com.hazelcast.jet.JetInstance; ...@@ -5,14 +5,12 @@ import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JetConfig; import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.core.JetTestSupport; import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.test.AssertionCompletedException; import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
import com.hazelcast.jet.pipeline.test.Assertions; import com.hazelcast.jet.pipeline.test.Assertions;
import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.test.SerialTest; import com.hazelcast.jet.test.SerialTest;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment