Skip to content
Snippets Groups Projects
Commit 955ec34a authored by Sören Henning's avatar Sören Henning
Browse files

Add support for emulated pub/sub

parent bbeb165d
No related branches found
No related tags found
1 merge request!225Add option to generate load via Google PubSub
Pipeline #6671 passed
...@@ -41,6 +41,10 @@ public final class ConfigurationKeys { ...@@ -41,6 +41,10 @@ public final class ConfigurationKeys {
public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC"; public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC";
public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT";
public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
...@@ -112,11 +112,22 @@ class EnvVarLoadGeneratorFactory { ...@@ -112,11 +112,22 @@ class EnvVarLoadGeneratorFactory {
recordSender = new HttpRecordSender<>(url); recordSender = new HttpRecordSender<>(url);
LOGGER.info("Use HTTP server as target with url '{}'.", url); LOGGER.info("Use HTTP server as target with url '{}'.", url);
} else if (target == LoadGeneratorTarget.PUBSUB) { } else if (target == LoadGeneratorTarget.PUBSUB) {
final String pubSubInputTopic = Objects.requireNonNullElse( final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT);
final String inputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.PUBSUB_INPUT_TOPIC), System.getenv(ConfigurationKeys.PUBSUB_INPUT_TOPIC),
LoadGenerator.PUBSUB_TOPIC_DEFAULT); LoadGenerator.PUBSUB_TOPIC_DEFAULT);
recordSender = TitanPubSubSenderFactory.forPubSubConfig(pubSubInputTopic); final String emulatorHost = System.getenv(ConfigurationKeys.PUBSUB_EMULATOR_HOST);
LOGGER.info("Use Pub/Sub as target with topic '{}'.", pubSubInputTopic); if (emulatorHost != null) { // NOPMD
LOGGER.info("Use Pub/Sub as target with emulator host {} and topic '{}'.",
emulatorHost,
inputTopic);
recordSender = TitanPubSubSenderFactory.forEmulatedPubSubConfig(emulatorHost, inputTopic);
} else if (project != null) { // NOPMD
LOGGER.info("Use Pub/Sub as target with project {} and topic '{}'.", project, inputTopic);
recordSender = TitanPubSubSenderFactory.forPubSubConfig(project, inputTopic);
} else {
throw new IllegalStateException("Neither an emulator host nor a project was provided.");
}
} else { } else {
// Should never happen // Should never happen
throw new IllegalStateException("Target " + target + " is not handled yet."); throw new IllegalStateException("Target " + target + " is not handled yet.");
......
package theodolite.commons.workloadgeneration; package theodolite.commons.workloadgeneration;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps; import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import org.apache.avro.specific.SpecificRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Sends monitoring records to Kafka. * Sends monitoring records to Pub/Sub.
* *
* @param <T> {@link SpecificRecord} to send * @param <T> Record type to send
*/ */
public class PubSubRecordSender<T> implements RecordSender<T> { public class PubSubRecordSender<T> implements RecordSender<T> {
...@@ -23,8 +32,6 @@ public class PubSubRecordSender<T> implements RecordSender<T> { ...@@ -23,8 +32,6 @@ public class PubSubRecordSender<T> implements RecordSender<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final String topic;
private final Function<T, ByteBuffer> recordSerializer; private final Function<T, ByteBuffer> recordSerializer;
private final Function<T, Long> timestampAccessor; private final Function<T, Long> timestampAccessor;
...@@ -33,19 +40,15 @@ public class PubSubRecordSender<T> implements RecordSender<T> { ...@@ -33,19 +40,15 @@ public class PubSubRecordSender<T> implements RecordSender<T> {
private final Publisher publisher; private final Publisher publisher;
/**
* Create a new {@link KafkaRecordSender}.
*/
private PubSubRecordSender(final Builder<T> builder) { private PubSubRecordSender(final Builder<T> builder) {
this.topic = builder.topic;
this.orderingKeyAccessor = builder.orderingKeyAccessor; this.orderingKeyAccessor = builder.orderingKeyAccessor;
this.timestampAccessor = builder.timestampAccessor; this.timestampAccessor = builder.timestampAccessor;
this.recordSerializer = builder.recordSerializer; this.recordSerializer = builder.recordSerializer;
try { try {
this.publisher = Publisher.newBuilder(this.topic).build(); this.publisher = builder.buildPublisher();
} catch (final IOException e) { } catch (final IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException("Can not create Pub/Sub publisher.", e);
} }
} }
...@@ -73,14 +76,44 @@ public class PubSubRecordSender<T> implements RecordSender<T> { ...@@ -73,14 +76,44 @@ public class PubSubRecordSender<T> implements RecordSender<T> {
if (this.timestampAccessor != null) { if (this.timestampAccessor != null) {
messageBuilder.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record))); messageBuilder.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record)));
} }
this.publisher.publish(messageBuilder.build()); final PubsubMessage message = messageBuilder.build();
LOGGER.debug("Send message to PubSub topic {}: {}", this.topic, messageBuilder); LOGGER.debug("Send message to PubSub topic {}: {}", this.publisher.getTopicName(), message);
final ApiFuture<String> publishResult = this.publisher.publish(message);
if (LOGGER.isDebugEnabled()) {
try {
LOGGER.debug("Publishing result is {}.", publishResult.get());
} catch (InterruptedException | ExecutionException e) {
LOGGER.warn("Can not get publishing result.", e);
}
}
} }
public static <T> Builder<T> builder( /**
* Creates a {@link Builder} object for a {@link PubSubRecordSender}.
*
* @param project The project where to write.
* @param topic The topic where to write.
* @param recordSerializer A function serializing objects to {@link ByteBuffer}.
*/
public static <T> Builder<T> builderForProject(
final String project,
final String topic, final String topic,
final Function<T, ByteBuffer> recordSerializer) { final Function<T, ByteBuffer> recordSerializer) {
return new Builder<>(topic, recordSerializer); return new Builder<>(project, topic, recordSerializer);
}
/**
* Creates a {@link Builder} object for a {@link PubSubRecordSender}.
*
* @param emulatorHost Host of the emulator.
* @param topic The topic where to write.
* @param recordSerializer A function serializing objects to {@link ByteBuffer}.
*/
public static <T> Builder<T> builderForEmulator(
final String emulatorHost,
final String topic,
final Function<T, ByteBuffer> recordSerializer) {
return new WithEmulatorBuilder<>(emulatorHost, topic, recordSerializer);
} }
/** /**
...@@ -90,7 +123,7 @@ public class PubSubRecordSender<T> implements RecordSender<T> { ...@@ -90,7 +123,7 @@ public class PubSubRecordSender<T> implements RecordSender<T> {
*/ */
public static class Builder<T> { public static class Builder<T> {
private final String topic; protected final TopicName topicName;
private final Function<T, ByteBuffer> recordSerializer; // NOPMD private final Function<T, ByteBuffer> recordSerializer; // NOPMD
private Function<T, Long> timestampAccessor = null; // NOPMD private Function<T, Long> timestampAccessor = null; // NOPMD
private Function<T, String> orderingKeyAccessor = null; // NOPMD private Function<T, String> orderingKeyAccessor = null; // NOPMD
...@@ -101,8 +134,11 @@ public class PubSubRecordSender<T> implements RecordSender<T> { ...@@ -101,8 +134,11 @@ public class PubSubRecordSender<T> implements RecordSender<T> {
* @param topic The topic where to write. * @param topic The topic where to write.
* @param recordSerializer A function serializing objects to {@link ByteBuffer}. * @param recordSerializer A function serializing objects to {@link ByteBuffer}.
*/ */
private Builder(final String topic, final Function<T, ByteBuffer> recordSerializer) { private Builder(
this.topic = topic; final String project,
final String topic,
final Function<T, ByteBuffer> recordSerializer) {
this.topicName = TopicName.of(project, topic);
this.recordSerializer = recordSerializer; this.recordSerializer = recordSerializer;
} }
...@@ -119,6 +155,51 @@ public class PubSubRecordSender<T> implements RecordSender<T> { ...@@ -119,6 +155,51 @@ public class PubSubRecordSender<T> implements RecordSender<T> {
public PubSubRecordSender<T> build() { public PubSubRecordSender<T> build() {
return new PubSubRecordSender<>(this); return new PubSubRecordSender<>(this);
} }
protected Publisher buildPublisher() throws IOException {
return Publisher.newBuilder(this.topicName).build();
}
}
private static class WithEmulatorBuilder<T> extends Builder<T> {
private static final String DUMMY_PROJECT = "dummy-project-id";
private final String emulatorHost;
/**
* Creates a Builder object for a {@link PubSubRecordSender}.
*
* @param emulatorHost host of the emulator.
* @param topic The topic where to write.
* @param recordSerializer A function serializing objects to {@link ByteBuffer}.
*/
private WithEmulatorBuilder(
final String emulatorHost,
final String topic,
final Function<T, ByteBuffer> recordSerializer) {
super(DUMMY_PROJECT, topic, recordSerializer);
this.emulatorHost = emulatorHost;
}
@Override
protected Publisher buildPublisher() throws IOException {
final ManagedChannel channel = ManagedChannelBuilder
.forTarget(this.emulatorHost)
.usePlaintext()
.build();
final TransportChannelProvider channelProvider = FixedTransportChannelProvider
.create(GrpcTransportChannel.create(channel));
final CredentialsProvider credentialsProvider = NoCredentialsProvider.create();
return Publisher.newBuilder(super.topicName)
.setChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
}
} }
} }
package theodolite.commons.workloadgeneration; package theodolite.commons.workloadgeneration;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
...@@ -11,20 +12,39 @@ public final class TitanPubSubSenderFactory { ...@@ -11,20 +12,39 @@ public final class TitanPubSubSenderFactory {
private TitanPubSubSenderFactory() {} private TitanPubSubSenderFactory() {}
/** /**
* Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given PubSub * Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given Pub/Sub
* configuration. * configuration.
*/ */
public static PubSubRecordSender<ActivePowerRecord> forPubSubConfig(final String topic) { public static PubSubRecordSender<ActivePowerRecord> forPubSubConfig(
final String project,
final String topic) {
return PubSubRecordSender return PubSubRecordSender
.<ActivePowerRecord>builder(topic, r -> { .builderForProject(project, topic, TitanPubSubSenderFactory::serialize)
try {
return r.toByteBuffer();
} catch (final IOException e) {
throw new IllegalStateException(e);
}
})
// .orderingKeyAccessor(r -> r.getIdentifier()) // .orderingKeyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp()) .timestampAccessor(r -> r.getTimestamp())
.build(); .build();
} }
/**
* Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given PubSub
* emulator configuration.
*/
public static PubSubRecordSender<ActivePowerRecord> forEmulatedPubSubConfig(
final String emulatorHost,
final String topic) {
return PubSubRecordSender
.builderForEmulator(emulatorHost, topic, TitanPubSubSenderFactory::serialize)
// .orderingKeyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.build();
}
private static ByteBuffer serialize(final ActivePowerRecord record) {
try {
return record.toByteBuffer();
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment