diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1HazelcastJetFactory.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1HazelcastJetFactory.java index 13576ad291515eea3e23ec099b5d9d5af36f2d4f..1a8f6344ceb7d8d575af03394ac8e03885676c4f 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1HazelcastJetFactory.java @@ -34,9 +34,8 @@ public class Uc1HazelcastJetFactory { * JetInstance as a job. * * @param jobName The name of the job. - * @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet. */ - public void runUc1Job(final String jobName) throws IllegalStateException { + public void runUc1Job(final String jobName) { // Check if a Jet Instance for UC1 is set. if (this.uc1JetInstance == null) { @@ -83,10 +82,8 @@ public class Uc1HazelcastJetFactory { * topic and kafka properties defined in this factory beforehand. * * @return A Uc1HazelcastJetFactory containg a set pipeline. - * @throws Exception If the input topic or the kafka properties are not defined, the pipeline - * cannot be built. */ - public Uc1HazelcastJetFactory buildUc1Pipeline() throws IllegalStateException { + public Uc1HazelcastJetFactory buildUc1Pipeline() { // Check if Properties for the Kafka Input are set. if (this.kafkaPropertiesForPipeline == null) { diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java index f3fe409d035ce36540bbf9f1a17d870f93621e1e..270fe9d1ee994a6e8b83ad690dc4443d6d222311 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java @@ -1,6 +1,5 @@ package theodolite.uc1.application; -import com.google.gson.Gson; import com.hazelcast.jet.Jet; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JetConfig; @@ -28,7 +27,6 @@ import titan.ccp.model.records.ActivePowerRecord; @Category(SerialTest.class) public class Uc1PipelineTest extends JetTestSupport { - private static final Gson GSON = new Gson(); private JetInstance testInstance = null; private Pipeline testPipeline = null; private StreamStage<String> uc1Topology = null; diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java index dd741a8d63e276c043bf3db883be34666c5740ae..46f9b1e54cb2938bf4e1a5ced1e2fcedc11b96cb 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java @@ -40,9 +40,8 @@ public class Uc2HazelcastJetFactory { * JetInstance as a job. * * @param jobName The name of the job. - * @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet. */ - public void runUc2Job(final String jobName) throws IllegalStateException { + public void runUc2Job(final String jobName) { // Check if a Jet Instance for UC2 is set. if (this.uc2JetInstance == null) { diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/uc2specifics/StatsAccumulatorSerializer.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/uc2specifics/StatsAccumulatorSerializer.java index 7257ef18388176f2fe7ee0d3525ce3cbd0a67944..8d89fb274e73a13816cf7ab431143d81b31334aa 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/uc2specifics/StatsAccumulatorSerializer.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/uc2specifics/StatsAccumulatorSerializer.java @@ -13,11 +13,10 @@ import java.io.IOException; */ public class StatsAccumulatorSerializer implements StreamSerializer<StatsAccumulator> { - private static final int TYPE_ID = 69420; + private static final int TYPE_ID = 69_420; @Override public int getTypeId() { - // TODO Auto-generated method stub return TYPE_ID; } diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/uc3specifics/HourOfDayKeySerializer.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/uc3specifics/HourOfDayKeySerializer.java index cafe50875efe279ccbbaa91e7444300e52db2d0c..cc75d438f2e1576102e3e12a714d309d9443132b 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/uc3specifics/HourOfDayKeySerializer.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/uc3specifics/HourOfDayKeySerializer.java @@ -18,12 +18,12 @@ public class HourOfDayKeySerializer implements StreamSerializer<HourOfDayKey> { @Override public void write(final ObjectDataOutput out, final HourOfDayKey key) throws IOException { out.writeInt(key.getHourOfDay()); - out.writeUTF(key.getSensorId()); + out.writeString(key.getSensorId()); } @Override public HourOfDayKey read(final ObjectDataInput in) throws IOException { - return new HourOfDayKey(in.readInt(), in.readUTF()); + return new HourOfDayKey(in.readInt(), in.readString()); } } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java index 180e750fcb8303d568af30f806d630c04119e821..2ba3f7897a4e47394dacb043f69a6145922971cc 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java @@ -18,6 +18,7 @@ import com.hazelcast.jet.pipeline.WindowDefinition; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; @@ -120,7 +121,7 @@ public class Uc4PipelineBuilder { * according aggregated values. The data can be further modified or directly be linked to * a Hazelcast Jet sink. */ - public StreamStage<Entry<String, Double>> extendUc4Topology(final Pipeline pipe,//NOPMD + public StreamStage<Entry<String, Double>> extendUc4Topology(final Pipeline pipe, //NOPMD final StreamSource<Entry<String, ActivePowerRecord>> inputSource, final StreamSource<Entry<String, Double>> aggregationSource, final StreamSource<Entry<Event, String>> configurationSource, final int windowSize) { @@ -174,16 +175,11 @@ public class Uc4PipelineBuilder { // (3) UC4 Join Configuration and Merges Input/Aggregation Stream // [sensorKey , (value,Set<Groups>)] final StreamStage<Entry<String, ValueGroup>> joinedStage = mergedInputAndAggregations - .mapUsingIMap( + .<Set<String>, Entry<String, ValueGroup>>mapUsingIMap( SENSOR_PARENT_MAP_NAME, (sensorEvent, sensorParentsSet) -> { - - // Get Data - @SuppressWarnings("unchecked") - Set<String> sensorParentsCasted = (Set<String>) sensorParentsSet; - // Check whether a groupset exists for a key or not - if (sensorParentsCasted == null) { + if (sensorParentsSet == null) { // No group set exists for this key: return valuegroup with default null group set. Set<String> nullSet = new HashSet<String>(); nullSet.add("NULL-GROUPSET"); @@ -192,7 +188,7 @@ public class Uc4PipelineBuilder { } else { // Group set exists for this key: return valuegroup with the groupset. ValueGroup valueParentsPair = - new ValueGroup(sensorEvent.getValue(), sensorParentsCasted); + new ValueGroup(sensorEvent.getValue(), sensorParentsSet); // Return solution return Util.entry(sensorEvent.getKey(), valueParentsPair); } @@ -212,8 +208,7 @@ public class Uc4PipelineBuilder { // Transformed Data String[] groupList = groups.toArray(String[]::new); SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length]; - ArrayList<Entry<SensorGroupKey, Double>> newEntryList = - new ArrayList<Entry<SensorGroupKey, Double>>(); + List<Entry<SensorGroupKey, Double>> newEntryList = new ArrayList<Entry<SensorGroupKey, Double>>(); for (int i = 0; i < groupList.length; i++) { newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]); newEntryList.add(Util.entry(newKeyList[i], valueInW)); @@ -269,7 +264,7 @@ public class Uc4PipelineBuilder { transformer.constructChildParentsPairs(eventItem.getValue()); // Compare both tables - final HashMap<String, Set<String>> updates = new HashMap<String, Set<String>>(); + final Map<String, Set<String>> updates = new HashMap<>(); for (final String key : mapFromRegistry.keySet()) { if (flatMapStage.containsKey(key)) { if (!mapFromRegistry.get(key).equals(flatMapStage.get(key))) { @@ -281,8 +276,7 @@ public class Uc4PipelineBuilder { } // Create a updates list to pass onto the next pipeline stage- - final ArrayList<Entry<String, Set<String>>> updatesList = - new ArrayList<Entry<String, Set<String>>>(updates.entrySet()); + final List<Entry<String, Set<String>>> updatesList = new ArrayList<>(updates.entrySet()); // Return traverser with updates list. return Traversers.traverseIterable(updatesList)