Skip to content
Snippets Groups Projects
Commit c7ea98d1 authored by Sören Henning's avatar Sören Henning
Browse files

Migrate UC1 for Flink

parent 0ebeaf46
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Showing
with 732 additions and 17 deletions
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
// implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:2.6.0'
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
\ No newline at end of file
......@@ -7,14 +7,17 @@ buildscript {
}
dependencies {
classpath "gradle.plugin.com.github.spotbugs.snom:spotbugs-gradle-plugin:4.6.0"
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
}
}
// Variables used to distinct different subprojects
def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')}
def useCaseApplications = subprojects.findAll {it -> it.name.matches('uc[0-9]+-application')}
def useCaseApplicationsFlink = subprojects.findAll {it -> it.name.matches('uc[0-9]+-application-flink')}
def useCaseGenerators = subprojects.findAll {it -> it.name.matches('uc[0-9]+-workload-generator*')}
def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')}
def flinkCommon = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')}
// Plugins
allprojects {
......@@ -32,6 +35,11 @@ configure(useCaseProjects){
apply plugin: 'application'
}
configure(useCaseApplicationsFlink){
apply plugin: 'com.github.johnrengelman.shadow'
applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
}
// Java version for all subprojects
subprojects {
java {
......@@ -55,10 +63,13 @@ allprojects {
maven {
url 'https://packages.confluent.io/maven/'
}
maven {
url 'https://repository.apache.org/content/repositories/snapshots/'
} // TODO required?
}
}
// Dependencies for all use case applications
// Dependencies for all Kafka Streams benchmarks (use case applications)
configure(useCaseApplications) {
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
......@@ -75,7 +86,60 @@ configure(useCaseApplications) {
}
}
// Dependencies for all use case generators
// Dependencies for all Flink benchmarks (use case applications)
configure(useCaseApplicationsFlink) {
ext {
flinkVersion = '1.11.0'
scalaBinaryVersion = '2.12'
}
dependencies {
// These dependencies is exported to consumers, that is to say found on their compile classpath.
compile('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') {
changing = true
}
api 'net.kieker-monitoring:kieker:1.14'//-SNAPSHOT'
api 'net.sourceforge.teetime:teetime:3.0'
// TODO Upgrade to 0.1.0
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation 'org.apache.kafka:kafka-clients:2.2.0'
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.6.1'
compile project(':flink-commons')
compile group: 'org.apache.kafka', name: 'kafka-clients', version: "2.2.0"
compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-streaming-java_${scalaBinaryVersion}", version:"${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-table-api-java-bridge_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-table-planner-blink_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.industrial-devops', name: 'titan-ccp-common', version: '0.0.3-SNAPSHOT'
compile group: 'org.apache.flink', name: "flink-runtime-web_${scalaBinaryVersion}", version: "${flinkVersion}" // TODO: remove after development
compile group: 'org.apache.flink', name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: 'flink-metrics-prometheus_2.12', version: '1.11.1'
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
run.classpath = sourceSets.main.runtimeClasspath
jar {
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}
shadowJar {
configurations = [project.configurations.compile]
zip64 true
}
}
// Dependencies for all load generators
configure(useCaseGenerators) {
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
......@@ -92,7 +156,7 @@ configure(useCaseGenerators) {
}
// Dependencies for all commons
configure(commonProjects) {
/*configure(commonProjects) {
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation 'org.slf4j:slf4j-simple:1.7.25'
......@@ -103,7 +167,7 @@ configure(commonProjects) {
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
}
}*/
// Per default XML reports for SpotBugs are generated
// Include this to generate HTML reports
......
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=false
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
configFilePath=../config/checkstyle.xml
customModulesJarPaths=
eclipse.preferences.version=1
enabled=true
customRulesJars=
eclipse.preferences.version=1
enabled=true
ruleSetFilePath=../config/pmd.xml
ext {
flinkVersion = '1.11.0'
scalaBinaryVersion = '2.12'
}
dependencies {
api 'org.apache.kafka:kafka-clients:2.2.0'
// implementation 'org.slf4j:slf4j-simple:1.6.1'
implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') {
changing = true
// exclude group: 'net.kieker-monitoring', module: 'kieker'
}
implementation 'com.google.guava:guava:30.1-jre'
compile group: 'org.apache.flink', name: "flink-connector-kafka_${scalaBinaryVersion}", version: "${flinkVersion}"
compile group: 'org.apache.flink', name: 'flink-java', version: "${flinkVersion}"
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
\ No newline at end of file
package theodolite.commons.flink.serialization;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import javax.annotation.Nullable;
public class FlinkKafkaKeyValueSerde<K, V>
implements KafkaDeserializationSchema<Tuple2<K, V>>,
KafkaSerializationSchema<Tuple2<K, V>> {
private static final long serialVersionUID = 2469569396501933443L;
private transient Serde<K> keySerde;
private transient Serde<V> valueSerde;
private SerializableSupplier<Serde<K>> keySerdeSupplier;
private SerializableSupplier<Serde<V>> valueSerdeSupplier;
private String topic;
private TypeInformation<Tuple2<K,V>> typeInfo;
public FlinkKafkaKeyValueSerde(final String topic,
final SerializableSupplier<Serde<K>> keySerdeSupplier,
final SerializableSupplier<Serde<V>> valueSerdeSupplier,
final TypeInformation<Tuple2<K, V>> typeInfo) {
this.topic = topic;
this.typeInfo = typeInfo;
this.keySerdeSupplier = keySerdeSupplier;
this.valueSerdeSupplier = valueSerdeSupplier;
}
@Override
public boolean isEndOfStream(final Tuple2<K, V> nextElement) {
return false;
}
@Override
public Tuple2<K, V> deserialize(final ConsumerRecord<byte[], byte[]> record) {
ensureInitialized();
final K key = this.keySerde.deserializer().deserialize(this.topic, record.key());
final V value = this.valueSerde.deserializer().deserialize(this.topic, record.value());
return new Tuple2<>(key, value);
}
@Override
public TypeInformation<Tuple2<K, V>> getProducedType() {
return this.typeInfo;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<K, V> element, @Nullable Long timestamp) {
ensureInitialized();
final byte[] key = this.keySerde.serializer().serialize(this.topic, element.f0);
final byte[] value = this.valueSerde.serializer().serialize(this.topic, element.f1);
return new ProducerRecord<>(this.topic, key, value);
}
private void ensureInitialized() {
if (this.keySerde == null || this.valueSerde == null) {
this.keySerde = this.keySerdeSupplier.get();
this.valueSerde = this.valueSerdeSupplier.get();;
}
}
}
package theodolite.commons.flink.serialization;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.factory.IRecordFactory;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.ActivePowerRecord;
import titan.ccp.models.records.AggregatedActivePowerRecord;
import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
/**
* This class wraps the serializer and deserializer implementations for {@link IMonitoringRecord}
* from {@link IMonitoringRecordSerde}
* into the Flink {@link DeserializationSchema} and {@link SerializationSchema}
* and Kryo {@link Serializer} interfaces.
* It is used for serialization to and from Kafka as well as internal serialization
* between Flink instances.
* This class is also itself serializable by Flink.
* @param <R> The specific record type that extends {@link IMonitoringRecord}
* @param <F> The specific record factory type that extends {@link IRecordFactory<R>}
*/
public class FlinkMonitoringRecordSerde<R extends IMonitoringRecord, F extends IRecordFactory<R>>
extends Serializer<R>
implements KafkaDeserializationSchema<R>,
KafkaSerializationSchema<R> {
private static final long serialVersionUID = -5687951056995646212L;
private final String topic;
private transient Serde<String> keySerde;
private transient Serde<R> serde;
private final Class<R> recordClass;
private final Class<F> recordFactoryClass;
/**
* Creates a new FlinkMonitoringRecordSerde.
* @param topic
* The Kafka topic to/from which to serialize/deserialize.
* @param recordClass
* The class of the serialized/deserialized record.
* @param recordFactoryClass
* The class of the factory for the serialized/deserialized record.
*/
public FlinkMonitoringRecordSerde(final String topic,
final Class<R> recordClass,
final Class<F> recordFactoryClass) {
this.topic = topic;
this.recordClass = recordClass;
this.recordFactoryClass = recordFactoryClass;
}
@Override
public boolean isEndOfStream(final R nextElement) {
return false;
}
@Override
public TypeInformation<R> getProducedType() {
return TypeExtractor.getForClass(recordClass);
}
@Override
public R deserialize(ConsumerRecord<byte[], byte[]> record) {
ensureInitialized();
return this.serde.deserializer().deserialize(this.topic, record.value());
}
@Override
public ProducerRecord<byte[], byte[]> serialize(R element, @Nullable Long timestamp) {
ensureInitialized();
String identifier = null;
if (element instanceof ActivePowerRecord) {
identifier = ((ActivePowerRecord) element).getIdentifier();
}
if (element instanceof AggregatedActivePowerRecord) {
identifier = ((AggregatedActivePowerRecord) element).getIdentifier();
}
final byte[] key = this.keySerde.serializer().serialize(this.topic, identifier);
final byte[] value = this.serde.serializer().serialize(this.topic, element);
return new ProducerRecord<>(this.topic, key, value);
}
private void ensureInitialized() {
if (this.keySerde == null || this.serde == null) {
try {
this.keySerde = Serdes.String();
this.serde = IMonitoringRecordSerde.serde(
recordFactoryClass.getDeclaredConstructor().newInstance());
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException e) {
e.printStackTrace();
}
}
}
@Override
public void write(final Kryo kryo, final Output output, final R record) {
ensureInitialized();
final byte[] data = this.serde.serializer().serialize(this.topic, record);
output.writeInt(data.length);
output.writeBytes(data);
}
@Override
public R read(final Kryo kryo, final Input input, final Class<R> type) {
ensureInitialized();
final int numBytes = input.readInt();
return this.serde.deserializer().deserialize(this.topic, input.readBytes(numBytes));
}
}
package theodolite.commons.flink.serialization;
import java.io.Serializable;
import java.util.function.Supplier;
public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
// here be dragons
}
package theodolite.commons.flink.serialization;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.math.Stats;
import java.io.Serializable;
/**
* Custom Kryo Serializer for efficient transmission between Flink instances.
*/
public class StatsSerializer extends Serializer<Stats> implements Serializable {
private static final long serialVersionUID = -1276866176534267373L;
@Override
public void write(final Kryo kryo, final Output output, final Stats object) {
final byte[] data = object.toByteArray();
output.writeInt(data.length);
output.writeBytes(data);
}
@Override
public Stats read(final Kryo kryo, final Input input, final Class<Stats> type) {
final int numBytes = input.readInt();
return Stats.fromByteArray(input.readBytes(numBytes));
}
}
rootProject.name = 'theodolite-benchmarks'
include 'workload-generator-commons'
include 'application-kafkastreams-commons'
include 'workload-generator-commons' // TODO Rename to load-generator-commons
include 'application-kafkastreams-commons' // TODO Rename to kstreams-commons
include 'flink-commons'
include 'uc1-workload-generator'
include 'uc1-application'
include 'uc1-workload-generator' // TODO Rename to uc1-load-generator
include 'uc1-application' // TODO Rename to uc1-kstreams
include 'uc1-application-flink' // TODO Rename to uc1-flink
include 'uc2-workload-generator'
include 'uc2-application'
include 'uc2-workload-generator' // TODO Rename to uc2-load-generator
include 'uc2-application' // TODO Rename to uc1-kstreams
//include 'uc2-application-flink' // TODO Rename to uc2-flink
include 'uc3-workload-generator'
include 'uc3-application'
include 'uc3-workload-generator' // TODO Rename to uc3-load-generator
include 'uc3-application' // TODO Rename to uc1-kstreams
//include 'uc3-application-flink' // TODO Rename to uc3-flink
include 'uc4-workload-generator'
include 'uc4-application'
include 'uc4-workload-generator' // TODO Rename to uc4-load-generator
include 'uc4-application' // TODO Rename to uc4-kstreams
//include 'uc4-application-flink' // TODO Rename to uc4-flink
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
configFilePath=../config/checkstyle.xml
customModulesJarPaths=
eclipse.preferences.version=1
enabled=true
customRulesJars=
eclipse.preferences.version=1
enabled=true
ruleSetFilePath=../config/pmd.xml
FROM nicobiernat/flink:1.11-scala_2.12-java_11
ADD build/libs/uc1-application-all.jar /opt/flink/usrlib/artifacts/uc1-application-all.jar
mainClassName = "theodolite.uc1.application.HistoryServiceFlinkJob"
package theodolite.uc1.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String CHECKPOINTING = "checkpointing";
private ConfigurationKeys() {}
}
package theodolite.uc1.application;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import theodolite.commons.flink.serialization.FlinkMonitoringRecordSerde;
import titan.ccp.common.configuration.Configurations;
import titan.ccp.models.records.ActivePowerRecord;
import titan.ccp.models.records.ActivePowerRecordFactory;
import java.util.Properties;
/**
* The History Microservice Flink Job.
*/
public class HistoryServiceFlinkJob {
private final Configuration config = Configurations.create();
private void run() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
final String applicationId = applicationName + "-" + applicationVersion;
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", kafkaBroker);
kafkaProps.setProperty("group.id", applicationId);
final FlinkMonitoringRecordSerde<ActivePowerRecord, ActivePowerRecordFactory> serde =
new FlinkMonitoringRecordSerde<>(inputTopic,
ActivePowerRecord.class,
ActivePowerRecordFactory.class);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer =
new FlinkKafkaConsumer<>(inputTopic, serde, kafkaProps);
kafkaConsumer.setStartFromGroupOffsets();
if (checkpointing)
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
if (checkpointing)
env.enableCheckpointing(commitIntervalMs);
final DataStream<ActivePowerRecord> stream = env.addSource(kafkaConsumer);
stream
.rebalance()
.map(v -> "ActivePowerRecord { "
+ "identifier: " + v.getIdentifier() + ", "
+ "timestamp: " + v.getTimestamp() + ", "
+ "valueInW: " + v.getValueInW() + " }")
.print();
try {
env.execute(applicationId);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(final String[] args) {
new HistoryServiceFlinkJob().run();
}
}
application.name=theodolite-uc1-application
application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
num.threads=1
commit.interval.ms=1000
cache.max.bytes.buffering=-1
......@@ -2,4 +2,11 @@ dependencies {
implementation 'com.google.guava:guava:30.1-jre'
implementation 'com.hazelcast:hazelcast:4.1.1'
implementation 'com.hazelcast:hazelcast-kubernetes:2.2.1'
implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:2.6.0' // TODO required?
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment