Skip to content
Snippets Groups Projects
Commit cbf172a2 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'align-uc4-among-engines' into 'main'

Align UC4 implementations among engines

See merge request !307
parents 62dc3ce4 75441855
No related branches found
No related tags found
1 merge request!307Align UC4 implementations among engines
Pipeline #10197 passed
Showing
with 52 additions and 73 deletions
......@@ -29,8 +29,7 @@ public final class ConfigurationKeys {
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
// UC4
public static final String EMIT_PERIOD_SECONDS = "kafka.window.duration.minutes";
// public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String GRACE_PERIOD_MS = "grace.period.ms";
......
......@@ -21,8 +21,6 @@ spec:
value: "theodolite-kafka-kafka-bootstrap:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://theodolite-kafka-schema-registry:8081"
- name: WINDOW_SIZE
value: "5000"
#- name: KUBERNETES_DNS_NAME
# value: "titan-ccp-aggregation"
- name: KUBERNETES_NAMESPACE
......
......@@ -50,7 +50,6 @@ services:
BOOTSTRAP_SERVER: benchmark:5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
WINDOW_SIZE_UC4: 5000
load-generator:
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest}
depends_on:
......
......@@ -19,10 +19,6 @@ public final class ConfigurationKeys {
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String WINDOW_SIZE_MS = "window.size.ms";
public static final String WINDOW_GRACE_MS = "window.grace.ms";
public static final String CHECKPOINTING = "checkpointing";
public static final String CHECKPOINTING_INTERVAL_MS = "checkpointing.interval.ms";
......
......@@ -32,6 +32,7 @@ public class ConfigurationKeys {
// UC4
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String WINDOW_SIZE_UC4 = "window.size";
public static final String EMIT_PERIOD_MS = "emit.period.ms";
// public static final String GRACE_PERIOD_MS = "grace.period.ms";
}
......@@ -36,16 +36,14 @@ public abstract class HazelcastJetService {
* build a new jet instance.
*/
public HazelcastJetService(final Logger logger) {
this.jobName = this.config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString();
this.jobName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
this.kafkaBootstrapServer = this.config.getProperty(
ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString();
this.schemaRegistryUrl =
this.config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString();
this.kafkaBootstrapServer = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
this.schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
this.propsBuilder =
new KafkaPropertiesBuilder(this.kafkaBootstrapServer, this.schemaRegistryUrl, this.jobName);
this.kafkaInputTopic = this.config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString();
this.kafkaInputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final JetInstanceBuilder jetInstance = new JetInstanceBuilder()
.setConfigFromEnv(logger, this.kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY);
......
......@@ -19,7 +19,6 @@ public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/**
* Constructs the use case logic for UC2. Retrieves the needed values and instantiates a pipeline
* factory.
......@@ -36,8 +35,7 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final String kafkaOutputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Duration downsampleInterval = Duration.ofMinutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
......
......@@ -35,16 +35,16 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Duration windowSize = Duration.ofDays(Integer.parseInt(
this.config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()));
final Duration windowSize = Duration.ofDays(
this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Duration hoppingSize = Duration.ofDays(Integer.parseInt(
this.config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()));
final Duration hoppingSize = Duration.ofDays(
this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Duration emitPeriod = Duration.ofSeconds(Integer.parseInt(
this.config.getProperty(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString()));
final Duration emitPeriod = Duration.ofSeconds(
this.config.getInt(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS));
this.pipelineFactory = new Uc3PipelineFactory(
kafkaProps,
......
......@@ -72,11 +72,11 @@ public class PipelineFactory extends AbstractPipelineFactory {
final String configurationTopic =
this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC);
final Duration duration = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.EMIT_PERIOD_SECONDS));
final Duration duration = Duration.millis(
this.config.getInt(ConfigurationKeys.EMIT_PERIOD_MS));
final Duration triggerDelay = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
final Duration gracePeriod = Duration.standardSeconds(
final Duration gracePeriod = Duration.standardSeconds(// TODO this is wrong
this.config.getInt(ConfigurationKeys.GRACE_PERIOD_MS));
// Read from Kafka
......@@ -115,9 +115,11 @@ public class PipelineFactory extends AbstractPipelineFactory {
.apply("Read Windows", Window.into(FixedWindows.of(duration)))
.apply("Set trigger for input", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.triggering(Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes());
......@@ -204,7 +206,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
.apply("Reset trigger for aggregations", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes())
......
......@@ -6,13 +6,10 @@ kafka.input.topic=input
kafka.output.topic=output
kafka.configuration.topic=configuration
kafka.feedback.topic=aggregation-feedback
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
aggregation.duration.days=30
aggregation.advance.days=1
emit.period.ms=5000
trigger.interval=15
grace.period.ms=270
......
......@@ -67,9 +67,9 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Time windowSize =
Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS));
Time.milliseconds(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS));
final Duration windowGrace =
Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS));
Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS));
final String configurationTopic =
this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
......
......@@ -19,9 +19,9 @@ public final class ConfigurationKeys {
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String WINDOW_SIZE_MS = "window.size.ms";
public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String WINDOW_GRACE_MS = "window.grace.ms";
public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
......
application.name=theodolite-uc2-application
application.name=theodolite-uc4-application
application.version=0.0.1
configuration.host=localhost
......@@ -9,8 +9,9 @@ kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
schema.registry.url=http://localhost:8081
window.size.ms=1000
window.grace.ms=0
emit.period.ms=5000
grace.period.ms=0
# Flink configuration
checkpointing.interval.ms=1000
package rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics;
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
......@@ -20,7 +20,6 @@ public class AggregatedActivePowerRecordAccumulator {
// This constructor is intentionally empty. Nothing special is needed here.
}
/**
* Creates an AggregationObject.
*/
......
package rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics;
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import java.util.Map;
import java.util.Optional;
......
package rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics;
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
......
package rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics;
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import com.hazelcast.function.SupplierEx;
import java.util.HashMap;
......@@ -21,6 +21,4 @@ public class HashMapSupplier implements SupplierEx<HashMap<String, Set<String>>>
return this.get();
}
}
......@@ -2,6 +2,7 @@ package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
......@@ -10,12 +11,6 @@ import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import rocks.theodolite.benchmarks.commons.model.sensorregistry.ImmutableSensorRegistry;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer;
/**
......@@ -27,8 +22,8 @@ public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/**
* Constructs the use case logic for UC4.
* Retrieves the needed values and instantiates a pipeline factory.
* Constructs the use case logic for UC4. Retrieves the needed values and instantiates a pipeline
* factory.
*/
public HistoryService() {
super(LOGGER);
......@@ -38,12 +33,12 @@ public class HistoryService extends HazelcastJetService {
KafkaAvroDeserializer.class.getCanonicalName());
final Properties kafkaConfigReadProps =
propsBuilder.buildReadProperties(
this.propsBuilder.buildReadProperties(
EventDeserializer.class.getCanonicalName(),
StringDeserializer.class.getCanonicalName());
final Properties kafkaAggregationReadProps =
propsBuilder.buildReadProperties(
this.propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
......@@ -52,28 +47,25 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName(),
KafkaAvroSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String kafkaConfigurationTopic =
config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString();
final String configurationTopic =
this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC);
final String kafkaFeedbackTopic =
config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString();
final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC);
final int windowSize = Integer.parseInt(
config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString());
final Duration windowSize = Duration.ofMillis(
this.config.getInt(ConfigurationKeys.EMIT_PERIOD_MS));
this.pipelineFactory = new Uc4PipelineFactory(
kafkaProps,
kafkaConfigReadProps,
kafkaAggregationReadProps,
kafkaWriteProps,
kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic,
this.kafkaInputTopic, outputTopic, configurationTopic, feedbackTopic,
windowSize);
}
@Override
protected void registerSerializer() {
this.jobConfig.registerSerializer(ValueGroup.class, ValueGroupSerializer.class)
......
package rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics;
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
......
package rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics;
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import java.util.Objects;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment