diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java index 83848261318b2e90d19f28d9ab53fdc2cf678279..a662aa1eddf2e667da5ec7714d471fb073f7a268 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/HistoryService.java @@ -1,64 +1,42 @@ package rocks.theodolite.benchmarks.uc1.hazelcastjet; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; /** - * A microservice that manages the history and, therefore, stores and aggregates incoming - * measurements. + * A microservice that records incoming measurements. */ -public class HistoryService { +public class HistoryService extends HazelcastJetService { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); - // Hazelcast settings (default) - private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns"; - private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; - - // Kafka settings (default) - private static final String KAFKA_BOOTSTRAP_DEFAULT = "localhost:9092"; - private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; - private static final String KAFKA_TOPIC_DEFAULT = "input"; - - // Job name (default) - private static final String JOB_NAME = "uc1-hazelcastjet"; - - /** - * Entrypoint for UC1 using Gradle Run. + * Constructs the use case logic for UC1. + * Retrieves the needed values and instantiates a pipeline factory. */ - public static void main(final String[] args) { - final HistoryService uc1HistoryService = new HistoryService(); - try { - uc1HistoryService.run(); - } catch (final Exception e) { // NOPMD - LOGGER.error("ABORT MISSION!: {}", e); - } + public HistoryService() { + super(LOGGER); + final Properties kafkaProps = + this.propsBuilder.buildReadProperties( + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); + + this.pipelineFactory = new Uc1PipelineFactory(kafkaProps, this.kafkaInputTopic); + } - /** - * Start a UC1 service. - * - * @throws Exception This Exception occurs if the Uc1HazelcastJetFactory is used in the wrong way. - * Detailed data is provided once an Exception occurs. - */ - public void run() throws Exception { // NOPMD - this.createHazelcastJetApplication(); + @Override + protected void registerSerializer() { + // empty since we need no serializer in uc1 } - /** - * Creates a Hazelcast Jet Application for UC1 using the Uc1HazelcastJetFactory. - * - * @throws Exception This Exception occurs if the Uc1HazelcastJetFactory is used in the wrong way. - * Detailed data is provided once an Exception occurs. - */ - private void createHazelcastJetApplication() throws Exception { // NOPMD - new Uc1HazelcastJetFactory() - .setPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME) - .setKafkaInputTopicFromEnv(KAFKA_TOPIC_DEFAULT) - .buildUc1Pipeline() - .buildUc1JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY) - .runUc1Job(JOB_NAME); + public static void main(final String[] args) { + new HistoryService().run(); } + } diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java deleted file mode 100644 index 3f3502ff17864a16b2ca91fd25b4b437cc6d00a4..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/NewHistoryService.java +++ /dev/null @@ -1,42 +0,0 @@ -package rocks.theodolite.benchmarks.uc1.hazelcastjet; - -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Properties; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; - -/** - * A microservice that records incoming measurements. - */ -public class NewHistoryService extends HazelcastJetService { - - private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); - - /** - * Constructs the use case logic for UC1. - * Retrieves the needed values and instantiates a pipeline factory. - */ - public NewHistoryService() { - super(LOGGER); - final Properties kafkaProps = - this.propsBuilder.buildReadProperties( - StringDeserializer.class.getCanonicalName(), - KafkaAvroDeserializer.class.getCanonicalName()); - - this.pipelineFactory = new Uc1PipelineFactory(kafkaProps, this.kafkaInputTopic); - - } - - @Override - protected void registerSerializer() { - // empty since we need no serializer in uc1 - } - - public static void main(final String[] args) { - new NewHistoryService().run(); - } - - -} diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java deleted file mode 100644 index 4a5c5dead14e606847dc5e2ac3c95414d9f611b3..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1HazelcastJetFactory.java +++ /dev/null @@ -1,178 +0,0 @@ -package rocks.theodolite.benchmarks.uc1.hazelcastjet; - -import com.hazelcast.jet.JetInstance; -import com.hazelcast.jet.config.JobConfig; -import com.hazelcast.jet.pipeline.Pipeline; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.slf4j.Logger; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; -import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder; -import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder; - -/** - * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC1 - * benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the - * Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build - * the Properties and set the input topic which can be done using internal functions of this - * factory. Outside data only refers to custom values or default values in case data of the - * environment cannot the fetched. - */ -public class Uc1HazelcastJetFactory { - - // Information per History Service - private Properties kafkaPropertiesForPipeline; - private String kafkaInputTopic; - private JetInstance uc1JetInstance; - private Pipeline uc1JetPipeline; - - ///////////////////////////////////// - // Layer 1 - Hazelcast Jet Run Job // - ///////////////////////////////////// - - /** - * Needs a JetInstance and Pipeline defined in this factors. Adds the pipeline to the existing - * JetInstance as a job. - * - * @param jobName The name of the job. - */ - public void runUc1Job(final String jobName) { - - // Check if a Jet Instance for UC1 is set. - if (this.uc1JetInstance == null) { - throw new IllegalStateException("Jet Instance is not set! " - + "Cannot start a hazelcast jet job for UC1."); - } - - // Check if a Pipeline for UC1 is set. - if (this.uc1JetPipeline == null) { - throw new IllegalStateException( - "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC1."); - } - - // Adds the job name and joins a job to the JetInstance defined in this factory - final JobConfig jobConfig = new JobConfig(); - jobConfig.setName(jobName); - this.uc1JetInstance.newJobIfAbsent(this.uc1JetPipeline, jobConfig).join(); - } - - ///////////// - // Layer 2 // - ///////////// - - /** - * Build a Hazelcast JetInstance used to run a job on. - * - * @param logger The logger specified for this JetInstance. - * @param bootstrapServerDefault Default bootstrap server in case no value can be derived from the - * environment. - * @param hzKubernetesServiceDnsKey The kubernetes service dns key. - * @return A Uc1HazelcastJetFactory containing a set JetInstance. - */ - public Uc1HazelcastJetFactory buildUc1JetInstanceFromEnv(final Logger logger, - final String bootstrapServerDefault, - final String hzKubernetesServiceDnsKey) { - this.uc1JetInstance = new JetInstanceBuilder() - .setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey) - .build(); - return this; - } - - /** - * Builds a Hazelcast Jet pipeline used for a JetInstance to run it as a job on. Needs the input - * topic and kafka properties defined in this factory beforehand. - * - * @return A Uc1HazelcastJetFactory containg a set pipeline. - */ - public Uc1HazelcastJetFactory buildUc1Pipeline() { - - // Check if Properties for the Kafka Input are set. - if (this.kafkaPropertiesForPipeline == null) { - throw new IllegalStateException( - "Kafka Properties for pipeline not set! Cannot build pipeline."); - } - - // Check if the Kafka input topic is set. - if (this.kafkaInputTopic == null) { - throw new IllegalStateException("Kafka input topic for pipeline not set! " - + "Cannot build pipeline."); - } - - // Build Pipeline Using the pipelineBuilder - final Uc1PipelineBuilder pipeBuilder = new Uc1PipelineBuilder(); - this.uc1JetPipeline = - pipeBuilder.build(this.kafkaPropertiesForPipeline, this.kafkaInputTopic); - // Return Uc1HazelcastJetBuilder factory - return this; - } - - ///////////// - // Layer 3 // - ///////////// - - /** - * Sets kafka properties for pipeline used in this builder. - * - * @param kafkaProperties A propeties object containing necessary values used for the hazelcst jet - * kafka connection. - * @return The Uc1HazelcastJetBuilder factory with set kafkaPropertiesForPipeline. - */ - public Uc1HazelcastJetFactory setCustomProperties(final Properties kafkaProperties) { // NOPMD - this.kafkaPropertiesForPipeline = kafkaProperties; - return this; - } - - /** - * Sets kafka properties for pipeline used in this builder using environment variables. - * - * @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server - * can be fetched from the environment. - * @param schemaRegistryUrlDefault Default schema registry url in the case that no schema registry - * url can be fetched from the environment. - * @return The Uc1HazelcastJetBuilder factory with set kafkaPropertiesForPipeline. - */ - public Uc1HazelcastJetFactory setPropertiesFromEnv(final String bootstrapServersDefault, // NOPMD - final String schemaRegistryUrlDefault, - final String jobName) { - // Use KafkaPropertiesBuilder to build a properties object used for kafka - final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); - final Properties kafkaProps = - propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault, - jobName, - StringDeserializer.class.getCanonicalName(), - KafkaAvroDeserializer.class.getCanonicalName()); - this.kafkaPropertiesForPipeline = kafkaProps; - return this; - } - - /** - * Sets the kafka input topic for the pipeline used in this builder. - * - * @param inputTopic The kafka topic used as the pipeline input. - * @return A Uc1HazelcastJetBuilder factory with a set kafkaInputTopic. - */ - public Uc1HazelcastJetFactory setCustomKafkaInputTopic(final String inputTopic) { // NOPMD - this.kafkaInputTopic = inputTopic; - return this; - } - - /** - * Sets the kafka input topic for the pipeline used in this builder using environment variables. - * - * @param defaultInputTopic The default kafka input topic used if no topic is specified by the - * environment. - * @return A Uc1HazelcastJetBuilder factory with a set kafkaInputTopic. - */ - public Uc1HazelcastJetFactory setKafkaInputTopicFromEnv(final String defaultInputTopic) { // NOPMD - this.kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - defaultInputTopic); - return this; - } - - - -} diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java deleted file mode 100644 index c02ea1e7ea7fb3f27bdbf818248678011a93f6a2..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc1/hazelcastjet/Uc1PipelineBuilder.java +++ /dev/null @@ -1,82 +0,0 @@ -package rocks.theodolite.benchmarks.uc1.hazelcastjet; - -import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder; - -import com.hazelcast.jet.kafka.KafkaSources; -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sink; -import com.hazelcast.jet.pipeline.StreamSource; -import com.hazelcast.jet.pipeline.StreamStage; -import java.util.Map.Entry; -import java.util.Properties; -import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; -import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; -import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * Builder to build a HazelcastJet Pipeline for UC1 which can be used for stream processing using - * Hazelcast Jet. - */ -public class Uc1PipelineBuilder { - - private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); - - /** - * Builds a pipeline which can be used for stream processing using Hazelcast Jet. - * - * @param kafkaPropsForPipeline Properties object containing the necessary Kafka attributes. - * @param kafkaInputTopic The name of the input topic used for the pipeline. - * @return A Hazelcast Jet pipeline which processes data for Uc1. - */ - public Pipeline build(final Properties kafkaPropsForPipeline, final String kafkaInputTopic) { - - // Define a new pipeline - final Pipeline pipe = Pipeline.create(); - - // Define the Kafka Source - final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource = - KafkaSources.<String, ActivePowerRecord>kafka(kafkaPropsForPipeline, kafkaInputTopic); - - // Extend UC1 topology to the pipeline - final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(pipe, kafkaSource); - - // Add Sink: Logger - // Do not refactor this to just use the call - // (There is a problem with static calls in functions in hazelcastjet) - final DatabaseWriter<String> writer = this.databaseAdapter.getDatabaseWriter(); - final Sink<String> sink = sinkBuilder( - "Sink into database", x -> writer) - .<String>receiveFn(DatabaseWriter::write) - .build(); - - uc1TopologyProduct.writeTo(sink); - - return pipe; - } - - /** - * Extends to a blank Hazelcast Jet Pipeline the UC1 topology defines by Theodolite. - * - * <p> - * UC1 takes {@code Entry<String,ActivePowerRecord>} objects and turns them into JSON strings - * using GSON. - * </p> - * - * @param pipe The blank hazelcast jet pipeline to extend the logic to. - * @param source A streaming source to fetch data from. - * @return A {@code StreamStage<String>} with the above definition of the String. It can be used - * to be further modified or directly be written into a sink. - */ - public StreamStage<String> extendUc1Topology(final Pipeline pipe, - final StreamSource<Entry<String, ActivePowerRecord>> source) { - - // Build the pipeline topology - return pipe.readFrom(source) - .withNativeTimestamps(0) - .setLocalParallelism(1) - .setName("Convert content") - .map(Entry::getValue) - .map(this.databaseAdapter.getRecordConverter()::convert); - } -} diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java index 525327ddbcdcddb6cf1bfe4e2d6be62d3384fc0c..4e25dd2bef6a7743db96a7b1c8a8705692ac8edc 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc1/hazelcast/Uc1PipelineTest.java @@ -14,6 +14,7 @@ import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.test.SerialTest; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.concurrent.CompletionException; import com.hazelcast.logging.ILogger; import org.junit.After; @@ -22,10 +23,11 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; -import rocks.theodolite.benchmarks.uc1.hazelcastjet.Uc1PipelineBuilder; +import rocks.theodolite.benchmarks.uc1.hazelcastjet.Uc1PipelineFactory; import titan.ccp.model.records.ActivePowerRecord; import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder; @@ -41,10 +43,10 @@ public class Uc1PipelineTest extends JetTestSupport { private Pipeline testPipeline = null; private StreamStage<String> uc1Topology = null; - // Standart Logger - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(Uc1PipelineTest.class); + // Standard Logger + private static final Logger LOGGER = LoggerFactory.getLogger(Uc1PipelineTest.class); // HazelcastJet Logger - private static final ILogger logger = getLogger(Uc1PipelineTest.class); + private static final ILogger logger = getLogger(Uc1PipelineTest.class); private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); @@ -83,10 +85,10 @@ public class Uc1PipelineTest extends JetTestSupport { }); // Create pipeline to test - final Uc1PipelineBuilder pipelineBuilder = new Uc1PipelineBuilder(); - this.testPipeline = Pipeline.create(); - this.uc1Topology = - pipelineBuilder.extendUc1Topology(this.testPipeline, testSource); + final Properties properties = new Properties(); + final Uc1PipelineFactory factory = new Uc1PipelineFactory(properties,""); + uc1Topology = factory.extendUc1Topology(testSource); + this.testPipeline = factory.getPipe(); // Create DatabaseWriter sink final DatabaseWriter<String> adapter = this.databaseAdapter.getDatabaseWriter();