Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Commits on Source (28)
Showing
with 134 additions and 47 deletions
......@@ -3,7 +3,7 @@ description: >-
Theodolite is a framework for benchmarking the horizontal and vertical
scalability of cloud-native applications.
remote_theme: pmarsceill/just-the-docs
remote_theme: just-the-docs/just-the-docs
aux_links:
"Theodolite on GitHub":
- "//github.com/cau-se/theodolite"
......
......@@ -56,6 +56,7 @@ The prebuilt container images can be configured with the following environment v
| `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) |
| `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | |
| `HTTP_ASYNC` | Whether the load generator should send HTTP messages asynchronously. Only used if HTTP is set as `TARGET`. | `false` |
| `HTTP_TIMEOUT_MS` | Timeout in milliseconds for sending HTTP messages. Only used if HTTP is set as `TARGET`. | 1000 |
| `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input |
| `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | |
| `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | |
......
cp-helm-charts:
cp-zookeeper:
servers: 1
cp-kafka:
brokers: 1
configurationOverrides:
offsets.topic.replication.factor: "1"
operator:
sloChecker:
droppedRecordsKStreams:
enabled: false
resultsVolume:
enabled: false
strimzi:
kafka:
replicas: 1
config:
"offsets.topic.replication.factor": "1"
zookeeper:
replicas: 1
\ No newline at end of file
......@@ -48,10 +48,13 @@ public abstract class AbstractPipelineFactory {
final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT));
consumerConfig.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
this.config.getString(ConfigurationKeys.MAX_POLL_RECORDS));
consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET));
consumerConfig.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
......
......@@ -46,7 +46,7 @@ public class BeamService {
* Start this microservice, by running the underlying Beam pipeline.
*/
public void run() {
LOGGER.info("Construct Beam pipeline with pipeline options: {}",
LOGGER.info("Constructing Beam pipeline with pipeline options: {}",
this.pipelineOptions.toString());
final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions);
LOGGER.info("Starting BeamService {}.", this.applicationName);
......
......@@ -33,16 +33,17 @@ public final class ConfigurationKeys {
// BEAM
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit.config";
public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit";
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset.config";
public static final String MAX_POLL_RECORDS = "max.poll.records";
public static final String AUTO_OFFSET_RESET = "auto.offset.reset";
public static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
public static final String TRIGGER_INTERVAL = "trigger.interval";
public static final String TRIGGER_INTERVAL = "trigger.interval";
private ConfigurationKeys() {
}
private ConfigurationKeys() {}
}
......@@ -43,6 +43,8 @@ public final class ConfigurationKeys {
public static final String HTTP_ASYNC = "HTTP_ASYNC";
public static final String HTTP_TIMEOUT_MS = "HTTP_TIMEOUT_MS";
public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC";
public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT";
......
......@@ -122,7 +122,10 @@ class EnvVarLoadGeneratorFactory {
final boolean async = Boolean.parseBoolean(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_ASYNC),
Boolean.toString(LoadGenerator.HTTP_ASYNC_DEFAULT)));
recordSender = new HttpRecordSender<>(url, async);
final long timeoutMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_TIMEOUT_MS),
Long.toString(LoadGenerator.HTTP_TIMEOUT_MS_DEFAULT)));
recordSender = new HttpRecordSender<>(url, async, Duration.ofMillis(timeoutMs));
LOGGER.info("Use HTTP server as target with URL '{}' and asynchronously: '{}'.", url, async);
} else if (target == LoadGeneratorTarget.PUBSUB) {
final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT);
......
......@@ -24,7 +24,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
private static final int HTTP_OK = 200;
private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(1);
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(1);
private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class);
......@@ -36,6 +36,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
private final boolean async;
private final Duration connectionTimeout;
private final List<Integer> validStatusCodes;
/**
......@@ -44,7 +46,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
* @param uri the {@link URI} records should be sent to
*/
public HttpRecordSender(final URI uri) {
this(uri, false, List.of(HTTP_OK));
this(uri, false, DEFAULT_CONNECTION_TIMEOUT);
}
/**
......@@ -52,9 +54,10 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
*
* @param uri the {@link URI} records should be sent to
* @param async whether HTTP requests should be sent asynchronous
* @param connectionTimeout timeout for the HTTP connection
*/
public HttpRecordSender(final URI uri, final boolean async) {
this(uri, async, List.of(HTTP_OK));
public HttpRecordSender(final URI uri, final boolean async, final Duration connectionTimeout) {
this(uri, async, connectionTimeout, List.of(HTTP_OK));
}
/**
......@@ -62,12 +65,17 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
*
* @param uri the {@link URI} records should be sent to
* @param async whether HTTP requests should be sent asynchronous
* @param connectionTimeout timeout for the HTTP connection
* @param validStatusCodes a list of HTTP status codes which are considered as successful
*/
public HttpRecordSender(final URI uri, final boolean async,
public HttpRecordSender(
final URI uri,
final boolean async,
final Duration connectionTimeout,
final List<Integer> validStatusCodes) {
this.uri = uri;
this.async = async;
this.connectionTimeout = connectionTimeout;
this.validStatusCodes = validStatusCodes;
}
......@@ -76,7 +84,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
final String json = this.gson.toJson(message);
final HttpRequest request = HttpRequest.newBuilder()
.uri(this.uri)
.timeout(CONNECTION_TIMEOUT)
.timeout(this.connectionTimeout)
.POST(HttpRequest.BodyPublishers.ofString(json))
.build();
final BodyHandler<Void> bodyHandler = BodyHandlers.discarding();
......@@ -98,8 +106,10 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
if (this.isSync()) {
try {
result.get();
} catch (InterruptedException | ExecutionException e) {
} catch (final InterruptedException e) {
LOGGER.error("Couldn't get result for request to {}.", this.uri, e);
} catch (final ExecutionException e) { // NOPMD
// Do nothing, Exception is already handled
}
}
}
......
......@@ -18,6 +18,7 @@ public final class LoadGenerator {
// Target: HTTP
public static final String HTTP_URI_DEFAULT = "http://localhost:8080";
public static final boolean HTTP_ASYNC_DEFAULT = false;
public static final long HTTP_TIMEOUT_MS_DEFAULT = 1_000;
// Target: Kafka
public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS
......
......@@ -46,4 +46,22 @@ public class HttpRecordSenderTest {
.withRequestBody(equalTo(expectedJson))); // toJson
}
@Test
public void testTimeout() {
this.wireMockRule.stubFor(
post(urlPathEqualTo("/"))
.willReturn(
aResponse()
.withFixedDelay(2_000)
.withStatus(200)
.withBody("received")));
final ActivePowerRecord record = new ActivePowerRecord("my-id", 12345L, 12.34);
this.httpRecordSender.send(record);
final String expectedJson = "{\"identifier\":\"my-id\",\"timestamp\":12345,\"valueInW\":12.34}";
verify(exactly(1), postRequestedFor(urlEqualTo("/"))
.withRequestBody(equalTo(expectedJson))); // toJson
}
}
......@@ -9,6 +9,7 @@ import org.apache.beam.sdk.transforms.Values;
import org.apache.commons.configuration2.Configuration;
import rocks.theodolite.benchmarks.commons.beam.AbstractPipelineFactory;
import rocks.theodolite.benchmarks.commons.beam.kafka.KafkaActivePowerTimestampReader;
import rocks.theodolite.benchmarks.uc1.beam.firestore.FirestoreOptionsExpander;
import titan.ccp.model.records.ActivePowerRecord;
/**
......@@ -17,6 +18,8 @@ import titan.ccp.model.records.ActivePowerRecord;
public class PipelineFactory extends AbstractPipelineFactory {
public static final String SINK_TYPE_KEY = "sink.type";
private final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY));
public PipelineFactory(final Configuration configuration) {
super(configuration);
......@@ -31,17 +34,18 @@ public class PipelineFactory extends AbstractPipelineFactory {
// final PubsubOptions pubSubOptions = options.as(PubsubOptions.class);
// pubSubOptions.setPubsubRootUrl("http://" + pubSubEmulatorHost);
// }
if (this.sinkType == SinkType.FIRESTORE) {
FirestoreOptionsExpander.expandOptions(options);
}
}
@Override
protected void constructPipeline(final Pipeline pipeline) {
final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY));
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
pipeline.apply(kafkaReader)
.apply(Values.create())
.apply(sinkType.create(this.config));
.apply(this.sinkType.create(this.config));
}
@Override
......
package rocks.theodolite.benchmarks.uc1.beam.firestore;
import java.io.IOException;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
/**
* Provides a method to expand {@link PipelineOptions} for Firestore.
*/
public final class FirestoreOptionsExpander {
private FirestoreOptionsExpander() {}
/**
* Expand {@link PipelineOptions} by special options required for Firestore derived from a default
* configuration.
*
* @param options {@link PipelineOptions} to be expanded.
*/
public static void expandOptions(final PipelineOptions options) {
final GcpOptions firestoreOptions = options.as(GcpOptions.class);
final FirestoreConfig firestoreConfig = getFirestoreConfig();
firestoreOptions.setProject(firestoreConfig.getProjectId());
}
private static FirestoreConfig getFirestoreConfig() {
try {
return FirestoreConfig.createFromDefaults();
} catch (final IOException e) {
throw new IllegalStateException("Cannot create Firestore configuration.", e);
}
}
}
......@@ -14,6 +14,7 @@ num.threads=1
commit.interval.ms=1000
cache.max.bytes.buffering=-1
specific.avro.reader=True
enable.auto.commit.config=True
auto.offset.reset.config=earliest
specific.avro.reader=true
enable.auto.commit=true
max.poll.records=500
auto.offset.reset=earliest
......@@ -12,6 +12,7 @@ num.threads=1
commit.interval.ms=1000
cache.max.bytes.buffering=-1
specific.avro.reader=True
enable.auto.commit.config=True
auto.offset.reset.config=earliest
\ No newline at end of file
specific.avro.reader=true
enable.auto.commit=true
max.poll.records=500
auto.offset.reset=earliest
\ No newline at end of file
......@@ -17,6 +17,7 @@ num.threads=1
commit.interval.ms=1000
cache.max.bytes.buffering=-1
specific.avro.reader=True
enable.auto.commit.config=True
auto.offset.reset.config=earliest
\ No newline at end of file
specific.avro.reader=true
enable.auto.commit=true
max.poll.records=500
auto.offset.reset=earliest
\ No newline at end of file
......@@ -251,10 +251,10 @@ public class PipelineFactory extends AbstractPipelineFactory {
final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT));
consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET));
consumerConfig.put(
ConsumerConfig.GROUP_ID_CONFIG, this.config
.getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
......@@ -265,10 +265,10 @@ public class PipelineFactory extends AbstractPipelineFactory {
final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT));
consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET));
consumerConfig.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
......
......@@ -20,6 +20,7 @@ num.threads=1
commit.interval.ms=1000
cache.max.bytes.buffering=-1
specific.avro.reader=True
enable.auto.commit.config=True
auto.offset.reset.config=earliest
\ No newline at end of file
specific.avro.reader=true
enable.auto.commit=true
max.poll.records=500
auto.offset.reset=earliest
\ No newline at end of file
......@@ -59,6 +59,12 @@ compileTestKotlin {
kotlinOptions.jvmTarget = JavaVersion.VERSION_11
}
test {
// Required because of https://github.com/quarkusio/quarkus/issues/18973
minHeapSize = "256m"
maxHeapSize = "1024m"
}
detekt {
failFast = true // fail build on any finding
buildUponDefaultConfig = true
......
#Gradle properties
quarkusPluginId=io.quarkus
quarkusPluginVersion=2.6.3.Final
quarkusPluginVersion=2.7.4.Final
quarkusPlatformGroupId=io.quarkus.platform
quarkusPlatformArtifactId=quarkus-bom
quarkusPlatformVersion=2.6.3.Final
quarkusPlatformVersion=2.7.4.Final
#org.gradle.logging.level=INFO
\ No newline at end of file