Skip to content
Snippets Groups Projects
Commit 346b3195 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Merge branch 'master' of git.se.informatik.uni-kiel.de:she/theodolite into hazelcastjet

parents faaeb8d3 a23ddb09
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #7831 passed
Showing
with 46 additions and 24 deletions
...@@ -39,6 +39,11 @@ spec: ...@@ -39,6 +39,11 @@ spec:
nodeSelectorTerms: nodeSelectorTerms:
{{- toYaml . | nindent 16 }} {{- toYaml . | nindent 16 }}
{{- end}} {{- end}}
{{- with .Values.strimzi.kafka.resources}}
resources:
{{- toYaml . | nindent 6 }}
{{- end}}
zookeeper: zookeeper:
{{- with .Values.strimzi.zookeeper.replicas }} {{- with .Values.strimzi.zookeeper.replicas }}
......
...@@ -174,6 +174,7 @@ strimzi: ...@@ -174,6 +174,7 @@ strimzi:
"-Xmx": "512M" "-Xmx": "512M"
"-Xms": "512M" "-Xms": "512M"
nodeSelectorTerms: [] nodeSelectorTerms: []
resources: {}
zookeeper: zookeeper:
replicas: 3 replicas: 3
......
...@@ -84,8 +84,9 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< ...@@ -84,8 +84,9 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
final String json = this.gson.toJson(message); final String json = this.gson.toJson(message);
final HttpRequest request = HttpRequest.newBuilder() final HttpRequest request = HttpRequest.newBuilder()
.uri(this.uri) .uri(this.uri)
.timeout(this.connectionTimeout)
.POST(HttpRequest.BodyPublishers.ofString(json)) .POST(HttpRequest.BodyPublishers.ofString(json))
.header("Content-Type", "application/json")
.timeout(this.connectionTimeout)
.build(); .build();
final BodyHandler<Void> bodyHandler = BodyHandlers.discarding(); final BodyHandler<Void> bodyHandler = BodyHandlers.discarding();
// final BodyHandler<String> bodyHandler = BodyHandlers.ofString(); // final BodyHandler<String> bodyHandler = BodyHandlers.ofString();
......
...@@ -18,7 +18,7 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -18,7 +18,7 @@ import titan.ccp.model.records.ActivePowerRecord;
public class PipelineFactory extends AbstractPipelineFactory { public class PipelineFactory extends AbstractPipelineFactory {
public static final String SINK_TYPE_KEY = "sink.type"; public static final String SINK_TYPE_KEY = "sink.type";
private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY));
public PipelineFactory(final Configuration configuration) { public PipelineFactory(final Configuration configuration) {
...@@ -52,7 +52,8 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -52,7 +52,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
protected void registerCoders(final CoderRegistry registry) { protected void registerCoders(final CoderRegistry registry) {
registry.registerCoderForClass( registry.registerCoderForClass(
ActivePowerRecord.class, ActivePowerRecord.class,
AvroCoder.of(ActivePowerRecord.SCHEMA$)); // AvroCoder.of(ActivePowerRecord.SCHEMA$));
AvroCoder.of(ActivePowerRecord.class, false));
} }
public static Function<Configuration, AbstractPipelineFactory> factory() { public static Function<Configuration, AbstractPipelineFactory> factory() {
......
...@@ -70,8 +70,10 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -70,8 +70,10 @@ public class PipelineFactory extends AbstractPipelineFactory {
@Override @Override
protected void registerCoders(final CoderRegistry registry) { protected void registerCoders(final CoderRegistry registry) {
registry.registerCoderForClass(ActivePowerRecord.class, registry.registerCoderForClass(
AvroCoder.of(ActivePowerRecord.SCHEMA$)); ActivePowerRecord.class,
// AvroCoder.of(ActivePowerRecord.SCHEMA$));
AvroCoder.of(ActivePowerRecord.class, false));
registry.registerCoderForClass(StatsAggregation.class, registry.registerCoderForClass(StatsAggregation.class,
SerializableCoder.of(StatsAggregation.class)); SerializableCoder.of(StatsAggregation.class));
registry.registerCoderForClass(StatsAccumulator.class, registry.registerCoderForClass(StatsAccumulator.class,
......
...@@ -91,7 +91,8 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -91,7 +91,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
protected void registerCoders(final CoderRegistry registry) { protected void registerCoders(final CoderRegistry registry) {
registry.registerCoderForClass( registry.registerCoderForClass(
ActivePowerRecord.class, ActivePowerRecord.class,
AvroCoder.of(ActivePowerRecord.SCHEMA$)); // AvroCoder.of(ActivePowerRecord.SCHEMA$));
AvroCoder.of(ActivePowerRecord.class, false));
registry.registerCoderForClass( registry.registerCoderForClass(
HourOfDayKey.class, HourOfDayKey.class,
new HourOfDayKeyCoder()); new HourOfDayKeyCoder());
......
...@@ -223,7 +223,8 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -223,7 +223,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
protected void registerCoders(final CoderRegistry registry) { protected void registerCoders(final CoderRegistry registry) {
registry.registerCoderForClass( registry.registerCoderForClass(
ActivePowerRecord.class, ActivePowerRecord.class,
AvroCoder.of(ActivePowerRecord.class)); // AvroCoder.of(ActivePowerRecord.SCHEMA$));
AvroCoder.of(ActivePowerRecord.class, false));
registry.registerCoderForClass( registry.registerCoderForClass(
AggregatedActivePowerRecord.class, AggregatedActivePowerRecord.class,
new AggregatedActivePowerRecordCoder()); new AggregatedActivePowerRecordCoder());
......
...@@ -23,7 +23,7 @@ class Action { ...@@ -23,7 +23,7 @@ class Action {
timeout = exec.timeoutSeconds, timeout = exec.timeoutSeconds,
command = exec.command command = exec.command
) )
if(exitCode != 0){ if (exitCode != 0){
throw ActionCommandFailedException("Error while executing action, finished with exit code $exitCode") throw ActionCommandFailedException("Error while executing action, finished with exit code $exitCode")
} }
} }
...@@ -38,7 +38,7 @@ class ActionSelector { ...@@ -38,7 +38,7 @@ class ActionSelector {
@JsonDeserialize @JsonDeserialize
@RegisterForReflection @RegisterForReflection
class PodSelector { class PodSelector {
lateinit var matchLabels: MutableMap<String, String> lateinit var matchLabels: Map<String, String>
} }
@JsonDeserialize @JsonDeserialize
@RegisterForReflection @RegisterForReflection
......
...@@ -33,7 +33,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) { ...@@ -33,7 +33,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) {
* @return the exit code of this executed command * @return the exit code of this executed command
*/ */
fun exec( fun exec(
matchLabels: MutableMap<String, String>, matchLabels: Map<String, String>,
command: Array<String>, command: Array<String>,
timeout: Long = Configuration.TIMEOUT_SECONDS, timeout: Long = Configuration.TIMEOUT_SECONDS,
container: String = "" container: String = ""
...@@ -58,7 +58,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) { ...@@ -58,7 +58,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) {
val latchTerminationStatus = execLatch.await(timeout, TimeUnit.SECONDS) val latchTerminationStatus = execLatch.await(timeout, TimeUnit.SECONDS)
if (!latchTerminationStatus) { if (!latchTerminationStatus) {
throw ActionCommandFailedException("Latch could not terminate within specified time") throw ActionCommandFailedException("Timeout while running action command")
} }
execWatch.close() execWatch.close()
} catch (e: Exception) { } catch (e: Exception) {
...@@ -112,7 +112,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) { ...@@ -112,7 +112,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) {
* it can take a while until the status is ready and the pod can be selected. * it can take a while until the status is ready and the pod can be selected.
* @return the name of the pod or throws [ActionCommandFailedException] * @return the name of the pod or throws [ActionCommandFailedException]
*/ */
fun getPodName(matchLabels: MutableMap<String, String>, tries: Int): String { fun getPodName(matchLabels: Map<String, String>, tries: Int): String {
for (i in 1..tries) { for (i in 1..tries) {
try { try {
...@@ -125,7 +125,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) { ...@@ -125,7 +125,7 @@ class ActionCommand(val client: NamespacedKubernetesClient) {
throw ActionCommandFailedException("Couldn't find any pod that matches the specified labels.") throw ActionCommandFailedException("Couldn't find any pod that matches the specified labels.")
} }
private fun getPodName(matchLabels: MutableMap<String, String>): String { private fun getPodName(matchLabels: Map<String, String>): String {
return try { return try {
val podNames = this.client val podNames = this.client
.pods() .pods()
......
...@@ -38,7 +38,7 @@ class AnalysisExecutor( ...@@ -38,7 +38,7 @@ class AnalysisExecutor(
try { try {
val ioHandler = IOHandler() val ioHandler = IOHandler()
val resultsFolder: String = ioHandler.getResultFolderURL() val resultsFolder = ioHandler.getResultFolderURL()
val fileURL = "${resultsFolder}exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}" val fileURL = "${resultsFolder}exp${executionId}_${load.get()}_${res.get()}_${slo.sloType.toSlug()}"
val prometheusData = executionIntervals val prometheusData = executionIntervals
......
...@@ -45,15 +45,16 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat ...@@ -45,15 +45,16 @@ class MetricFetcher(private val prometheusURL: String, private val offset: Durat
) )
while (counter < RETRIES) { while (counter < RETRIES) {
logger.info { "Request collected metrics from Prometheus for interval [$offsetStart,$offsetEnd]." }
val response = get("$prometheusURL/api/v1/query_range", params = parameter, timeout = TIMEOUT) val response = get("$prometheusURL/api/v1/query_range", params = parameter, timeout = TIMEOUT)
if (response.statusCode != 200) { if (response.statusCode != 200) {
val message = response.jsonObject.toString() val message = response.jsonObject.toString()
logger.warn { "Could not connect to Prometheus: $message. Retrying now." } logger.warn { "Could not connect to Prometheus: $message. Retry $counter/$RETRIES." }
counter++ counter++
} else { } else {
val values = parseValues(response) val values = parseValues(response)
if (values.data?.result.isNullOrEmpty()) { if (values.data?.result.isNullOrEmpty()) {
throw NoSuchFieldException("Empty query result: $values between $start and $end for query $query.") throw NoSuchFieldException("Empty query result: $values between for query '$query' in interval [$offsetStart,$offsetEnd] .")
} }
return parseValues(response) return parseValues(response)
} }
......
...@@ -190,7 +190,7 @@ class BenchmarkStateChecker( ...@@ -190,7 +190,7 @@ class BenchmarkStateChecker(
} }
} }
private fun <K, V> MutableMap<K, V>.containsMatchLabels(matchLabels: MutableMap<V, V>): Boolean { private fun <K, V> Map<K, V>.containsMatchLabels(matchLabels: Map<V, V>): Boolean {
for (kv in matchLabels) { for (kv in matchLabels) {
if (kv.value != this[kv.key as K]) { if (kv.value != this[kv.key as K]) {
return false return false
......
package theodolite.execution.operator package theodolite.execution.operator
import io.fabric8.kubernetes.client.KubernetesClientException
import io.fabric8.kubernetes.client.NamespacedKubernetesClient import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.kubernetes.client.dsl.MixedOperation import io.fabric8.kubernetes.client.dsl.MixedOperation
import io.fabric8.kubernetes.client.dsl.Resource import io.fabric8.kubernetes.client.dsl.Resource
import mu.KotlinLogging
import theodolite.execution.Shutdown import theodolite.execution.Shutdown
import theodolite.k8s.K8sContextFactory import theodolite.k8s.K8sContextFactory
import theodolite.k8s.ResourceByLabelHandler import theodolite.k8s.ResourceByLabelHandler
import theodolite.model.crd.* import theodolite.model.crd.*
private val logger = KotlinLogging.logger {}
class ClusterSetup( class ClusterSetup(
private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>, private val executionCRDClient: MixedOperation<ExecutionCRD, BenchmarkExecutionList, Resource<ExecutionCRD>>,
private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>, private val benchmarkCRDClient: MixedOperation<BenchmarkCRD, KubernetesBenchmarkList, Resource<BenchmarkCRD>>,
...@@ -75,10 +79,15 @@ class ClusterSetup( ...@@ -75,10 +79,15 @@ class ClusterSetup(
labelName = "app.kubernetes.io/created-by", labelName = "app.kubernetes.io/created-by",
labelValue = "theodolite" labelValue = "theodolite"
) )
resourceRemover.removeCR( try {
labelName = "app.kubernetes.io/created-by", resourceRemover.removeCR(
labelValue = "theodolite", labelName = "app.kubernetes.io/created-by",
context = serviceMonitorContext labelValue = "theodolite",
) context = serviceMonitorContext
)
} catch (e: KubernetesClientException) {
logger.warn { "Service monitors could not be cleaned up. It may be that service monitors are not registered by the Kubernetes API."}
logger.debug { "Error is: ${e.message}" }
}
} }
} }
\ No newline at end of file
...@@ -102,7 +102,7 @@ class ActionCommandTest { ...@@ -102,7 +102,7 @@ class ActionCommandTest {
val action = Action() val action = Action()
action.selector = ActionSelector() action.selector = ActionSelector()
action.selector.pod = PodSelector() action.selector.pod = PodSelector()
action.selector.pod.matchLabels = mutableMapOf("app" to "pod") action.selector.pod.matchLabels = mapOf("app" to "pod")
action.exec = Command() action.exec = Command()
action.exec.command = arrayOf("ls") action.exec.command = arrayOf("ls")
action.exec.timeoutSeconds = 10L action.exec.timeoutSeconds = 10L
...@@ -118,7 +118,7 @@ class ActionCommandTest { ...@@ -118,7 +118,7 @@ class ActionCommandTest {
val action = Action() val action = Action()
action.selector = ActionSelector() action.selector = ActionSelector()
action.selector.pod = PodSelector() action.selector.pod = PodSelector()
action.selector.pod.matchLabels = mutableMapOf("app" to "pod") action.selector.pod.matchLabels = mapOf("app" to "pod")
action.exec = Command() action.exec = Command()
action.exec.command = arrayOf("error-command") action.exec.command = arrayOf("error-command")
action.exec.timeoutSeconds = 10L action.exec.timeoutSeconds = 10L
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment