Skip to content
Snippets Groups Projects
Commit 6c90c194 authored by Sören Henning's avatar Sören Henning
Browse files

Align Hazelcast jet implementation with others

parent af09b9e9
No related branches found
No related tags found
No related merge requests found
Pipeline #8596 passed
......@@ -57,18 +57,8 @@ public class Uc4HazelcastJetFactory {
* @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet.
*/
public void runUc4Job(final String jobName) throws IllegalStateException { // NOPMD
// Check if a Jet Instance for UC4 is set.
if (this.uc4JetInstance == null) {
throw new IllegalStateException("Jet Instance is not set! "
+ "Cannot start a hazelcast jet job for UC4.");
}
// Check if a Pipeline for UC3 is set.
if (this.uc4JetPipeline == null) {
throw new IllegalStateException(
"Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC4.");
}
Objects.requireNonNull(this.uc4JetInstance, "Jet instance is not set.");
Objects.requireNonNull(this.uc4JetPipeline, "Jet pipeline is not set.");
// Adds the job name and joins a job to the JetInstance defined in this factory
final JobConfig jobConfig = new JobConfig()
......
......@@ -17,7 +17,6 @@ import com.hazelcast.jet.pipeline.StreamStageWithKey;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
......@@ -191,20 +190,10 @@ public class Uc4PipelineBuilder {
.<Set<String>, Entry<String, ValueGroup>>mapUsingIMap(
SENSOR_PARENT_MAP_NAME,
(sensorEvent, sensorParentsSet) -> {
// Check whether a groupset exists for a key or not
if (sensorParentsSet == null) {
// No group set exists for this key: return valuegroup with default null group set.
final Set<String> nullSet = new HashSet<>();
nullSet.add("NULL-GROUPSET");
return Util.entry(sensorEvent.getKey(),
new ValueGroup(sensorEvent.getValue(), nullSet));
} else {
// Group set exists for this key: return valuegroup with the groupset.
final ValueGroup valueParentsPair =
new ValueGroup(sensorEvent.getValue(), sensorParentsSet);
// Return solution
return Util.entry(sensorEvent.getKey(), valueParentsPair);
}
final ValueGroup valueParentsPair = new ValueGroup(
sensorEvent.getValue(),
sensorParentsSet == null ? Set.of() : sensorParentsSet);
return Util.entry(sensorEvent.getKey(), valueParentsPair);
});
//////////////////////////////////
......
......@@ -39,7 +39,6 @@ import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSeria
@Category(SerialTest.class)
public class Uc4PipelineTest extends JetTestSupport {
// TEst Machinery
JetInstance testInstance = null;
Pipeline testPipeline = null;
StreamStage<Entry<String, AggregatedActivePowerRecord>> uc4Topology = null;
......@@ -55,7 +54,7 @@ public class Uc4PipelineTest extends JetTestSupport {
final Double testValueInW = 10.0;
final int testWindowSize = 5000; // As window size is bugged, not necessary.
// Create mock jet instance with configuration
// Create mocked Hazelcast Jet instance with configuration
final String testClusterName = randomName();
final JetConfig testJetConfig = new JetConfig();
testJetConfig.getHazelcastConfig().setClusterName(testClusterName);
......@@ -218,7 +217,6 @@ public class Uc4PipelineTest extends JetTestSupport {
@After
public void after() {
System.out.println("Shutting down");
// Shuts down all running Jet Instances
Jet.shutdownAll();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment