Skip to content
Snippets Groups Projects
Commit 9a72780b authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Refactor uc3 hzj to use new implementation

parent d4e18ab6
No related branches found
No related tags found
1 merge request!275Refactor hazelcast jet benchmarks:
Pipeline #8510 failed
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer;
/**
* A microservice that manages the history and, therefore, stores and aggregates incoming
* measurements.
* A microservice that aggregate incoming messages in a sliding window.
*/
public class HistoryService {
public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
// Hazelcast settings (default)
private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
/**
* Constructs the use case logic for UC3.
* Retrieves the needed values and instantiates a pipeline factory.
*/
public HistoryService() {
super(LOGGER);
final Properties kafkaProps =
this.propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
// Kafka settings (default)
private static final String KAFKA_BOOTSTRAP_DEFAULT = "localhost:9092";
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_INPUT_TOPIC_DEFAULT = "input";
private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output";
final Properties kafkaWriteProps =
this.propsBuilder.buildWriteProperties(
StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName());
// UC3 specific (default)
private static final String WINDOW_SIZE_IN_SECONDS_DEFAULT = "30";
private static final String HOPSIZE_IN_SEC_DEFAULT = "1";
final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
// Job name (default)
private static final String JOB_NAME = "uc3-hazelcastjet";
final int windowSizeInDaysNumber = Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString());
/**
* Entrypoint for UC3 using Gradle Run.
*/
public static void main(final String[] args) {
final HistoryService uc3HistoryService = new HistoryService();
try {
uc3HistoryService.run();
} catch (final Exception e) { // NOPMD
LOGGER.error("ABORT MISSION!: {}", e);
}
}
final int hoppingSizeInDaysNumber = Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString());
/**
* Start a UC3 service.
*
* @throws Exception This Exception occurs if the Uc3HazelcastJetFactory is used in the wrong way.
* Detailed data is provided once an Exception occurs.
*/
public void run() throws Exception { // NOPMD
this.createHazelcastJetApplication();
this.pipelineFactory = new Uc3PipelineFactory(
kafkaProps,
kafkaInputTopic,
kafkaWriteProps,
kafkaOutputTopic,
windowSizeInDaysNumber,
hoppingSizeInDaysNumber);
}
/**
* Creates a Hazelcast Jet Application for UC3 using the Uc3HazelcastJetFactory.
*
* @throws Exception This Exception occurs if the Uc3HazelcastJetFactory is used in the wrong way.
* Detailed data is provided once an Exception occurs.
*/
private void createHazelcastJetApplication() throws Exception { // NOPMD
new Uc3HazelcastJetFactory()
.setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT, JOB_NAME)
.setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
.setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT)
.setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT)
.setWindowSizeInSecondsFromEnv(WINDOW_SIZE_IN_SECONDS_DEFAULT)
.setHoppingSizeInSecondsFromEnv(HOPSIZE_IN_SEC_DEFAULT)
.buildUc3Pipeline()
.buildUc3JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY)
.runUc3Job(JOB_NAME);
@Override
protected void registerSerializer() {
this.jobConfig.registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class);
}
public static void main(final String[] args) {
new HistoryService().run();
}
}
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer;
/**
* A microservice that aggregate incoming messages in a sliding window.
*/
public class NewHistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/**
* Constructs the use case logic for UC3.
* Retrieves the needed values and instantiates a pipeline factory.
*/
public NewHistoryService() {
super(LOGGER);
final Properties kafkaProps =
this.propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
final Properties kafkaWriteProps =
this.propsBuilder.buildWriteProperties(
StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final int windowSizeInDaysNumber = Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString());
final int hoppingSizeInDaysNumber = Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString());
this.pipelineFactory = new Uc3PipelineFactory(
kafkaProps,
kafkaInputTopic,
kafkaWriteProps,
kafkaOutputTopic,
windowSizeInDaysNumber,
hoppingSizeInDaysNumber);
}
@Override
protected void registerSerializer() {
this.jobConfig.registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class);
}
public static void main(final String[] args) {
new NewHistoryService().run();
}
}
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder;
import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer;
/**
* A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC3
* benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the
* Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build
* the Read and Write Properties, set the input and output topic, and set the window size in seconds
* and the hopping size in seconds. This can be done using internal functions of this factory.
* Outside data only refers to custom values or default values in case data of the environment
* cannot the fetched.
*/
public class Uc3HazelcastJetFactory { // NOPMD
// Information per History Service
private Properties kafkaReadPropsForPipeline;
private Properties kafkaWritePropsForPipeline;
private String kafkaInputTopic;
private String kafkaOutputTopic;
private JetInstance uc3JetInstance;
private Pipeline uc3JetPipeline;
// UC3 specific
private int windowSizeInSeconds;
private int hoppingSizeInSeconds;
/////////////////////////////////////
// Layer 1 - Hazelcast Jet Run Job //
/////////////////////////////////////
/**
* Needs a JetInstance and Pipeline defined in this factors. Adds the pipeline to the existing
* JetInstance as a job.
*
* @param jobName The name of the job.
* @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet.
*/
public void runUc3Job(final String jobName) throws IllegalStateException { // NOPMD
// Check if a Jet Instance for UC3 is set.
if (this.uc3JetInstance == null) {
throw new IllegalStateException("Jet Instance is not set! "
+ "Cannot start a hazelcast jet job for UC3.");
}
// Check if a Pipeline for UC3 is set.
if (this.uc3JetPipeline == null) {
throw new IllegalStateException(
"Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC3.");
}
// Adds the job name and joins a job to the JetInstance defined in this factory
final JobConfig jobConfig = new JobConfig()
.registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class)
.setName(jobName);
this.uc3JetInstance.newJobIfAbsent(this.uc3JetPipeline, jobConfig).join();
}
/////////////
// Layer 2 //
/////////////
/**
* Build a Hazelcast JetInstance used to run a job on.
*
* @param logger The logger specified for this JetInstance.
* @param bootstrapServerDefault Default bootstrap server in case no value can be derived from the
* environment.
* @param hzKubernetesServiceDnsKey The kubernetes service dns key.
* @return A Uc3HazelcastJetFactory containing a set JetInstance.
*/
public Uc3HazelcastJetFactory buildUc3JetInstanceFromEnv(final Logger logger,
final String bootstrapServerDefault,
final String hzKubernetesServiceDnsKey) {
this.uc3JetInstance = new JetInstanceBuilder()
.setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey)
.build();
return this;
}
/**
* Builds a Hazelcast Jet pipeline used for a JetInstance to run it as a job on. Needs the input
* topic and kafka properties defined in this factory beforehand.
*
* @return A Uc3HazelcastJetFactory containg a set pipeline.
* @throws Exception If the input topic or the kafka properties are not defined, the pipeline
* cannot be built.
*/
public Uc3HazelcastJetFactory buildUc3Pipeline() throws IllegalStateException { // NOPMD
final String defaultPipelineWarning = "Cannot build pipeline."; // NOPMD
// Check if Properties for the Kafka Input are set.
if (this.kafkaReadPropsForPipeline == null) {
throw new IllegalStateException("Kafka Read Properties for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if Properties for the Kafka Output are set.
if (this.kafkaWritePropsForPipeline == null) {
throw new IllegalStateException("Kafka Write Properties for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if the Kafka input topic is set.
if (this.kafkaInputTopic == null) {
throw new IllegalStateException("Kafka input topic for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if the Kafka output topic is set.
if (this.kafkaOutputTopic == null) {
throw new IllegalStateException("kafka output topic for pipeline not set! "
+ defaultPipelineWarning);
}
// Check if the window size for the "sliding" window is set.
if (this.windowSizeInSeconds <= 0) {
throw new IllegalStateException(
"window size in seconds for pipeline not set or not greater than 0! "
+ defaultPipelineWarning);
}
// Check if the hopping distance for the "sliding" window is set.
if (this.hoppingSizeInSeconds <= 0) {
throw new IllegalStateException(
"hopping size in seconds for pipeline not set or not greater than 0! "
+ defaultPipelineWarning);
}
// Build Pipeline Using the pipelineBuilder
final Uc3PipelineBuilder pipeBuilder = new Uc3PipelineBuilder();
this.uc3JetPipeline =
pipeBuilder.build(this.kafkaReadPropsForPipeline,
this.kafkaWritePropsForPipeline,
this.kafkaInputTopic, this.kafkaOutputTopic, this.hoppingSizeInSeconds,
this.windowSizeInSeconds);
// Return Uc3HazelcastJetBuilder factory
return this;
}
/////////////
// Layer 3 //
/////////////
/**
* Sets kafka read properties for pipeline used in this builder.
*
* @param kafkaReadProperties A propeties object containing necessary values used for the hazelcst
* jet kafka connection to read data.
* @return The Uc3HazelcastJetBuilder factory with set kafkaReadPropsForPipeline.
*/
public Uc3HazelcastJetFactory setCustomReadProperties(// NOPMD
final Properties kafkaReadProperties) {
this.kafkaReadPropsForPipeline = kafkaReadProperties;
return this;
}
/**
* Sets kafka write properties for pipeline used in this builder.
*
* @param kafkaWriteProperties A propeties object containing necessary values used for the
* hazelcst jet kafka connection to write data.
* @return The Uc3HazelcastJetBuilder factory with set kafkaWritePropsForPipeline.
*/
public Uc3HazelcastJetFactory setCustomWriteProperties(// NOPMD
final Properties kafkaWriteProperties) {
this.kafkaWritePropsForPipeline = kafkaWriteProperties;
return this;
}
/**
* Sets kafka read properties for pipeline used in this builder using environment variables.
*
* @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server
* can be fetched from the environment.
* @param schemaRegistryUrlDefault Default schema registry url in the case that no schema registry
* url can be fetched from the environment.
* @return The Uc3HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline.
*/
public Uc3HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD
final String bootstrapServersDefault,
final String schemaRegistryUrlDefault,
final String jobName) {
// Use KafkaPropertiesBuilder to build a properties object used for kafka
final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
final Properties kafkaReadProps =
propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault,
jobName,
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
this.kafkaReadPropsForPipeline = kafkaReadProps;
return this;
}
/**
* Sets kafka write properties for pipeline used in this builder using environment variables.
*
* @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server
* can be fetched from the environment.
* @return The Uc3HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline.
*/
public Uc3HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD
final String bootstrapServersDefault, final String schemaRegistryUrlDefault) {
// Use KafkaPropertiesBuilder to build a properties object used for kafka
final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
final Properties kafkaWriteProps =
propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault,
StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName());
this.kafkaWritePropsForPipeline = kafkaWriteProps;
return this;
}
/**
* Sets the kafka input topic for the pipeline used in this builder.
*
* @param inputTopic The kafka topic used as the pipeline input.
* @return A Uc3HazelcastJetBuilder factory with a set kafkaInputTopic.
*/
public Uc3HazelcastJetFactory setCustomKafkaInputTopic(// NOPMD
final String inputTopic) {
this.kafkaInputTopic = inputTopic;
return this;
}
/**
* Sets the kafka input output for the pipeline used in this builder.
*
* @param outputTopic The kafka topic used as the pipeline output.
* @return A Uc3HazelcastJetBuilder factory with a set kafkaOutputTopic.
*/
public Uc3HazelcastJetFactory setCustomKafkaOutputTopic(final String outputTopic) { // NOPMD
this.kafkaOutputTopic = outputTopic;
return this;
}
/**
* Sets the kafka input topic for the pipeline used in this builder using environment variables.
*
* @param defaultInputTopic The default kafka input topic used if no topic is specified by the
* environment.
* @return A Uc3HazelcastJetBuilder factory with a set kafkaInputTopic.
*/
public Uc3HazelcastJetFactory setKafkaInputTopicFromEnv(// NOPMD
final String defaultInputTopic) {
this.kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
defaultInputTopic);
return this;
}
/**
* Sets the kafka output topic for the pipeline used in this builder using environment variables.
*
* @param defaultOutputTopic The default kafka output topic used if no topic is specified by the
* environment.
* @return A Uc3HazelcastJetBuilder factory with a set kafkaOutputTopic.
*/
public Uc3HazelcastJetFactory setKafkaOutputTopicFromEnv(// NOPMD
final String defaultOutputTopic) {
this.kafkaOutputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC),
defaultOutputTopic);
return this;
}
/**
* Sets the window size in seconds for the pipeline used in this builder.
*
* @param windowSizeInSeconds the windowSizeInSeconds to be used for this pipeline.
* @return A Uc3HazelcastJetFactory with a set windowSizeInSeconds.
*/
public Uc3HazelcastJetFactory setCustomWindowSizeInSeconds(// NOPMD
final int windowSizeInSeconds) {
this.windowSizeInSeconds = windowSizeInSeconds;
return this;
}
/**
* Sets the window size in seconds for the pipeline used in this builder from the environment.
*
* @param defaultWindowSizeInSeconds the default window size in seconds to be used for this
* pipeline when none is set in the environment.
* @return A Uc3HazelcastJetFactory with a set windowSizeInSeconds.
*/
public Uc3HazelcastJetFactory setWindowSizeInSecondsFromEnv(// NOPMD
final String defaultWindowSizeInSeconds) {
final String windowSizeInSeconds = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.AGGREGATION_DURATION_DAYS),
defaultWindowSizeInSeconds);
final int windowSizeInSecondsNumber = Integer.parseInt(windowSizeInSeconds);
this.windowSizeInSeconds = windowSizeInSecondsNumber;
return this;
}
/**
* Sets the hopping size in seconds for the pipeline used in this builder.
*
* @param hoppingSizeInSeconds the hoppingSizeInSeconds to be used for this pipeline.
* @return A Uc3HazelcastJetFactory with a set hoppingSizeInSeconds.
*/
public Uc3HazelcastJetFactory setCustomHoppingSizeInSeconds(// NOPMD
final int hoppingSizeInSeconds) {
this.hoppingSizeInSeconds = hoppingSizeInSeconds;
return this;
}
/**
* Sets the hopping size in seconds for the pipeline used in this builder from the environment.
*
* @param defaultHoppingSizeInSeconds the default hopping size in seconds to be used for this
* pipeline when none is set in the environment.
* @return A Uc3HazelcastJetFactory with a set hoppingSizeInSeconds.
*/
public Uc3HazelcastJetFactory setHoppingSizeInSecondsFromEnv(// NOPMD
final String defaultHoppingSizeInSeconds) {
final String hoppingSizeInSeconds = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS),
defaultHoppingSizeInSeconds);
final int hoppingSizeInSecondsNumber = Integer.parseInt(hoppingSizeInSeconds);
this.hoppingSizeInSeconds = hoppingSizeInSecondsNumber;
return this;
}
}
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HoursOfDayKeyFactory;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Builder to build a HazelcastJet Pipeline for UC3 which can be used for stream processing using
* Hazelcast Jet.
*/
public class Uc3PipelineBuilder {
/**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet.
*
* @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
* attributes.
* @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
* attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline.
* @param kafkaOutputTopic The name of the output topic used for the pipeline.
* @param hoppingSizeInSeconds The hop length of the sliding window used in the aggregation of
* this pipeline.
* @param windowSizeInSeconds The window length of the sliding window used in the aggregation of
* this pipeline.
* @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data
* for UC3.
*/
public Pipeline build(final Properties kafkaReadPropsForPipeline,
final Properties kafkaWritePropsForPipeline, final String kafkaInputTopic,
final String kafkaOutputTopic,
final int hoppingSizeInSeconds, final int windowSizeInSeconds) {
// Define a new Pipeline
final Pipeline pipe = Pipeline.create();
// Define the source
final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource = KafkaSources
.<String, ActivePowerRecord>kafka(
kafkaReadPropsForPipeline, kafkaInputTopic);
// Extend topology for UC3
final StreamStage<Map.Entry<String, String>> uc3Product =
this.extendUc3Topology(pipe, kafkaSource, hoppingSizeInSeconds, windowSizeInSeconds);
// Add Sink1: Logger
uc3Product.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark
uc3Product.writeTo(KafkaSinks.<String, String>kafka(
kafkaWritePropsForPipeline, kafkaOutputTopic));
return pipe;
}
/**
* Extends to a blank Hazelcast Jet Pipeline the UC3 topology defined by theodolite.
*
* <p>
* UC3 takes {@code ActivePowerRecord} object, groups them by keys and calculates average double
* values for a sliding window and sorts them into the hour of the day.
* </p>
*
* @param pipe The blank hazelcast jet pipeline to extend the logic to.
* @param source A streaming source to fetch data from.
* @param hoppingSizeInSeconds The jump distance of the "sliding" window.
* @param windowSizeInSeconds The size of the "sliding" window.
* @return A {@code StreamStage<Map.Entry<String,String>>} with the above definition of the key
* and value of the Entry object. It can be used to be further modified or directly be
* written into a sink.
*/
public StreamStage<Map.Entry<String, String>> extendUc3Topology(final Pipeline pipe,
final StreamSource<Entry<String, ActivePowerRecord>> source, final int hoppingSizeInSeconds,
final int windowSizeInSeconds) {
// Build the pipeline topology.
return pipe
.readFrom(source)
// use Timestamps
.withNativeTimestamps(0)
.setLocalParallelism(1)
// Map timestamp to hour of day and create new key using sensorID and
// datetime mapped to HourOfDay
.map(record -> {
final String sensorId = record.getValue().getIdentifier();
final long timestamp = record.getValue().getTimestamp();
final LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
TimeZone.getDefault().toZoneId());
final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
final HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime);
return Map.entry(newKey, record.getValue());
})
// group by new keys
.groupingKey(Entry::getKey)
// Sliding/Hopping Window
.window(WindowDefinition.sliding(TimeUnit.DAYS.toMillis(windowSizeInSeconds),
TimeUnit.DAYS.toMillis(hoppingSizeInSeconds)))
// get average value of group (sensoreId,hourOfDay)
.aggregate(
AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
// map to return pair (sensorID,hourOfDay) -> (averaged what value)
.map(agg -> {
final String theValue = agg.getValue().toString();
final String theKey = agg.getKey().toString();
return Map.entry(theKey, theValue);
});
}
}
......@@ -16,6 +16,7 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.Map.Entry;
import java.util.concurrent.CompletionException;
......@@ -24,7 +25,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.Uc3PipelineBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer;
import titan.ccp.model.records.ActivePowerRecord;
......@@ -35,6 +37,9 @@ import titan.ccp.model.records.ActivePowerRecord;
@Category(SerialTest.class)
public class Uc3PipelineTest extends JetTestSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(Uc3PipelineTest.class);
// Test Machinery
private JetInstance testInstance = null;
private Pipeline testPipeline = null;
......@@ -49,13 +54,13 @@ public class Uc3PipelineTest extends JetTestSupport {
public void buildUc3Pipeline() {
// Setup Configuration
int testItemsPerSecond = 1;
String testSensorName = "TEST-SENSOR";
Double testValueInW = 10.0;
int testHopSizeInSec = 1;
int testWindowSizeInSec = 50;
final int testItemsPerSecond = 1;
final String testSensorName = "TEST-SENSOR";
final Double testValueInW = 10.0;
final int testHopSizeInSec = 1;
final int testWindowSizeInSec = 50;
// Used to check hourOfDay
long mockTimestamp = 1632741651;
final long mockTimestamp = 1632741651;
// Create mock jet instance with configuration
......@@ -75,10 +80,13 @@ public class Uc3PipelineTest extends JetTestSupport {
});
// Create pipeline to test
Uc3PipelineBuilder pipelineBuilder = new Uc3PipelineBuilder();
this.testPipeline = Pipeline.create();
this.uc3Topology = pipelineBuilder.extendUc3Topology(testPipeline, testSource,
testHopSizeInSec, testWindowSizeInSec);
final Properties properties = new Properties();
final Uc3PipelineFactory factory = new Uc3PipelineFactory(
properties,"", properties,"", testWindowSizeInSec, testHopSizeInSec);
this.uc3Topology = factory.extendUc3Topology(testSource);
testPipeline = factory.getPipe();
}
/**
......@@ -88,44 +96,43 @@ public class Uc3PipelineTest extends JetTestSupport {
public void testOutput() {
// Assertion Configuration
int timeout = 10;
String testSensorName = "TEST-SENSOR";
Double testValueInW = 10.0;
final int timeout = 10;
final String testSensorName = "TEST-SENSOR";
final Double testValueInW = 10.0;
// Used to check hourOfDay
long mockTimestamp = 1632741651;
final long mockTimestamp = 1632741651;
// Assertion
this.uc3Topology.apply(Assertions.assertCollectedEventually(timeout,
collection -> {
// DEBUG
System.out.println("DEBUG: CHECK 1 || Entered Assertion of testOutput()");
LOGGER.info("CHECK 1 || Entered Assertion of testOutput()");
// Check all collected Items
boolean allOkay = true;
if (collection != null) {
System.out.println("DEBUG: CHECK 2 || Collection Size: " + collection.size());
for (int i = 0; i < collection.size(); i++) {
LOGGER.info("CHECK 2 || Collection Size: " + collection.size());
for (final Entry<String, String> entry : collection) {
// Build hour of day
long timestamp = mockTimestamp;
int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
final int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
TimeZone.getDefault().toZoneId()).getHour();
// Compare expected output with generated output
Entry<String, String> currentEntry = collection.get(i);
String expectedKey = testSensorName + ";" + expectedHour;
String expectedValue = testValueInW.toString();
final String expectedKey = testSensorName + ";" + expectedHour;
final String expectedValue = testValueInW.toString();
// DEBUG
System.out.println(
"DEBUG: CHECK 3 || Expected Output: '" + expectedKey + "=" + expectedValue
+ "' - Actual Output: '" + currentEntry.getKey() + "="
+ currentEntry.getValue().toString() + "'");
if (!(currentEntry.getKey().equals(expectedKey)
&& currentEntry.getValue().toString().equals(expectedValue))) {
System.out.println("DEBUG: CHECK 5 || Failed assertion!");
LOGGER.info(
"CHECK 3 || Expected Output: '" + expectedKey + "=" + expectedValue
+ "' - Actual Output: '" + entry.getKey() + "="
+ entry.getValue() + "'");
if (!(entry.getKey().equals(expectedKey)
&& entry.getValue().equals(expectedValue))) {
LOGGER.info("CHECK 5 || Failed assertion!");
allOkay = false;
}
}
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment