Skip to content
Snippets Groups Projects
Commit 50811ff5 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into...

Merge branch 'master' into stu200776/theodolite-feature/356-specify-consumergroup-in-lagtrend-metric
parents 53d46101 0bfa08bd
Branches
Tags
1 merge request!252Allow to specify consumergroup in lagtrend slo
Pipeline #7303 passed
Showing
with 268 additions and 118 deletions
...@@ -131,7 +131,7 @@ lint-helm: ...@@ -131,7 +131,7 @@ lint-helm:
script: helm lint helm/ script: helm lint helm/
rules: rules:
- changes: - changes:
- helm/* - helm/**/*
- when: manual - when: manual
allow_failure: true allow_failure: true
......
...@@ -3,7 +3,7 @@ description: >- ...@@ -3,7 +3,7 @@ description: >-
Theodolite is a framework for benchmarking the horizontal and vertical Theodolite is a framework for benchmarking the horizontal and vertical
scalability of cloud-native applications. scalability of cloud-native applications.
remote_theme: pmarsceill/just-the-docs remote_theme: just-the-docs/just-the-docs
aux_links: aux_links:
"Theodolite on GitHub": "Theodolite on GitHub":
- "//github.com/cau-se/theodolite" - "//github.com/cau-se/theodolite"
......
...@@ -130,7 +130,7 @@ If [persisting results](installation#persisting-results) is enabled in Theodolit ...@@ -130,7 +130,7 @@ If [persisting results](installation#persisting-results) is enabled in Theodolit
For installations without persistence, but also as an alternative for installations with persistence, we provide a second option to access results: Theodolite comes with a *results access sidecar*. It allows to copy all benchmark results from the Theodolite pod to your current working directory on your host machine with the following command: For installations without persistence, but also as an alternative for installations with persistence, we provide a second option to access results: Theodolite comes with a *results access sidecar*. It allows to copy all benchmark results from the Theodolite pod to your current working directory on your host machine with the following command:
```sh ```sh
kubectl cp $(kubectl get pod -l app=theodolite -o jsonpath="{.items[0].metadata.name}"):/results . -c results-access kubectl cp $(kubectl get pod -l app=theodolite -o jsonpath="{.items[0].metadata.name}"):results . -c results-access
``` ```
## Analyzing Benchmark Results ## Analyzing Benchmark Results
......
...@@ -55,6 +55,8 @@ The prebuilt container images can be configured with the following environment v ...@@ -55,6 +55,8 @@ The prebuilt container images can be configured with the following environment v
| `KAFKA_LINGER_MS` | Value for the Kafka producer configuration: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms). Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms) | | `KAFKA_LINGER_MS` | Value for the Kafka producer configuration: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms). Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms) |
| `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) | | `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) |
| `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | | | `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | |
| `HTTP_ASYNC` | Whether the load generator should send HTTP messages asynchronously. Only used if HTTP is set as `TARGET`. | `false` |
| `HTTP_TIMEOUT_MS` | Timeout in milliseconds for sending HTTP messages. Only used if HTTP is set as `TARGET`. | 1000 |
| `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input | | `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input |
| `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | | | `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | |
| `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | | | `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | |
......
cp-helm-charts:
cp-zookeeper:
servers: 1
cp-kafka:
brokers: 1
configurationOverrides:
offsets.topic.replication.factor: "1"
operator: operator:
sloChecker: sloChecker:
droppedRecordsKStreams: droppedRecordsKStreams:
enabled: false enabled: false
resultsVolume: resultsVolume:
enabled: false enabled: false
strimzi:
kafka:
replicas: 1
config:
"offsets.topic.replication.factor": "1"
zookeeper:
replicas: 1
\ No newline at end of file
...@@ -30,6 +30,15 @@ spec: ...@@ -30,6 +30,15 @@ spec:
configMapKeyRef: configMapKeyRef:
name: {{ template "theodolite.fullname" . }}-kafka-metrics name: {{ template "theodolite.fullname" . }}-kafka-metrics
key: kafka-metrics-config.yml key: kafka-metrics-config.yml
{{- with .Values.strimzi.kafka.nodeSelectorTerms}}
template:
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
{{- toYaml . | nindent 16 }}
{{- end}}
zookeeper: zookeeper:
{{- with .Values.strimzi.zookeeper.replicas }} {{- with .Values.strimzi.zookeeper.replicas }}
...@@ -38,6 +47,15 @@ spec: ...@@ -38,6 +47,15 @@ spec:
{{- end }} {{- end }}
storage: storage:
type: ephemeral type: ephemeral
{{- with .Values.strimzi.zookeeper.nodeSelectorTerms}}
template:
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
{{- toYaml . | nindent 16 }}
{{- end}}
kafkaExporter: {} kafkaExporter: {}
......
...@@ -45,6 +45,19 @@ rules: ...@@ -45,6 +45,19 @@ rules:
- list - list
- create - create
- get - get
- apiGroups:
- kafka.strimzi.io
resources:
- kafkatopics
verbs:
- delete
- list
- get
- create
- update
{{- with .Values.rbac.additionalRules }}
{{ toYaml . | indent 2 }}
{{- end }}
{{- if .Values.operator.enabled }} {{- if .Values.operator.enabled }}
- apiGroups: - apiGroups:
- theodolite.com - theodolite.com
......
...@@ -173,6 +173,8 @@ strimzi: ...@@ -173,6 +173,8 @@ strimzi:
jvmOptions: jvmOptions:
"-Xmx": "512M" "-Xmx": "512M"
"-Xms": "512M" "-Xms": "512M"
nodeSelectorTerms: []
zookeeper: zookeeper:
replicas: 3 replicas: 3
zooEntrance: zooEntrance:
...@@ -180,8 +182,10 @@ strimzi: ...@@ -180,8 +182,10 @@ strimzi:
zookeeperClient: zookeeperClient:
enabled: true enabled: true
nodeSelector: {} nodeSelector: {}
nodeSelectorTerms: []
topicOperator: topicOperator:
enabled: false enabled: true
### ###
...@@ -341,6 +345,7 @@ serviceAccount: ...@@ -341,6 +345,7 @@ serviceAccount:
rbac: rbac:
create: true create: true
additionalRules: []
randomScheduler: randomScheduler:
enabled: true enabled: true
......
...@@ -24,7 +24,7 @@ elif os.getenv('LOG_LEVEL') == 'DEBUG': ...@@ -24,7 +24,7 @@ elif os.getenv('LOG_LEVEL') == 'DEBUG':
def calculate_slope_trend(results, warmup): def calculate_slope_trend(results, warmup):
d = [] d = []
for result in results: for result in results:
group = result['metric']['consumergroup'] group = result['metric'].get('consumergroup', "default")
for value in result['values']: for value in result['values']:
d.append({'group': group, 'timestamp': int( d.append({'group': group, 'timestamp': int(
value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0}) value[0]), 'value': int(value[1]) if value[1] != 'NaN' else 0})
......
...@@ -48,10 +48,13 @@ public abstract class AbstractPipelineFactory { ...@@ -48,10 +48,13 @@ public abstract class AbstractPipelineFactory {
final Map<String, Object> consumerConfig = new HashMap<>(); final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put( consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT));
consumerConfig.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
this.config.getString(ConfigurationKeys.MAX_POLL_RECORDS));
consumerConfig.put( consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET));
consumerConfig.put( consumerConfig.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
......
package rocks.theodolite.benchmarks.commons.beam; package rocks.theodolite.benchmarks.commons.beam;
import java.io.IOException;
import java.util.function.Function; import java.util.function.Function;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsFactory;
...@@ -23,6 +25,7 @@ public class BeamService { ...@@ -23,6 +25,7 @@ public class BeamService {
private final AbstractPipelineFactory pipelineFactory; private final AbstractPipelineFactory pipelineFactory;
private final PipelineOptions pipelineOptions; private final PipelineOptions pipelineOptions;
private PipelineResult pipelineResult;
/** /**
* Create a new {@link BeamService}. * Create a new {@link BeamService}.
...@@ -43,14 +46,43 @@ public class BeamService { ...@@ -43,14 +46,43 @@ public class BeamService {
} }
/** /**
* Start this microservice, by running the underlying Beam pipeline. * Start this microservice by running the underlying Beam pipeline.
*/ */
public void run() { public void run() {
LOGGER.info("Construct Beam pipeline with pipeline options: {}", LOGGER.info("Constructing Beam pipeline with pipeline options: {}",
this.pipelineOptions.toString()); this.pipelineOptions.toString());
final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions); final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions);
LOGGER.info("Starting BeamService {}.", this.applicationName); LOGGER.info("Starting BeamService {}.", this.applicationName);
pipeline.run().waitUntilFinish(); this.pipelineResult = pipeline.run();
}
/**
* Start this microservice by running the underlying Beam pipeline and block until this process is
* terminated.
*/
public void runStandalone() {
this.run();
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop()));
this.pipelineResult.waitUntilFinish();
}
/**
* Stop this microservice by canceling the underlying Beam pipeline.
*/
public void stop() {
LOGGER.info("Initiate shutdown of Beam service {}.", this.applicationName);
if (this.pipelineResult == null) {
throw new IllegalStateException("Cannot stop service since it has never been started.");
}
LOGGER.info("Stopping Beam pipeline.");
try {
this.pipelineResult.cancel();
this.pipelineResult = null; // NOPMD use null to indicate absence
} catch (final IOException e) {
throw new IllegalStateException(
"Stopping the service failed due to failed stop of Beam pipeline.", e);
}
LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName);
} }
} }
...@@ -33,16 +33,17 @@ public final class ConfigurationKeys { ...@@ -33,16 +33,17 @@ public final class ConfigurationKeys {
// BEAM // BEAM
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit.config"; public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit";
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset.config"; public static final String MAX_POLL_RECORDS = "max.poll.records";
public static final String AUTO_OFFSET_RESET = "auto.offset.reset";
public static final String SPECIFIC_AVRO_READER = "specific.avro.reader"; public static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
public static final String TRIGGER_INTERVAL = "trigger.interval"; public static final String TRIGGER_INTERVAL = "trigger.interval";
private ConfigurationKeys() { private ConfigurationKeys() {}
}
} }
...@@ -41,6 +41,10 @@ public final class ConfigurationKeys { ...@@ -41,6 +41,10 @@ public final class ConfigurationKeys {
public static final String HTTP_URL = "HTTP_URL"; public static final String HTTP_URL = "HTTP_URL";
public static final String HTTP_ASYNC = "HTTP_ASYNC";
public static final String HTTP_TIMEOUT_MS = "HTTP_TIMEOUT_MS";
public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC"; public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC";
public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT"; public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT";
......
...@@ -42,9 +42,9 @@ class EnvVarLoadGeneratorFactory { ...@@ -42,9 +42,9 @@ class EnvVarLoadGeneratorFactory {
.setLoadDefinition(new WorkloadDefinition( .setLoadDefinition(new WorkloadDefinition(
new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors), new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors),
Duration.ofMillis(periodMs))) Duration.ofMillis(periodMs)))
.setGeneratorConfig(new LoadGeneratorConfig( .setGeneratorConfig(new LoadGeneratorConfig(GeneratorAction.from(
TitanRecordGenerator.forConstantValue(value), TitanRecordGenerator.forConstantValue(value),
this.buildRecordSender())) this.buildRecordSender())))
.withThreads(threads); .withThreads(threads);
} }
...@@ -119,8 +119,14 @@ class EnvVarLoadGeneratorFactory { ...@@ -119,8 +119,14 @@ class EnvVarLoadGeneratorFactory {
Objects.requireNonNullElse( Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_URL), System.getenv(ConfigurationKeys.HTTP_URL),
LoadGenerator.HTTP_URI_DEFAULT)); LoadGenerator.HTTP_URI_DEFAULT));
recordSender = new HttpRecordSender<>(url); final boolean async = Boolean.parseBoolean(Objects.requireNonNullElse(
LOGGER.info("Use HTTP server as target with url '{}'.", url); System.getenv(ConfigurationKeys.HTTP_ASYNC),
Boolean.toString(LoadGenerator.HTTP_ASYNC_DEFAULT)));
final long timeoutMs = Integer.parseInt(Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.HTTP_TIMEOUT_MS),
Long.toString(LoadGenerator.HTTP_TIMEOUT_MS_DEFAULT)));
recordSender = new HttpRecordSender<>(url, async, Duration.ofMillis(timeoutMs));
LOGGER.info("Use HTTP server as target with URL '{}' and asynchronously: '{}'.", url, async);
} else if (target == LoadGeneratorTarget.PUBSUB) { } else if (target == LoadGeneratorTarget.PUBSUB) {
final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT); final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT);
final String inputTopic = Objects.requireNonNullElse( final String inputTopic = Objects.requireNonNullElse(
......
...@@ -5,14 +5,18 @@ package rocks.theodolite.benchmarks.loadgenerator; ...@@ -5,14 +5,18 @@ package rocks.theodolite.benchmarks.loadgenerator;
* it. * it.
*/ */
@FunctionalInterface @FunctionalInterface
interface GeneratorAction { public interface GeneratorAction {
void generate(final String key); void generate(final String key);
default void shutdown() {
// Nothing to do per default
}
public static <T> GeneratorAction from( public static <T> GeneratorAction from(
final RecordGenerator<? extends T> generator, final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender) { final RecordSender<? super T> sender) {
return key -> sender.send(generator.generate(key)); return new GeneratorActionImpl<>(generator, sender);
} }
} }
package rocks.theodolite.benchmarks.loadgenerator;
class GeneratorActionImpl<T> implements GeneratorAction {
private final RecordGenerator<? extends T> generator;
private final RecordSender<? super T> sender;
public GeneratorActionImpl(
final RecordGenerator<? extends T> generator,
final RecordSender<? super T> sender) {
this.generator = generator;
this.sender = sender;
}
@Override
public void shutdown() {
this.generator.close();
this.sender.close();
}
@Override
public void generate(final String key) {
this.sender.send(this.generator.generate(key));
}
}
...@@ -36,7 +36,7 @@ public class HazelcastRunner { ...@@ -36,7 +36,7 @@ public class HazelcastRunner {
} }
/** /**
* Start the workload generation and blocks until the workload generation is stopped again. * Start the load generation and blocks until the load generation is stopped again.
*/ */
public void runBlocking() { public void runBlocking() {
while (!this.stopAction.isDone()) { while (!this.stopAction.isDone()) {
...@@ -52,19 +52,24 @@ public class HazelcastRunner { ...@@ -52,19 +52,24 @@ public class HazelcastRunner {
} }
public void restart() { public void restart() {
this.stopRunnerState(); this.stopRunnerStateAsync();
} }
/**
* Stop generating load and clean up the entire state.
*/
public void stop() { public void stop() {
this.stopAction.complete(null); this.stopAction.complete(null);
this.stopRunnerState(); this.stopRunnerStateAsync().join();
this.hzInstance.shutdown();
} }
private void stopRunnerState() { private CompletableFuture<Void> stopRunnerStateAsync() {
synchronized (this) { synchronized (this) {
if (this.runnerState != null) { if (this.runnerState != null) {
this.runnerState.stopAsync(); return this.runnerState.stopAsync();
} }
return CompletableFuture.completedFuture(null);
} }
} }
......
...@@ -27,6 +27,7 @@ public class HazelcastRunnerStateInstance { ...@@ -27,6 +27,7 @@ public class HazelcastRunnerStateInstance {
private static final Duration TASK_ASSIGNMENT_WAIT_DURATION = Duration.ofMillis(500); private static final Duration TASK_ASSIGNMENT_WAIT_DURATION = Duration.ofMillis(500);
private final CompletableFuture<Void> stopAction = new CompletableFuture<>(); private final CompletableFuture<Void> stopAction = new CompletableFuture<>();
private final CompletableFuture<Void> stopFinished = new CompletableFuture<>();
private LoadGeneratorExecution loadGeneratorExecution; private LoadGeneratorExecution loadGeneratorExecution;
private final LoadGeneratorConfig loadGeneratorConfig; private final LoadGeneratorConfig loadGeneratorConfig;
...@@ -61,10 +62,12 @@ public class HazelcastRunnerStateInstance { ...@@ -61,10 +62,12 @@ public class HazelcastRunnerStateInstance {
} }
this.stopAction.join(); this.stopAction.join();
this.stopLoadGeneration(); this.stopLoadGeneration();
this.stopFinished.complete(null);
} }
public void stopAsync() { public CompletableFuture<Void> stopAsync() {
this.stopAction.complete(null); this.stopAction.complete(null);
return this.stopFinished;
} }
private void tryPerformBeforeAction() { private void tryPerformBeforeAction() {
......
...@@ -7,6 +7,7 @@ import java.net.http.HttpRequest; ...@@ -7,6 +7,7 @@ import java.net.http.HttpRequest;
import java.net.http.HttpResponse; import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers; import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
...@@ -23,6 +24,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< ...@@ -23,6 +24,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
private static final int HTTP_OK = 200; private static final int HTTP_OK = 200;
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(1);
private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class); private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class);
private final Gson gson = new Gson(); private final Gson gson = new Gson();
...@@ -33,6 +36,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< ...@@ -33,6 +36,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
private final boolean async; private final boolean async;
private final Duration connectionTimeout;
private final List<Integer> validStatusCodes; private final List<Integer> validStatusCodes;
/** /**
...@@ -41,7 +46,18 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< ...@@ -41,7 +46,18 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
* @param uri the {@link URI} records should be sent to * @param uri the {@link URI} records should be sent to
*/ */
public HttpRecordSender(final URI uri) { public HttpRecordSender(final URI uri) {
this(uri, true, List.of(HTTP_OK)); this(uri, false, DEFAULT_CONNECTION_TIMEOUT);
}
/**
* Create a new {@link HttpRecordSender}.
*
* @param uri the {@link URI} records should be sent to
* @param async whether HTTP requests should be sent asynchronous
* @param connectionTimeout timeout for the HTTP connection
*/
public HttpRecordSender(final URI uri, final boolean async, final Duration connectionTimeout) {
this(uri, async, connectionTimeout, List.of(HTTP_OK));
} }
/** /**
...@@ -49,12 +65,17 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< ...@@ -49,12 +65,17 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
* *
* @param uri the {@link URI} records should be sent to * @param uri the {@link URI} records should be sent to
* @param async whether HTTP requests should be sent asynchronous * @param async whether HTTP requests should be sent asynchronous
* @param connectionTimeout timeout for the HTTP connection
* @param validStatusCodes a list of HTTP status codes which are considered as successful * @param validStatusCodes a list of HTTP status codes which are considered as successful
*/ */
public HttpRecordSender(final URI uri, final boolean async, public HttpRecordSender(
final URI uri,
final boolean async,
final Duration connectionTimeout,
final List<Integer> validStatusCodes) { final List<Integer> validStatusCodes) {
this.uri = uri; this.uri = uri;
this.async = async; this.async = async;
this.connectionTimeout = connectionTimeout;
this.validStatusCodes = validStatusCodes; this.validStatusCodes = validStatusCodes;
} }
...@@ -63,6 +84,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< ...@@ -63,6 +84,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
final String json = this.gson.toJson(message); final String json = this.gson.toJson(message);
final HttpRequest request = HttpRequest.newBuilder() final HttpRequest request = HttpRequest.newBuilder()
.uri(this.uri) .uri(this.uri)
.timeout(this.connectionTimeout)
.POST(HttpRequest.BodyPublishers.ofString(json)) .POST(HttpRequest.BodyPublishers.ofString(json))
.build(); .build();
final BodyHandler<Void> bodyHandler = BodyHandlers.discarding(); final BodyHandler<Void> bodyHandler = BodyHandlers.discarding();
...@@ -81,13 +103,19 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< ...@@ -81,13 +103,19 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender<
response.statusCode()); response.statusCode());
} }
}); });
if (this.async) { if (this.isSync()) {
try { try {
result.get(); result.get();
} catch (InterruptedException | ExecutionException e) { } catch (final InterruptedException e) {
LOGGER.error("Couldn't get result for request to {}.", this.uri, e); LOGGER.error("Couldn't get result for request to {}.", this.uri, e);
} catch (final ExecutionException e) { // NOPMD
// Do nothing, Exception is already handled
}
} }
} }
private boolean isSync() {
return !this.async;
} }
} }
...@@ -4,115 +4,72 @@ import java.util.Properties; ...@@ -4,115 +4,72 @@ import java.util.Properties;
import java.util.function.Function; import java.util.function.Function;
import org.apache.avro.specific.SpecificRecord; import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
* Sends monitoring records to Kafka. * Sends records to Kafka.
* *
* @param <T> {@link SpecificRecord} to send * @param <T> Record type to send.
*/ */
public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> { public interface KafkaRecordSender<T> extends RecordSender<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); @Override
public void close();
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<String, T> producer;
/** /**
* Create a new {@link KafkaRecordSender}. * Creates a builder object for a {@link KafkaRecordSender} based on a Kafka {@link Serializer}.
*
* @param bootstrapServers The server to for accessing Kafka.
* @param topic The topic where to write.
* @param serializer The {@link Serializer} for mapping a value to keys.
*/ */
private KafkaRecordSender(final Builder<T> builder) { public static <T> Builder<T> builderWithSerializer(
this.topic = builder.topic; final String bootstrapServers,
this.keyAccessor = builder.keyAccessor; final String topic,
this.timestampAccessor = builder.timestampAccessor; final Serializer<T> serializer) {
return new Builder<>(bootstrapServers, topic, serializer);
final Properties properties = new Properties();
properties.putAll(builder.defaultProperties);
properties.put("bootstrap.servers", builder.bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl);
this.producer = new KafkaProducer<>(
properties,
new StringSerializer(),
avroSerdeFactory.<T>forKeys().serializer());
} }
/** /**
* Write the passed monitoring record to Kafka. * Creates a Builder object for a {@link KafkaRecordSender} based on a Confluent Schema Registry
* URL.
*
* @param bootstrapServers The Server to for accessing Kafka.
* @param topic The topic where to write.
* @param schemaRegistryUrl URL to the schema registry for avro.
*/ */
public void write(final T monitoringRecord) { public static <T extends SpecificRecord> Builder<T> builderWithSchemaRegistry(
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
try {
this.producer.send(record);
} catch (final SerializationException e) {
LOGGER.warn(
"Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS
e);
}
}
public void terminate() {
this.producer.close();
}
@Override
public void send(final T message) {
this.write(message);
}
public static <T extends SpecificRecord> Builder<T> builder(
final String bootstrapServers, final String bootstrapServers,
final String topic, final String topic,
final String schemaRegistryUrl) { final String schemaRegistryUrl) {
return new Builder<>(bootstrapServers, topic, schemaRegistryUrl); final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl);
return new Builder<>(bootstrapServers, topic, avroSerdeFactory.<T>forValues().serializer());
} }
/** /**
* Builder class to build a new {@link KafkaRecordSender}. * Builder class to build a new {@link KafkaRecordSenderImpl}.
* *
* @param <T> Type of the records that should later be send. * @param <T> Type of the records that should later be send.
*/ */
public static class Builder<T extends SpecificRecord> { public static class Builder<T> {
private final String bootstrapServers; private final String bootstrapServers;
private final String topic; private final String topic;
private final String schemaRegistryUrl; private final Serializer<T> serializer;
private Function<T, String> keyAccessor = x -> ""; // NOPMD private Function<T, String> keyAccessor = x -> ""; // NOPMD
private Function<T, Long> timestampAccessor = x -> null; // NOPMD private Function<T, Long> timestampAccessor = x -> null; // NOPMD
private Properties defaultProperties = new Properties(); // NOPMD private Properties defaultProperties = new Properties(); // NOPMD
/**
* Creates a Builder object for a {@link KafkaRecordSender}.
*
* @param bootstrapServers The Server to for accessing Kafka.
* @param topic The topic where to write.
* @param schemaRegistryUrl URL to the schema registry for avro.
*/
private Builder(final String bootstrapServers, final String topic, private Builder(final String bootstrapServers, final String topic,
final String schemaRegistryUrl) { final Serializer<T> serializer) {
this.bootstrapServers = bootstrapServers; this.bootstrapServers = bootstrapServers;
this.topic = topic; this.topic = topic;
this.schemaRegistryUrl = schemaRegistryUrl; this.serializer = serializer;
} }
public Builder<T> keyAccessor(final Function<T, String> keyAccessor) { public Builder<T> keyAccessor(final Function<T, String> keyAccessor) {
...@@ -130,9 +87,51 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender ...@@ -130,9 +87,51 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender
return this; return this;
} }
/**
* Create a {@link KafkaRecordSender} from this builder.
*/
public KafkaRecordSender<T> build() { public KafkaRecordSender<T> build() {
return new KafkaRecordSender<>(this); final Properties properties = new Properties();
properties.putAll(this.defaultProperties);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
return new KafkaRecordSenderImpl<>(
new KafkaProducer<>(
properties,
new StringSerializer(),
this.serializer),
new DefaultRecordFactory<>(),
this.topic,
this.keyAccessor,
this.timestampAccessor);
}
private static class DefaultRecordFactory<T> implements KafkaRecordFactory<T, String, T> {
@Override
public ProducerRecord<String, T> create(final String topic, final String key, final T value,
final long timestamp) {
return new ProducerRecord<>(topic, null, timestamp, key, value);
}
} }
} }
/**
* Create Kafka {@link ProducerRecord}s from a topic, a key, a value and a timestamp.
*
* @param <T> type the records should be created from.
* @param <K> key type of the {@link ProducerRecord}s.
* @param <V> value type of the {@link ProducerRecord}s.
*/
public static interface KafkaRecordFactory<T, K, V> {
ProducerRecord<K, V> create(String topic, String key, T value, long timestamp);
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment