diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java index d4b61845d1c6206cd06ded8c77279024622b0558..e94a11425eebc8180504a8a4f4ff582116623574 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/ConfigurationKeys.java @@ -41,6 +41,10 @@ public final class ConfigurationKeys { public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC"; + public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT"; + + public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java index 3ced27377a0705de9a1d3f98b5cc2bc9fe92b573..2901b68d8f3e6fa90cccfe15e7992aca67653f94 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/EnvVarLoadGeneratorFactory.java @@ -112,11 +112,22 @@ class EnvVarLoadGeneratorFactory { recordSender = new HttpRecordSender<>(url); LOGGER.info("Use HTTP server as target with url '{}'.", url); } else if (target == LoadGeneratorTarget.PUBSUB) { - final String pubSubInputTopic = Objects.requireNonNullElse( + final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT); + final String inputTopic = Objects.requireNonNullElse( System.getenv(ConfigurationKeys.PUBSUB_INPUT_TOPIC), LoadGenerator.PUBSUB_TOPIC_DEFAULT); - recordSender = TitanPubSubSenderFactory.forPubSubConfig(pubSubInputTopic); - LOGGER.info("Use Pub/Sub as target with topic '{}'.", pubSubInputTopic); + final String emulatorHost = System.getenv(ConfigurationKeys.PUBSUB_EMULATOR_HOST); + if (emulatorHost != null) { // NOPMD + LOGGER.info("Use Pub/Sub as target with emulator host {} and topic '{}'.", + emulatorHost, + inputTopic); + recordSender = TitanPubSubSenderFactory.forEmulatedPubSubConfig(emulatorHost, inputTopic); + } else if (project != null) { // NOPMD + LOGGER.info("Use Pub/Sub as target with project {} and topic '{}'.", project, inputTopic); + recordSender = TitanPubSubSenderFactory.forPubSubConfig(project, inputTopic); + } else { + throw new IllegalStateException("Neither an emulator host nor a project was provided."); + } } else { // Should never happen throw new IllegalStateException("Target " + target + " is not handled yet."); diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java index f18fc5b7c62b85f5467e12fc82646ed60a01045a..ccbeb729236307b26538ee12b1a0e2373a7f0378 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/PubSubRecordSender.java @@ -1,21 +1,30 @@ package theodolite.commons.workloadgeneration; +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import org.apache.avro.specific.SpecificRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Sends monitoring records to Kafka. + * Sends monitoring records to Pub/Sub. * - * @param <T> {@link SpecificRecord} to send + * @param <T> Record type to send */ public class PubSubRecordSender<T> implements RecordSender<T> { @@ -23,8 +32,6 @@ public class PubSubRecordSender<T> implements RecordSender<T> { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); - private final String topic; - private final Function<T, ByteBuffer> recordSerializer; private final Function<T, Long> timestampAccessor; @@ -33,19 +40,15 @@ public class PubSubRecordSender<T> implements RecordSender<T> { private final Publisher publisher; - /** - * Create a new {@link KafkaRecordSender}. - */ private PubSubRecordSender(final Builder<T> builder) { - this.topic = builder.topic; this.orderingKeyAccessor = builder.orderingKeyAccessor; this.timestampAccessor = builder.timestampAccessor; this.recordSerializer = builder.recordSerializer; try { - this.publisher = Publisher.newBuilder(this.topic).build(); + this.publisher = builder.buildPublisher(); } catch (final IOException e) { - throw new IllegalStateException(e); + throw new IllegalStateException("Can not create Pub/Sub publisher.", e); } } @@ -73,14 +76,44 @@ public class PubSubRecordSender<T> implements RecordSender<T> { if (this.timestampAccessor != null) { messageBuilder.setPublishTime(Timestamps.fromMillis(this.timestampAccessor.apply(record))); } - this.publisher.publish(messageBuilder.build()); - LOGGER.debug("Send message to PubSub topic {}: {}", this.topic, messageBuilder); + final PubsubMessage message = messageBuilder.build(); + LOGGER.debug("Send message to PubSub topic {}: {}", this.publisher.getTopicName(), message); + final ApiFuture<String> publishResult = this.publisher.publish(message); + if (LOGGER.isDebugEnabled()) { + try { + LOGGER.debug("Publishing result is {}.", publishResult.get()); + } catch (InterruptedException | ExecutionException e) { + LOGGER.warn("Can not get publishing result.", e); + } + } } - public static <T> Builder<T> builder( + /** + * Creates a {@link Builder} object for a {@link PubSubRecordSender}. + * + * @param project The project where to write. + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + public static <T> Builder<T> builderForProject( + final String project, final String topic, final Function<T, ByteBuffer> recordSerializer) { - return new Builder<>(topic, recordSerializer); + return new Builder<>(project, topic, recordSerializer); + } + + /** + * Creates a {@link Builder} object for a {@link PubSubRecordSender}. + * + * @param emulatorHost Host of the emulator. + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + public static <T> Builder<T> builderForEmulator( + final String emulatorHost, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + return new WithEmulatorBuilder<>(emulatorHost, topic, recordSerializer); } /** @@ -90,7 +123,7 @@ public class PubSubRecordSender<T> implements RecordSender<T> { */ public static class Builder<T> { - private final String topic; + protected final TopicName topicName; private final Function<T, ByteBuffer> recordSerializer; // NOPMD private Function<T, Long> timestampAccessor = null; // NOPMD private Function<T, String> orderingKeyAccessor = null; // NOPMD @@ -101,8 +134,11 @@ public class PubSubRecordSender<T> implements RecordSender<T> { * @param topic The topic where to write. * @param recordSerializer A function serializing objects to {@link ByteBuffer}. */ - private Builder(final String topic, final Function<T, ByteBuffer> recordSerializer) { - this.topic = topic; + private Builder( + final String project, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + this.topicName = TopicName.of(project, topic); this.recordSerializer = recordSerializer; } @@ -119,6 +155,51 @@ public class PubSubRecordSender<T> implements RecordSender<T> { public PubSubRecordSender<T> build() { return new PubSubRecordSender<>(this); } + + protected Publisher buildPublisher() throws IOException { + return Publisher.newBuilder(this.topicName).build(); + } + + } + + private static class WithEmulatorBuilder<T> extends Builder<T> { + + private static final String DUMMY_PROJECT = "dummy-project-id"; + + private final String emulatorHost; + + /** + * Creates a Builder object for a {@link PubSubRecordSender}. + * + * @param emulatorHost host of the emulator. + * @param topic The topic where to write. + * @param recordSerializer A function serializing objects to {@link ByteBuffer}. + */ + private WithEmulatorBuilder( + final String emulatorHost, + final String topic, + final Function<T, ByteBuffer> recordSerializer) { + super(DUMMY_PROJECT, topic, recordSerializer); + this.emulatorHost = emulatorHost; + } + + @Override + protected Publisher buildPublisher() throws IOException { + final ManagedChannel channel = ManagedChannelBuilder + .forTarget(this.emulatorHost) + .usePlaintext() + .build(); + + final TransportChannelProvider channelProvider = FixedTransportChannelProvider + .create(GrpcTransportChannel.create(channel)); + final CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); + + return Publisher.newBuilder(super.topicName) + .setChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build(); + } + } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java index 45638d47779d656b604b5b4b1c01f90c2c7e3513..5a18376ab4c2fcbf896f847c0ed34af69c5eb507 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/theodolite/commons/workloadgeneration/TitanPubSubSenderFactory.java @@ -1,6 +1,7 @@ package theodolite.commons.workloadgeneration; import java.io.IOException; +import java.nio.ByteBuffer; import titan.ccp.model.records.ActivePowerRecord; /** @@ -11,20 +12,39 @@ public final class TitanPubSubSenderFactory { private TitanPubSubSenderFactory() {} /** - * Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given PubSub + * Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given Pub/Sub * configuration. */ - public static PubSubRecordSender<ActivePowerRecord> forPubSubConfig(final String topic) { + public static PubSubRecordSender<ActivePowerRecord> forPubSubConfig( + final String project, + final String topic) { return PubSubRecordSender - .<ActivePowerRecord>builder(topic, r -> { - try { - return r.toByteBuffer(); - } catch (final IOException e) { - throw new IllegalStateException(e); - } - }) + .builderForProject(project, topic, TitanPubSubSenderFactory::serialize) // .orderingKeyAccessor(r -> r.getIdentifier()) .timestampAccessor(r -> r.getTimestamp()) .build(); } + + /** + * Create a new {@link PubSubRecordSender} for {@link ActivePowerRecord}s for the given PubSub + * emulator configuration. + */ + public static PubSubRecordSender<ActivePowerRecord> forEmulatedPubSubConfig( + final String emulatorHost, + final String topic) { + return PubSubRecordSender + .builderForEmulator(emulatorHost, topic, TitanPubSubSenderFactory::serialize) + // .orderingKeyAccessor(r -> r.getIdentifier()) + .timestampAccessor(r -> r.getTimestamp()) + .build(); + } + + private static ByteBuffer serialize(final ActivePowerRecord record) { + try { + return record.toByteBuffer(); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + }