Skip to content
Snippets Groups Projects
Commit 75a3a3d3 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Add NewHistoryServices

parent c80df733
No related branches found
No related tags found
1 merge request!275Refactor hazelcast jet benchmarks:
Pipeline #8394 failed
package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer;
/**
* A microservice that aggregate incoming messages in a tumbling window.
*/
public class NewHistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/**
* Constructs the use case logic for UC2.
* Retrieves the needed values and instantiates a pipeline factory.
*/
public NewHistoryService() {
super(LOGGER);
final Properties kafkaProps =
this.propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
final Properties kafkaWriteProps =
this.propsBuilder.buildWriteProperties(
StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
// Transform minutes to milliseconds
final int downsampleInterval = Integer.parseInt(
config.getProperty(ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString());
final int downsampleIntervalMs = downsampleInterval * 60_000;
this.pipelineFactory = new Uc2PipelineFactory(
kafkaProps,
this.kafkaInputTopic,
kafkaWriteProps,
kafkaOutputTopic,
downsampleIntervalMs);
}
@Override
protected void registerSerializer() {
this.jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class);
}
public static void main(final String[] args) {
new NewHistoryService().run();
}
}
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer;
/**
* A microservice that aggregate incoming messages in a sliding window.
*/
public class NewHistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/**
* Constructs the use case logic for UC3.
* Retrieves the needed values and instantiates a pipeline factory.
*/
public NewHistoryService() {
super(LOGGER);
final Properties kafkaProps =
this.propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
final Properties kafkaWriteProps =
this.propsBuilder.buildWriteProperties(
StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final int windowSizeInSecondsNumber = Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString());
final int hoppingSizeInSecondsNumber = Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString());
this.pipelineFactory = new Uc3PipelineFactory(
kafkaProps,
kafkaInputTopic,
kafkaWriteProps,
kafkaOutputTopic,
windowSizeInSecondsNumber,
hoppingSizeInSecondsNumber);
}
@Override
protected void registerSerializer() {
this.jobConfig.registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class);
}
public static void main(final String[] args) {
new NewHistoryService().run();
}
}
package rocks.theodolite.benchmarks.uc4.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup;
import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer;
import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
/**
* A microservice that manages the history and, therefore, stores and aggregates incoming
* measurements.
*/
public class NewHistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/**
* Constructs the use case logic for UC4.
* Retrieves the needed values and instantiates a pipeline factory.
*/
public NewHistoryService() {
super(LOGGER);
final Properties kafkaProps =
this.propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
final Properties kafkaConfigReadProps =
propsBuilder.buildReadProperties(
EventDeserializer.class.getCanonicalName(),
StringDeserializer.class.getCanonicalName());
final Properties kafkaAggregationReadProps =
propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
final Properties kafkaWriteProps =
this.propsBuilder.buildWriteProperties(
StringSerializer.class.getCanonicalName(),
KafkaAvroSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final String kafkaConfigurationTopic =
config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString();
final String kafkaFeedbackTopic =
config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString();
final int windowSize = Integer.parseInt(
config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString());
this.pipelineFactory = new Uc4PipelineFactory(
kafkaProps,
kafkaConfigReadProps,
kafkaAggregationReadProps,
kafkaWriteProps,
kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic,
windowSize);
}
@Override
protected void registerSerializer() {
this.jobConfig.registerSerializer(ValueGroup.class, ValueGroupSerializer.class)
.registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class)
.registerSerializer(ImmutableSensorRegistry.class,
ImmutableSensorRegistryUc4Serializer.class);
}
public static void main(final String[] args) {
new NewHistoryService().run();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment