Skip to content
Snippets Groups Projects
Commit e944be2e authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into pubsub-load-generator

parents bd9af6e8 47ec6dbc
Branches
Tags
1 merge request!225Add option to generate load via Google PubSub
Showing
with 113 additions and 37 deletions
......@@ -19,7 +19,7 @@ services:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1"
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1"
schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
depends_on:
......@@ -33,7 +33,7 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
benchmark:
image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:latest
image: ghcr.io/cau-se/theodolite-uc3-kstreams-app:${THEODOLITE_TAG:-latest}
depends_on:
- schema-registry
- kafka
......@@ -41,7 +41,7 @@ services:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
load-generator:
image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest
image: ghcr.io/cau-se/theodolite-uc3-workload-generator:${THEODOLITE_TAG:-latest}
depends_on:
- schema-registry
- kafka
......
......@@ -33,7 +33,7 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
load-generator:
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest}
depends_on:
- schema-registry
- kafka
......@@ -45,7 +45,7 @@ services:
NUM_SENSORS: 4
NUM_NESTED_GROUPS: 4
benchmark-jobmanager:
image: ghcr.io/cau-se/theodolite-uc4-beam-flink:latest
image: ghcr.io/cau-se/theodolite-uc4-beam-flink:${THEODOLITE_TAG:-latest}
#ports:
# - "8080:8081"
command: >
......@@ -66,7 +66,7 @@ services:
- schema-registry
- kafka
benchmark-taskmanager:
image: ghcr.io/cau-se/theodolite-uc4-beam-flink:latest
image: ghcr.io/cau-se/theodolite-uc4-beam-flink:${THEODOLITE_TAG:-latest}
scale: 1
command: taskmanager
environment:
......
......@@ -35,7 +35,7 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
benchmark:
image: ghcr.io/cau-se/theodolite-uc4-beam-samza:latest
image: ghcr.io/cau-se/theodolite-uc4-beam-samza:${THEODOLITE_TAG:-latest}
scale: 1
depends_on:
- schema-registry
......@@ -47,7 +47,7 @@ services:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
load-generator:
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest}
depends_on:
- schema-registry
- kafka
......
......@@ -33,7 +33,7 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
load-generator:
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest}
depends_on:
- schema-registry
- kafka
......@@ -45,7 +45,7 @@ services:
NUM_SENSORS: 4
NUM_NESTED_GROUPS: 4
benchmark-jobmanager:
image: ghcr.io/cau-se/theodolite-uc4-flink:latest
image: ghcr.io/cau-se/theodolite-uc4-flink:${THEODOLITE_TAG:-latest}
#ports:
# - "8080:8081"
command: standalone-job --job-classname theodolite.uc4.application.AggregationServiceFlinkJob
......@@ -60,7 +60,7 @@ services:
- schema-registry
- kafka
benchmark-taskmanager:
image: ghcr.io/cau-se/theodolite-uc4-flink:latest
image: ghcr.io/cau-se/theodolite-uc4-flink:${THEODOLITE_TAG:-latest}
command: taskmanager
environment:
- |
......
......@@ -33,7 +33,7 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
benchmark:
image: ghcr.io/cau-se/theodolite-uc4-kstreams-app:latest
image: ghcr.io/cau-se/theodolite-uc4-kstreams-app:${THEODOLITE_TAG:-latest}
depends_on:
- schema-registry
- kafka
......@@ -41,7 +41,7 @@ services:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
load-generator:
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest}
depends_on:
- schema-registry
- kafka
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -37,7 +37,7 @@ public final class ConfigurationKeys {
public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY";
public static final String HTTP_URI = "HTTP_URI";
public static final String HTTP_URL = "HTTP_URL";
private ConfigurationKeys() {}
......
......@@ -95,7 +95,7 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT),
Duration.ofMillis(PERIOD_MS_DEFAULT)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanRecordGeneratorFactory.forConstantValue(VALUE_DEFAULT),
TitanRecordGenerator.forConstantValue(VALUE_DEFAULT),
TitanKafkaSenderFactory.forKafkaConfig(
KAFKA_BOOTSTRAP_SERVERS_DEFAULT,
KAFKA_TOPIC_DEFAULT,
......@@ -164,12 +164,16 @@ public final class LoadGenerator {
kafkaBootstrapServers,
kafkaInputTopic,
schemaRegistryUrl);
LOGGER.info(
"Use Kafka as target with bootstrap server '{}', schema registry url '{}' and topic '{}'.", // NOCS
kafkaBootstrapServers, schemaRegistryUrl, kafkaInputTopic);
} else if (target == LoadGeneratorTarget.HTTP) {
final URI uri = URI.create(
final URI url = URI.create(
Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_URI),
System.getenv(ConfigurationKeys.HTTP_URL),
HTTP_URI_DEFAULT));
recordSender = new HttpRecordSender<>(uri);
recordSender = new HttpRecordSender<>(url);
LOGGER.info("Use HTTP server as target with url '{}'.", url);
} else {
// Should never happen
throw new IllegalStateException("Target " + target + " is not handled yet.");
......@@ -194,7 +198,7 @@ public final class LoadGenerator {
new KeySpace(SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig(
TitanRecordGeneratorFactory.forConstantValue(value),
TitanRecordGenerator.forConstantValue(value),
recordSender))
.withThreads(threads);
}
......
package theodolite.commons.workloadgeneration;
import java.time.Clock;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A factory for creating {@link RecordGenerator}s that creates Titan {@link ActivePowerRecord}s.
*/
public final class TitanRecordGeneratorFactory {
public final class TitanRecordGenerator implements RecordGenerator<ActivePowerRecord> {
private final Clock clock;
private TitanRecordGeneratorFactory() {}
private final double constantValue;
private TitanRecordGenerator(final double constantValue) {
this.constantValue = constantValue;
this.clock = Clock.systemUTC();
}
/* default */ TitanRecordGenerator(final double constantValue, final Clock clock) {
this.constantValue = constantValue;
this.clock = clock;
}
/**
* Create a {@link RecordGenerator} that generates Titan {@link ActivePowerRecord}s with a
* constant value.
*/
public static RecordGenerator<ActivePowerRecord> forConstantValue(final double value) {
return sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value);
return new TitanRecordGenerator(value);
}
@Override
public ActivePowerRecord generate(final String key) {
return new ActivePowerRecord(key, this.clock.millis(), this.constantValue);
}
}
......@@ -10,7 +10,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.google.gson.Gson;
import java.net.URI;
import org.junit.Before;
import org.junit.Rule;
......@@ -21,8 +20,6 @@ public class HttpRecordSenderTest {
private HttpRecordSender<ActivePowerRecord> httpRecordSender;
private Gson gson;
@Rule
public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort());
......@@ -30,7 +27,6 @@ public class HttpRecordSenderTest {
public void setup() {
this.httpRecordSender =
new HttpRecordSender<>(URI.create("http://localhost:" + this.wireMockRule.port()));
this.gson = new Gson();
}
@Test
......@@ -45,8 +41,9 @@ public class HttpRecordSenderTest {
final ActivePowerRecord record = new ActivePowerRecord("my-id", 12345L, 12.34);
this.httpRecordSender.send(record);
final String expectedJson = "{\"identifier\":\"my-id\",\"timestamp\":12345,\"valueInW\":12.34}";
verify(exactly(1), postRequestedFor(urlEqualTo("/"))
.withRequestBody(equalTo(this.gson.toJson(record)))); // toJson
.withRequestBody(equalTo(expectedJson))); // toJson
}
}
package theodolite.commons.workloadgeneration;
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import org.junit.Assert;
import org.junit.Test;
import titan.ccp.model.records.ActivePowerRecord;
public class TitanRecordGeneratorTest {
@Test
public void testGenerate() {
final ZoneId zoneId = ZoneOffset.UTC;
final LocalDateTime dateTime = LocalDateTime.of(2022, 1, 17, 14, 2, 42);
final Instant instant = dateTime.atZone(zoneId).toInstant();
final TitanRecordGenerator generator =
new TitanRecordGenerator(42.0, Clock.fixed(instant, zoneId));
final ActivePowerRecord activePowerRecord = generator.generate("my-identifier");
Assert.assertEquals("my-identifier", activePowerRecord.getIdentifier());
Assert.assertEquals(instant.toEpochMilli(), activePowerRecord.getTimestamp());
Assert.assertEquals(42.0, activePowerRecord.getValueInW(), 0.001);
}
@Test
public void testTimestampForArbitraryClockTimeZone() {
final LocalDateTime dateTime = LocalDateTime.of(2022, 1, 17, 14, 2, 42);
final Instant instant = dateTime.atZone(ZoneId.of("Europe/Paris")).toInstant();
// Setting of ZoneId should have no impact on result as we request epoch millis
final Clock clock = Clock.fixed(instant, ZoneId.of("America/Sao_Paulo"));
final TitanRecordGenerator generator = new TitanRecordGenerator(42.0, clock);
final ActivePowerRecord activePowerRecord = generator.generate("my-identifier");
Assert.assertEquals(instant.toEpochMilli(), activePowerRecord.getTimestamp());
}
}
......@@ -8,7 +8,6 @@ import org.slf4j.LoggerFactory;
/**
* Logs all Key Value pairs.
*/
@SuppressWarnings({"unused"})
public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
private static final long serialVersionUID = 4328743;
private static final Logger LOGGER = LoggerFactory.getLogger(LogKeyValue.class);
......@@ -19,9 +18,7 @@ public class LogKeyValue extends DoFn<KV<String, String>, KV<String, String>> {
@ProcessElement
public void processElement(@Element final KV<String, String> kv,
final OutputReceiver<KV<String, String>> out) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Key: {}, Value: {}", kv.getKey(), kv.getValue());
}
out.output(kv);
}
}
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment