Skip to content
Snippets Groups Projects
Commit 2f2a0eff authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' of git.se.informatik.uni-kiel.de:she/theodolite

parents f42cf8cf 1907a3c2
No related branches found
No related tags found
No related merge requests found
Pipeline #6590 passed
Showing
with 108 additions and 52 deletions
package serialization; package serialization;
import java.io.ByteArrayOutputStream; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.AggregatedActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord;
/** /**
* Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Serializer * {@link Serializer} for an {@link AggregatedActivePowerRecord}.
*/ */
public class AggregatedActivePowerRecordSerializer public class AggregatedActivePowerRecordSerializer
implements Serializer<AggregatedActivePowerRecord> { extends SpecificAvroSerializer<AggregatedActivePowerRecord> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class);
private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
this.avroEnCoder.encode(data, out);
} catch (final IOException e) {
LOGGER.error("Could not serialize AggregatedActivePowerRecord", e);
}
final byte[] result = out.toByteArray();
try {
out.close();
} catch (final IOException e) {
LOGGER.error(
"Could not close output stream after serialization of AggregatedActivePowerRecord", e);
}
return result;
}
@Override
public void close() {
Serializer.super.close();
}
} }
...@@ -61,7 +61,7 @@ cleanup_settings_version=2 ...@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1 eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style formatter_profile=_CAU-SE-Style
formatter_settings_version=15 formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
......
FROM flink:1.12-scala_2.12-java11 FROM flink:1.13-java11
ADD build/libs/uc4-flink-all.jar /opt/flink/usrlib/artifacts/uc4-flink-all.jar ADD build/libs/uc4-flink-all.jar /opt/flink/usrlib/artifacts/uc4-flink-all.jar
\ No newline at end of file
...@@ -9,12 +9,12 @@ import org.apache.flink.api.common.typeinfo.Types; ...@@ -9,12 +9,12 @@ import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -68,8 +68,6 @@ public final class AggregationServiceFlinkJob { ...@@ -68,8 +68,6 @@ public final class AggregationServiceFlinkJob {
} }
private void configureEnv() { private void configureEnv() {
this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) { if (checkpointing) {
...@@ -103,9 +101,11 @@ public final class AggregationServiceFlinkJob { ...@@ -103,9 +101,11 @@ public final class AggregationServiceFlinkJob {
this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS
new ImmutableSetSerializer()); new ImmutableSetSerializer());
this.env.getConfig().getRegisteredTypesWithKryoSerializers() this.env
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " .getConfig()
+ s.getSerializer().getClass().getName())); .getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class '{}' registered with serializer '{}'.", c.getName(),
s.getSerializer().getClass().getName()));
} }
private void buildPipeline() { private void buildPipeline() {
...@@ -134,12 +134,13 @@ public final class AggregationServiceFlinkJob { ...@@ -134,12 +134,13 @@ public final class AggregationServiceFlinkJob {
final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource =
kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class); kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class);
final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource = final FlinkKafkaConsumerBase<Tuple2<Event, String>> kafkaConfigSource =
kafkaConnector.createConsumer( kafkaConnector.createConsumer(
configurationTopic, configurationTopic,
EventSerde::serde, EventSerde::serde,
Serdes::String, Serdes::String,
TupleType.of(TypeInformation.of(Event.class), Types.STRING)); TupleType.of(TypeInformation.of(Event.class), Types.STRING))
.setStartFromEarliest();
// Sink to output topic with SensorId, AggregatedActivePowerRecord // Sink to output topic with SensorId, AggregatedActivePowerRecord
final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink = final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink =
......
...@@ -61,7 +61,7 @@ cleanup_settings_version=2 ...@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1 eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style formatter_profile=_CAU-SE-Style
formatter_settings_version=15 formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
......
...@@ -146,7 +146,7 @@ public class TopologyBuilder { ...@@ -146,7 +146,7 @@ public class TopologyBuilder {
.groupByKey(Grouped.with( .groupByKey(Grouped.with(
SensorParentKeySerde.serde(), SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues())) this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod)) .windowedBy(TimeWindows.ofSizeAndGrace(this.emitPeriod, this.gracePeriod))
.reduce( .reduce(
// TODO Configurable window aggregation function // TODO Configurable window aggregation function
(oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal, (oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal,
......
...@@ -61,7 +61,7 @@ cleanup_settings_version=2 ...@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1 eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style formatter_profile=_CAU-SE-Style
formatter_settings_version=15 formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99 org.eclipse.jdt.ui.ondemandthreshold=99
......
...@@ -32,11 +32,11 @@ class ConfigMapResourceSet : ResourceSet, KubernetesResource { ...@@ -32,11 +32,11 @@ class ConfigMapResourceSet : ResourceSet, KubernetesResource {
} }
if (::files.isInitialized) { if (::files.isInitialized) {
resources = resources.filter { files.contains(it.key) } val filteredResources = resources.filter { files.contains(it.key) }
if (filteredResources.size != files.size) {
if (resources.size != files.size) {
throw DeploymentFailedException("Could not find all specified Kubernetes manifests files") throw DeploymentFailedException("Could not find all specified Kubernetes manifests files")
} }
resources = filteredResources
} }
return try { return try {
......
...@@ -13,6 +13,7 @@ abstract class AbstractK8sLoader: K8sResourceLoader { ...@@ -13,6 +13,7 @@ abstract class AbstractK8sLoader: K8sResourceLoader {
"Deployment" -> loadDeployment(resourceString) "Deployment" -> loadDeployment(resourceString)
"Service" -> loadService(resourceString) "Service" -> loadService(resourceString)
"ServiceMonitor" -> loadServiceMonitor(resourceString) "ServiceMonitor" -> loadServiceMonitor(resourceString)
"PodMonitor" -> loadPodMonitor(resourceString)
"ConfigMap" -> loadConfigmap(resourceString) "ConfigMap" -> loadConfigmap(resourceString)
"StatefulSet" -> loadStatefulSet(resourceString) "StatefulSet" -> loadStatefulSet(resourceString)
"Execution" -> loadExecution(resourceString) "Execution" -> loadExecution(resourceString)
...@@ -51,6 +52,16 @@ abstract class AbstractK8sLoader: K8sResourceLoader { ...@@ -51,6 +52,16 @@ abstract class AbstractK8sLoader: K8sResourceLoader {
return loadCustomResourceWrapper(resource, context) return loadCustomResourceWrapper(resource, context)
} }
override fun loadPodMonitor(resource: String): KubernetesResource {
val context = K8sContextFactory().create(
api = "v1",
scope = "Namespaced",
group = "monitoring.coreos.com",
plural = "podmonitors"
)
return loadCustomResourceWrapper(resource, context)
}
override fun loadExecution(resource: String): KubernetesResource { override fun loadExecution(resource: String): KubernetesResource {
val context = K8sContextFactory().create( val context = K8sContextFactory().create(
api = "v1", api = "v1",
......
...@@ -11,5 +11,6 @@ interface K8sResourceLoader { ...@@ -11,5 +11,6 @@ interface K8sResourceLoader {
fun loadBenchmark(resource: String): KubernetesResource fun loadBenchmark(resource: String): KubernetesResource
fun loadConfigmap(resource: String): KubernetesResource fun loadConfigmap(resource: String): KubernetesResource
fun loadServiceMonitor(resource: String): KubernetesResource fun loadServiceMonitor(resource: String): KubernetesResource
fun loadPodMonitor(resource: String): KubernetesResource
fun loadCustomResourceWrapper(resource: String, context: CustomResourceDefinitionContext): KubernetesResource fun loadCustomResourceWrapper(resource: String, context: CustomResourceDefinitionContext): KubernetesResource
} }
\ No newline at end of file
package theodolite.patcher
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.api.model.apps.StatefulSet
/**
* This patcher is able to set the `spec.selector.matchLabels` for a `Deployment` or `StatefulSet` Kubernetes resource.
*
* @property k8sResource The Kubernetes manifests to patch
* @property variableName The matchLabel which should be set
*/
class MatchLabelPatcher(private val k8sResource: KubernetesResource, val variableName: String) :
AbstractPatcher(k8sResource) {
override fun <String> patch(labelValue: String) {
if (labelValue is kotlin.String) {
when (k8sResource) {
is Deployment -> {
if (k8sResource.spec.selector.matchLabels == null) {
k8sResource.spec.selector.matchLabels = mutableMapOf()
}
k8sResource.spec.selector.matchLabels[this.variableName] = labelValue
}
is StatefulSet -> {
if (k8sResource.spec.selector.matchLabels == null) {
k8sResource.spec.selector.matchLabels = mutableMapOf()
}
k8sResource.spec.selector.matchLabels[this.variableName] = labelValue
}
}
}
}
}
\ No newline at end of file
...@@ -79,6 +79,14 @@ class PatcherFactory { ...@@ -79,6 +79,14 @@ class PatcherFactory {
k8sResource = resource, k8sResource = resource,
variableName = patcherDefinition.properties["variableName"]!! variableName = patcherDefinition.properties["variableName"]!!
) )
"MatchLabelPatcher" -> MatchLabelPatcher(
k8sResource = resource,
variableName = patcherDefinition.properties["variableName"]!!
)
"TemplateLabelPatcher" -> TemplateLabelPatcher(
k8sResource = resource,
variableName = patcherDefinition.properties["variableName"]!!
)
"ImagePatcher" -> ImagePatcher( "ImagePatcher" -> ImagePatcher(
k8sResource = resource, k8sResource = resource,
container = patcherDefinition.properties["container"]!! container = patcherDefinition.properties["container"]!!
......
package theodolite.patcher
import io.fabric8.kubernetes.api.model.KubernetesResource
import io.fabric8.kubernetes.api.model.apps.Deployment
import io.fabric8.kubernetes.api.model.apps.StatefulSet
/**
* This patcher is able to set the field `spec.template.metadata.labels` for a `Deployment` or `StatefulSet` Kubernetes resource.
*
* @property k8sResource The Kubernetes manifests to patch
* @property variableName The label which should be set
*/
class TemplateLabelPatcher(private val k8sResource: KubernetesResource, val variableName: String) :
AbstractPatcher(k8sResource) {
override fun <String> patch(labelValue: String) {
if (labelValue is kotlin.String) {
when (k8sResource) {
is Deployment -> {
if (k8sResource.spec.template.metadata.labels == null) {
k8sResource.spec.template.metadata.labels = mutableMapOf()
}
k8sResource.spec.template.metadata.labels[this.variableName] = labelValue
}
is StatefulSet -> {
if (k8sResource.spec.template.metadata.labels == null) {
k8sResource.spec.template.metadata.labels = mutableMapOf()
}
k8sResource.spec.template.metadata.labels[this.variableName] = labelValue
}
}
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment