Skip to content
Snippets Groups Projects
Commit 72cd1119 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Extend commons-hazelcastjet infrastructure

parent 2aa438d7
No related branches found
No related tags found
1 merge request!275Refactor hazelcast jet benchmarks:
......@@ -5,37 +5,42 @@ import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import org.apache.commons.configuration2.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations;
public abstract class HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastJetService.class);
private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
protected final Configuration config = ServiceConfigurations.createWithDefaults();
protected final String kafkaBootstrapServer;
protected final String schemaRegistryUrl;
protected final String jobName;
protected final String kafkaInputTopic;
private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
protected final String kafkaInputTopic;
protected PipelineFactory pipelineFactory;
private final JobConfig jobConfig = new JobConfig();
protected final JobConfig jobConfig = new JobConfig();
protected final KafkaPropertiesBuilder propsBuilder;
private Pipeline pipeline;
JetInstance jetInstance;
protected final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
private final JetInstance jetInstance;
/**
* Instantiate a new abstract service.
* Retrieves needed fields using ServiceConfiguration and build a new jet instance.
*/
public HazelcastJetService(final Logger logger) {
this.kafkaBootstrapServer = config.getProperty(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString();
this.jobName = config.getProperty(ConfigurationKeys.APPLICATION_NAME).toString();
this.kafkaBootstrapServer = config.getProperty(
ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS).toString();
this.schemaRegistryUrl = config.getProperty(ConfigurationKeys.SCHEMA_REGISTRY_URL).toString();
this.kafkaInputTopic = config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString();
this.propsBuilder =
new KafkaPropertiesBuilder(kafkaBootstrapServer, schemaRegistryUrl, jobName);
this.kafkaInputTopic = config.getProperty(ConfigurationKeys.KAFKA_INPUT_TOPIC).toString();
final JetInstanceBuilder jetInstance = new JetInstanceBuilder()
.setConfigFromEnv(logger, kafkaBootstrapServer, HZ_KUBERNETES_SERVICE_DNS_KEY);
......@@ -44,21 +49,26 @@ public abstract class HazelcastJetService {
/**
* Constructs and starts the pipeline.
* First initiates a pipeline,
* Second register the corresponding serializers,
* Third set the job name,
* Lastly, Add the job to the hazelcast instance.
* Lastly, add the job to the hazelcast instance.
*/
public void run() {
this.pipeline = pipelineFactory.buildPipeline();
registerSerializer();
jobConfig.setName(config.getString("name"));
this.jetInstance.newJobIfAbsent(pipeline, jobConfig).join();
try {
final Pipeline pipeline = pipelineFactory.buildPipeline();
registerSerializer();
jobConfig.setName(config.getString("name"));
this.jetInstance.newJobIfAbsent(pipeline, jobConfig).join();
} catch (final Exception e) { // NOPMD
LOGGER.error("ABORT MISSION!:", e);
}
}
/**
* Needs to be implemented to register the needed Serializer.
* Needs to be implemented by subclasses to register the needed Serializer.
*/
protected abstract void registerSerializer();
......
package rocks.theodolite.benchmarks.commons.hazelcastjet;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.StreamStage;
import java.util.Properties;
/**
* Abstract class to handle the common logic for all pipelines.
* Implement {@link #buildPipeline()} method to implement the custom logic of the use case.
* Caution implement this with the help of an extendPipeline() method in order to
* be testable without further infrastructure.
* A template for this is construct the sources in {@link #buildPipeline()} and give them
* as parameters to extendTopology(...).
* Further implement the pipeline logic in the extendPipeline() method and return the last stage.
* Use the returned stage afterwards in the {@link #buildPipeline()} to write the results.
*/
public abstract class PipelineFactory {
final Pipeline pipe;
protected final Pipeline pipe;
protected Properties kafkaReadPropsForPipeline;
protected Properties kafkaWritePropsForPipeline;
protected String kafkaInputTopic;
protected String kafkaOutputTopic;
public PipelineFactory() {
this.pipe = Pipeline.create();
}
/**
* Constructs a pipeline factory with read properties and input topic.
* Directly used for Uc1.
*/
public PipelineFactory(final Properties kafkaReadPropsForPipeline,
final String kafkaInputTopic) {
this();
this.kafkaReadPropsForPipeline = kafkaReadPropsForPipeline;
this.kafkaInputTopic = kafkaInputTopic;
}
public abstract Pipeline buildPipeline();
/**
* Constructs a pipeline factory with read/write properties and input/output topic.
*/
public PipelineFactory(final Properties kafkaReadPropsForPipeline,
final String kafkaInputTopic,
final Properties kafkaWritePropsForPipeline,
final String kafkaOutputTopic) {
this(kafkaReadPropsForPipeline, kafkaInputTopic);
this.kafkaWritePropsForPipeline = kafkaWritePropsForPipeline;
this.kafkaOutputTopic = kafkaOutputTopic;
}
public abstract StreamStage extendTopology();
/**
* Implement to construct the use case logic.
* @return pipeline that holds the use case logic.
*/
public abstract Pipeline buildPipeline();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment