Skip to content
Snippets Groups Projects
Commit ce0daa37 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into issue-259

parents 9531bbaf f849ef81
No related branches found
No related tags found
1 merge request!215Redesign Strategy, Load, and Resources data types
Pipeline #8465 passed
Showing
with 1564 additions and 3 deletions
package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.google.common.math.StatsAccumulator;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder;
import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder;
import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer;
/**
* A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC2
* benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the
* Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build
* the Read and Write Properties, set the input and output topic, and set the downsample interval
* which can be done using internal functions of this factory. Outside data only refers to custom
* values or default values in case data of the environment cannot the fetched.
*/
public class Uc2HazelcastJetFactory {
// Information per History Service
private Properties kafkaReadPropsForPipeline;
private Properties kafkaWritePropsForPipeline;
private String kafkaInputTopic;
private String kafkaOutputTopic;
private JetInstance uc2JetInstance;
private Pipeline uc2JetPipeline;
// UC2 specific
private int downsampleInterval;
/////////////////////////////////////
// Layer 1 - Hazelcast Jet Run Job //
/////////////////////////////////////
/**
* Needs a JetInstance and Pipeline defined in this factors. Adds the pipeline to the existing
* JetInstance as a job.
*
* @param jobName The name of the job.
*/
public void runUc2Job(final String jobName) {
// Check if a Jet Instance for UC2 is set.
if (this.uc2JetInstance == null) {
throw new IllegalStateException("Jet Instance is not set! "
+ "Cannot start a hazelcast jet job for UC2.");
}
// Check if a Pipeline for UC2 is set.
if (this.uc2JetPipeline == null) {
throw new IllegalStateException(
"Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC2.");
}
// Adds the job name and joins a job to the JetInstance defined in this factory
final JobConfig jobConfig = new JobConfig();
jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class);
jobConfig.setName(jobName);
this.uc2JetInstance.newJobIfAbsent(this.uc2JetPipeline, jobConfig).join();
}
/////////////
// Layer 2 //
/////////////
/**
* Build a Hazelcast JetInstance used to run a job on.
*
* @param logger The logger specified for this JetInstance.
* @param bootstrapServerDefault Default bootstrap server in case no value can be derived from the
* environment.
* @param hzKubernetesServiceDnsKey The kubernetes service dns key.
* @return A Uc2HazelcastJetFactory containing a set JetInstance.
*/
public Uc2HazelcastJetFactory buildUc2JetInstanceFromEnv(final Logger logger,
final String bootstrapServerDefault,
final String hzKubernetesServiceDnsKey) {
this.uc2JetInstance = new JetInstanceBuilder()
.setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey)
.build();
return this;
}
/**
* Builds a Hazelcast Jet pipeline used for a JetInstance to run it as a job on. Needs the input
* topic and kafka properties defined in this factory beforehand.
*
* @return A Uc2HazelcastJetFactory containg a set pipeline.
* @throws Exception If the input topic or the kafka properties are not defined, the pipeline
* cannot be built.
*/
public Uc2HazelcastJetFactory buildUc2Pipeline() throws IllegalStateException { // NOPMD
final String defaultPipelineWarning = "Cannot build pipeline."; // NOPMD
// Check if Properties for the Kafka Input are set.
if (this.kafkaReadPropsForPipeline == null) {
throw new IllegalStateException("Kafka Read Properties for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if Properties for the Kafka Output are set.
if (this.kafkaWritePropsForPipeline == null) {
throw new IllegalStateException("Kafka Write Properties for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if the Kafka input topic is set.
if (this.kafkaInputTopic == null) {
throw new IllegalStateException("Kafka input topic for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if the Kafka output topic is set.
if (this.kafkaOutputTopic == null) {
throw new IllegalStateException("kafka output topic for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if the downsampleInterval (tumbling window time) is set.
if (this.downsampleInterval <= 0) {
throw new IllegalStateException(
"downsample interval for pipeline not set or not bigger than 0! "
+ defaultPipelineWarning);
}
// Build Pipeline Using the pipelineBuilder
final Uc2PipelineBuilder pipeBuilder = new Uc2PipelineBuilder();
this.uc2JetPipeline =
pipeBuilder.build(this.kafkaReadPropsForPipeline, this.kafkaWritePropsForPipeline,
this.kafkaInputTopic, this.kafkaOutputTopic, this.downsampleInterval);
// Return Uc2HazelcastJetBuilder factory
return this;
}
/////////////
// Layer 3 //
/////////////
/**
* Sets kafka read properties for pipeline used in this builder.
*
* @param kafkaReadProperties A propeties object containing necessary values used for the hazelcst
* jet kafka connection to read data.
* @return The Uc2HazelcastJetBuilder factory with set kafkaReadPropsForPipeline.
*/
public Uc2HazelcastJetFactory setCustomReadProperties(// NOPMD
final Properties kafkaReadProperties) {
this.kafkaReadPropsForPipeline = kafkaReadProperties;
return this;
}
/**
* Sets kafka write properties for pipeline used in this builder.
*
* @param kafkaWriteProperties A propeties object containing necessary values used for the
* hazelcst jet kafka connection to write data.
* @return The Uc2HazelcastJetBuilder factory with set kafkaWritePropsForPipeline.
*/
public Uc2HazelcastJetFactory setCustomWriteProperties(// NOPMD
final Properties kafkaWriteProperties) {
this.kafkaWritePropsForPipeline = kafkaWriteProperties;
return this;
}
/**
* Sets kafka read properties for pipeline used in this builder using environment variables.
*
* @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server
* can be fetched from the environment.
* @param schemaRegistryUrlDefault Default schema registry url in the case that no schema registry
* url can be fetched from the environment.
* @return The Uc2HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline.
*/
public Uc2HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD
final String bootstrapServersDefault,
final String schemaRegistryUrlDefault,
final String jobName) {
// Use KafkaPropertiesBuilder to build a properties object used for kafka
final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
final Properties kafkaReadProps =
propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault,
jobName,
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
this.kafkaReadPropsForPipeline = kafkaReadProps;
return this;
}
/**
* Sets kafka write properties for pipeline used in this builder using environment variables.
*
* @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server
* can be fetched from the environment.
* @return The Uc2HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline.
*/
public Uc2HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD
final String bootstrapServersDefault, final String schemaRegistryUrlDefault) {
// Use KafkaPropertiesBuilder to build a properties object used for kafka
final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
final Properties kafkaWriteProps =
propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault,
StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName());
this.kafkaWritePropsForPipeline = kafkaWriteProps;
return this;
}
/**
* Sets the kafka input topic for the pipeline used in this builder.
*
* @param inputTopic The kafka topic used as the pipeline input.
* @return A Uc2HazelcastJetBuilder factory with a set kafkaInputTopic.
*/
public Uc2HazelcastJetFactory setCustomKafkaInputTopic(// NOPMD
final String inputTopic) {
this.kafkaInputTopic = inputTopic;
return this;
}
/**
* Sets the kafka input output for the pipeline used in this builder.
*
* @param outputTopic The kafka topic used as the pipeline output.
* @return A Uc2HazelcastJetBuilder factory with a set kafkaOutputTopic.
*/
public Uc2HazelcastJetFactory setCustomKafkaOutputTopic(final String outputTopic) { // NOPMD
this.kafkaOutputTopic = outputTopic;
return this;
}
/**
* Sets the kafka input topic for the pipeline used in this builder using environment variables.
*
* @param defaultInputTopic The default kafka input topic used if no topic is specified by the
* environment.
* @return A Uc2HazelcastJetBuilder factory with a set kafkaInputTopic.
*/
public Uc2HazelcastJetFactory setKafkaInputTopicFromEnv(// NOPMD
final String defaultInputTopic) {
this.kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
defaultInputTopic);
return this;
}
/**
* Sets the kafka output topic for the pipeline used in this builder using environment variables.
*
* @param defaultOutputTopic The default kafka output topic used if no topic is specified by the
* environment.
* @return A Uc2HazelcastJetBuilder factory with a set kafkaOutputTopic.
*/
public Uc2HazelcastJetFactory setKafkaOutputTopicFromEnv(// NOPMD
final String defaultOutputTopic) {
this.kafkaOutputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC),
defaultOutputTopic);
return this;
}
/**
* Sets the downsample interval for the pipeline used in this builder.
*
* @param downsampleInterval the downsample interval to be used for this pipeline.
* @return A Uc2HazelcastJetFactory with a set downsampleInterval.
*/
public Uc2HazelcastJetFactory setCustomDownsampleInterval(// NOPMD
final int downsampleInterval) {
this.downsampleInterval = downsampleInterval;
return this;
}
/**
* Sets the downsample interval for the pipeline used in this builder from the environment.
*
* @param defaultDownsampleInterval the default downsample interval to be used for this pipeline
* when none is set in the environment.
* @return A Uc2HazelcastJetFactory with a set downsampleInterval.
*/
public Uc2HazelcastJetFactory setDownsampleIntervalFromEnv(// NOPMD
final String defaultDownsampleInterval) {
final String downsampleInterval = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.DOWNSAMPLE_INTERVAL),
defaultDownsampleInterval);
final int downsampleIntervalNumber = Integer.parseInt(downsampleInterval);
this.downsampleInterval = downsampleIntervalNumber;
return this;
}
}
package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Builder to build a HazelcastJet Pipeline for UC2 which can be used for stream processing using
* Hazelcast Jet.
*/
public class Uc2PipelineBuilder {
/**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet.
*
* @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
* attributes.
* @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
* attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline.
* @param kafkaOutputTopic The name of the output topic used for the pipeline.
* @param downsampleIntervalInMs The window length of the tumbling window used in the aggregation
* of this pipeline.
* @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data
* for UC2.
*/
public Pipeline build(final Properties kafkaReadPropsForPipeline,
final Properties kafkaWritePropsForPipeline, final String kafkaInputTopic,
final String kafkaOutputTopic,
final int downsampleIntervalInMs) {
// Define a new pipeline
final Pipeline pipe = Pipeline.create();
// Define the Kafka Source
final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource =
KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic);
// Extend UC2 topology to the pipeline
final StreamStage<Map.Entry<String, String>> uc2TopologyProduct =
this.extendUc2Topology(pipe, kafkaSource, downsampleIntervalInMs);
// Add Sink1: Logger
uc2TopologyProduct.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark
uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka(
kafkaWritePropsForPipeline, kafkaOutputTopic));
return pipe;
}
/**
* Extends to a blank Hazelcast Jet Pipeline the UC2 topology defined by theodolite.
*
* <p>
* UC2 takes {@code ActivePowerRecord} objects, groups them by keys, windows them in a tumbling
* window and aggregates them into {@code Stats} objects. The final map returns an
* {@code Entry<String,String>} where the key is the key of the group and the String is the
* {@code .toString()} representation of the {@code Stats} object.
* </p>
*
* @param pipe The blank hazelcast jet pipeline to extend the logic to.
* @param source A streaming source to fetch data from.
* @param downsampleIntervalInMs The size of the tumbling window.
* @return A {@code StreamStage<Map.Entry<String,String>>} with the above definition of the key
* and value of the Entry object. It can be used to be further modified or directly be
* written into a sink.
*/
public StreamStage<Map.Entry<String, String>> extendUc2Topology(final Pipeline pipe,
final StreamSource<Entry<String, ActivePowerRecord>> source,
final int downsampleIntervalInMs) {
// Build the pipeline topology.
return pipe.readFrom(source)
.withNativeTimestamps(0)
.setLocalParallelism(1)
.groupingKey(record -> record.getValue().getIdentifier())
.window(WindowDefinition.tumbling(downsampleIntervalInMs))
.aggregate(this.uc2AggregateOperation())
.map(agg -> {
final String theKey = agg.key();
final String theValue = agg.getValue().toString();
return Map.entry(theKey, theValue);
});
}
/**
* Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast
* Jet implementation of UC2.
*
* <p>
* Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a
* {@Stats} object.
* </p>
*
* @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates
* ActivePowerRecord Objects into Stats Objects.
*/
public AggregateOperation1<Entry<String, ActivePowerRecord>,
StatsAccumulator, Stats> uc2AggregateOperation() {
// Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using
// the Statsaccumulator.
return AggregateOperation
// Creates the accumulator
.withCreate(new StatsAccumulatorSupplier())
// Defines the accumulation
.<Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> {
accumulator.add(item.getValue().getValueInW());
})
// Defines the combination of spread out instances
.andCombine((left, right) -> {
final Stats rightStats = right.snapshot();
left.addAll(rightStats);
})
// Finishes the aggregation
.andExportFinish(
(accumulator) -> {
return accumulator.snapshot();
});
}
}
package rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.StreamSerializer;
import java.io.IOException;
/**
* A serializer and deserializer for the StatsAccumulator which is used in the UC2 implementation
* using Hazelcast Jet.
*/
public class StatsAccumulatorSerializer implements StreamSerializer<StatsAccumulator> {
private static final int TYPE_ID = 69_420;
@Override
public int getTypeId() {
return TYPE_ID;
}
@Override
public void write(final ObjectDataOutput out, final StatsAccumulator object) throws IOException {
final byte[] byteArray = object.snapshot().toByteArray();
out.writeByteArray(byteArray);
}
@Override
public StatsAccumulator read(final ObjectDataInput in) throws IOException {
final byte[] byteArray = in.readByteArray();
final Stats deserializedStats = Stats.fromByteArray(byteArray);
final StatsAccumulator accumulator = new StatsAccumulator();
accumulator.addAll(deserializedStats);
return accumulator;
}
}
package rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics;
import com.google.common.math.StatsAccumulator;
import com.hazelcast.function.SupplierEx;
/**
* Supplies a StatsAccumulator. Is used in the aggregation operation of the Hazelcast Jet
* implementation for UC2.
*/
public class StatsAccumulatorSupplier implements SupplierEx<StatsAccumulator> {
private static final long serialVersionUID = -656395626316842910L; // NOPMD
/**
* Gets a StatsAccumulator.
*/
@Override
public StatsAccumulator getEx() throws Exception {
return new StatsAccumulator();
}
}
application.name=theodolite-uc1-application
application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
schema.registry.url=http://localhost:8081
package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
import com.hazelcast.jet.pipeline.test.Assertions;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.test.SerialTest;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletionException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Test methods for the Hazelcast Jet Implementation of UC2.
*/
@Category(SerialTest.class)
public class Uc2PipelineTest extends JetTestSupport {
JetInstance testInstance = null;
Pipeline testPipeline = null;
StreamStage<Entry<String, String>> uc2Topology = null;
/*
* Creates the JetInstance, defines a new Hazelcast Jet Pipeline and extends the UC2 topology.
* Allows for quick extension of tests.
*/
@Before
public void buildUc2Pipeline() {
// Setup Configuration
int testItemsPerSecond = 1;
String testSensorName = "TEST-SENSOR";
Double testValueInW = 10.0;
int testWindowInMs = 5000;
// Create mock jet instance with configuration
final String testClusterName = randomName();
final JetConfig testJetConfig = new JetConfig();
testJetConfig.getHazelcastConfig().setClusterName(testClusterName);
this.testInstance = this.createJetMember(testJetConfig);
// Create a test source
final StreamSource<Entry<String, ActivePowerRecord>> testSource =
TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> {
final ActivePowerRecord testRecord =
new ActivePowerRecord(testSensorName, timestamp, testValueInW);
final Entry<String, ActivePowerRecord> testEntry =
Map.entry(testSensorName, testRecord);
return testEntry;
});
// Create pipeline to test
Uc2PipelineBuilder pipelineBuilder = new Uc2PipelineBuilder();
this.testPipeline = Pipeline.create();
this.uc2Topology =
pipelineBuilder.extendUc2Topology(this.testPipeline, testSource, testWindowInMs);
}
/**
* Tests if no items reach the end before the first window ends.
*/
@Test
public void testOutput() {
// Assertion Configuration
int timeout = 14;
String expectedOutput = "Stats{count=5, mean=10.0, populationStandardDeviation=0.0, min=10.0, max=10.0}";
// Assertion
this.uc2Topology.apply(Assertions.assertCollectedEventually(timeout,
collection -> Assert.assertTrue(
"Not the right amount items in Stats Object!",
collection.get(collection.size()-1).getValue().equals(expectedOutput))));
// Run the test!
try {
this.testInstance.newJob(this.testPipeline).join();
Assert.fail("Job should have completed with an AssertionCompletedException, "
+ "but completed normally!");
} catch (final CompletionException e) {
final String errorMsg = e.getCause().getMessage();
Assert.assertTrue(
"Job was expected to complete with AssertionCompletedException, but completed with: "
+ e.getCause(),
errorMsg.contains(AssertionCompletedException.class.getName()));
}
}
@After
public void after() {
// Shuts down all running Jet Instances
Jet.shutdownAll();
}
}
...@@ -21,7 +21,7 @@ public final class Uc3BeamFlink { ...@@ -21,7 +21,7 @@ public final class Uc3BeamFlink {
* Start running this microservice. * Start running this microservice.
*/ */
public static void main(final String[] args) { public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
} }
} }
...@@ -21,7 +21,7 @@ public final class Uc3BeamSamza { ...@@ -21,7 +21,7 @@ public final class Uc3BeamSamza {
* Start running this microservice. * Start running this microservice.
*/ */
public static void main(final String[] args) { public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
} }
} }
......
...@@ -91,7 +91,8 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -91,7 +91,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
protected void registerCoders(final CoderRegistry registry) { protected void registerCoders(final CoderRegistry registry) {
registry.registerCoderForClass( registry.registerCoderForClass(
ActivePowerRecord.class, ActivePowerRecord.class,
AvroCoder.of(ActivePowerRecord.SCHEMA$)); // AvroCoder.of(ActivePowerRecord.SCHEMA$));
AvroCoder.of(ActivePowerRecord.class, false));
registry.registerCoderForClass( registry.registerCoderForClass(
HourOfDayKey.class, HourOfDayKey.class,
new HourOfDayKeyCoder()); new HourOfDayKeyCoder());
......
cleanup.add_all=false
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.array_with_curly=false
cleanup.arrays_fill=false
cleanup.bitwise_conditional_expression=false
cleanup.boolean_literal=false
cleanup.boolean_value_rather_than_comparison=true
cleanup.break_loop=false
cleanup.collection_cloning=false
cleanup.comparing_on_criteria=false
cleanup.comparison_statement=false
cleanup.controlflow_merge=false
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.convert_to_enhanced_for_loop_if_loop_var_used=true
cleanup.convert_to_switch_expressions=false
cleanup.correct_indentation=true
cleanup.do_while_rather_than_while=true
cleanup.double_negation=false
cleanup.else_if=false
cleanup.embedded_if=false
cleanup.evaluate_nullable=false
cleanup.extract_increment=false
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.hash=false
cleanup.if_condition=false
cleanup.insert_inferred_type_arguments=false
cleanup.instanceof=false
cleanup.instanceof_keyword=false
cleanup.invert_equals=false
cleanup.join=false
cleanup.lazy_logical_operator=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.map_cloning=false
cleanup.merge_conditional_blocks=false
cleanup.multi_catch=false
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.no_string_creation=false
cleanup.no_super=false
cleanup.number_suffix=false
cleanup.objects_equals=false
cleanup.one_if_rather_than_duplicate_blocks_that_fall_through=true
cleanup.operand_factorization=false
cleanup.organize_imports=true
cleanup.overridden_assignment=false
cleanup.plain_replacement=false
cleanup.precompile_regex=false
cleanup.primitive_comparison=false
cleanup.primitive_parsing=false
cleanup.primitive_rather_than_wrapper=true
cleanup.primitive_serialization=false
cleanup.pull_out_if_from_if_else=false
cleanup.pull_up_assignment=false
cleanup.push_down_negation=false
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.reduce_indentation=false
cleanup.redundant_comparator=false
cleanup.redundant_falling_through_block_end=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_array_creation=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.return_expression=false
cleanup.simplify_lambda_expression_and_method_ref=false
cleanup.single_used_field=false
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.standard_comparison=false
cleanup.static_inner_class=false
cleanup.strictly_equal_or_different=false
cleanup.stringbuffer_to_stringbuilder=false
cleanup.stringbuilder=false
cleanup.stringbuilder_for_local_vars=true
cleanup.stringconcat_to_textblock=false
cleanup.substring=false
cleanup.switch=false
cleanup.system_property=false
cleanup.system_property_boolean=false
cleanup.system_property_file_encoding=false
cleanup.system_property_file_separator=false
cleanup.system_property_line_separator=false
cleanup.system_property_path_separator=false
cleanup.ternary_operator=false
cleanup.try_with_resource=false
cleanup.unlooped_while=false
cleanup.unreachable_block=false
cleanup.use_anonymous_class_creation=false
cleanup.use_autoboxing=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_directly_map_method=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_string_is_blank=false
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup.use_unboxing=false
cleanup.use_var=false
cleanup.useless_continue=false
cleanup.useless_return=false
cleanup.valueof_rather_than_instantiation=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
org.eclipse.jdt.ui.text.custom_code_templates=
sp_cleanup.add_all=false
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.array_with_curly=false
sp_cleanup.arrays_fill=false
sp_cleanup.bitwise_conditional_expression=false
sp_cleanup.boolean_literal=false
sp_cleanup.boolean_value_rather_than_comparison=false
sp_cleanup.break_loop=false
sp_cleanup.collection_cloning=false
sp_cleanup.comparing_on_criteria=true
sp_cleanup.comparison_statement=false
sp_cleanup.controlflow_merge=false
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.convert_to_enhanced_for_loop_if_loop_var_used=false
sp_cleanup.convert_to_switch_expressions=false
sp_cleanup.correct_indentation=true
sp_cleanup.do_while_rather_than_while=false
sp_cleanup.double_negation=false
sp_cleanup.else_if=false
sp_cleanup.embedded_if=false
sp_cleanup.evaluate_nullable=false
sp_cleanup.extract_increment=false
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.hash=false
sp_cleanup.if_condition=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.instanceof=false
sp_cleanup.instanceof_keyword=false
sp_cleanup.invert_equals=false
sp_cleanup.join=false
sp_cleanup.lazy_logical_operator=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.map_cloning=false
sp_cleanup.merge_conditional_blocks=false
sp_cleanup.multi_catch=false
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.no_string_creation=false
sp_cleanup.no_super=false
sp_cleanup.number_suffix=false
sp_cleanup.objects_equals=false
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.one_if_rather_than_duplicate_blocks_that_fall_through=false
sp_cleanup.operand_factorization=false
sp_cleanup.organize_imports=true
sp_cleanup.overridden_assignment=false
sp_cleanup.plain_replacement=false
sp_cleanup.precompile_regex=false
sp_cleanup.primitive_comparison=false
sp_cleanup.primitive_parsing=false
sp_cleanup.primitive_rather_than_wrapper=false
sp_cleanup.primitive_serialization=false
sp_cleanup.pull_out_if_from_if_else=false
sp_cleanup.pull_up_assignment=false
sp_cleanup.push_down_negation=false
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.reduce_indentation=false
sp_cleanup.redundant_comparator=false
sp_cleanup.redundant_falling_through_block_end=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=false
sp_cleanup.remove_redundant_type_arguments=false
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_array_creation=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.return_expression=false
sp_cleanup.simplify_lambda_expression_and_method_ref=false
sp_cleanup.single_used_field=false
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.standard_comparison=false
sp_cleanup.static_inner_class=false
sp_cleanup.strictly_equal_or_different=false
sp_cleanup.stringbuffer_to_stringbuilder=false
sp_cleanup.stringbuilder=false
sp_cleanup.stringbuilder_for_local_vars=false
sp_cleanup.stringconcat_to_textblock=false
sp_cleanup.substring=false
sp_cleanup.switch=false
sp_cleanup.system_property=false
sp_cleanup.system_property_boolean=false
sp_cleanup.system_property_file_encoding=false
sp_cleanup.system_property_file_separator=false
sp_cleanup.system_property_line_separator=false
sp_cleanup.system_property_path_separator=false
sp_cleanup.ternary_operator=false
sp_cleanup.try_with_resource=false
sp_cleanup.unlooped_while=false
sp_cleanup.unreachable_block=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_autoboxing=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_directly_map_method=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_string_is_blank=false
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
sp_cleanup.use_unboxing=false
sp_cleanup.use_var=false
sp_cleanup.useless_continue=true
sp_cleanup.useless_return=true
sp_cleanup.valueof_rather_than_instantiation=false
configFilePath=../config/checkstyle.xml
customModulesJarPaths=
eclipse.preferences.version=1
enabled=false
customRulesJars=
eclipse.preferences.version=1
enabled=false
ruleSetFilePath=../config/pmd.xml
FROM openjdk:11-slim
ADD build/distributions/uc3-hazelcastjet.tar /
CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
/uc3-hazelcastjet/bin/uc3-hazelcastjet
\ No newline at end of file
plugins {
id 'theodolite.hazelcastjet'
}
mainClassName = "rocks.theodolite.benchmarks.uc3.hazelcastjet.HistoryService"
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A microservice that manages the history and, therefore, stores and aggregates incoming
* measurements.
*/
public class HistoryService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
// Hazelcast settings (default)
private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
// Kafka settings (default)
private static final String KAFKA_BOOTSTRAP_DEFAULT = "localhost:9092";
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_INPUT_TOPIC_DEFAULT = "input";
private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output";
// UC3 specific (default)
private static final String WINDOW_SIZE_IN_SECONDS_DEFAULT = "2629800";
private static final String HOPSIZE_IN_SEC_DEFAULT = "86400";
// Job name (default)
private static final String JOB_NAME = "uc3-hazelcastjet";
/**
* Entrypoint for UC3 using Gradle Run.
*/
public static void main(final String[] args) {
final HistoryService uc3HistoryService = new HistoryService();
try {
uc3HistoryService.run();
} catch (final Exception e) { // NOPMD
LOGGER.error("ABORT MISSION!: {}", e);
}
}
/**
* Start a UC3 service.
*
* @throws Exception This Exception occurs if the Uc3HazelcastJetFactory is used in the wrong way.
* Detailed data is provided once an Exception occurs.
*/
public void run() throws Exception { // NOPMD
this.createHazelcastJetApplication();
}
/**
* Creates a Hazelcast Jet Application for UC3 using the Uc3HazelcastJetFactory.
*
* @throws Exception This Exception occurs if the Uc3HazelcastJetFactory is used in the wrong way.
* Detailed data is provided once an Exception occurs.
*/
private void createHazelcastJetApplication() throws Exception { // NOPMD
new Uc3HazelcastJetFactory()
.setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT, JOB_NAME)
.setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
.setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT)
.setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT)
.setWindowSizeInSecondsFromEnv(WINDOW_SIZE_IN_SECONDS_DEFAULT)
.setHoppingSizeInSecondsFromEnv(HOPSIZE_IN_SEC_DEFAULT)
.buildUc3Pipeline()
.buildUc3JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY)
.runUc3Job(JOB_NAME);
}
}
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HoursOfDayKeyFactory;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Builder to build a HazelcastJet Pipeline for UC3 which can be used for stream processing using
* Hazelcast Jet.
*/
public class Uc3PipelineBuilder {
/**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet.
*
* @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
* attributes.
* @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
* attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline.
* @param kafkaOutputTopic The name of the output topic used for the pipeline.
* @param hoppingSizeInSeconds The hop length of the sliding window used in the aggregation of
* this pipeline.
* @param windowSizeInSeconds The window length of the sliding window used in the aggregation of
* this pipeline.
* @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data
* for UC3.
*/
public Pipeline build(final Properties kafkaReadPropsForPipeline,
final Properties kafkaWritePropsForPipeline, final String kafkaInputTopic,
final String kafkaOutputTopic,
final int hoppingSizeInSeconds, final int windowSizeInSeconds) {
// Define a new Pipeline
final Pipeline pipe = Pipeline.create();
// Define the source
final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource = KafkaSources
.<String, ActivePowerRecord>kafka(
kafkaReadPropsForPipeline, kafkaInputTopic);
// Extend topology for UC3
final StreamStage<Map.Entry<String, String>> uc3Product =
this.extendUc3Topology(pipe, kafkaSource, hoppingSizeInSeconds, windowSizeInSeconds);
// Add Sink1: Logger
uc3Product.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark
uc3Product.writeTo(KafkaSinks.<String, String>kafka(
kafkaWritePropsForPipeline, kafkaOutputTopic));
return pipe;
}
/**
* Extends to a blank Hazelcast Jet Pipeline the UC3 topology defined by theodolite.
*
* <p>
* UC3 takes {@code ActivePowerRecord} object, groups them by keys and calculates average double
* values for a sliding window and sorts them into the hour of the day.
* </p>
*
* @param pipe The blank hazelcast jet pipeline to extend the logic to.
* @param source A streaming source to fetch data from.
* @param hoppingSizeInSeconds The jump distance of the "sliding" window.
* @param windowSizeInSeconds The size of the "sliding" window.
* @return A {@code StreamStage<Map.Entry<String,String>>} with the above definition of the key
* and value of the Entry object. It can be used to be further modified or directly be
* written into a sink.
*/
public StreamStage<Map.Entry<String, String>> extendUc3Topology(final Pipeline pipe,
final StreamSource<Entry<String, ActivePowerRecord>> source, final int hoppingSizeInSeconds,
final int windowSizeInSeconds) {
// Build the pipeline topology.
return pipe
.readFrom(source)
// use Timestamps
.withNativeTimestamps(0)
.setLocalParallelism(1)
// Map timestamp to hour of day and create new key using sensorID and
// datetime mapped to HourOfDay
.map(record -> {
final String sensorId = record.getValue().getIdentifier();
final long timestamp = record.getValue().getTimestamp();
final LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
TimeZone.getDefault().toZoneId());
final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
final HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime);
return Map.entry(newKey, record.getValue());
})
// group by new keys
.groupingKey(Entry::getKey)
// Sliding/Hopping Window
.window(WindowDefinition.sliding(TimeUnit.SECONDS.toMillis(windowSizeInSeconds),
TimeUnit.SECONDS.toMillis(hoppingSizeInSeconds)))
// get average value of group (sensoreId,hourOfDay)
.aggregate(
AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
// map to return pair (sensorID,hourOfDay) -> (averaged what value)
.map(agg -> {
final String theValue = agg.getValue().toString();
final String theKey = agg.getKey().toString();
return Map.entry(theKey, theValue);
});
}
}
package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics;
import java.util.Objects;
/**
* A key consisting of a hour of a day and a sensorID.
*
*/
public class HourOfDayKey {
private final int hourOfDay;
private final String sensorId;
public HourOfDayKey(final int hourOfDay, final String sensorId) {
this.hourOfDay = hourOfDay;
this.sensorId = sensorId;
}
public int getHourOfDay() {
return this.hourOfDay;
}
public String getSensorId() {
return this.sensorId;
}
@Override
public String toString() {
return this.sensorId + ";" + this.hourOfDay;
}
@Override
public int hashCode() {
return Objects.hash(this.hourOfDay, this.sensorId);
}
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof HourOfDayKey) {
final HourOfDayKey other = (HourOfDayKey) obj;
return Objects.equals(this.hourOfDay, other.hourOfDay)
&& Objects.equals(this.sensorId, other.sensorId);
}
return false;
}
}
package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.StreamSerializer;
import java.io.IOException;
/**
* A pipeline serializer for the HourOfDayKey to allow for parallelization.
*
*/
public class HourOfDayKeySerializer implements StreamSerializer<HourOfDayKey> {
private static final int TYPE_ID = 1;
@Override
public int getTypeId() {
return TYPE_ID;
}
@Override
public void write(final ObjectDataOutput out, final HourOfDayKey key) throws IOException {
out.writeInt(key.getHourOfDay());
out.writeString(key.getSensorId());
}
@Override
public HourOfDayKey read(final ObjectDataInput in) throws IOException {
return new HourOfDayKey(in.readInt(), in.readString());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment