Skip to content
Snippets Groups Projects
Commit 92555bca authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Refactor new uc1-hazelcastjet structure

parent e473b475
No related branches found
No related tags found
1 merge request!275Refactor hazelcast jet benchmarks:
package rocks.theodolite.benchmarks.uc1.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import java.util.Properties;
/**
* A microservice that records incoming measurements.
*/
public class NewHistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
public NewHistoryService(final Logger logger) {
super(logger);
/**
* Constructs the use case logic for UC1.
* Retrieves the needed values and instantiates a pipeline factory.
*/
public NewHistoryService() {
super(LOGGER);
final Properties kafkaProps =
this.propsBuilder.buildKafkaInputReadPropsFromEnv(this.kafkaBootstrapServer,
schemaRegistryUrl,
jobName,
this.propsBuilder.buildReadProperties(
StringDeserializer.class.getCanonicalName(),
KafkaAvroDeserializer.class.getCanonicalName());
......@@ -25,23 +29,13 @@ public class NewHistoryService extends HazelcastJetService {
}
@Override
public void run() {
try {
super.run();
} catch (final Exception e) { // NOPMD
LOGGER.error("ABORT MISSION!: {}", e);
}
}
@Override
protected void registerSerializer() {
// empty since we need no serializer in uc1
}
public static void main(final String[] args) {
new NewHistoryService(LOGGER).run();
new NewHistoryService().run();
}
......
package rocks.theodolite.benchmarks.uc1.hazelcastjet;
import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import java.util.Map;
import java.util.Properties;
import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import titan.ccp.model.records.ActivePowerRecord;
import java.util.Map;
import java.util.Properties;
import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
public class Uc1PipelineFactory extends PipelineFactory {
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
private final Properties kafkaPropsForPipeline;
private final String kafkaInputTopic;
/**
* Creates a new Uc1PipelineFactory.
* @param kafkaPropsForPipeline Properties object containing the necessary Kafka attributes.
* @param kafkaReadPropsForPipeline Properties object containing the necessary Kafka attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline.
*/
public Uc1PipelineFactory(final Properties kafkaPropsForPipeline, final String kafkaInputTopic) {
super();
this.kafkaPropsForPipeline = kafkaPropsForPipeline;
this.kafkaInputTopic = kafkaInputTopic;
public Uc1PipelineFactory(final Properties kafkaReadPropsForPipeline,
final String kafkaInputTopic) {
super(kafkaReadPropsForPipeline,kafkaInputTopic);
}
/**
......@@ -39,15 +36,12 @@ public class Uc1PipelineFactory extends PipelineFactory {
*/
public Pipeline buildPipeline() {
// Define a new pipeline
final Pipeline pipe = Pipeline.create();
// Define the Kafka Source
final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource =
KafkaSources.<String, ActivePowerRecord>kafka(kafkaPropsForPipeline, kafkaInputTopic);
KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic);
// Extend UC1 topology to the pipeline
final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(pipe, kafkaSource);
final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(kafkaSource);
// Add Sink: Logger
// Do not refactor this to just use the call
......@@ -63,10 +57,6 @@ public class Uc1PipelineFactory extends PipelineFactory {
return pipe;
}
@Override
public StreamStage extendTopology() {
return null;
}
/**
* Extends to a blank Hazelcast Jet Pipeline the UC1 topology defines by Theodolite.
......@@ -76,13 +66,12 @@ public class Uc1PipelineFactory extends PipelineFactory {
* using GSON.
* </p>
*
* @param pipe The blank hazelcast jet pipeline to extend the logic to.
* @param source A streaming source to fetch data from.
* @return A {@code StreamStage<String>} with the above definition of the String. It can be used
* to be further modified or directly be written into a sink.
*/
public StreamStage<String> extendUc1Topology(final Pipeline pipe,
final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
public StreamStage<String>
extendUc1Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
// Build the pipeline topology
return pipe.readFrom(source)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment