Skip to content
Snippets Groups Projects
Commit c85236b0 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into theodolite-kotlin

parents 1384573c f6c3d87d
Branches
Tags
3 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Showing
with 168 additions and 12 deletions
rootProject.name = 'theodolite-benchmarks' rootProject.name = 'theodolite-benchmarks'
include 'workload-generator-commons' include 'load-generator-commons'
include 'application-kafkastreams-commons' include 'kstreams-commons'
include 'flink-commons'
include 'uc1-workload-generator' include 'uc1-load-generator'
include 'uc1-application' include 'uc1-kstreams'
include 'uc1-flink'
include 'uc2-workload-generator' include 'uc2-load-generator'
include 'uc2-application' include 'uc2-kstreams'
include 'uc2-flink'
include 'uc3-workload-generator' include 'uc3-load-generator'
include 'uc3-application' include 'uc3-kstreams'
include 'uc3-flink'
include 'uc4-workload-generator' include 'uc4-load-generator'
include 'uc4-application' include 'uc4-kstreams'
include 'uc4-flink'
FROM flink:1.12-scala_2.12-java11
ADD build/libs/uc1-flink-all.jar /opt/flink/usrlib/artifacts/uc1-flink-all.jar
plugins {
id 'theodolite.flink'
}
mainClassName = "theodolite.uc1.application.HistoryServiceFlinkJob"
package theodolite.uc1.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String CHECKPOINTING = "checkpointing";
private ConfigurationKeys() {}
}
package theodolite.uc1.application;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import titan.ccp.model.records.ActivePowerRecord;
/**
* {@link MapFunction} which maps {@link ActivePowerRecord}s to their representation as JSON
* strings.
*/
public class GsonMapper implements MapFunction<ActivePowerRecord, String> {
private static final long serialVersionUID = -5263671231838353747L; // NOPMD
private static final Gson GSON = new Gson();
@Override
public String map(final ActivePowerRecord value) throws Exception {
return GSON.toJson(value);
}
}
package theodolite.uc1.application;
import org.apache.commons.configuration2.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.flink.KafkaConnectorFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
import titan.ccp.model.records.ActivePowerRecord;
/**
* The History microservice implemented as a Flink job.
*/
public final class HistoryServiceFlinkJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final StreamExecutionEnvironment env;
private final String applicationId;
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.buildPipeline();
}
private void configureEnv() {
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs);
}
}
private void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
this.applicationId, kafkaBroker, checkpointing, schemaRegistryUrl);
final FlinkKafkaConsumer<ActivePowerRecord> kafkaConsumer =
kafkaConnector.createConsumer(inputTopic, ActivePowerRecord.class);
final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer);
stream
.rebalance()
.map(new GsonMapper())
.flatMap((record, c) -> LOGGER.info("Record: {}", record));
}
/**
* Start running this microservice.
*/
public void run() {
try {
this.env.execute(this.applicationId);
} catch (final Exception e) { // NOPMD Execution thrown by Flink
LOGGER.error("An error occured while running this job.", e);
}
}
public static void main(final String[] args) {
new HistoryServiceFlinkJob().run();
}
}
application.name=theodolite-uc1-application
application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
schema.registry.url=http://localhost:8081
num.threads=1
commit.interval.ms=1000
cache.max.bytes.buffering=-1
FROM openjdk:11-slim FROM openjdk:11-slim
ADD build/distributions/uc1-application.tar / ADD build/distributions/uc1-kstreams.tar /
CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
/uc1-application/bin/uc1-application /uc1-kstreams/bin/uc1-kstreams
\ No newline at end of file \ No newline at end of file
plugins {
id 'theodolite.kstreams'
}
mainClassName = "theodolite.uc1.application.HistoryService" mainClassName = "theodolite.uc1.application.HistoryService"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment