Skip to content
Snippets Groups Projects
Commit b206d512 authored by Sören Henning's avatar Sören Henning
Browse files

Migrate UC3 (fka UC4) for Flink

parent fbf0fc2c
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2189 failed
Showing
with 669 additions and 1 deletion
......@@ -14,7 +14,7 @@ include 'uc2-application-flink' // TODO Rename to uc2-flink
include 'uc3-workload-generator' // TODO Rename to uc3-load-generator
include 'uc3-application' // TODO Rename to uc1-kstreams
//include 'uc3-application-flink' // TODO Rename to uc3-flink
include 'uc3-application-flink' // TODO Rename to uc3-flink
include 'uc4-workload-generator' // TODO Rename to uc4-load-generator
include 'uc4-application' // TODO Rename to uc4-kstreams
......
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
configFilePath=../config/checkstyle.xml
customModulesJarPaths=
eclipse.preferences.version=1
enabled=true
customRulesJars=
eclipse.preferences.version=1
enabled=true
ruleSetFilePath=../config/pmd.xml
FROM nicobiernat/flink:1.11-scala_2.12-java_11
ADD build/libs/uc3-application-all.jar /opt/flink/usrlib/artifacts/uc3-application-all.jar
\ No newline at end of file
allprojects {
repositories {
maven {
url 'https://packages.confluent.io/maven/'
}
}
}
dependencies {
compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT')
}
mainClassName = "theodolite.uc3.application.HistoryServiceFlinkJob"
package theodolite.uc3.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String TIME_ZONE = "time.zone";
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = "flink.state.backend.memory.size";
public static final String CHECKPOINTING = "checkpointing";
private ConfigurationKeys() {}
}
package theodolite.uc3.application;
import com.google.common.math.Stats;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde;
import theodolite.commons.flink.serialization.StatsSerializer;
import theodolite.uc3.application.util.HourOfDayKey;
import theodolite.uc3.application.util.HourOfDayKeyFactory;
import theodolite.uc3.application.util.HourOfDayKeySerde;
import theodolite.uc3.application.util.StatsKeyFactory;
import titan.ccp.common.configuration.Configurations;
import titan.ccp.models.records.ActivePowerRecord;
import titan.ccp.models.records.ActivePowerRecordFactory;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Properties;
/**
* The History Microservice Flink Job.
*/
public class HistoryServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = Configurations.create();
private void run() {
// Configurations
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
final String applicationId = applicationName + "-" + applicationVersion;
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String timeZoneString = this.config.getString(ConfigurationKeys.TIME_ZONE);
final ZoneId timeZone = ZoneId.of(timeZoneString);
final Time aggregationDuration = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Time aggregationAdvance = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
final int memoryStateBackendSize = this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
final boolean checkpointing= this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
kafkaProps.setProperty("group.id", applicationId);
// Sources and Sinks with Serializer and Deserializer
final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> sourceSerde =
new FlinkMonitoringRecordSerde<>(
inputTopic,
ActivePowerRecord.class,
ActivePowerRecordFactory.class);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>(
inputTopic, sourceSerde, kafkaProps);
kafkaSource.setStartFromGroupOffsets();
if (checkpointing)
kafkaSource.setCommitOffsetsOnCheckpoints(true);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
final FlinkKafkaKeyValueSerde<String, String> sinkSerde =
new FlinkKafkaKeyValueSerde<>(outputTopic,
Serdes::String,
Serdes::String,
TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})
);
final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
kafkaSink.setWriteTimestampToKafka(true);
// Execution environment configuration
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
if (checkpointing)
env.enableCheckpointing(commitIntervalMs);
// State Backend
if (stateBackend.equals("filesystem")) {
env.setStateBackend(new FsStateBackend(stateBackendPath));
} else if (stateBackend.equals("rocksdb")) {
try {
env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
} catch (IOException e) {
e.printStackTrace();
}
} else {
env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
}
// Kryo serializer registration
env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde());
env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class,
new FlinkMonitoringRecordSerde<>(
inputTopic,
ActivePowerRecord.class,
ActivePowerRecordFactory.class));
env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) ->
LOGGER.info("Class " + c.getName() + " registered with serializer "
+ s.getSerializer().getClass().getName()));
// Streaming topology
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource)
.name("[Kafka Consumer] Topic: " + inputTopic);
stream
.rebalance()
.keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> {
final Instant instant = Instant.ofEpochMilli(record.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, timeZone);
return keyFactory.createKey(record.getIdentifier(), dateTime);
})
.window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
.aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
.map(new MapFunction<Tuple2<HourOfDayKey, Stats>, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(Tuple2<HourOfDayKey, Stats> tuple) {
final String newKey = keyFactory.getSensorId(tuple.f0);
final String newValue = tuple.f1.toString();
final int hourOfDay = tuple.f0.getHourOfDay();
LOGGER.info(newKey + "|" + hourOfDay + ": " + newValue);
return new Tuple2<>(newKey, newValue);
}
}).name("map")
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
// Execution plan
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Execution Plan: " + env.getExecutionPlan());
}
// Execute Job
try {
env.execute(applicationId);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(final String[] args) {
new HistoryServiceFlinkJob().run();
}
}
package theodolite.uc3.application;
import com.google.common.math.Stats;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import theodolite.uc3.application.util.HourOfDayKey;
public class HourOfDayProcessWindowFunction extends ProcessWindowFunction<Stats, Tuple2<HourOfDayKey, Stats>, HourOfDayKey, TimeWindow> {
@Override
public void process(final HourOfDayKey hourOfDayKey,
final Context context,
final Iterable<Stats> elements,
final Collector<Tuple2<HourOfDayKey, Stats>> out) {
final Stats stats = elements.iterator().next();
out.collect(new Tuple2<>(hourOfDayKey, stats));
}
}
\ No newline at end of file
package theodolite.uc3.application;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import org.apache.flink.api.common.functions.AggregateFunction;
import theodolite.uc3.application.util.StatsFactory;
import titan.ccp.models.records.ActivePowerRecord;
/**
* Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}.
*/
@SuppressWarnings("UnstableApiUsage")
public class StatsAggregateFunction implements AggregateFunction<ActivePowerRecord, Stats, Stats> {
private static final long serialVersionUID = -8873572990921515499L;
@Override
public Stats createAccumulator() {
return Stats.of();
}
@Override
public Stats add(final ActivePowerRecord value, final Stats accumulator) {
return StatsFactory.accumulate(accumulator, value.getValueInW());
}
@Override
public Stats getResult(final Stats accumulator) {
return accumulator;
}
@Override
public Stats merge(final Stats a, final Stats b) {
final StatsAccumulator statsAccumulator = new StatsAccumulator();
statsAccumulator.addAll(a);
statsAccumulator.addAll(b);
return statsAccumulator.snapshot();
}
}
\ No newline at end of file
package theodolite.uc3.application.util;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;
/**
* Composed key of an hour of the day and a sensor id.
*/
public class HourOfDayKey {
private final int hourOfDay;
private final String sensorId;
public HourOfDayKey(final int hourOfDay, final String sensorId) {
this.hourOfDay = hourOfDay;
this.sensorId = sensorId;
}
public int getHourOfDay() {
return this.hourOfDay;
}
public String getSensorId() {
return this.sensorId;
}
@Override
public String toString() {
return this.sensorId + ";" + this.hourOfDay;
}
@Override
public int hashCode() {
return Objects.hash(hourOfDay, sensorId);
}
@Override
public boolean equals(Object obj) {
if (obj == this) return true;
if (!(obj instanceof HourOfDayKey)) return false;
final HourOfDayKey k = (HourOfDayKey) obj;
return hourOfDay == k.hourOfDay && sensorId.equals(k.sensorId);
}
public byte[] toByteArray() {
final int numBytes = (2 * Integer.SIZE + this.sensorId.length() * Character.SIZE) / Byte.SIZE;
final ByteBuffer buffer = ByteBuffer.allocate(numBytes).order(ByteOrder.LITTLE_ENDIAN);
buffer.putInt(this.hourOfDay);
buffer.putInt(this.sensorId.length());
for (final char c : this.sensorId.toCharArray()) {
buffer.putChar(c);
}
return buffer.array();
}
public static HourOfDayKey fromByteArray(final byte[] bytes) {
final ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
final int hourOfDay = buffer.getInt();
final int strLen = buffer.getInt();
final char[] sensorId = new char[strLen];
for (int i = 0; i < strLen; i++) {
sensorId[i] = buffer.getChar();
}
return new HourOfDayKey(hourOfDay, new String(sensorId));
}
}
package theodolite.uc3.application.util;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* {@link StatsKeyFactory} for {@link HourOfDayKey}.
*/
public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey>, Serializable {
@Override
public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) {
final int hourOfDay = dateTime.getHour();
return new HourOfDayKey(hourOfDay, sensorId);
}
@Override
public String getSensorId(final HourOfDayKey key) {
return key.getSensorId();
}
}
package theodolite.uc3.application.util;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.simpleserdes.BufferSerde;
import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
import java.io.Serializable;
/**
* {@link BufferSerde} for a {@link HourOfDayKey}. Use the {@link #create()} method to create a new
* Kafka {@link Serde}.
*/
public class HourOfDayKeySerde extends Serializer<HourOfDayKey> implements BufferSerde<HourOfDayKey>, Serializable {
@Override
public void serialize(final WriteBuffer buffer, final HourOfDayKey data) {
buffer.putInt(data.getHourOfDay());
buffer.putString(data.getSensorId());
}
@Override
public HourOfDayKey deserialize(final ReadBuffer buffer) {
final int hourOfDay = buffer.getInt();
final String sensorId = buffer.getString();
return new HourOfDayKey(hourOfDay, sensorId);
}
public static Serde<HourOfDayKey> create() {
return SimpleSerdes.create(new HourOfDayKeySerde());
}
@Override
public void write(Kryo kryo, Output output, HourOfDayKey object) {
byte[] data = object.toByteArray();
output.writeInt(data.length);
output.writeBytes(data);
}
@Override
public HourOfDayKey read(Kryo kryo, Input input, Class<HourOfDayKey> type) {
final int numBytes = input.readInt();
return HourOfDayKey.fromByteArray(input.readBytes(numBytes));
}
}
package theodolite.uc3.application.util;
import com.google.common.math.Stats;
import org.apache.kafka.streams.kstream.Windowed;
import titan.ccp.model.records.HourOfDayActivePowerRecord;
/**
* {@link StatsRecordFactory} to create an {@link HourOfDayActivePowerRecord}.
*/
public class HourOfDayRecordFactory
implements StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> {
@Override
public HourOfDayActivePowerRecord create(final Windowed<HourOfDayKey> windowed,
final Stats stats) {
return new HourOfDayActivePowerRecord(
windowed.key().getSensorId(),
windowed.key().getHourOfDay(),
windowed.window().start(),
windowed.window().end(),
stats.count(),
stats.mean(),
stats.populationVariance(),
stats.min(),
stats.max());
}
}
package theodolite.uc3.application.util;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
/**
* Factory methods for working with {@link Stats}.
*/
public final class StatsFactory {
private StatsFactory() {}
/**
* Add a value to a {@link Stats} object.
*/
public static Stats accumulate(final Stats stats, final double value) {
final StatsAccumulator statsAccumulator = new StatsAccumulator();
statsAccumulator.addAll(stats);
statsAccumulator.add(value);
return statsAccumulator.snapshot();
}
}
package theodolite.uc3.application.util;
import java.time.LocalDateTime;
/**
* Factory interface for creating a stats key from a sensor id and a {@link LocalDateTime} object
* and vice versa.
*
* @param <T> Type of the key
*/
public interface StatsKeyFactory<T> {
T createKey(String sensorId, LocalDateTime dateTime);
String getSensorId(T key);
}
package theodolite.uc3.application.util;
import com.google.common.math.Stats;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
/**
* Factory interface for creating a stats Avro record from a {@link Windowed} and a {@link Stats}.
* The {@link Windowed} contains about information about the start end end of the {@link Window} as
* well as the sensor id and the aggregated time unit. The {@link Stats} objects contains the actual
* aggregation results.
*
* @param <K> Key type of the {@link Windowed}
* @param <R> Avro record type
*/
@FunctionalInterface
public interface StatsRecordFactory<K, R extends SpecificRecord> {
R create(Windowed<K> windowed, Stats stats);
}
application.name=theodolite-uc3-application
application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
aggregation.duration.days=30
aggregation.advance.days=1
num.threads=1
commit.interval.ms=100
cache.max.bytes.buffering=-1
time.zone=Europe/Paris
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment