Skip to content
Snippets Groups Projects
Commit 05c84d2b authored by Sören Henning's avatar Sören Henning
Browse files

Align Hazelcast UC4 intervals with others

additionally, do some minor code cleanup
parent 0d4131dc
Branches
Tags
No related merge requests found
Pipeline #10438 canceled
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import com.hazelcast.function.SupplierEx;
import java.util.HashMap;
import java.util.Set;
/**
* Supplies a {@link HashMap} and implements {@link SupplierEx}.
*/
public class HashMapSupplier implements SupplierEx<HashMap<String, Set<String>>> {
private static final long serialVersionUID = -6247504592403610702L; // NOPMD
@Override
public HashMap<String, Set<String>> get() {
return new HashMap<>();
}
@Override
public HashMap<String, Set<String>> getEx() throws Exception {
return this.get();
}
}
......@@ -59,6 +59,9 @@ public class HistoryService extends HazelcastJetService {
final Duration gracePeriod = Duration.ofMillis(
this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS));
final Duration triggerPeriod = Duration.ofSeconds(
this.config.getInt(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
this.pipelineFactory = new Uc4PipelineFactory(
kafkaProps,
kafkaConfigReadProps,
......@@ -66,7 +69,8 @@ public class HistoryService extends HazelcastJetService {
kafkaWriteProps,
this.kafkaInputTopic, outputTopic, configurationTopic, feedbackTopic,
emirPeriod,
gracePeriod);
gracePeriod,
triggerPeriod);
}
@Override
......
......@@ -15,4 +15,6 @@ public class Uc4ConfigurationKeys {
public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds";
}
......@@ -16,9 +16,7 @@ import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.StreamStageWithKey;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
......@@ -47,6 +45,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
private final Duration gracePeriod;
private final Duration triggerPeriod;
/**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet.
......@@ -74,7 +74,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
final String kafkaConfigurationTopic,
final String kafkaFeedbackTopic,
final Duration emitPeriod,
final Duration gracePeriod) {
final Duration gracePeriod,
final Duration triggerPeriod) {
super(kafkaInputReadPropsForPipeline, kafkaInputTopic,
kafkaWritePropsForPipeline, kafkaOutputTopic);
......@@ -84,6 +85,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
this.kafkaFeedbackTopic = kafkaFeedbackTopic;
this.emitPeriod = emitPeriod;
this.gracePeriod = gracePeriod;
this.triggerPeriod = triggerPeriod;
}
/**
......@@ -229,7 +231,9 @@ public class Uc4PipelineFactory extends PipelineFactory {
// (5) UC4 Last Value Map
// Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time
final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> windowedLastValues =
dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.emitPeriod.toMillis()));
dupliAsFlatmappedStage.window(WindowDefinition
.tumbling(this.emitPeriod.toMillis())
.setEarlyResultsPeriod(this.triggerPeriod.toMillis()));
final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = // NOCS
AggregateOperation
......@@ -285,11 +289,8 @@ public class Uc4PipelineFactory extends PipelineFactory {
}
}
// Create a updates list to pass onto the next pipeline stage-
final List<Entry<String, Set<String>>> updatesList = new ArrayList<>(updates.entrySet());
// Return traverser with updates list.
return Traversers.traverseIterable(updatesList)
return Traversers.traverseIterable(updates.entrySet())
.map(e -> Util.entry(e.getKey(), e.getValue()));
}
......
......@@ -10,4 +10,5 @@ kafka.feedback.topic=aggregation-feedback
schema.registry.url=http://localhost:8081
emit.period.ms=5000
grace.period.ms=0
trigger.interval.seconds=1
grace.period.ms=5000
......@@ -120,7 +120,7 @@ public class Uc4PipelineTest extends JetTestSupport {
final Properties properties = new Properties();
final Uc4PipelineFactory factory = new Uc4PipelineFactory(
properties, properties, properties, properties, "", "",
"", "", testWindowSize, Duration.ofMillis(0));
"", "", testWindowSize, Duration.ofSeconds(1), Duration.ofMillis(0));
this.uc4Topology =
factory.extendUc4Topology(testInputSource, testAggregationSource, testConfigSource);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment