Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Commits on Source (5)
......@@ -26,6 +26,7 @@ Theodolite's internal development including issue boards, merge requests and ext
* [Lorenz Boguhn](https://github.com/lorenzboguhn)
* [Simon Ehrenstein](https://github.com/sehrenstein)
* [Willi Hasselbring](https://www.se.informatik.uni-kiel.de/en/team/prof.-dr.-wilhelm-willi-hasselbring)
* [Christopher Konkel](https://github.com/JustAnotherChristoph)
* [Luca Mertens](https://www.linkedin.com/in/luca-mertens-35a932201)
* [Tobias Pfandzelter](https://pfandzelter.com/)
* [Julia Rossow](https://www.linkedin.com/in/julia-rossow/)
......
......@@ -14,7 +14,7 @@ repositories {
url "https://oss.sonatype.org/content/repositories/snapshots/"
}
maven {
url 'https://packages.confluent.io/maven/'
url 'https://packages.confluent.io/maven/'
}
}
......@@ -28,4 +28,10 @@ dependencies {
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
//testImplementation 'com.github.stefanbirkner:system-rules:1.17.0'
testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.1.0'
testImplementation('io.confluent:kafka-streams-avro-serde:5.5.12') {
// exclude, because introduces older version with higher number 5.5.1-ccs
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
}
......@@ -10,7 +10,7 @@ import org.apache.kafka.common.serialization.Serde;
/**
* Factory methods to create {@link Serde}s for Avro records using the Confluent Schema Registry.
*/
public final class SchemaRegistryAvroSerdeFactory {
public class SchemaRegistryAvroSerdeFactory {
private static final String SCHEMA_REGISTRY_URL_KEY =
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
......
......@@ -3,6 +3,7 @@ package rocks.theodolite.benchmarks.commons.hazelcastjet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import java.util.Objects;
import org.apache.commons.configuration2.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -57,10 +58,11 @@ public abstract class HazelcastJetService {
* instance.
*/
public void run() {
Objects.requireNonNull(this.jobName, "'jobName' must be set.");
try {
final Pipeline pipeline = this.pipelineFactory.buildPipeline();
this.registerSerializer();
this.jobConfig.setName(this.config.getString("name"));
this.jobConfig.setName(this.jobName);
this.jetInstance.newJobIfAbsent(pipeline, this.jobConfig).join();
} catch (final Exception e) { // NOPMD
LOGGER.error("ABORT MISSION!:", e);
......
package rocks.theodolite.benchmarks.uc4.kstreams;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.Collections;
import java.util.Map;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serde;
import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory;
public class MockedSchemaRegistrySerdes extends SchemaRegistryAvroSerdeFactory {
private static final String URL_KEY = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
private static final String DUMMY_URL = "http://dummy";
private final MockSchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
private final Map<String, String> serdeConfig = Collections.singletonMap(URL_KEY, DUMMY_URL);
public MockedSchemaRegistrySerdes() {
super(DUMMY_URL);
}
@Override
public <T extends SpecificRecord> Serde<T> forKeys() {
return this.build(true);
}
@Override
public <T extends SpecificRecord> Serde<T> forValues() {
return this.build(false);
}
private <T extends SpecificRecord> Serde<T> build(final boolean isKey) {
final Serde<T> avroSerde = new SpecificAvroSerde<>(this.schemaRegistryClient);
avroSerde.configure(this.serdeConfig, isKey);
return avroSerde;
}
}
package rocks.theodolite.benchmarks.uc4.kstreams;
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import rocks.theodolite.benchmarks.commons.configuration.events.Event;
import rocks.theodolite.benchmarks.commons.configuration.events.EventSerde;
import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
import rocks.theodolite.benchmarks.commons.model.records.AggregatedActivePowerRecord;
import rocks.theodolite.benchmarks.commons.model.sensorregistry.MutableAggregatedSensor;
import rocks.theodolite.benchmarks.commons.model.sensorregistry.MutableSensorRegistry;
public class TopologyTest {
private static final String INPUT_TOPIC = "input";
private static final String OUTPUT_TOPIC = "output";
private static final String FEEDBACK_TOPIC = "feedback";
private static final String CONFIGURATION_TOPIC = "configuration";
private TopologyTestDriver testDriver;
private TestInputTopic<Event, String> configurationTopic;
private TestInputTopic<String, ActivePowerRecord> inputTopic;
private TestOutputTopic<String, AggregatedActivePowerRecord> outputTopic;
// private TestOutputTopic<String, AggregatedActivePowerRecord> feedbackTopic;
private SchemaRegistryAvroSerdeFactory serdes;
@Before
public void setup() {
this.serdes = new MockedSchemaRegistrySerdes();
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-aggregation");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
final Topology topology = new TopologyBuilder(
INPUT_TOPIC,
OUTPUT_TOPIC,
FEEDBACK_TOPIC,
CONFIGURATION_TOPIC,
Duration.ofSeconds(2),
Duration.ofSeconds(2),
this.serdes).build(props);
// setup test driver
this.testDriver = new TopologyTestDriver(topology, props);
this.inputTopic = this.testDriver.createInputTopic(
INPUT_TOPIC,
Serdes.String().serializer(),
this.serdes.<ActivePowerRecord>forValues().serializer());
this.configurationTopic = this.testDriver.createInputTopic(
CONFIGURATION_TOPIC,
EventSerde.serde().serializer(),
Serdes.String().serializer());
this.testDriver.createInputTopic(
OUTPUT_TOPIC,
Serdes.String().serializer(),
this.serdes.forValues().serializer());
this.testDriver.createInputTopic(
FEEDBACK_TOPIC,
Serdes.String().serializer(),
this.serdes.<AggregatedActivePowerRecord>forValues().serializer());
this.outputTopic = this.testDriver.createOutputTopic(
OUTPUT_TOPIC,
Serdes.String().deserializer(),
this.serdes.<AggregatedActivePowerRecord>forValues().deserializer());
// this.feedbackTopic = this.testDriver.createOutputTopic(
// FEEDBACK_TOPIC,
// this.serdes.string().deserializer(),
// this.serdes.aggregatedActivePowerRecordValues().deserializer());
}
@After
public void tearDown() {
this.testDriver.close();
}
@Test
public void testOneLevelRegistry() {
// Publish sensor registry
final MutableSensorRegistry registry = new MutableSensorRegistry("root");
final MutableAggregatedSensor root = registry.getTopLevelSensor();
root.addChildMachineSensor("child1");
root.addChildMachineSensor("child2");
this.configurationTopic.pipeInput(
Event.SENSOR_REGISTRY_CHANGED,
registry.toJson(),
Instant.ofEpochSecond(0));
// Publish input records
this.pipeInput("child1", Instant.ofEpochSecond(0), 100.0);
this.pipeInput("child2", Instant.ofEpochSecond(1), 100.0);
// Advance time to obtain outputs
this.pipeInput("child2", Instant.ofEpochSecond(2), 123.0);
// Check aggregation results
Assert.assertEquals(1, this.outputTopic.getQueueSize());
final AggregatedActivePowerRecord result = this.outputTopic.readValue();
Assert.assertEquals("root", result.getIdentifier());
Assert.assertEquals(2, result.getCount());
Assert.assertEquals(200.0, result.getSumInW(), 0.1);
}
@Test
public void shouldOnlyConsiderLatestValue() {
// Publish sensor registry
final MutableSensorRegistry registry = new MutableSensorRegistry("root");
final MutableAggregatedSensor root = registry.getTopLevelSensor();
root.addChildMachineSensor("child1");
root.addChildMachineSensor("child2");
this.configurationTopic.pipeInput(
Event.SENSOR_REGISTRY_CHANGED,
registry.toJson(),
Instant.ofEpochSecond(0));
// Publish input records
this.pipeInput("child1", Instant.ofEpochMilli(500), 50.0);
this.pipeInput("child2", Instant.ofEpochMilli(1000), 100.0);
this.pipeInput("child1", Instant.ofEpochMilli(1500), 400.0);
// Advance time to obtain outputs
this.pipeInput("child2", Instant.ofEpochSecond(3), 123.0);
// Check aggregation results
Assert.assertEquals(1, this.outputTopic.getQueueSize());
final AggregatedActivePowerRecord result = this.outputTopic.readValue();
Assert.assertEquals("root", result.getIdentifier());
Assert.assertEquals(2, result.getCount());
Assert.assertEquals(500.0, result.getSumInW(), 0.1);
}
@Test
public void shouldOnlyConsiderLatestValueWhenOutOfOrder() {
// Publish sensor registry
final MutableSensorRegistry registry = new MutableSensorRegistry("root");
final MutableAggregatedSensor root = registry.getTopLevelSensor();
root.addChildMachineSensor("child1");
root.addChildMachineSensor("child2");
this.configurationTopic.pipeInput(
Event.SENSOR_REGISTRY_CHANGED,
registry.toJson(),
Instant.ofEpochSecond(0));
// Publish input records
this.pipeInput("child2", Instant.ofEpochMilli(1000), 100.0);
this.pipeInput("child1", Instant.ofEpochMilli(1500), 400.0);
this.pipeInput("child1", Instant.ofEpochMilli(500), 50.0);
// Advance time to obtain outputs
this.pipeInput("child2", Instant.ofEpochSecond(3), 123.0);
// Check aggregation results
Assert.assertEquals(1, this.outputTopic.getQueueSize());
final AggregatedActivePowerRecord result = this.outputTopic.readValue();
Assert.assertEquals("root", result.getIdentifier());
Assert.assertEquals(2, result.getCount());
Assert.assertEquals(500.0, result.getSumInW(), 0.1);
}
@Test
public void shouldHandleUpdateDuringGracePeriod() {
// Publish sensor registry
final MutableSensorRegistry registry = new MutableSensorRegistry("root");
final MutableAggregatedSensor root = registry.getTopLevelSensor();
root.addChildMachineSensor("child1");
root.addChildMachineSensor("child2");
this.configurationTopic.pipeInput(
Event.SENSOR_REGISTRY_CHANGED,
registry.toJson(),
Instant.ofEpochSecond(0));
// Publish input records
this.pipeInput("child1", Instant.ofEpochMilli(500), 50.0);
this.pipeInput("child2", Instant.ofEpochMilli(1000), 100.0);
// Advance time to obtain outputs
this.pipeInput("child2", Instant.ofEpochSecond(3), 123.0);
// This record arrives out-of-order but withing the grace period
this.pipeInput("child1", Instant.ofEpochMilli(1500), 400.0);
// Advance time to obtain outputs
this.pipeInput("child2", Instant.ofEpochSecond(5), 123.0);
// Check aggregation results
Assert.assertEquals(3, this.outputTopic.getQueueSize());
final AggregatedActivePowerRecord firstResult = this.outputTopic.readValue();
Assert.assertEquals("root", firstResult.getIdentifier());
Assert.assertEquals(2, firstResult.getCount());
Assert.assertEquals(150.0, firstResult.getSumInW(), 0.1);
final AggregatedActivePowerRecord secondResult = this.outputTopic.readValue();
Assert.assertEquals("root", secondResult.getIdentifier());
Assert.assertEquals(2, secondResult.getCount());
Assert.assertEquals(500.0, secondResult.getSumInW(), 0.1);
final AggregatedActivePowerRecord thirdResult = this.outputTopic.readValue();
Assert.assertEquals("root", thirdResult.getIdentifier());
Assert.assertEquals(1, thirdResult.getCount());
Assert.assertEquals(123.0, thirdResult.getSumInW(), 0.1);
}
@Test
public void shouldNotHandleUpdateAfterGracePeriod() {
// Publish sensor registry
final MutableSensorRegistry registry = new MutableSensorRegistry("root");
final MutableAggregatedSensor root = registry.getTopLevelSensor();
root.addChildMachineSensor("child1");
root.addChildMachineSensor("child2");
this.configurationTopic.pipeInput(
Event.SENSOR_REGISTRY_CHANGED,
registry.toJson(),
Instant.ofEpochSecond(0));
// Publish input records
this.pipeInput("child1", Instant.ofEpochMilli(500), 50.0);
this.pipeInput("child2", Instant.ofEpochMilli(1000), 100.0);
// Advance time to obtain outputs
this.pipeInput("child2", Instant.ofEpochSecond(3), 123.0);
// Advance time to obtain outputs
this.pipeInput("child2", Instant.ofEpochSecond(5), 123.0);
// This record arrives out-of-order and after the grace period has expired
this.pipeInput("child1", Instant.ofEpochMilli(1500), 400.0);
// Advance time to obtain outputs
this.pipeInput("child2", Instant.ofEpochSecond(7), 123.0);
// Check aggregation results
Assert.assertEquals(3, this.outputTopic.getQueueSize());
final AggregatedActivePowerRecord firstResult = this.outputTopic.readValue();
Assert.assertEquals("root", firstResult.getIdentifier());
Assert.assertEquals(2, firstResult.getCount());
Assert.assertEquals(150.0, firstResult.getSumInW(), 0.1);
final AggregatedActivePowerRecord secondResult = this.outputTopic.readValue();
Assert.assertEquals("root", secondResult.getIdentifier());
Assert.assertEquals(1, secondResult.getCount());
Assert.assertEquals(123.0, secondResult.getSumInW(), 0.1);
final AggregatedActivePowerRecord thirdResult = this.outputTopic.readValue();
Assert.assertEquals("root", thirdResult.getIdentifier());
Assert.assertEquals(1, thirdResult.getCount());
Assert.assertEquals(123.0, thirdResult.getSumInW(), 0.1);
}
@Test
public void testTwoLevelRegistry() {
// Publish sensor registry
final MutableSensorRegistry registry = new MutableSensorRegistry("root");
final MutableAggregatedSensor root = registry.getTopLevelSensor();
final MutableAggregatedSensor sensor1 = root.addChildAggregatedSensor("sensor-1");
sensor1.addChildAggregatedSensor("sensor-1-1");
sensor1.addChildAggregatedSensor("sensor-1-2");
final MutableAggregatedSensor sensor2 = root.addChildAggregatedSensor("sensor-2");
sensor2.addChildAggregatedSensor("sensor-2-1");
sensor2.addChildAggregatedSensor("sensor-2-2");
this.configurationTopic.pipeInput(
Event.SENSOR_REGISTRY_CHANGED,
registry.toJson(),
Instant.ofEpochSecond(0));
// Publish input records
this.pipeInput("sensor-1-1", Instant.ofEpochSecond(0), 100.0);
this.pipeInput("sensor-2-1", Instant.ofEpochSecond(0), 100.0);
this.pipeInput("sensor-1-2", Instant.ofEpochSecond(1), 100.0);
this.pipeInput("sensor-2-2", Instant.ofEpochSecond(1), 100.0);
// Advance time to obtain outputs
this.pipeInput("sensor-2-2", Instant.ofEpochSecond(2), 123.0);
// Check aggregation results
Assert.assertEquals(3, this.outputTopic.getQueueSize());
final AggregatedActivePowerRecord rootResult = this.outputTopic.readValue();
Assert.assertEquals("root", rootResult.getIdentifier());
Assert.assertEquals(2, rootResult.getCount());
Assert.assertEquals(400.0, rootResult.getSumInW(), 0.1);
final AggregatedActivePowerRecord sensor1Result = this.outputTopic.readValue();
Assert.assertEquals("sensor-1", sensor1Result.getIdentifier());
Assert.assertEquals(2, sensor1Result.getCount());
Assert.assertEquals(200.0, sensor1Result.getSumInW(), 0.1);
final AggregatedActivePowerRecord sensor2Result = this.outputTopic.readValue();
Assert.assertEquals("sensor-2", sensor2Result.getIdentifier());
Assert.assertEquals(2, sensor2Result.getCount());
Assert.assertEquals(200.0, sensor2Result.getSumInW(), 0.1);
}
@Test
public void testUnbalancedRegistry() {
// Publish sensor registry
final MutableSensorRegistry registry = new MutableSensorRegistry("root");
final MutableAggregatedSensor root = registry.getTopLevelSensor();
final MutableAggregatedSensor sensor1 = root.addChildAggregatedSensor("sensor-1");
sensor1.addChildMachineSensor("sensor-1-1");
sensor1.addChildMachineSensor("sensor-1-2");
final MutableAggregatedSensor sensor2 = root.addChildAggregatedSensor("sensor-2");
final MutableAggregatedSensor sensor21 = sensor2.addChildAggregatedSensor("sensor-2-1");
sensor21.addChildMachineSensor("sensor-2-1-1");
sensor21.addChildMachineSensor("sensor-2-1-2");
final MutableAggregatedSensor sensor22 = sensor2.addChildAggregatedSensor("sensor-2-2");
sensor22.addChildMachineSensor("sensor-2-2-1");
sensor22.addChildMachineSensor("sensor-2-2-2");
this.configurationTopic.pipeInput(
Event.SENSOR_REGISTRY_CHANGED,
registry.toJson(),
Instant.ofEpochSecond(0));
// Publish input records
this.pipeInput("sensor-1-1", Instant.ofEpochSecond(0), 100.0);
this.pipeInput("sensor-2-1-1", Instant.ofEpochSecond(0), 10.0);
this.pipeInput("sensor-2-2-1", Instant.ofEpochSecond(0), 20.0);
this.pipeInput("sensor-1-2", Instant.ofEpochSecond(0), 200.0);
this.pipeInput("sensor-2-1-2", Instant.ofEpochSecond(1), 1.0);
this.pipeInput("sensor-2-2-2", Instant.ofEpochSecond(1), 2.0);
// Advance time to obtain outputs
this.pipeInput("sensor-1-1", Instant.ofEpochSecond(2), 999.0);
// Check aggregation results
Assert.assertEquals(5, this.outputTopic.getQueueSize());
final AggregatedActivePowerRecord rootResult = this.outputTopic.readValue();
Assert.assertEquals("root", rootResult.getIdentifier());
Assert.assertEquals(2, rootResult.getCount());
Assert.assertEquals(333.0, rootResult.getSumInW(), 0.1);
final AggregatedActivePowerRecord sensor1Result = this.outputTopic.readValue();
Assert.assertEquals("sensor-1", sensor1Result.getIdentifier());
Assert.assertEquals(2, sensor1Result.getCount());
Assert.assertEquals(300.0, sensor1Result.getSumInW(), 0.1);
final AggregatedActivePowerRecord sensor2Result = this.outputTopic.readValue();
Assert.assertEquals("sensor-2", sensor2Result.getIdentifier());
Assert.assertEquals(2, sensor2Result.getCount());
Assert.assertEquals(33.0, sensor2Result.getSumInW(), 0.1);
final AggregatedActivePowerRecord sensor21Result = this.outputTopic.readValue();
Assert.assertEquals("sensor-2-1", sensor21Result.getIdentifier());
Assert.assertEquals(2, sensor21Result.getCount());
Assert.assertEquals(11.0, sensor21Result.getSumInW(), 0.1);
final AggregatedActivePowerRecord sensor22Result = this.outputTopic.readValue();
Assert.assertEquals("sensor-2-2", sensor22Result.getIdentifier());
Assert.assertEquals(2, sensor22Result.getCount());
Assert.assertEquals(22.0, sensor22Result.getSumInW(), 0.1);
}
private void pipeInput(final String identifier, final Instant timestamp, final double value) {
this.inputTopic.pipeInput(
identifier,
new ActivePowerRecord(identifier, timestamp.toEpochMilli(), value),
timestamp);
}
}