Skip to content
Snippets Groups Projects
Commit f5ad7ab0 authored by Sören Henning's avatar Sören Henning
Browse files

Add UC2 (fka UC3)


Co-authored-by: default avatarNico Biernat <stu209212@mail.uni-kiel.de>
parent c7ea98d1
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Showing
with 425 additions and 1 deletion
......@@ -10,7 +10,7 @@ include 'uc1-application-flink' // TODO Rename to uc1-flink
include 'uc2-workload-generator' // TODO Rename to uc2-load-generator
include 'uc2-application' // TODO Rename to uc1-kstreams
//include 'uc2-application-flink' // TODO Rename to uc2-flink
include 'uc2-application-flink' // TODO Rename to uc2-flink
include 'uc3-workload-generator' // TODO Rename to uc3-load-generator
include 'uc3-application' // TODO Rename to uc1-kstreams
......
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
configFilePath=../config/checkstyle.xml
customModulesJarPaths=
eclipse.preferences.version=1
enabled=true
customRulesJars=
eclipse.preferences.version=1
enabled=true
ruleSetFilePath=../config/pmd.xml
FROM nicobiernat/flink:1.11-scala_2.12-java_11
ADD build/libs/uc2-application-all.jar /opt/flink/usrlib/artifacts/uc2-application-all.jar
\ No newline at end of file
allprojects {
repositories {
maven {
url 'https://packages.confluent.io/maven/'
}
}
}
dependencies {
compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT')
}
mainClassName = "theodolite.uc2.application.HistoryServiceFlinkJob"
package theodolite.uc2.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = "flink.state.backend.memory.size";
public static final String CHECKPOINTING = "checkpointing";
private ConfigurationKeys() {
}
}
package theodolite.uc2.application;
import com.google.common.math.Stats;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.flink.serialization.FlinkKafkaKeyValueSerde;
import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde;
import theodolite.commons.flink.serialization.StatsSerializer;
import titan.ccp.common.configuration.Configurations;
import titan.ccp.models.records.ActivePowerRecord;
import titan.ccp.models.records.ActivePowerRecordFactory;
import java.io.IOException;
import java.util.Properties;
/**
* The History Microservice Flink Job.
*/
public class HistoryServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = Configurations.create();
private void run() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
final String applicationId = applicationName + "-" + applicationVersion;
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
final String stateBackend = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND, "").toLowerCase();
final String stateBackendPath = this.config.getString(ConfigurationKeys.FLINK_STATE_BACKEND_PATH, "/opt/flink/statebackend");
final int memoryStateBackendSize = this.config.getInt(ConfigurationKeys.FLINK_STATE_BACKEND_MEMORY_SIZE, MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
final boolean checkpointing= this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
kafkaProps.setProperty("group.id", applicationId);
final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> sourceSerde =
new FlinkMonitoringRecordSerde<>(
inputTopic,
ActivePowerRecord.class,
ActivePowerRecordFactory.class);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource = new FlinkKafkaConsumer<>(
inputTopic, sourceSerde, kafkaProps);
kafkaSource.setStartFromGroupOffsets();
if (checkpointing)
kafkaSource.setCommitOffsetsOnCheckpoints(true);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
final FlinkKafkaKeyValueSerde<String, String> sinkSerde =
new FlinkKafkaKeyValueSerde<>(outputTopic,
Serdes::String,
Serdes::String,
TypeInformation.of(new TypeHint<Tuple2<String, String>>(){})
);
kafkaProps.setProperty("transaction.timeout.ms", ""+5*60*1000);
final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink = new FlinkKafkaProducer<>(
outputTopic, sinkSerde, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
kafkaSink.setWriteTimestampToKafka(true);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
if (checkpointing)
env.enableCheckpointing(commitIntervalMs);
// State Backend
if (stateBackend.equals("filesystem")) {
env.setStateBackend(new FsStateBackend(stateBackendPath));
} else if (stateBackend.equals("rocksdb")) {
try {
env.setStateBackend(new RocksDBStateBackend(stateBackendPath, true));
} catch (IOException e) {
e.printStackTrace();
}
} else {
env.setStateBackend(new MemoryStateBackend(memoryStateBackendSize));
}
env.getConfig().registerTypeWithKryoSerializer(ActivePowerRecord.class,
new FlinkMonitoringRecordSerde<>(
inputTopic,
ActivePowerRecord.class,
ActivePowerRecordFactory.class));
env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
env.getConfig().getRegisteredTypesWithKryoSerializers().forEach((c, s) ->
LOGGER.info("Class " + c.getName() + " registered with serializer "
+ s.getSerializer().getClass().getName()));
final DataStream<ActivePowerRecord> stream = env.addSource(kafkaSource)
.name("[Kafka Consumer] Topic: " + inputTopic);
stream
.rebalance()
.keyBy((KeySelector<ActivePowerRecord, String>) ActivePowerRecord::getIdentifier)
.window(TumblingEventTimeWindows.of(Time.minutes(windowDuration)))
.aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction())
.map(new MapFunction<Tuple2<String, Stats>, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(Tuple2<String, Stats> t) {
final String key = t.f0;
final String value = t.f1.toString();
LOGGER.info(key + ": " + value);
return new Tuple2<>(key, value);
}
}).name("map")
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Execution Plan: " + env.getExecutionPlan());
}
try {
env.execute(applicationId);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(final String[] args) {
new HistoryServiceFlinkJob().run();
}
}
package theodolite.uc2.application;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import org.apache.flink.api.common.functions.AggregateFunction;
import theodolite.uc2.application.util.StatsFactory;
import titan.ccp.models.records.ActivePowerRecord;
/**
* Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}.
*/
@SuppressWarnings("UnstableApiUsage")
public class StatsAggregateFunction implements AggregateFunction<ActivePowerRecord, Stats, Stats> {
private static final long serialVersionUID = -8873572990921515499L;
@Override
public Stats createAccumulator() {
return Stats.of();
}
@Override
public Stats add(final ActivePowerRecord value, final Stats accumulator) {
return StatsFactory.accumulate(accumulator, value.getValueInW());
}
@Override
public Stats getResult(final Stats accumulator) {
return accumulator;
}
@Override
public Stats merge(final Stats a, final Stats b) {
final StatsAccumulator statsAccumulator = new StatsAccumulator();
statsAccumulator.addAll(a);
statsAccumulator.addAll(b);
return statsAccumulator.snapshot();
}
}
package theodolite.uc2.application;
import com.google.common.math.Stats;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class StatsProcessWindowFunction extends ProcessWindowFunction<Stats, Tuple2<String, Stats>, String, TimeWindow> {
private static final long serialVersionUID = 4363099880614593379L;
@Override
public void process(String key, Context context, Iterable<Stats> elements, Collector<Tuple2<String, Stats>> out) {
final Stats stats = elements.iterator().next();
out.collect(new Tuple2<>(key, stats));
}
}
package theodolite.uc2.application.util;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
/**
* Factory methods for working with {@link Stats}.
*/
public final class StatsFactory {
private StatsFactory() {}
/**
* Add a value to a {@link Stats} object.
*/
public static Stats accumulate(final Stats stats, final double value) {
final StatsAccumulator statsAccumulator = new StatsAccumulator();
statsAccumulator.addAll(stats);
statsAccumulator.add(value);
return statsAccumulator.snapshot();
}
}
application.name=theodolite-uc2-application
application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
num.threads=1
commit.interval.ms=100
cache.max.bytes.buffering=-1
kafka.window.duration.minutes=1
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment