Skip to content
Snippets Groups Projects

WIP: Hazelcast Jet Benchmark/Docker-Compose/Kubernetes implementation

6 files
+ 112
94
Compare changes
  • Side-by-side
  • Inline

Files

@@ -5,6 +5,7 @@ import com.hazelcast.config.JoinConfig;
@@ -5,6 +5,7 @@ import com.hazelcast.config.JoinConfig;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.JetInstance;
 
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sinks;
@@ -104,13 +105,14 @@ public class HistoryService {
@@ -104,13 +105,14 @@ public class HistoryService {
}
}
/** Set Pipeline Kafka Properties. */
/** Set Pipeline Kafka Properties. */
public HistoryService setKafkaPropertiesForPipeline(final Properties kafkaPropertiesForPipeline) {
public HistoryService setKafkaPropertiesForPipeline(// NOPMD
 
final Properties kafkaPropertiesForPipeline) {
this.kafkaPropertiesForPipeline = kafkaPropertiesForPipeline;
this.kafkaPropertiesForPipeline = kafkaPropertiesForPipeline;
return this;
return this;
}
}
/** Set Kafka Input topic used to build the pipeline. */
/** Set Kafka Input topic used to build the pipeline. */
public HistoryService setKafkaInputTopic(final String kafkaInputTopic) {
public HistoryService setKafkaInputTopic(final String kafkaInputTopic) { // NOPMD
this.kafkaInputTopic = kafkaInputTopic;
this.kafkaInputTopic = kafkaInputTopic;
return this;
return this;
}
}
@@ -148,7 +150,8 @@ public class HistoryService {
@@ -148,7 +150,8 @@ public class HistoryService {
final Pipeline p = Pipeline.create();
final Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.<String, ActivePowerRecord>kafka(this.kafkaPropertiesForPipeline,
p.readFrom(KafkaSources.<String, ActivePowerRecord>kafka(this.kafkaPropertiesForPipeline,
this.kafkaInputTopic))
this.kafkaInputTopic))
.withoutTimestamps()
.withNativeTimestamps(0)
 
.setLocalParallelism(1)
.setName("Log content")
.setName("Log content")
.map(record -> {
.map(record -> {
final ActivePowerRecord powerRecord = record.getValue();
final ActivePowerRecord powerRecord = record.getValue();
@@ -167,12 +170,9 @@ public class HistoryService {
@@ -167,12 +170,9 @@ public class HistoryService {
.setPort(this.clusterConfig.getPort())
.setPort(this.clusterConfig.getPort())
.setPortAutoIncrement(this.clusterConfig.isPortAutoIncrement())
.setPortAutoIncrement(this.clusterConfig.isPortAutoIncrement())
.getJoin();
.getJoin();
System.out.println("Port: " + this.clusterConfig.getPort());
joinConfig.getMulticastConfig().setEnabled(false);
joinConfig.getMulticastConfig().setEnabled(false);
if (this.clusterConfig.hasBootstrapServer()) {
if (this.clusterConfig.hasBootstrapServer()) {
joinConfig.getTcpIpConfig().addMember(this.clusterConfig.getBootstrapServer());
joinConfig.getTcpIpConfig().addMember(this.clusterConfig.getBootstrapServer());
System.out.println("Added Member to port: " + this.clusterConfig.getBootstrapServer());
} else if (this.clusterConfig.hasKubernetesDnsName()) {
} else if (this.clusterConfig.hasKubernetesDnsName()) {
joinConfig.getKubernetesConfig()
joinConfig.getKubernetesConfig()
.setEnabled(true)
.setEnabled(true)
@@ -183,7 +183,9 @@ public class HistoryService {
@@ -183,7 +183,9 @@ public class HistoryService {
// Add config and add pipeline as the job
// Add config and add pipeline as the job
final JetInstance jet = Jet.newJetInstance();
final JetInstance jet = Jet.newJetInstance();
jet.getConfig().setHazelcastConfig(config);
jet.getConfig().setHazelcastConfig(config);
jet.newJob(p).join();
final JobConfig jobConfig = new JobConfig();
 
jobConfig.setName("uc1-hazelcastjet");
 
jet.newJobIfAbsent(p, jobConfig).join();
}
}
Loading