Skip to content
Snippets Groups Projects
Commit 1ab16534 authored by MaxEmerold's avatar MaxEmerold Committed by Sören Henning
Browse files

Fix UC4 Hazelcast Jet implementation

- Added StreamSerializer for ImmutableSensorRegistry
- Changed Map.Entry to Util.Entry for serialization support
- Pipeline now applicable for multiple running JetInstances
parent 8b6f32b1
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #5481 failed
package theodolite.uc3.applicationold;
/**
* Configuration of a load generator cluster.
*/
public final class ClusterConfig {
private static final int PORT_DEFAULT = 5701;
private static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation";
private final String bootstrapServer;
private final String kubernetesDnsName;
private int port = PORT_DEFAULT;
private boolean portAutoIncrement = true;
private String clusterNamePrefix = CLUSTER_NAME_PREFIX_DEFAULT;
/**
* Create a new {@link ClusterConfig} with the given parameter values.
*/
private ClusterConfig(final String bootstrapServer, final String kubernetesDnsName) {
this.bootstrapServer = bootstrapServer;
this.kubernetesDnsName = kubernetesDnsName;
}
public boolean hasBootstrapServer() {
return this.bootstrapServer != null;
}
public String getBootstrapServer() {
return this.bootstrapServer;
}
public boolean hasKubernetesDnsName() {
return this.kubernetesDnsName != null;
}
public String getKubernetesDnsName() {
return this.kubernetesDnsName;
}
public int getPort() {
return this.port;
}
public boolean isPortAutoIncrement() {
return this.portAutoIncrement;
}
public ClusterConfig setPortAutoIncrement(final boolean portAutoIncrement) { // NOPMD
this.portAutoIncrement = portAutoIncrement;
return this;
}
public ClusterConfig setPort(final int port) { // NOPMD
this.port = port;
return this;
}
public String getClusterNamePrefix() {
return this.clusterNamePrefix;
}
public ClusterConfig setClusterNamePrefix(final String clusterNamePrefix) { // NOPMD
this.clusterNamePrefix = clusterNamePrefix;
return this;
}
public static ClusterConfig fromBootstrapServer(final String bootstrapServer) {
return new ClusterConfig(bootstrapServer, null);
}
public static ClusterConfig fromKubernetesDnsName(final String kubernetesDnsName) {
return new ClusterConfig(null, kubernetesDnsName);
}
}
package theodolite.uc3.applicationold;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER";
public static final String KUBERNETES_DNS_NAME = "KUBERNETES_DNS_NAME";
public static final String PORT = "PORT";
public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT";
public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX";
public static final String NUM_SENSORS = "NUM_SENSORS";
public static final String PERIOD_MS = "PERIOD_MS";
public static final String DOWNSAMPLE_INTERVAL = "DOWNSAMPLE_INTERVAL";
public static final String WINDOW_SIZE_IN_SECONDS = "WINDOW_SIZE_IN_SECONDS";
public static final String HOPPING_SIZE_IN_SECONDS = "HOPPING_SIZE_IN_SECONDS";
public static final String VALUE = "VALUE";
public static final String THREADS = "THREADS";
public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS";
public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL";
public static final String KAFKA_INPUT_TOPIC = "KAFKA_INPUT_TOPIC";
public static final String KAFKA_OUTPUT_TOPIC = "KAFKA_OUTPUT_TOPIC";
public static final String KAFKA_BATCH_SIZE = "KAFKA_BATCH_SIZE";
public static final String KAFKA_LINGER_MS = "KAFKA_LINGER_MS";
public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY";
private ConfigurationKeys() {}
}
package theodolite.uc3.applicationold;
import com.hazelcast.config.Config;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.*; // NOCS
//import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.uc3.application.uc3specifics.HourOfDayKey;
import theodolite.uc3.application.uc3specifics.HourOfDayKeySerializer;
import theodolite.uc3.application.uc3specifics.HoursOfDayKeyFactory;
import theodolite.uc3.application.uc3specifics.StatsKeyFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A microservice that manages the history and, therefore, stores and aggregates incoming
* measurements.
*/
public class HistoryService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
//private static final DateTimeFormatter TIME_FORMATTER_DEFAULT =
// DateTimeFormatter.ofPattern("HH:mm:ss:SSS");
// General Information
private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_INPUT_TOPIC_DEFAULT = "input";
private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output";
private static final String KAFKA_BSERVERS_DEFAULT = "localhost:19092";
// UC3 specific
private static final String WINDOW_SIZE_IN_SECONDS_DEFAULT = "50";
private static final String HOPSIZE_IN_SEC_DEFAULT = "1";
// Information per History Service
private ClusterConfig clusterConfig;
private Properties kafkaReadPropsForPipeline;
private Properties kafkaWritePropsForPipeline;
private String kafkaInputTopic;
private String kafkaOutputTopic;
// UC3 specific
private int windowSizeInSeconds;
private int hoppingSizeInSeconds;
/**
* Entrypoint for UC3 using Gradle Run.
*/
public static void main(final String[] args) {
HistoryService.loadHistoryService().run();
}
/** Build a history service object to run. */
public static HistoryService loadHistoryService() {
final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER);
final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME);
ClusterConfig clusterConfig;
if (bootstrapServer != null) { // NOPMD
clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer);
LOGGER.info("Use bootstrap server '{}'.", bootstrapServer);
} else if (kubernetesDnsName != null) { // NOPMD
clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName);
LOGGER.info("Use Kubernetes DNC name '{}'", kubernetesDnsName);
} else {
clusterConfig = ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT);
LOGGER.info(// NOPMD
"Neitehr a bootstrap server nor a Kubernetes DNS name was provided."
+ "Use default bootstrap server '{}'",
BOOTSTRAP_SERVER_DEFAULT);
}
final String port = System.getenv(ConfigurationKeys.PORT);
if (port != null) {
clusterConfig.setPort(Integer.parseInt(port));
}
final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT);
if (portAutoIncrement != null) {
clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement));
}
final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX);
if (clusterNamePrefix != null) {
clusterConfig.setClusterNamePrefix(clusterNamePrefix);
}
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
KAFKA_BSERVERS_DEFAULT);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
SCHEMA_REGISTRY_URL_DEFAULT);
final Properties kafkaReadPropsForPipeline =
buildKafkaReadProps(kafkaBootstrapServers, schemaRegistryUrl);
final Properties kafkaWritePropsForPipeline =
buildKafkaWriteProps(kafkaBootstrapServers);
final String kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
KAFKA_INPUT_TOPIC_DEFAULT);
final String kafkaOutputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC),
KAFKA_OUTPUT_TOPIC_DEFAULT);
final String windowSizeInSeconds = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.WINDOW_SIZE_IN_SECONDS),
WINDOW_SIZE_IN_SECONDS_DEFAULT);
final int windowSizeInSecondsNumber = Integer.parseInt(windowSizeInSeconds);
final String hoppingSizeInSeconds = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HOPPING_SIZE_IN_SECONDS),
HOPSIZE_IN_SEC_DEFAULT);
final int hoppingSizeInSecondsNumber = Integer.parseInt(hoppingSizeInSeconds);
return new HistoryService()
.setClusterConfig(clusterConfig)
.setKafkaReadPropertiesForPipeline(kafkaReadPropsForPipeline)
.setKafkaWritePropertiesForPipeline(kafkaWritePropsForPipeline)
.setKafkaInputTopic(kafkaInputTopic)
.setKafkaOutputTopic(kafkaOutputTopic)
.setWindowSizeInSeconds(windowSizeInSecondsNumber)
.setHoppingSizeInSeconds(hoppingSizeInSecondsNumber);
}
/** Set Cluster Config when creating History Service. */
private HistoryService setClusterConfig(final ClusterConfig clusterConfig) { // NOPMD
this.clusterConfig = clusterConfig;
return this;
}
/** Set Pipeline Kafka Read Properties. */
private HistoryService setKafkaReadPropertiesForPipeline(// NOPMD
final Properties kafkaReadPropsForPipeline) {
this.kafkaReadPropsForPipeline = kafkaReadPropsForPipeline;
return this;
}
/** Set Pipeline Kafka Write Properties. */
private HistoryService setKafkaWritePropertiesForPipeline(// NOPMD
final Properties kafkaWritePropsForPipeline) {
this.kafkaWritePropsForPipeline = kafkaWritePropsForPipeline;
return this;
}
/** Set Kafka Input topic used to build the pipeline. */
private HistoryService setKafkaInputTopic(final String kafkaInputTopic) { // NOPMD
this.kafkaInputTopic = kafkaInputTopic;
return this;
}
/** Set Kafka Output topic used to build the pipeline. */
private HistoryService setKafkaOutputTopic(final String kafkaOutputTopic) { // NOPMD
this.kafkaOutputTopic = kafkaOutputTopic;
return this;
}
/** Set the window size used in this history service (given in seconds). */
private HistoryService setWindowSizeInSeconds(final int windowSizeInSeconds) { // NOPMD
this.windowSizeInSeconds = windowSizeInSeconds;
return this;
}
/** Set the hopping size used in this history service (given in seconds). */
private HistoryService setHoppingSizeInSeconds(final int hoppingSizeInSeconds) { // NOPMD
this.hoppingSizeInSeconds = hoppingSizeInSeconds;
return this;
}
/**
* Defines kafka properties used to fetch data from kafka using a Hazelcast Jet pipeline.
*
* @return properties used to fetch data from kafka using a Hazelcast Jet pipeline.
*/
private static Properties buildKafkaReadProps(final String kafkaBootstrapServer,
final String schemaRegistryUrl) {
final Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrapServer); // NOCS
props.put("key.deserializer", StringDeserializer.class.getCanonicalName());
props.put("value.deserializer", KafkaAvroDeserializer.class);
props.put("specific.avro.reader", true);
props.put("schema.registry.url", schemaRegistryUrl);
props.setProperty("auto.offset.reset", "earliest");
return props;
}
/**
* Defines kafka properties used to write data to kafka using a Hazelcast Jet pipeline.
*
* @return properties used to fetch data from kafka using a Hazelcast Jet pipeline.
*/
private static Properties buildKafkaWriteProps(final String kafkaBootstrapServer) {
final Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrapServer); // NOCS
props.put("key.serializer", StringSerializer.class.getCanonicalName());
props.put("value.serializer", StringSerializer.class.getCanonicalName());
return props;
}
/**
* Start the UC3 service.
*/
public void run() {
Objects.requireNonNull(this.clusterConfig, "No cluster config set.");
this.createHazelcastJetApplication();
}
/**
* Build a pipeline and start a Hazelcast Jet Instance and add a job that uses the built pipeline.
*/
private void createHazelcastJetApplication() {
// Build Pipeline for the History Service of UC3
final Pipeline pipeline = Pipeline.create();
final StreamStage<Map.Entry<String, String>> mapProduct =
pipeline
.readFrom(KafkaSources
.<String, ActivePowerRecord>kafka(
this.kafkaReadPropsForPipeline, this.kafkaInputTopic))
// use Timestamps
.withNativeTimestamps(0)
// Map timestamp to hour of day and create new key using sensorID and
// datetime mapped to HourOfDay
.map(record -> {
String sensorId = record.getValue().getIdentifier();
long timestamp = record.getValue().getTimestamp();
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
TimeZone.getDefault().toZoneId());
final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime);
return Map.entry(newKey, record.getValue());
})
// group by new keys
.groupingKey(newRecord -> newRecord.getKey())
// Sliding/Hopping Window
.window(WindowDefinition.sliding(TimeUnit.SECONDS.toMillis(this.windowSizeInSeconds),
TimeUnit.SECONDS.toMillis(this.hoppingSizeInSeconds)))
// get average value of group (sensoreId,hourOfDay)
.aggregate(
AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
// map to return pair (sensorID,hourOfDay) -> (averaged what value)
.map(agg -> {
String theValue = agg.getValue().toString();
String theKey = agg.getKey().toString();
return Map.entry(theKey, theValue);
});
// Add Sink1: Logger
mapProduct.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark
mapProduct.writeTo(KafkaSinks.<String, String>kafka(
this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
// Set network config for this hazelcast jet instance
// Create Hazelcast Config
final Config config = new Config().setClusterName(this.clusterConfig.getClusterNamePrefix());
final JoinConfig joinConfig = config.getNetworkConfig()
.setPort(this.clusterConfig.getPort())
.setPortAutoIncrement(this.clusterConfig.isPortAutoIncrement())
.getJoin();
// Set either Bootstrap Server Member or establish Kubernetes Connection
joinConfig.getMulticastConfig().setEnabled(false);
if (this.clusterConfig.hasBootstrapServer()) {
joinConfig.getTcpIpConfig().addMember(this.clusterConfig.getBootstrapServer());
} else if (this.clusterConfig.hasKubernetesDnsName()) {
joinConfig.getKubernetesConfig()
.setEnabled(true)
.setProperty(HZ_KUBERNETES_SERVICE_DNS_KEY, this.clusterConfig.getKubernetesDnsName());
}
// Create Hazelcast Jet Instance
// Add config for jet instance, config for the job and add pipeline as the job
final JetInstance jet = Jet.newJetInstance();
jet.getConfig().setHazelcastConfig(config);
final JobConfig pipelineConfig = new JobConfig()
.registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class)
.setName("uc3-hazelcastjet");
jet.newJobIfAbsent(pipeline, pipelineConfig).join();
}
}
......@@ -8,10 +8,12 @@ import java.util.Properties;
import org.slf4j.Logger;
import theodolite.commons.hazelcastjet.ConfigurationKeys;
import theodolite.commons.hazelcastjet.JetInstanceBuilder;
import theodolite.uc4.application.uc4specifics.ImmutableSensorRegistryUc4Serializer;
import theodolite.uc4.application.uc4specifics.SensorGroupKey;
import theodolite.uc4.application.uc4specifics.SensorGroupKeySerializer;
import theodolite.uc4.application.uc4specifics.ValueGroup;
import theodolite.uc4.application.uc4specifics.ValueGroupSerializer;
import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
/**
* A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC4
......@@ -66,6 +68,8 @@ public class Uc4HazelcastJetFactory {
final JobConfig jobConfig = new JobConfig()
.registerSerializer(ValueGroup.class, ValueGroupSerializer.class)
.registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class)
.registerSerializer(ImmutableSensorRegistry.class,
ImmutableSensorRegistryUc4Serializer.class)
.setName(jobName);
this.uc4JetInstance.newJobIfAbsent(this.uc4JetPipeline, jobConfig).join();
}
......@@ -121,7 +125,7 @@ public class Uc4HazelcastJetFactory {
throw new IllegalStateException("Kafka Config Read Properties for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if Properties for the Kafka Feedback Read are set.
if (this.kafkaFeedbackPropsForPipeline == null) {
throw new IllegalStateException("Kafka Feedback Read Properties for pipeline not set! "
......
......@@ -95,7 +95,7 @@ public class Uc4PipelineBuilder {
// DEBUG
// System.out.println("D E B U G: It passed through the filter");
return Map.entry(data.getKey(), SensorRegistry.fromJson(data.getValue()));
return Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue()));
});
// Builds a new HashMap //
......@@ -173,7 +173,7 @@ public class Uc4PipelineBuilder {
// System.out.println("INPUT D E B U G: Got an input Stream Element!");
// System.out.println("[SensorId=" + sensorId + "//valueinW=" + valueInW.toString());
return Map.entry(sensorId, valueInW);
return Util.entry(sensorId, valueInW);
});
// (1) Aggregation Stream
......@@ -210,13 +210,13 @@ public class Uc4PipelineBuilder {
if (sensorParentsCasted == null) {
Set<String> nullSet = new HashSet<String>();
nullSet.add("NULL-GROUPSET");
return Map.entry(sensorEvent.getKey(),
return Util.entry(sensorEvent.getKey(),
new ValueGroup(sensorEvent.getValue(), nullSet));
} else {
ValueGroup valueParentsPair =
new ValueGroup(sensorEvent.getValue(), sensorParentsCasted);
// Return solution
return Map.entry(sensorEvent.getKey(), valueParentsPair);
return Util.entry(sensorEvent.getKey(), valueParentsPair);
}
......@@ -242,7 +242,7 @@ public class Uc4PipelineBuilder {
new ArrayList<Entry<SensorGroupKey, Double>>();
for (int i = 0; i < groupList.length; i++) {
newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]);
newEntryList.add(Map.entry(newKeyList[i], valueInW));
newEntryList.add(Util.entry(newKeyList[i], valueInW));
// DEBUG
// System.out.println("Added new Entry to list: [(" + newKeyList[i].getSensorId() + ","
// + newKeyList[i].getGroup() + ")," + valueInW.toString());
......
package theodolite.uc4.application.uc4specifics;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.StreamSerializer;
import java.io.IOException;
import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
/**
* StreamSerializer for Hazelcast Jet to serialize and deserialize an ImmutableSensorRegistry.
*/
public class ImmutableSensorRegistryUc4Serializer
implements StreamSerializer<ImmutableSensorRegistry> {
private static final int TYPE_ID = 3;
@Override
public int getTypeId() {
// TODO Auto-generated method stub
return TYPE_ID;
}
@Override
public void write(final ObjectDataOutput out, final ImmutableSensorRegistry object)
throws IOException {
final String sensorRegistryJson = object.toJson();
out.writeString(sensorRegistryJson);
}
@Override
public ImmutableSensorRegistry read(final ObjectDataInput in) throws IOException {
final String sensorRegistryJson = in.readString();
return (ImmutableSensorRegistry) ImmutableSensorRegistry.fromJson(sensorRegistryJson);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment