Skip to content
Snippets Groups Projects
Commit 9d08d51a authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merged master

parents 0a46081a 7ed94fdf
Branches
Tags
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!96Handle shutdown,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 281 additions and 4 deletions
......@@ -17,11 +17,11 @@ import titan.ccp.model.records.ActivePowerRecord;
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private static final Gson GSON = new Gson();
private final String inputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Gson gson = new Gson();
private final StreamsBuilder builder = new StreamsBuilder();
......@@ -42,8 +42,8 @@ public class TopologyBuilder {
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
.mapValues(v -> GSON.toJson(v))
.foreach((k, record) -> LOGGER.info("Record: {}", record));
return this.builder.build(properties);
}
......
......
......@@ -4,5 +4,5 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
schema.registry.url=http://localhost:8091
schema.registry.url=http://localhost:8081
FROM openjdk:11-slim
ADD build/distributions/uc2-workload-generator.tar /
ADD build/distributions/uc1-load-generator.tar /
CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
/uc2-workload-generator/bin/uc2-workload-generator
\ No newline at end of file
/uc1-load-generator/bin/uc1-load-generator
\ No newline at end of file
plugins {
id 'theodolite.load-generator'
}
mainClassName = "theodolite.uc1.workloadgenerator.LoadGenerator"
......@@ -66,6 +66,7 @@ org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
org.eclipse.jdt.ui.text.custom_code_templates=
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
......
......
FROM flink:1.12-scala_2.12-java11
ADD build/libs/uc2-flink-all.jar /opt/flink/usrlib/artifacts/uc2-flink-all.jar
\ No newline at end of file
plugins {
id 'theodolite.flink'
}
allprojects {
repositories {
maven {
url 'https://packages.confluent.io/maven/'
}
}
}
dependencies {
compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT')
}
mainClassName = "theodolite.uc2.application.HistoryServiceFlinkJob"
package theodolite.uc2.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
"flink.state.backend.memory.size";
public static final String CHECKPOINTING = "checkpointing";
private ConfigurationKeys() {}
}
package theodolite.uc2.application;
import com.google.common.math.Stats;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.flink.KafkaConnectorFactory;
import theodolite.commons.flink.StateBackends;
import theodolite.commons.flink.serialization.StatsSerializer;
import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord;
/**
* The History microservice implemented as a Flink job.
*/
public final class HistoryServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
// State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend);
this.configureSerializers();
}
private void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
this.env.getConfig().getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer "
+ s.getSerializer().getClass().getName()));
}
private void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaSource =
kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
final FlinkKafkaProducer<Tuple2<String, String>> kafkaSink =
kafkaConnector.createProducer(outputTopic,
Serdes::String,
Serdes::String,
Types.TUPLE(Types.STRING, Types.STRING));
this.env
.addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
.rebalance()
.keyBy(ActivePowerRecord::getIdentifier)
.window(TumblingEventTimeWindows.of(Time.minutes(windowDuration)))
.aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction())
.map(t -> {
final String key = t.f0;
final String value = t.f1.toString();
LOGGER.info("{}: {}", key, value);
return new Tuple2<>(key, value);
}).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING))
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
}
/**
* Start running this microservice.
*/
public void run() {
LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
public static void main(final String[] args) {
new HistoryServiceFlinkJob().run();
}
}
package theodolite.uc2.application;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import org.apache.flink.api.common.functions.AggregateFunction;
import theodolite.uc2.application.util.StatsFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Statistical aggregation of {@link ActivePowerRecord}s using {@link Stats}.
*/
public class StatsAggregateFunction implements AggregateFunction<ActivePowerRecord, Stats, Stats> {
private static final long serialVersionUID = -8873572990921515499L; // NOPMD
@Override
public Stats createAccumulator() {
return Stats.of();
}
@Override
public Stats add(final ActivePowerRecord value, final Stats accumulator) {
return StatsFactory.accumulate(accumulator, value.getValueInW());
}
@Override
public Stats getResult(final Stats accumulator) {
return accumulator;
}
@Override
public Stats merge(final Stats a, final Stats b) {
final StatsAccumulator statsAccumulator = new StatsAccumulator();
statsAccumulator.addAll(a);
statsAccumulator.addAll(b);
return statsAccumulator.snapshot();
}
}
package theodolite.uc2.application;
import com.google.common.math.Stats;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* A {@link ProcessWindowFunction} that forwards a computed {@link Stats} object along with its
* associated key.
*/
public class StatsProcessWindowFunction
extends ProcessWindowFunction<Stats, Tuple2<String, Stats>, String, TimeWindow> {
private static final long serialVersionUID = 4363099880614593379L; // NOPMD
@Override
public void process(final String key, final Context context, final Iterable<Stats> elements,
final Collector<Tuple2<String, Stats>> out) {
final Stats stats = elements.iterator().next();
out.collect(new Tuple2<>(key, stats));
}
}
package theodolite.uc2.application.util;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
/**
* Factory methods for working with {@link Stats}.
*/
public final class StatsFactory {
private StatsFactory() {}
/**
* Add a value to a {@link Stats} object.
*/
public static Stats accumulate(final Stats stats, final double value) {
final StatsAccumulator statsAccumulator = new StatsAccumulator();
statsAccumulator.addAll(stats);
statsAccumulator.add(value);
return statsAccumulator.snapshot();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment