Skip to content
Snippets Groups Projects
Commit 3d7c3072 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Adjust uc3 new impl names

parent 7a452042
Branches
Tags
1 merge request!275Refactor hazelcast jet benchmarks:
This commit is part of merge request !275. Comments created here will be created in the context of that merge request.
...@@ -37,10 +37,10 @@ public class NewHistoryService extends HazelcastJetService { ...@@ -37,10 +37,10 @@ public class NewHistoryService extends HazelcastJetService {
final String kafkaOutputTopic = final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final int windowSizeInSecondsNumber = Integer.parseInt( final int windowSizeInDaysNumber = Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()); config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString());
final int hoppingSizeInSecondsNumber = Integer.parseInt( final int hoppingSizeInDaysNumber = Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()); config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString());
this.pipelineFactory = new Uc3PipelineFactory( this.pipelineFactory = new Uc3PipelineFactory(
...@@ -48,8 +48,8 @@ public class NewHistoryService extends HazelcastJetService { ...@@ -48,8 +48,8 @@ public class NewHistoryService extends HazelcastJetService {
kafkaInputTopic, kafkaInputTopic,
kafkaWriteProps, kafkaWriteProps,
kafkaOutputTopic, kafkaOutputTopic,
windowSizeInSecondsNumber, windowSizeInDaysNumber,
hoppingSizeInSecondsNumber); hoppingSizeInDaysNumber);
} }
@Override @Override
......
...@@ -23,8 +23,8 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -23,8 +23,8 @@ import titan.ccp.model.records.ActivePowerRecord;
public class Uc3PipelineFactory extends PipelineFactory { public class Uc3PipelineFactory extends PipelineFactory {
private final int hoppingSizeInSeconds; private final int hoppingSizeInDays;
private final int windowSizeInSeconds; private final int windowSizeInDays;
/** /**
* Build a new Pipeline. * Build a new Pipeline.
...@@ -34,21 +34,21 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -34,21 +34,21 @@ public class Uc3PipelineFactory extends PipelineFactory {
* attributes. * attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline. * @param kafkaInputTopic The name of the input topic used for the pipeline.
* @param kafkaOutputTopic The name of the output topic used for the pipeline. * @param kafkaOutputTopic The name of the output topic used for the pipeline.
* @param hoppingSizeInSeconds The hop length of the sliding window used in the aggregation of * @param hoppingSizeInDays The hop length of the sliding window used in the aggregation of
* this pipeline. * this pipeline.
* @param windowSizeInSeconds The window length of the sliding window used in the aggregation of * @param windowSizeInDays The window length of the sliding window used in the aggregation of
* this pipeline. * this pipeline.
*/ */
public Uc3PipelineFactory(final Properties kafkaReadPropsForPipeline, public Uc3PipelineFactory(final Properties kafkaReadPropsForPipeline,
final String kafkaInputTopic, final String kafkaInputTopic,
final Properties kafkaWritePropsForPipeline, final Properties kafkaWritePropsForPipeline,
final String kafkaOutputTopic, final String kafkaOutputTopic,
final int windowSizeInSeconds, final int windowSizeInDays,
final int hoppingSizeInSeconds) { final int hoppingSizeInDays) {
super(kafkaReadPropsForPipeline, kafkaInputTopic, super(kafkaReadPropsForPipeline, kafkaInputTopic,
kafkaWritePropsForPipeline,kafkaOutputTopic); kafkaWritePropsForPipeline,kafkaOutputTopic);
this.windowSizeInSeconds = windowSizeInSeconds; this.windowSizeInDays = windowSizeInDays;
this.hoppingSizeInSeconds = hoppingSizeInSeconds; this.hoppingSizeInDays = hoppingSizeInDays;
} }
...@@ -116,8 +116,8 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -116,8 +116,8 @@ public class Uc3PipelineFactory extends PipelineFactory {
// group by new keys // group by new keys
.groupingKey(Map.Entry::getKey) .groupingKey(Map.Entry::getKey)
// Sliding/Hopping Window // Sliding/Hopping Window
.window(WindowDefinition.sliding(TimeUnit.DAYS.toMillis(windowSizeInSeconds), .window(WindowDefinition.sliding(TimeUnit.DAYS.toMillis(windowSizeInDays),
TimeUnit.DAYS.toMillis(hoppingSizeInSeconds))) TimeUnit.DAYS.toMillis(hoppingSizeInDays)))
// get average value of group (sensoreId,hourOfDay) // get average value of group (sensoreId,hourOfDay)
.aggregate( .aggregate(
AggregateOperations.averagingDouble(record -> record.getValue().getValueInW())) AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment