Skip to content
Snippets Groups Projects
Commit 0cfb8e4b authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Merge

parents 5421ed7f 67371596
No related branches found
No related tags found
1 merge request!6Add Distributed Workload Generator
Showing
with 22 additions and 373 deletions
package spesb.uc3.workloadgenerator;
package theodolite.uc3.workloadgenerator;
import common.dimensions.Duration;
import common.dimensions.KeySpace;
......@@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.kafkasender.KafkaRecordSender;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
......
......@@ -10,4 +10,4 @@ dependencies {
compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT')
}
mainClassName = "spesb.uc4.application.HistoryService"
mainClassName = "theodolite.uc4.application.HistoryService"
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
/**
* Composed key of a {@link DayOfWeek} and a sensor id.
*/
public class DayOfWeekKey {
private final DayOfWeek dayOfWeek;
private final String sensorId;
public DayOfWeekKey(final DayOfWeek dayOfWeek, final String sensorId) {
this.dayOfWeek = dayOfWeek;
this.sensorId = sensorId;
}
public DayOfWeek getDayOfWeek() {
return this.dayOfWeek;
}
public String getSensorId() {
return this.sensorId;
}
@Override
public String toString() {
return this.sensorId + ";" + this.dayOfWeek.toString();
}
}
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
import java.time.LocalDateTime;
/**
* {@link StatsKeyFactory} for {@link DayOfWeekKey}.
*/
public class DayOfWeekKeyFactory implements StatsKeyFactory<DayOfWeekKey> {
@Override
public DayOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) {
final DayOfWeek dayOfWeek = dateTime.getDayOfWeek();
return new DayOfWeekKey(dayOfWeek, sensorId);
}
@Override
public String getSensorId(final DayOfWeekKey key) {
return key.getSensorId();
}
}
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.simpleserdes.BufferSerde;
import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
/**
* {@link BufferSerde} for a {@link DayOfWeekKey}. Use the {@link #create()} method to create a new
* Kafka {@link Serde}.
*/
public class DayOfWeekKeySerde implements BufferSerde<DayOfWeekKey> {
@Override
public void serialize(final WriteBuffer buffer, final DayOfWeekKey data) {
buffer.putInt(data.getDayOfWeek().getValue());
buffer.putString(data.getSensorId());
}
@Override
public DayOfWeekKey deserialize(final ReadBuffer buffer) {
final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt());
final String sensorId = buffer.getString();
return new DayOfWeekKey(dayOfWeek, sensorId);
}
public static Serde<DayOfWeekKey> create() {
return SimpleSerdes.create(new DayOfWeekKeySerde());
}
}
package spesb.uc4.streamprocessing;
import com.google.common.math.Stats;
import org.apache.kafka.streams.kstream.Windowed;
import titan.ccp.model.records.DayOfWeekActivePowerRecord;
/**
* {@link StatsRecordFactory} to create an {@link DayOfWeekActivePowerRecord}.
*/
public class DayOfWeekRecordFactory
implements StatsRecordFactory<DayOfWeekKey, DayOfWeekActivePowerRecord> {
@Override
public DayOfWeekActivePowerRecord create(final Windowed<DayOfWeekKey> windowed,
final Stats stats) {
return new DayOfWeekActivePowerRecord(
windowed.key().getSensorId(),
windowed.key().getDayOfWeek().getValue(),
windowed.window().start(),
windowed.window().end(),
stats.count(),
stats.mean(),
stats.populationVariance(),
stats.min(),
stats.max());
}
}
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
/**
* Composed key of a {@link DayOfWeek}, an hour of the day and a sensor id.
*/
public class HourOfWeekKey {
private final DayOfWeek dayOfWeek;
private final int hourOfDay;
private final String sensorId;
/**
* Create a new {@link HourOfDayKey} using its components.
*/
public HourOfWeekKey(final DayOfWeek dayOfWeek, final int hourOfDay, final String sensorId) {
this.dayOfWeek = dayOfWeek;
this.hourOfDay = hourOfDay;
this.sensorId = sensorId;
}
public DayOfWeek getDayOfWeek() {
return this.dayOfWeek;
}
public int getHourOfDay() {
return this.hourOfDay;
}
public String getSensorId() {
return this.sensorId;
}
@Override
public String toString() {
return this.sensorId + ";" + this.dayOfWeek.toString() + ";" + this.hourOfDay;
}
}
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
import java.time.LocalDateTime;
/**
* {@link StatsKeyFactory} for {@link HourOfWeekKey}.
*/
public class HourOfWeekKeyFactory implements StatsKeyFactory<HourOfWeekKey> {
@Override
public HourOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) {
final DayOfWeek dayOfWeek = dateTime.getDayOfWeek();
final int hourOfDay = dateTime.getHour();
return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId);
}
@Override
public String getSensorId(final HourOfWeekKey key) {
return key.getSensorId();
}
}
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.simpleserdes.BufferSerde;
import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
/**
* {@link BufferSerde} for a {@link HourOfWeekKey}. Use the {@link #create()} method to create a new
* Kafka {@link Serde}.
*/
public class HourOfWeekKeySerde implements BufferSerde<HourOfWeekKey> {
@Override
public void serialize(final WriteBuffer buffer, final HourOfWeekKey data) {
buffer.putInt(data.getDayOfWeek().getValue());
buffer.putInt(data.getHourOfDay());
buffer.putString(data.getSensorId());
}
@Override
public HourOfWeekKey deserialize(final ReadBuffer buffer) {
final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt());
final int hourOfDay = buffer.getInt();
final String sensorId = buffer.getString();
return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId);
}
public static Serde<HourOfWeekKey> create() {
return SimpleSerdes.create(new HourOfWeekKeySerde());
}
}
package spesb.uc4.streamprocessing;
import com.google.common.math.Stats;
import org.apache.kafka.streams.kstream.Windowed;
import titan.ccp.model.records.HourOfWeekActivePowerRecord;
/**
* {@link StatsRecordFactory} to create an {@link HourOfWeekActivePowerRecord}.
*/
public class HourOfWeekRecordFactory
implements StatsRecordFactory<HourOfWeekKey, HourOfWeekActivePowerRecord> {
@Override
public HourOfWeekActivePowerRecord create(final Windowed<HourOfWeekKey> windowed,
final Stats stats) {
return new HourOfWeekActivePowerRecord(
windowed.key().getSensorId(),
windowed.key().getDayOfWeek().getValue(),
windowed.key().getHourOfDay(),
windowed.window().start(),
windowed.window().end(),
stats.count(),
stats.mean(),
stats.populationVariance(),
stats.min(),
stats.max());
}
}
package spesb.uc4.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import titan.ccp.common.kafka.streams.PropertiesBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class KafkaStreamsBuilder {
private static final String APPLICATION_NAME = "titan-ccp-history";
private static final String APPLICATION_VERSION = "0.0.1";
// private static final Logger LOGGER =
// LoggerFactory.getLogger(KafkaStreamsBuilder.class);
private String bootstrapServers; // NOPMD
private String inputTopic; // NOPMD
private String outputTopic; // NOPMD
private Duration aggregtionDuration; // NOPMD
private Duration aggregationAdvance; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
return this;
}
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
public KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic;
return this;
}
public KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) {
this.aggregtionDuration = aggregtionDuration;
return this;
}
public KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) {
this.aggregationAdvance = aggregationAdvance;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default.
*/
public KafkaStreamsBuilder numThreads(final int numThreads) {
if (numThreads < -1 || numThreads == 0) {
throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
}
this.numThreads = numThreads;
return this;
}
/**
* Sets the Kafka Streams property for the frequency with which to save the position (offsets in
* source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
if (commitIntervalMs < -1) {
throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
}
this.commitIntervalMs = commitIntervalMs;
return this;
}
/**
* Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
* across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
if (cacheMaxBytesBuffering < -1) {
throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
}
this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
return this;
}
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set.");
Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set.");
// TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic,
this.outputTopic,
this.aggregtionDuration,
this.aggregationAdvance);
final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers)
.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
// .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
.build();
return new KafkaStreams(topologyBuilder.build(), properties);
}
}
package spesb.uc4.application;
package theodolite.uc4.application;
/**
* Keys to access configuration parameters.
......
package spesb.uc4.application;
package theodolite.uc4.application;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import spesb.uc4.streamprocessing.KafkaStreamsBuilder;
import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations;
/**
......@@ -30,18 +30,24 @@ public class HistoryService {
*
*/
private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
// Use case specific stream configuration
final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder();
uc4KafkaStreamsBuilder
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.aggregtionDuration(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)))
.aggregationAdvance(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)))
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)));
// Configuration of the stream application
final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.build();
this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start();
}
......
package spesb.uc4.streamprocessing;
package theodolite.uc4.streamprocessing;
/**
* Composed key of an hour of the day and a sensor id.
......
package spesb.uc4.streamprocessing;
package theodolite.uc4.streamprocessing;
import java.time.LocalDateTime;
......
package spesb.uc4.streamprocessing;
package theodolite.uc4.streamprocessing;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.simpleserdes.BufferSerde;
......
package spesb.uc4.streamprocessing;
package theodolite.uc4.streamprocessing;
import com.google.common.math.Stats;
import org.apache.kafka.streams.kstream.Windowed;
......
package spesb.uc4.streamprocessing;
package theodolite.uc4.streamprocessing;
import java.util.Collection;
import java.util.List;
......
package spesb.uc4.streamprocessing;
package theodolite.uc4.streamprocessing;
import java.time.LocalDateTime;
......
package spesb.uc4.streamprocessing;
package theodolite.uc4.streamprocessing;
import com.google.common.math.Stats;
import org.apache.avro.specific.SpecificRecord;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment