Skip to content
Snippets Groups Projects
Commit 0c7b36c9 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Refactor to use new hzj impl uc4

parent 1870a875
No related branches found
No related tags found
1 merge request!275Refactor hazelcast jet benchmarks:
package rocks.theodolite.benchmarks.uc4.hazelcastjet; package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer;
import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates incoming * A microservice that manages the history and, therefore, stores and aggregates incoming
* measurements. * measurements.
*/ */
public class HistoryService { public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
// Hazelcast settings (default) /**
private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns"; * Constructs the use case logic for UC4.
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; * Retrieves the needed values and instantiates a pipeline factory.
*/
public HistoryService() {
super(LOGGER);
final Properties kafkaProps =
this.propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
// Kafka settings (default) final Properties kafkaConfigReadProps =
private static final String KAFKA_BOOTSTRAP_DEFAULT = "localhost:9092"; propsBuilder.buildReadProperties(
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; EventDeserializer.class.getCanonicalName(),
private static final String KAFKA_INPUT_TOPIC_DEFAULT = "input"; StringDeserializer.class.getCanonicalName());
private static final String KAFKA_CONFIG_TOPIC_DEFAULT = "configuration";
private static final String KAFKA_FEEDBACK_TOPIC_DEFAULT = "aggregation-feedback";
private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output";
// UC4 specific (default) final Properties kafkaAggregationReadProps =
private static final String WINDOW_SIZE_DEFAULT_MS = "5000"; propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
// Job name (default) final Properties kafkaWriteProps =
private static final String JOB_NAME = "uc4-hazelcastjet"; this.propsBuilder.buildWriteProperties(
StringSerializer.class.getCanonicalName(),
KafkaAvroSerializer.class.getCanonicalName());
/** final String kafkaOutputTopic =
* Entrypoint for UC4 using Gradle Run. config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
*/
public static void main(final String[] args) {
final HistoryService uc4HistoryService = new HistoryService();
try {
uc4HistoryService.run();
} catch (final Exception e) { // NOPMD
LOGGER.error("ABORT MISSION!: {}", e);
}
}
/** final String kafkaConfigurationTopic =
* Start a UC4 service. config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString();
*
* @throws Exception This Exception occurs if the Uc4HazelcastJetFactory is used in the wrong way. final String kafkaFeedbackTopic =
* Detailed data is provided once an Exception occurs. config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString();
*/
public void run() throws Exception { // NOPMD final int windowSize = Integer.parseInt(
this.createHazelcastJetApplication(); config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString());
this.pipelineFactory = new Uc4PipelineFactory(
kafkaProps,
kafkaConfigReadProps,
kafkaAggregationReadProps,
kafkaWriteProps,
kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic,
windowSize);
} }
/**
* Creates a Hazelcast Jet Application for UC4 using the Uc1HazelcastJetFactory. @Override
* protected void registerSerializer() {
* @throws Exception This Exception occurs if the Uc4HazelcastJetFactory is used in the wrong way. this.jobConfig.registerSerializer(ValueGroup.class, ValueGroupSerializer.class)
* Detailed data is provided once an Exception occurs. .registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class)
*/ .registerSerializer(ImmutableSensorRegistry.class,
private void createHazelcastJetApplication() throws Exception { // NOPMD ImmutableSensorRegistryUc4Serializer.class);
new Uc4HazelcastJetFactory()
.setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME)
.setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
.setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT)
.setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT)
.setKafkaConfigurationTopicFromEnv(KAFKA_CONFIG_TOPIC_DEFAULT)
.setKafkaFeedbackTopicFromEnv(KAFKA_FEEDBACK_TOPIC_DEFAULT)
.setWindowSizeFromEnv(WINDOW_SIZE_DEFAULT_MS)
.buildUc4JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY)
.buildUc4Pipeline()
.runUc4Job(JOB_NAME);
} }
public static void main(final String[] args) {
new HistoryService().run();
}
} }
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer;
import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
/**
* A microservice that manages the history and, therefore, stores and aggregates incoming
* measurements.
*/
public class NewHistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/**
* Constructs the use case logic for UC4.
* Retrieves the needed values and instantiates a pipeline factory.
*/
public NewHistoryService() {
super(LOGGER);
final Properties kafkaProps =
this.propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
final Properties kafkaConfigReadProps =
propsBuilder.buildReadProperties(
EventDeserializer.class.getCanonicalName(),
StringDeserializer.class.getCanonicalName());
final Properties kafkaAggregationReadProps =
propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
final Properties kafkaWriteProps =
this.propsBuilder.buildWriteProperties(
StringSerializer.class.getCanonicalName(),
KafkaAvroSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final String kafkaConfigurationTopic =
config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString();
final String kafkaFeedbackTopic =
config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString();
final int windowSize = Integer.parseInt(
config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString());
this.pipelineFactory = new Uc4PipelineFactory(
kafkaProps,
kafkaConfigReadProps,
kafkaAggregationReadProps,
kafkaWriteProps,
kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic,
windowSize);
}
@Override
protected void registerSerializer() {
this.jobConfig.registerSerializer(ValueGroup.class, ValueGroupSerializer.class)
.registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class)
.registerSerializer(ImmutableSensorRegistry.class,
ImmutableSensorRegistryUc4Serializer.class);
}
public static void main(final String[] args) {
new NewHistoryService().run();
}
}
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder;
import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer;
import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
/**
* A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC4
* benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the
* Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build
* the Read and Write Properties and set the input, output, and configuration topic. This can be
* done using internal functions of this factory. Outside data only refers to custom values or
* default values in case data of the environment cannot the fetched.
*/
public class Uc4HazelcastJetFactory {
// Information per History Service
private Properties kafkaInputReadPropsForPipeline;
private Properties kafkaConfigPropsForPipeline;
private Properties kafkaFeedbackPropsForPipeline;
private Properties kafkaWritePropsForPipeline;
private String kafkaInputTopic;
private String kafkaOutputTopic;
private JetInstance uc4JetInstance;
private Pipeline uc4JetPipeline;
// UC4 specific
private String kafkaConfigurationTopic;
private String kafkaFeedbackTopic;
private int windowSize;
/////////////////////////////////////
// Layer 1 - Hazelcast Jet Run Job //
/////////////////////////////////////
/**
* Needs a JetInstance and Pipeline defined in this factors. Adds the pipeline to the existing
* JetInstance as a job.
*
* @param jobName The name of the job.
* @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet.
*/
public void runUc4Job(final String jobName) throws IllegalStateException { // NOPMD
// Check if a Jet Instance for UC4 is set.
if (this.uc4JetInstance == null) {
throw new IllegalStateException("Jet Instance is not set! "
+ "Cannot start a hazelcast jet job for UC4.");
}
// Check if a Pipeline for UC3 is set.
if (this.uc4JetPipeline == null) {
throw new IllegalStateException(
"Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC4.");
}
// Adds the job name and joins a job to the JetInstance defined in this factory
final JobConfig jobConfig = new JobConfig()
.registerSerializer(ValueGroup.class, ValueGroupSerializer.class)
.registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class)
.registerSerializer(ImmutableSensorRegistry.class,
ImmutableSensorRegistryUc4Serializer.class)
.setName(jobName);
this.uc4JetInstance.newJobIfAbsent(this.uc4JetPipeline, jobConfig).join();
}
/////////////
// Layer 2 //
/////////////
/**
* Build a Hazelcast JetInstance used to run a job on.
*
* @param logger The logger specified for this JetInstance.
* @param bootstrapServerDefault Default bootstrap server in case no value can be derived from the
* environment.
* @param hzKubernetesServiceDnsKey The kubernetes service dns key.
* @return A Uc4HazelcastJetFactory containing a set JetInstance.
*/
public Uc4HazelcastJetFactory buildUc4JetInstanceFromEnv(final Logger logger,
final String bootstrapServerDefault,
final String hzKubernetesServiceDnsKey) {
this.uc4JetInstance = new JetInstanceBuilder()
.setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey)
.build();
return this;
}
/**
* Builds a Hazelcast Jet pipeline used for a JetInstance to run it as a job on. Needs the input
* topic and kafka properties defined in this factory beforehand.
*
* @return A Uc4HazelcastJetFactory containg a set pipeline.
* @throws Exception If the input topic or the kafka properties are not defined, the pipeline
* cannot be built.
*/
public Uc4HazelcastJetFactory buildUc4Pipeline() throws IllegalStateException { // NOPMD
final String defaultPipelineWarning = "Cannot build pipeline."; // NOPMD
// Check if Properties for the Kafka Input are set.
if (this.kafkaInputReadPropsForPipeline == null) {
throw new IllegalStateException("Kafka Input Read Properties for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if Properties for the Kafka Output are set.
if (this.kafkaWritePropsForPipeline == null) {
throw new IllegalStateException("Kafka Write Properties for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if Properties for the Kafka Config Read are set.
if (this.kafkaConfigPropsForPipeline == null) {
throw new IllegalStateException("Kafka Config Read Properties for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if Properties for the Kafka Feedback Read are set.
if (this.kafkaFeedbackPropsForPipeline == null) {
throw new IllegalStateException("Kafka Feedback Read Properties for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if the Kafka input topic is set.
if (this.kafkaInputTopic == null) {
throw new IllegalStateException("Kafka input topic for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if the Kafka output topic is set.
if (this.kafkaOutputTopic == null) {
throw new IllegalStateException("kafka output topic for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if the Kafka config topic is set.
if (this.kafkaConfigurationTopic == null) {
throw new IllegalStateException("configuratin topic for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if the Kafka feedback topic is set.
if (this.kafkaFeedbackTopic == null) {
throw new IllegalStateException("Feedback topic not set! "
+ defaultPipelineWarning);
}
// Check if window size for tumbling window is set.
if (this.windowSize <= 0) {
throw new IllegalStateException("window size for pipeline not set or not greater than 0! "
+ defaultPipelineWarning);
}
// Build Pipeline Using the pipelineBuilder
final Uc4PipelineBuilder pipeBuilder = new Uc4PipelineBuilder();
this.uc4JetPipeline =
pipeBuilder.build(this.kafkaInputReadPropsForPipeline,
this.kafkaConfigPropsForPipeline,
this.kafkaFeedbackPropsForPipeline,
this.kafkaWritePropsForPipeline,
this.kafkaInputTopic, this.kafkaOutputTopic,
this.kafkaConfigurationTopic,
this.kafkaFeedbackTopic,
this.windowSize);
// Return Uc4HazelcastJetBuilder factory
return this;
}
/////////////
// Layer 3 //
/////////////
/**
* Sets kafka read properties for pipeline used in this builder using environment variables.
*
* @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server
* can be fetched from the environment.
* @param schemaRegistryUrlDefault Default schema registry url in the case that no schema registry
* url can be fetched from the environment.
* @return The Uc4HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline.
*/
public Uc4HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD
final String bootstrapServersDefault,
final String schemaRegistryUrlDefault,
final String jobName) {
// Use KafkaPropertiesBuilder to build a properties object used for kafka
final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
final Properties kafkaInputReadProps =
propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault, jobName,
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
final Properties kafkaConfigReadProps =
propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault,
jobName,
EventDeserializer.class.getCanonicalName(),
StringDeserializer.class.getCanonicalName());
final Properties kafkaAggregationReadProps =
propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault,
jobName,
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
this.kafkaInputReadPropsForPipeline = kafkaInputReadProps;
this.kafkaConfigPropsForPipeline = kafkaConfigReadProps;
this.kafkaFeedbackPropsForPipeline = kafkaAggregationReadProps;
return this;
}
/**
* Sets kafka write properties for pipeline used in this builder using environment variables.
*
* @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server
* can be fetched from the environment.
* @return The Uc4HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline.
*/
public Uc4HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD
final String bootstrapServersDefault, final String schemaRegistryUrlDefault) {
// Use KafkaPropertiesBuilder to build a properties object used for kafka
final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
final Properties kafkaWriteProps =
propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault,
StringSerializer.class.getCanonicalName(),
KafkaAvroSerializer.class.getCanonicalName());
this.kafkaWritePropsForPipeline = kafkaWriteProps;
return this;
}
/**
* Sets the kafka input topic for the pipeline used in this builder.
*
* @param inputTopic The kafka topic used as the pipeline input.
* @return A Uc4HazelcastJetBuilder factory with a set kafkaInputTopic.
*/
public Uc4HazelcastJetFactory setCustomKafkaInputTopic(// NOPMD
final String inputTopic) {
this.kafkaInputTopic = inputTopic;
return this;
}
/**
* Sets the kafka input output for the pipeline used in this builder.
*
* @param outputTopic The kafka topic used as the pipeline output.
* @return A Uc4HazelcastJetBuilder factory with a set kafkaOutputTopic.
*/
public Uc4HazelcastJetFactory setCustomKafkaOutputTopic(final String outputTopic) { // NOPMD
this.kafkaOutputTopic = outputTopic;
return this;
}
/**
* Sets the kafka input topic for the pipeline used in this builder using environment variables.
*
* @param defaultInputTopic The default kafka input topic used if no topic is specified by the
* environment.
* @return A Uc4HazelcastJetBuilder factory with a set kafkaInputTopic.
*/
public Uc4HazelcastJetFactory setKafkaInputTopicFromEnv(// NOPMD
final String defaultInputTopic) {
this.kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
defaultInputTopic);
return this;
}
/**
* Sets the kafka output topic for the pipeline used in this builder using environment variables.
*
* @param defaultOutputTopic The default kafka output topic used if no topic is specified by the
* environment.
* @return A Uc4HazelcastJetBuilder factory with a set kafkaOutputTopic.
*/
public Uc4HazelcastJetFactory setKafkaOutputTopicFromEnv(// NOPMD
final String defaultOutputTopic) {
this.kafkaOutputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC),
defaultOutputTopic);
return this;
}
/**
* Sets the window size for the pipeline used in this builder.
*
* @param windowSize the window size to be used for this pipeline.
* @return A Uc4HazelcastJetFactory with a set windowSize.
*/
public Uc4HazelcastJetFactory setCustomWindowSize(// NOPMD
final int windowSize) {
this.windowSize = windowSize;
return this;
}
/**
* Sets the window size for the pipeline used in this builder from the environment.
*
* @param defaultWindowSize the default window size to be used for this pipeline when none is set
* in the environment.
* @return A Uc4HazelcastJetFactory with a set windowSize.
*/
public Uc4HazelcastJetFactory setWindowSizeFromEnv(// NOPMD
final String defaultWindowSize) {
final String windowSize = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.WINDOW_SIZE_UC4),
defaultWindowSize);
final int windowSizeNumber = Integer.parseInt(windowSize);
this.windowSize = windowSizeNumber;
return this;
}
/**
* Sets the configuration topic for the pipeline used in this builder.
*
* @param kafkaConfigurationTopic the configuration topic to be used for this pipeline.
* @return A Uc4HazelcastJetFactory with a set configuration topic.
*/
public Uc4HazelcastJetFactory setCustomKafkaConfigurationTopic(// NOPMD
final String kafkaConfigurationTopic) {
this.kafkaConfigurationTopic = kafkaConfigurationTopic;
return this;
}
/**
* Sets the configuration topic for the pipeline used in this builder from the environment.
*
* @param defaultKafkaConfigurationTopic the default configuration topic to be used for this
* pipeline when none is set in the environment.
* @return A Uc4HazelcastJetFactory with a set kafkaConfigurationTopic.
*/
public Uc4HazelcastJetFactory setKafkaConfigurationTopicFromEnv(// NOPMD
final String defaultKafkaConfigurationTopic) {
this.kafkaConfigurationTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC),
defaultKafkaConfigurationTopic);
return this;
}
/**
* Sets the Feedback topic for the pipeline used in this builder.
*
* @param kafkaFeedbackTopic the Feedback topic to be used for this pipeline.
* @return A Uc4HazelcastJetFactory with a set Feedback topic.
*/
public Uc4HazelcastJetFactory setCustomKafkaFeedbackTopic(// NOPMD
final String kafkaFeedbackTopic) {
this.kafkaFeedbackTopic = kafkaFeedbackTopic;
return this;
}
/**
* Sets the Feedback topic for the pipeline used in this builder from the environment.
*
* @param defaultKafkaFeedbackTopic the default Feedback topic to be used for this pipeline when
* none is set in the environment.
* @return A Uc4HazelcastJetFactory with a set kafkaFeedbackTopic.
*/
public Uc4HazelcastJetFactory setKafkaFeedbackTopicFromEnv(// NOPMD
final String defaultKafkaFeedbackTopic) {
this.kafkaFeedbackTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC),
defaultKafkaFeedbackTopic);
return this;
}
}
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StageWithWindow;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.StreamStageWithKey;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.AggregatedActivePowerRecordAccumulator;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ChildParentsTransformer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
import titan.ccp.model.sensorregistry.SensorRegistry;
/**
* Builder to build a HazelcastJet Pipeline for UC4 which can be used for stream processing using
* Hazelcast Jet.
*/
@SuppressWarnings("PMD.ExcessiveImports")
public class Uc4PipelineBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(Uc4PipelineBuilder.class);
private static final String SENSOR_PARENT_MAP_NAME = "SensorParentMap";
/**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet.
*
* @param kafkaInputReadPropsForPipeline Properties Object containing the necessary kafka input
* read attributes.
* @param kafkaConfigPropsForPipeline Properties Object containing the necessary kafka config read
* attributes.
* @param kafkaFeedbackPropsForPipeline Properties Object containing the necessary kafka
* aggregation read attributes.
* @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
* attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline.
* @param kafkaOutputTopic The name of the output topic used for the pipeline.
* @param kafkaConfigurationTopic The name of the configuration topic used for the pipeline.
* @param kafkaFeedbackTopic The name of the feedback topic used for the pipeline.
* @param windowSize The window size of the tumbling window used in this pipeline.
* @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data
* for UC3.
*/
public Pipeline build(final Properties kafkaInputReadPropsForPipeline, // NOPMD
final Properties kafkaConfigPropsForPipeline,
final Properties kafkaFeedbackPropsForPipeline,
final Properties kafkaWritePropsForPipeline,
final String kafkaInputTopic,
final String kafkaOutputTopic,
final String kafkaConfigurationTopic,
final String kafkaFeedbackTopic,
final int windowSize) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("kafkaConfigProps: " + kafkaConfigPropsForPipeline);
LOGGER.info("kafkaFeedbackProps: " + kafkaFeedbackPropsForPipeline);
LOGGER.info("kafkaWriteProps: " + kafkaWritePropsForPipeline);
}
// The pipeline for this Use Case
final Pipeline uc4Pipeline = Pipeline.create();
// Sources for this use case
final StreamSource<Entry<Event, String>> configSource =
KafkaSources.kafka(kafkaConfigPropsForPipeline, kafkaConfigurationTopic);
final StreamSource<Entry<String, ActivePowerRecord>> inputSource =
KafkaSources.kafka(kafkaInputReadPropsForPipeline, kafkaInputTopic);
final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource =
KafkaSources.kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic);
// Extend UC4 topology to pipeline
final StreamStage<Entry<String, AggregatedActivePowerRecord>> uc4Aggregation =
this.extendUc4Topology(uc4Pipeline, inputSource, aggregationSource, configSource,
windowSize);
// Add Sink2: Write back to kafka feedback/aggregation topic
uc4Aggregation.writeTo(KafkaSinks.kafka(
kafkaWritePropsForPipeline, kafkaFeedbackTopic));
// Log aggregation product
uc4Aggregation.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka output topic
uc4Aggregation.writeTo(KafkaSinks.kafka(
kafkaWritePropsForPipeline, kafkaOutputTopic));
// Return the pipeline
return uc4Pipeline;
}
/**
* Extends to a blank Hazelcast Jet Pipeline the UC4 topology defines by theodolite.
*
* <p>
* UC4 takes {@code ActivePowerRecord} events from sensors and a {@code SensorRegistry} with maps
* from keys to groups to map values to their according groups. A feedback stream allows for
* group keys to be mapped to values and eventually to be mapped to other top level groups defines
* by the {@code SensorRegistry}.
* </p>
*
* <p>
* 6 Step topology: <br>
* (1) Inputs (Config, Values, Aggregations) <br>
* (2) Merge Input Values and Aggregations <br>
* (3) Join Configuration with Merged Input Stream <br>
* (4) Duplicate as flatmap per value and group <br>
* (5) Window (preparation for possible last values) <br>
* (6) Aggregate data over the window
* </p>
*
* @param pipe The blank pipeline to extend the logic to.
* @param inputSource A streaming source with {@code ActivePowerRecord} data.
* @param aggregationSource A streaming source with aggregated data.
* @param configurationSource A streaming source delivering a {@code SensorRegistry}.
* @param windowSize The window size used to aggregate over.
* @return A {@code StreamSource<String,Double>} with sensorKeys or groupKeys mapped to their
* according aggregated values. The data can be further modified or directly be linked to
* a Hazelcast Jet sink.
*/
public StreamStage<Entry<String, AggregatedActivePowerRecord>> extendUc4Topology(// NOPMD
final Pipeline pipe,
final StreamSource<Entry<String, ActivePowerRecord>> inputSource,
final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource,
final StreamSource<Entry<Event, String>> configurationSource, final int windowSize) {
//////////////////////////////////
// (1) Configuration Stream
pipe.readFrom(configurationSource)
.withNativeTimestamps(0)
.filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED
|| entry.getKey() == Event.SENSOR_REGISTRY_STATUS)
.map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue())))
.flatMapStateful(HashMap::new, new ConfigFlatMap())
.writeTo(Sinks.mapWithUpdating(
SENSOR_PARENT_MAP_NAME, // The addressed IMAP
Entry::getKey, // The key to look for
(oldValue, newEntry) -> newEntry.getValue()));
//////////////////////////////////
// (1) Sensor Input Stream
final StreamStage<Entry<String, ActivePowerRecord>> inputStream = pipe
.readFrom(inputSource)
.withNativeTimestamps(0);
//////////////////////////////////
// (1) Aggregation Stream
final StreamStage<Entry<String, ActivePowerRecord>> aggregations = pipe
.readFrom(aggregationSource)
.withNativeTimestamps(0)
.map(entry -> { // Map Aggregated to ActivePowerRecord
final AggregatedActivePowerRecord agg = entry.getValue();
final ActivePowerRecord record = new ActivePowerRecord(
agg.getIdentifier(), agg.getTimestamp(), agg.getSumInW());
return Util.entry(entry.getKey(), record);
});
//////////////////////////////////
// (2) UC4 Merge Input with aggregation stream
final StreamStageWithKey<Entry<String, ActivePowerRecord>, String>
mergedInputAndAggregations = inputStream
.merge(aggregations)
.groupingKey(Entry::getKey);
//////////////////////////////////
// (3) UC4 Join Configuration and Merges Input/Aggregation Stream
// [sensorKey , (value,Set<Groups>)]
final StreamStage<Entry<String, ValueGroup>> joinedStage = mergedInputAndAggregations
.<Set<String>, Entry<String, ValueGroup>>mapUsingIMap(
SENSOR_PARENT_MAP_NAME,
(sensorEvent, sensorParentsSet) -> {
// Check whether a groupset exists for a key or not
if (sensorParentsSet == null) {
// No group set exists for this key: return valuegroup with default null group set.
final Set<String> nullSet = new HashSet<>();
nullSet.add("NULL-GROUPSET");
return Util.entry(sensorEvent.getKey(),
new ValueGroup(sensorEvent.getValue(), nullSet));
} else {
// Group set exists for this key: return valuegroup with the groupset.
final ValueGroup valueParentsPair =
new ValueGroup(sensorEvent.getValue(), sensorParentsSet);
// Return solution
return Util.entry(sensorEvent.getKey(), valueParentsPair);
}
});
//////////////////////////////////
// (4) UC4 Duplicate as flatmap joined Stream
// [(sensorKey, Group) , value]
final StreamStage<Entry<SensorGroupKey, ActivePowerRecord>> dupliAsFlatmappedStage = joinedStage
.flatMap(entry -> {
// Supplied data
final String keyGroupId = entry.getKey();
final ActivePowerRecord record = entry.getValue().getRecord();
final Set<String> groups = entry.getValue().getGroups();
// Transformed Data
final String[] groupList = groups.toArray(String[]::new);
final SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length];
final List<Entry<SensorGroupKey, ActivePowerRecord>> newEntryList = new ArrayList<>();
for (int i = 0; i < groupList.length; i++) {
newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]);
newEntryList.add(Util.entry(newKeyList[i], record));
}
// Return traversable list of new entry elements
return Traversers.traverseIterable(newEntryList);
});
//////////////////////////////////
// (5) UC4 Last Value Map
// Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time
final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>>
windowedLastValues = dupliAsFlatmappedStage
.window(WindowDefinition.tumbling(windowSize));
final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>,
AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp =
AggregateOperation
.withCreate(AggregatedActivePowerRecordAccumulator::new)
.<Entry<SensorGroupKey, ActivePowerRecord>>andAccumulate((acc, rec) -> {
acc.setId(rec.getKey().getGroup());
acc.addInputs(rec.getValue());
})
.andCombine((acc, acc2) ->
acc.addInputs(acc2.getId(), acc2.getSumInW(), acc2.getCount(), acc.getTimestamp()))
.andDeduct((acc, acc2) -> acc.removeInputs(acc2.getSumInW(), acc2.getCount()))
.andExportFinish(acc ->
new AggregatedActivePowerRecord(acc.getId(),
acc.getTimestamp(),
acc.getCount(),
acc.getSumInW(),
acc.getAverageInW())
);
// write aggregation back to kafka
return windowedLastValues
.groupingKey(entry -> entry.getKey().getGroup())
.aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue()));
}
/**
* FlatMap function used to process the configuration input for UC4.
*/
private static class ConfigFlatMap implements
BiFunctionEx<Map<String, Set<String>>, Entry<Event, SensorRegistry>, Traverser<Entry<String, Set<String>>>> { // NOCS
private static final long serialVersionUID = -6769931374907428699L;
@Override
public Traverser<Entry<String, Set<String>>> applyEx(
final Map<String, Set<String>> flatMapStage,
final Entry<Event, SensorRegistry> eventItem) {
// Transform new Input
final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name");
final Map<String, Set<String>> mapFromRegistry =
transformer.constructChildParentsPairs(eventItem.getValue());
// Compare both tables
final Map<String, Set<String>> updates = new HashMap<>();
for (final String key : mapFromRegistry.keySet()) {
if (flatMapStage.containsKey(key)) {
if (!mapFromRegistry.get(key).equals(flatMapStage.get(key))) {
updates.put(key, mapFromRegistry.get(key));
}
} else {
updates.put(key, mapFromRegistry.get(key));
}
}
// Create a updates list to pass onto the next pipeline stage-
final List<Entry<String, Set<String>>> updatesList = new ArrayList<>(updates.entrySet());
// Return traverser with updates list.
return Traversers.traverseIterable(updatesList)
.map(e -> Util.entry(e.getKey(), e.getValue()));
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment