Skip to content
Snippets Groups Projects
Commit 29facf99 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'theodolite-kotlin' into store-results-in-volume

parents cd36cc82 4c185437
No related branches found
No related tags found
4 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!153Store benchmark execution results in operator volume,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #3700 passed
Showing
with 97 additions and 41 deletions
...@@ -47,7 +47,7 @@ public final class HistoryServiceFlinkJob { ...@@ -47,7 +47,7 @@ public final class HistoryServiceFlinkJob {
// Parallelism // Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) { if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism); LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism); this.env.setParallelism(parallelism);
} }
...@@ -68,7 +68,7 @@ public final class HistoryServiceFlinkJob { ...@@ -68,7 +68,7 @@ public final class HistoryServiceFlinkJob {
final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer); final DataStream<ActivePowerRecord> stream = this.env.addSource(kafkaConsumer);
stream stream
.rebalance() // .rebalance()
.map(new GsonMapper()) .map(new GsonMapper())
.flatMap((record, c) -> LOGGER.info("Record: {}", record)) .flatMap((record, c) -> LOGGER.info("Record: {}", record))
.returns(Types.GENERIC(Object.class)); // Will never be used .returns(Types.GENERIC(Object.class)); // Will never be used
......
...@@ -59,7 +59,7 @@ public final class HistoryServiceFlinkJob { ...@@ -59,7 +59,7 @@ public final class HistoryServiceFlinkJob {
// Parallelism // Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) { if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism); LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism); this.env.setParallelism(parallelism);
} }
...@@ -83,7 +83,9 @@ public final class HistoryServiceFlinkJob { ...@@ -83,7 +83,9 @@ public final class HistoryServiceFlinkJob {
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final int windowDuration = this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES); final int windowDurationMinutes =
this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
final Time windowDuration = Time.minutes(windowDurationMinutes);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
...@@ -100,9 +102,9 @@ public final class HistoryServiceFlinkJob { ...@@ -100,9 +102,9 @@ public final class HistoryServiceFlinkJob {
this.env this.env
.addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic) .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
.rebalance() // .rebalance()
.keyBy(ActivePowerRecord::getIdentifier) .keyBy(ActivePowerRecord::getIdentifier)
.window(TumblingEventTimeWindows.of(Time.minutes(windowDuration))) .window(TumblingEventTimeWindows.of(windowDuration))
.aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction()) .aggregate(new StatsAggregateFunction(), new StatsProcessWindowFunction())
.map(t -> { .map(t -> {
final String key = t.f0; final String key = t.f0;
......
...@@ -117,9 +117,8 @@ public final class HistoryServiceFlinkJob { ...@@ -117,9 +117,8 @@ public final class HistoryServiceFlinkJob {
// Streaming topology // Streaming topology
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
this.env this.env
.addSource(kafkaSource) .addSource(kafkaSource).name("[Kafka Consumer] Topic: " + inputTopic)
.name("[Kafka Consumer] Topic: " + inputTopic) // .rebalance()
.rebalance()
.keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> { .keyBy((KeySelector<ActivePowerRecord, HourOfDayKey>) record -> {
final Instant instant = Instant.ofEpochMilli(record.getTimestamp()); final Instant instant = Instant.ofEpochMilli(record.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, timeZone); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, timeZone);
......
...@@ -79,7 +79,7 @@ public final class AggregationServiceFlinkJob { ...@@ -79,7 +79,7 @@ public final class AggregationServiceFlinkJob {
// Parallelism // Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null); final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) { if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism); LOGGER.info("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism); this.env.setParallelism(parallelism);
} }
...@@ -152,7 +152,7 @@ public final class AggregationServiceFlinkJob { ...@@ -152,7 +152,7 @@ public final class AggregationServiceFlinkJob {
// Build input stream // Build input stream
final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource) final DataStream<ActivePowerRecord> inputStream = this.env.addSource(kafkaInputSource)
.name("[Kafka Consumer] Topic: " + inputTopic)// NOCS .name("[Kafka Consumer] Topic: " + inputTopic)// NOCS
.rebalance() // .rebalance()
.map(r -> r) .map(r -> r)
.name("[Map] Rebalance Forward"); .name("[Map] Rebalance Forward");
...@@ -160,7 +160,7 @@ public final class AggregationServiceFlinkJob { ...@@ -160,7 +160,7 @@ public final class AggregationServiceFlinkJob {
final DataStream<ActivePowerRecord> aggregationsInputStream = final DataStream<ActivePowerRecord> aggregationsInputStream =
this.env.addSource(kafkaOutputSource) this.env.addSource(kafkaOutputSource)
.name("[Kafka Consumer] Topic: " + outputTopic) // NOCS .name("[Kafka Consumer] Topic: " + outputTopic) // NOCS
.rebalance() // .rebalance()
.map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())) .map(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()))
.name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord"); .name("[Map] AggregatedActivePowerRecord -> ActivePowerRecord");
......
apiVersion: apps/v1
kind: Deployment
metadata:
name: theodolite
spec:
selector:
matchLabels:
app: theodolite
replicas: 1
template:
metadata:
labels:
app: theodolite
spec:
terminationGracePeriodSeconds: 0
serviceAccountName: theodolite
containers:
- name: thedolite
image: ghcr.io/cau-se/theodolite:theodolite-kotlin-latest
env:
- name: KUBECONFIG
value: "~/.kube/config"
- name: NAMESPACE
value: "default"
\ No newline at end of file
...@@ -3,15 +3,14 @@ kind: benchmark ...@@ -3,15 +3,14 @@ kind: benchmark
metadata: metadata:
name: uc1-kstreams name: uc1-kstreams
spec: spec:
name: test
appResource: appResource:
- "uc1-kstreams-deployment.yaml" - "uc1-kstreams-deployment.yaml"
- "aggregation-service.yaml" - "aggregation-service.yaml"
- "jmx-configmap.yaml" - "jmx-configmap.yaml"
- "uc1-service-monitor.yaml"
loadGenResource: loadGenResource:
- "uc1-load-generator-deployment.yaml" - "uc1-load-generator-deployment.yaml"
- "uc1-load-generator-service.yaml" - "uc1-load-generator-service.yaml"
- "uc1-load-generator-service.yaml"
resourceTypes: resourceTypes:
- typeName: "Instances" - typeName: "Instances"
patchers: patchers:
...@@ -23,14 +22,14 @@ spec: ...@@ -23,14 +22,14 @@ spec:
- type: "EnvVarPatcher" - type: "EnvVarPatcher"
resource: "uc1-load-generator-deployment.yaml" resource: "uc1-load-generator-deployment.yaml"
properties: properties:
container: "workload-generator"
variableName: "NUM_SENSORS" variableName: "NUM_SENSORS"
container: "workload-generator"
- type: "NumSensorsLoadGeneratorReplicaPatcher" - type: "NumSensorsLoadGeneratorReplicaPatcher"
resource: "uc1-load-generator-deployment.yaml" resource: "uc1-load-generator-deployment.yaml"
properties: properties:
loadGenMaxRecords: "15000" loadGenMaxRecords: "15000"
kafkaConfig: kafkaConfig:
bootstrapServer: "localhost:31290" bootstrapServer: "theodolite-cp-kafka:9092"
topics: topics:
- name: "input" - name: "input"
numPartitions: 40 numPartitions: 40
......
apiVersion: theodolite.com/v1
kind: execution
metadata:
name: theodolite-example-execution
spec:
benchmark: "uc1-kstreams"
load:
loadType: "NumSensors"
loadValues: [25000, 50000, 75000, 100000, 125000, 150000]
resources:
resourceType: "Instances"
resourceValues: [1, 2, 3, 4, 5]
slos:
- sloType: "lag trend"
threshold: 2000
prometheusUrl: "http://prometheus-operated:9090"
externalSloUrl: "http://localhost:80/evaluate-slope"
offset: 0
warmup: 60 # in seconds
execution:
strategy: "LinearSearch"
duration: 300 # in seconds
repetitions: 1
loadGenerationDelay: 30 # in seconds
restrictions:
- "LowerBound"
configOverrides:
# - patcher:
# type: "NodeSelectorPatcher"
# resource: "uc1-load-generator-deployment.yaml"
# properties:
# variableName: "env"
# value: "prod"
# - patcher:
# type: "NodeSelectorPatcher"
# resource: "uc1-kstreams-deployment.yaml"
# properties:
# variableName: "env"
# value: "prod"
# - patcher:
# type: "ResourceLimitPatcher"
# resource: "uc1-kstreams-deployment.yaml"
# properties:
# container: "uc-application"
# limitedResource: "cpu"
# value: "1000m"
# - patcher:
# type: "ResourceLimitPatcher"
# resource: "uc1-kstreams-deployment.yaml"
# properties:
# container: "uc-application"
# limitedResource: "memory"
# value: "2Gi"
# - patcher:
# type: "SchedulerNamePatcher"
# resource: "uc1-kstreams-deployment.yaml"
# value: "random-scheduler"
name: example-execution
benchmark: "uc1-kstreams"
load:
loadType: "NumSensors"
loadValues: [25000, 50000, 75000, 100000, 125000, 150000]
resources:
resourceType: "Instances"
resourceValues: [1, 2, 3, 4, 5]
slos:
- sloType: "lag trend"
threshold: 2000
prometheusUrl: "http://prometheus-operated:9090"
externalSloUrl: "http://localhost:80/evaluate-slope"
offset: 0
warmup: 60 # in seconds
execution:
strategy: "LinearSearch"
duration: 300 # in seconds
repetitions: 1
loadGenerationDelay: 30 # in seconds, optional field, default is 0 seconds
restrictions:
- "LowerBound"
configOverrides: []
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment