Skip to content
Snippets Groups Projects
Commit d4e18ab6 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Refactor uc1 hzj to use new implementation

parent c433c0db
Branches
Tags
1 merge request!275Refactor hazelcast jet benchmarks:
This commit is part of merge request !275. Comments created here will be created in the context of that merge request.
package rocks.theodolite.benchmarks.uc1.hazelcastjet; package rocks.theodolite.benchmarks.uc1.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates incoming * A microservice that records incoming measurements.
* measurements.
*/ */
public class HistoryService { public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
// Hazelcast settings (default)
private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
// Kafka settings (default)
private static final String KAFKA_BOOTSTRAP_DEFAULT = "localhost:9092";
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
private static final String KAFKA_TOPIC_DEFAULT = "input";
// Job name (default)
private static final String JOB_NAME = "uc1-hazelcastjet";
/** /**
* Entrypoint for UC1 using Gradle Run. * Constructs the use case logic for UC1.
* Retrieves the needed values and instantiates a pipeline factory.
*/ */
public static void main(final String[] args) { public HistoryService() {
final HistoryService uc1HistoryService = new HistoryService(); super(LOGGER);
try { final Properties kafkaProps =
uc1HistoryService.run(); this.propsBuilder.buildReadProperties(
} catch (final Exception e) { // NOPMD StringDeserializer.class.getCanonicalName(),
LOGGER.error("ABORT MISSION!: {}", e); KafkaAvroDeserializer.class.getCanonicalName());
}
this.pipelineFactory = new Uc1PipelineFactory(kafkaProps, this.kafkaInputTopic);
} }
/** @Override
* Start a UC1 service. protected void registerSerializer() {
* // empty since we need no serializer in uc1
* @throws Exception This Exception occurs if the Uc1HazelcastJetFactory is used in the wrong way.
* Detailed data is provided once an Exception occurs.
*/
public void run() throws Exception { // NOPMD
this.createHazelcastJetApplication();
} }
/** public static void main(final String[] args) {
* Creates a Hazelcast Jet Application for UC1 using the Uc1HazelcastJetFactory. new HistoryService().run();
*
* @throws Exception This Exception occurs if the Uc1HazelcastJetFactory is used in the wrong way.
* Detailed data is provided once an Exception occurs.
*/
private void createHazelcastJetApplication() throws Exception { // NOPMD
new Uc1HazelcastJetFactory()
.setPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME)
.setKafkaInputTopicFromEnv(KAFKA_TOPIC_DEFAULT)
.buildUc1Pipeline()
.buildUc1JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY)
.runUc1Job(JOB_NAME);
} }
} }
package rocks.theodolite.benchmarks.uc1.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
/**
* A microservice that records incoming measurements.
*/
public class NewHistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/**
* Constructs the use case logic for UC1.
* Retrieves the needed values and instantiates a pipeline factory.
*/
public NewHistoryService() {
super(LOGGER);
final Properties kafkaProps =
this.propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
this.pipelineFactory = new Uc1PipelineFactory(kafkaProps, this.kafkaInputTopic);
}
@Override
protected void registerSerializer() {
// empty since we need no serializer in uc1
}
public static void main(final String[] args) {
new NewHistoryService().run();
}
}
package rocks.theodolite.benchmarks.uc1.hazelcastjet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder;
import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder;
/**
* A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC1
* benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the
* Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build
* the Properties and set the input topic which can be done using internal functions of this
* factory. Outside data only refers to custom values or default values in case data of the
* environment cannot the fetched.
*/
public class Uc1HazelcastJetFactory {
// Information per History Service
private Properties kafkaPropertiesForPipeline;
private String kafkaInputTopic;
private JetInstance uc1JetInstance;
private Pipeline uc1JetPipeline;
/////////////////////////////////////
// Layer 1 - Hazelcast Jet Run Job //
/////////////////////////////////////
/**
* Needs a JetInstance and Pipeline defined in this factors. Adds the pipeline to the existing
* JetInstance as a job.
*
* @param jobName The name of the job.
*/
public void runUc1Job(final String jobName) {
// Check if a Jet Instance for UC1 is set.
if (this.uc1JetInstance == null) {
throw new IllegalStateException("Jet Instance is not set! "
+ "Cannot start a hazelcast jet job for UC1.");
}
// Check if a Pipeline for UC1 is set.
if (this.uc1JetPipeline == null) {
throw new IllegalStateException(
"Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC1.");
}
// Adds the job name and joins a job to the JetInstance defined in this factory
final JobConfig jobConfig = new JobConfig();
jobConfig.setName(jobName);
this.uc1JetInstance.newJobIfAbsent(this.uc1JetPipeline, jobConfig).join();
}
/////////////
// Layer 2 //
/////////////
/**
* Build a Hazelcast JetInstance used to run a job on.
*
* @param logger The logger specified for this JetInstance.
* @param bootstrapServerDefault Default bootstrap server in case no value can be derived from the
* environment.
* @param hzKubernetesServiceDnsKey The kubernetes service dns key.
* @return A Uc1HazelcastJetFactory containing a set JetInstance.
*/
public Uc1HazelcastJetFactory buildUc1JetInstanceFromEnv(final Logger logger,
final String bootstrapServerDefault,
final String hzKubernetesServiceDnsKey) {
this.uc1JetInstance = new JetInstanceBuilder()
.setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey)
.build();
return this;
}
/**
* Builds a Hazelcast Jet pipeline used for a JetInstance to run it as a job on. Needs the input
* topic and kafka properties defined in this factory beforehand.
*
* @return A Uc1HazelcastJetFactory containg a set pipeline.
*/
public Uc1HazelcastJetFactory buildUc1Pipeline() {
// Check if Properties for the Kafka Input are set.
if (this.kafkaPropertiesForPipeline == null) {
throw new IllegalStateException(
"Kafka Properties for pipeline not set! Cannot build pipeline.");
}
// Check if the Kafka input topic is set.
if (this.kafkaInputTopic == null) {
throw new IllegalStateException("Kafka input topic for pipeline not set! "
+ "Cannot build pipeline.");
}
// Build Pipeline Using the pipelineBuilder
final Uc1PipelineBuilder pipeBuilder = new Uc1PipelineBuilder();
this.uc1JetPipeline =
pipeBuilder.build(this.kafkaPropertiesForPipeline, this.kafkaInputTopic);
// Return Uc1HazelcastJetBuilder factory
return this;
}
/////////////
// Layer 3 //
/////////////
/**
* Sets kafka properties for pipeline used in this builder.
*
* @param kafkaProperties A propeties object containing necessary values used for the hazelcst jet
* kafka connection.
* @return The Uc1HazelcastJetBuilder factory with set kafkaPropertiesForPipeline.
*/
public Uc1HazelcastJetFactory setCustomProperties(final Properties kafkaProperties) { // NOPMD
this.kafkaPropertiesForPipeline = kafkaProperties;
return this;
}
/**
* Sets kafka properties for pipeline used in this builder using environment variables.
*
* @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server
* can be fetched from the environment.
* @param schemaRegistryUrlDefault Default schema registry url in the case that no schema registry
* url can be fetched from the environment.
* @return The Uc1HazelcastJetBuilder factory with set kafkaPropertiesForPipeline.
*/
public Uc1HazelcastJetFactory setPropertiesFromEnv(final String bootstrapServersDefault, // NOPMD
final String schemaRegistryUrlDefault,
final String jobName) {
// Use KafkaPropertiesBuilder to build a properties object used for kafka
final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
final Properties kafkaProps =
propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
schemaRegistryUrlDefault,
jobName,
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
this.kafkaPropertiesForPipeline = kafkaProps;
return this;
}
/**
* Sets the kafka input topic for the pipeline used in this builder.
*
* @param inputTopic The kafka topic used as the pipeline input.
* @return A Uc1HazelcastJetBuilder factory with a set kafkaInputTopic.
*/
public Uc1HazelcastJetFactory setCustomKafkaInputTopic(final String inputTopic) { // NOPMD
this.kafkaInputTopic = inputTopic;
return this;
}
/**
* Sets the kafka input topic for the pipeline used in this builder using environment variables.
*
* @param defaultInputTopic The default kafka input topic used if no topic is specified by the
* environment.
* @return A Uc1HazelcastJetBuilder factory with a set kafkaInputTopic.
*/
public Uc1HazelcastJetFactory setKafkaInputTopicFromEnv(final String defaultInputTopic) { // NOPMD
this.kafkaInputTopic = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
defaultInputTopic);
return this;
}
}
package rocks.theodolite.benchmarks.uc1.hazelcastjet;
import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import java.util.Map.Entry;
import java.util.Properties;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Builder to build a HazelcastJet Pipeline for UC1 which can be used for stream processing using
* Hazelcast Jet.
*/
public class Uc1PipelineBuilder {
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
/**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet.
*
* @param kafkaPropsForPipeline Properties object containing the necessary Kafka attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline.
* @return A Hazelcast Jet pipeline which processes data for Uc1.
*/
public Pipeline build(final Properties kafkaPropsForPipeline, final String kafkaInputTopic) {
// Define a new pipeline
final Pipeline pipe = Pipeline.create();
// Define the Kafka Source
final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource =
KafkaSources.<String, ActivePowerRecord>kafka(kafkaPropsForPipeline, kafkaInputTopic);
// Extend UC1 topology to the pipeline
final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(pipe, kafkaSource);
// Add Sink: Logger
// Do not refactor this to just use the call
// (There is a problem with static calls in functions in hazelcastjet)
final DatabaseWriter<String> writer = this.databaseAdapter.getDatabaseWriter();
final Sink<String> sink = sinkBuilder(
"Sink into database", x -> writer)
.<String>receiveFn(DatabaseWriter::write)
.build();
uc1TopologyProduct.writeTo(sink);
return pipe;
}
/**
* Extends to a blank Hazelcast Jet Pipeline the UC1 topology defines by Theodolite.
*
* <p>
* UC1 takes {@code Entry<String,ActivePowerRecord>} objects and turns them into JSON strings
* using GSON.
* </p>
*
* @param pipe The blank hazelcast jet pipeline to extend the logic to.
* @param source A streaming source to fetch data from.
* @return A {@code StreamStage<String>} with the above definition of the String. It can be used
* to be further modified or directly be written into a sink.
*/
public StreamStage<String> extendUc1Topology(final Pipeline pipe,
final StreamSource<Entry<String, ActivePowerRecord>> source) {
// Build the pipeline topology
return pipe.readFrom(source)
.withNativeTimestamps(0)
.setLocalParallelism(1)
.setName("Convert content")
.map(Entry::getValue)
.map(this.databaseAdapter.getRecordConverter()::convert);
}
}
...@@ -14,6 +14,7 @@ import com.hazelcast.jet.pipeline.test.TestSources; ...@@ -14,6 +14,7 @@ import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.test.SerialTest; import com.hazelcast.jet.test.SerialTest;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import com.hazelcast.logging.ILogger; import com.hazelcast.logging.ILogger;
import org.junit.After; import org.junit.After;
...@@ -22,10 +23,11 @@ import org.junit.Before; ...@@ -22,10 +23,11 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory; import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import rocks.theodolite.benchmarks.uc1.hazelcastjet.Uc1PipelineBuilder; import rocks.theodolite.benchmarks.uc1.hazelcastjet.Uc1PipelineFactory;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder; import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
...@@ -41,8 +43,8 @@ public class Uc1PipelineTest extends JetTestSupport { ...@@ -41,8 +43,8 @@ public class Uc1PipelineTest extends JetTestSupport {
private Pipeline testPipeline = null; private Pipeline testPipeline = null;
private StreamStage<String> uc1Topology = null; private StreamStage<String> uc1Topology = null;
// Standart Logger // Standard Logger
private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(Uc1PipelineTest.class); private static final Logger LOGGER = LoggerFactory.getLogger(Uc1PipelineTest.class);
// HazelcastJet Logger // HazelcastJet Logger
private static final ILogger logger = getLogger(Uc1PipelineTest.class); private static final ILogger logger = getLogger(Uc1PipelineTest.class);
...@@ -83,10 +85,10 @@ public class Uc1PipelineTest extends JetTestSupport { ...@@ -83,10 +85,10 @@ public class Uc1PipelineTest extends JetTestSupport {
}); });
// Create pipeline to test // Create pipeline to test
final Uc1PipelineBuilder pipelineBuilder = new Uc1PipelineBuilder(); final Properties properties = new Properties();
this.testPipeline = Pipeline.create(); final Uc1PipelineFactory factory = new Uc1PipelineFactory(properties,"");
this.uc1Topology = uc1Topology = factory.extendUc1Topology(testSource);
pipelineBuilder.extendUc1Topology(this.testPipeline, testSource); this.testPipeline = factory.getPipe();
// Create DatabaseWriter sink // Create DatabaseWriter sink
final DatabaseWriter<String> adapter = this.databaseAdapter.getDatabaseWriter(); final DatabaseWriter<String> adapter = this.databaseAdapter.getDatabaseWriter();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment