Skip to content
Snippets Groups Projects
Commit 13ea8dd9 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Change uc4 hazelcastjet output to ActivePowerRecords + smoke test

parent 05ca2028
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #6613 passed
Showing
with 71 additions and 53 deletions
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
until docker-compose exec -T kcat kcat -L -b kafka:9092 -t output -J | jq -r '.topics[0].partitions | length' | grep "\b3\b"; do sleep 5s; done until docker-compose exec -T kcat kcat -L -b kafka:9092 -t output -J | jq -r '.topics[0].partitions | length' | grep "\b3\b"; do sleep 5s; done
docker-compose exec -T kcat kcat -C -b kafka:9092 -t output -s key=s -s value=avro -r http://schema-registry:8081 -f '%k:%s\n' -c 500 | docker-compose exec -T kcat kcat -C -b kafka:9092 -t output -s key=s -s value=avro -r http://schema-registry:8081 -f '%k:%s\n' -c 600 |
tee /dev/stderr | tee /dev/stderr |
awk -F ':' '!/^%/ {print $1}' | awk -F ':' '!/^%/ {print $1}' |
sort | sort |
......
...@@ -56,7 +56,7 @@ public class HistoryService { ...@@ -56,7 +56,7 @@ public class HistoryService {
private void createHazelcastJetApplication() throws Exception { // NOPMD private void createHazelcastJetApplication() throws Exception { // NOPMD
new Uc4HazelcastJetFactory() new Uc4HazelcastJetFactory()
.setReadPropertiesFromEnv(KAFKA_BSERVER_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) .setReadPropertiesFromEnv(KAFKA_BSERVER_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
.setWritePropertiesFromEnv(KAFKA_BSERVER_DEFAULT) .setWritePropertiesFromEnv(KAFKA_BSERVER_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT)
.setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT) .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT)
.setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT) .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT)
.setKafkaConfigurationTopicFromEnv(KAFKA_CONFIG_TOPIC_DEFAULT) .setKafkaConfigurationTopicFromEnv(KAFKA_CONFIG_TOPIC_DEFAULT)
......
...@@ -218,11 +218,11 @@ public class Uc4HazelcastJetFactory { ...@@ -218,11 +218,11 @@ public class Uc4HazelcastJetFactory {
* @return The Uc4HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline. * @return The Uc4HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline.
*/ */
public Uc4HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD public Uc4HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD
final String bootstrapServersDefault) { final String bootstrapServersDefault, final String schemaRegistryUrlDefault) {
// Use KafkaPropertiesBuilder to build a properties object used for kafka // Use KafkaPropertiesBuilder to build a properties object used for kafka
final Uc4KafkaPropertiesBuilder propsBuilder = new Uc4KafkaPropertiesBuilder(); final Uc4KafkaPropertiesBuilder propsBuilder = new Uc4KafkaPropertiesBuilder();
final Properties kafkaWriteProps = final Properties kafkaWriteProps =
propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault); propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, schemaRegistryUrlDefault);
this.kafkaWritePropsForPipeline = kafkaWriteProps; this.kafkaWritePropsForPipeline = kafkaWriteProps;
return this; return this;
} }
......
package theodolite.uc4.application; package theodolite.uc4.application;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import theodolite.commons.hazelcastjet.ConfigurationKeys; import theodolite.commons.hazelcastjet.ConfigurationKeys;
...@@ -74,9 +73,10 @@ public class Uc4KafkaPropertiesBuilder { ...@@ -74,9 +73,10 @@ public class Uc4KafkaPropertiesBuilder {
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName()); StringDeserializer.class.getCanonicalName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
DoubleDeserializer.class.getCanonicalName()); KafkaAvroDeserializer.class.getCanonicalName());
props.put(SPECIFIC_AVRO_READER_CONFIG, true); props.put(SPECIFIC_AVRO_READER_CONFIG, true);
props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return props; return props;
} }
...@@ -117,18 +117,25 @@ public class Uc4KafkaPropertiesBuilder { ...@@ -117,18 +117,25 @@ public class Uc4KafkaPropertiesBuilder {
* @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4
* Pipeline. * Pipeline.
*/ */
public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault) { public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault,
final String schemaRegistryUrlDefault) {
final String kafkaBootstrapServers = Objects.requireNonNullElse( final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
kafkaBootstrapServerDefault); kafkaBootstrapServerDefault);
final String schemaRegistryUrl = Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
schemaRegistryUrlDefault);
final Properties props = new Properties(); final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName()); StringSerializer.class.getCanonicalName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
DoubleSerializer.class.getCanonicalName()); KafkaAvroSerializer.class.getCanonicalName());
props.put("specific.avro.writer", true);
props.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return props; return props;
} }
......
...@@ -22,6 +22,8 @@ import java.util.Map; ...@@ -22,6 +22,8 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.uc4.application.uc4specifics.ChildParentsTransformer; import theodolite.uc4.application.uc4specifics.ChildParentsTransformer;
import theodolite.uc4.application.uc4specifics.SensorGroupKey; import theodolite.uc4.application.uc4specifics.SensorGroupKey;
import theodolite.uc4.application.uc4specifics.ValueGroup; import theodolite.uc4.application.uc4specifics.ValueGroup;
...@@ -33,8 +35,10 @@ import titan.ccp.model.sensorregistry.SensorRegistry; ...@@ -33,8 +35,10 @@ import titan.ccp.model.sensorregistry.SensorRegistry;
* Builder to build a HazelcastJet Pipeline for UC4 which can be used for stream processing using * Builder to build a HazelcastJet Pipeline for UC4 which can be used for stream processing using
* Hazelcast Jet. * Hazelcast Jet.
*/ */
@SuppressWarnings("PMD.ExcessiveImports")
public class Uc4PipelineBuilder { public class Uc4PipelineBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(Uc4PipelineBuilder.class);
private static final String SENSOR_PARENT_MAP_NAME = "SensorParentMap"; private static final String SENSOR_PARENT_MAP_NAME = "SensorParentMap";
/** /**
...@@ -66,6 +70,14 @@ public class Uc4PipelineBuilder { ...@@ -66,6 +70,14 @@ public class Uc4PipelineBuilder {
final String kafkaFeedbackTopic, final String kafkaFeedbackTopic,
final int windowSize) { final int windowSize) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("kafkaConfigProps: " + kafkaConfigPropsForPipeline);
LOGGER.info("kafkaFeedbackProps: " + kafkaFeedbackPropsForPipeline);
LOGGER.info("kafkaWriteProps: " + kafkaWritePropsForPipeline);
}
// The pipeline for this Use Case // The pipeline for this Use Case
final Pipeline uc4Pipeline = Pipeline.create(); final Pipeline uc4Pipeline = Pipeline.create();
...@@ -75,19 +87,20 @@ public class Uc4PipelineBuilder { ...@@ -75,19 +87,20 @@ public class Uc4PipelineBuilder {
final StreamSource<Entry<String, ActivePowerRecord>> inputSource = final StreamSource<Entry<String, ActivePowerRecord>> inputSource =
KafkaSources.<String, ActivePowerRecord>kafka( KafkaSources.<String, ActivePowerRecord>kafka(
kafkaInputReadPropsForPipeline, kafkaInputTopic); kafkaInputReadPropsForPipeline, kafkaInputTopic);
final StreamSource<Entry<String, Double>> aggregationSource = final StreamSource<Entry<String, ActivePowerRecord>> aggregationSource =
KafkaSources.<String, Double>kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic); KafkaSources.<String, ActivePowerRecord>
kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic);
// Extend UC4 topology to pipeline // Extend UC4 topology to pipeline
final StreamStage<Entry<String, Double>> uc4Product = final StreamStage<Entry<String, ActivePowerRecord>> uc4Product =
this.extendUc4Topology(uc4Pipeline, inputSource, aggregationSource, configSource, this.extendUc4Topology(uc4Pipeline, inputSource, aggregationSource, configSource,
windowSize); windowSize);
// Add Sink1: Write back to kafka output topic // Add Sink1: Write back to kafka output topic
uc4Product.writeTo(KafkaSinks.<String, Double>kafka( uc4Product.writeTo(KafkaSinks.kafka(
kafkaWritePropsForPipeline, kafkaOutputTopic)); kafkaWritePropsForPipeline, kafkaOutputTopic));
// Add Sink2: Write back to kafka feedback/aggregation topic // Add Sink2: Write back to kafka feedback/aggregation topic
uc4Product.writeTo(KafkaSinks.<String, Double>kafka( uc4Product.writeTo(KafkaSinks.kafka(
kafkaWritePropsForPipeline, kafkaFeedbackTopic)); kafkaWritePropsForPipeline, kafkaFeedbackTopic));
// Add Sink3: Logger // Add Sink3: Logger
uc4Product.writeTo(Sinks.logger()); uc4Product.writeTo(Sinks.logger());
...@@ -125,9 +138,10 @@ public class Uc4PipelineBuilder { ...@@ -125,9 +138,10 @@ public class Uc4PipelineBuilder {
* according aggregated values. The data can be further modified or directly be linked to * according aggregated values. The data can be further modified or directly be linked to
* a Hazelcast Jet sink. * a Hazelcast Jet sink.
*/ */
public StreamStage<Entry<String, Double>> extendUc4Topology(final Pipeline pipe, // NOPMD public StreamStage<Entry<String, ActivePowerRecord>> extendUc4Topology(// NOPMD
final Pipeline pipe,
final StreamSource<Entry<String, ActivePowerRecord>> inputSource, final StreamSource<Entry<String, ActivePowerRecord>> inputSource,
final StreamSource<Entry<String, Double>> aggregationSource, final StreamSource<Entry<String, ActivePowerRecord>> aggregationSource,
final StreamSource<Entry<Event, String>> configurationSource, final int windowSize) { final StreamSource<Entry<Event, String>> configurationSource, final int windowSize) {
////////////////////////////////// //////////////////////////////////
...@@ -145,26 +159,20 @@ public class Uc4PipelineBuilder { ...@@ -145,26 +159,20 @@ public class Uc4PipelineBuilder {
////////////////////////////////// //////////////////////////////////
// (1) Sensor Input Stream // (1) Sensor Input Stream
final StreamStage<Entry<String, Double>> inputStream = pipe final StreamStage<Entry<String, ActivePowerRecord>> inputStream = pipe
.readFrom(inputSource) .readFrom(inputSource)
.withNativeTimestamps(0) .withNativeTimestamps(0);
.map(stream -> {
// Build data for next pipeline stage
final String sensorId = stream.getValue().getIdentifier();
final Double valueInW = stream.getValue().getValueInW();
// Return data for next pipeline stage
return Util.entry(sensorId, valueInW);
});
////////////////////////////////// //////////////////////////////////
// (1) Aggregation Stream // (1) Aggregation Stream
final StreamStage<Entry<String, Double>> aggregations = pipe final StreamStage<Entry<String, ActivePowerRecord>> aggregations = pipe
.readFrom(aggregationSource) .readFrom(aggregationSource)
.withNativeTimestamps(0); .withNativeTimestamps(0);
////////////////////////////////// //////////////////////////////////
// (2) UC4 Merge Input with aggregation stream // (2) UC4 Merge Input with aggregation stream
final StreamStageWithKey<Entry<String, Double>, String> mergedInputAndAggregations = inputStream final StreamStageWithKey<Entry<String, ActivePowerRecord>, String>
mergedInputAndAggregations = inputStream
.merge(aggregations) .merge(aggregations)
.groupingKey(Entry::getKey); .groupingKey(Entry::getKey);
...@@ -194,21 +202,21 @@ public class Uc4PipelineBuilder { ...@@ -194,21 +202,21 @@ public class Uc4PipelineBuilder {
////////////////////////////////// //////////////////////////////////
// (4) UC4 Duplicate as flatmap joined Stream // (4) UC4 Duplicate as flatmap joined Stream
// [(sensorKey, Group) , value] // [(sensorKey, Group) , value]
final StreamStage<Entry<SensorGroupKey, Double>> dupliAsFlatmappedStage = joinedStage final StreamStage<Entry<SensorGroupKey, ActivePowerRecord>> dupliAsFlatmappedStage = joinedStage
.flatMap(entry -> { .flatMap(entry -> {
// Supplied data // Supplied data
final String keyGroupId = entry.getKey(); final String keyGroupId = entry.getKey();
final Double valueInW = entry.getValue().getValueInW(); final ActivePowerRecord record = entry.getValue().getRecord();
final Set<String> groups = entry.getValue().getGroups(); final Set<String> groups = entry.getValue().getGroups();
// Transformed Data // Transformed Data
final String[] groupList = groups.toArray(String[]::new); final String[] groupList = groups.toArray(String[]::new);
final SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length]; final SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length];
final List<Entry<SensorGroupKey, Double>> newEntryList = new ArrayList<>(); final List<Entry<SensorGroupKey, ActivePowerRecord>> newEntryList = new ArrayList<>();
for (int i = 0; i < groupList.length; i++) { for (int i = 0; i < groupList.length; i++) {
newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]); newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]);
newEntryList.add(Util.entry(newKeyList[i], valueInW)); newEntryList.add(Util.entry(newKeyList[i], record));
} }
// Return traversable list of new entry elements // Return traversable list of new entry elements
...@@ -218,7 +226,8 @@ public class Uc4PipelineBuilder { ...@@ -218,7 +226,8 @@ public class Uc4PipelineBuilder {
////////////////////////////////// //////////////////////////////////
// (5) UC4 Last Value Map // (5) UC4 Last Value Map
// Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time
final StageWithWindow<Entry<SensorGroupKey, Double>> windowedLastValues = dupliAsFlatmappedStage final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>>
windowedLastValues = dupliAsFlatmappedStage
.window(WindowDefinition.tumbling(windowSize)); .window(WindowDefinition.tumbling(windowSize));
////////////////////////////////// //////////////////////////////////
...@@ -226,15 +235,16 @@ public class Uc4PipelineBuilder { ...@@ -226,15 +235,16 @@ public class Uc4PipelineBuilder {
// Group using the group out of the sensorGroupKey keys // Group using the group out of the sensorGroupKey keys
return windowedLastValues return windowedLastValues
.groupingKey(entry -> entry.getKey().getGroup()) .groupingKey(entry -> entry.getKey().getGroup())
.aggregate(AggregateOperations.summingDouble(Entry::getValue)) .aggregate(AggregateOperations.summingDouble(entry -> entry.getValue().getValueInW()))
.map(agg -> { .map(agg -> {
// Construct data for return pair // Construct data for return pair
final String theGroup = agg.getKey(); final String theGroup = agg.getKey();
final Double summedValueInW = agg.getValue(); final Double summedValueInW = agg.getValue();
// Return aggregates group value pair // Return aggregates group value pair
return Util.entry(theGroup, summedValueInW); return Util.entry(
theGroup,
new ActivePowerRecord(theGroup, System.currentTimeMillis(), summedValueInW));
}); });
} }
......
...@@ -2,22 +2,27 @@ package theodolite.uc4.application.uc4specifics; ...@@ -2,22 +2,27 @@ package theodolite.uc4.application.uc4specifics;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Structure: (valueInW, Set(Groups)). * Structure: (valueInW, Set(Groups)).
*/ */
public class ValueGroup { public class ValueGroup {
private final Double valueInW; private final ActivePowerRecord record;
private final Set<String> groups; private final Set<String> groups;
public ValueGroup(final Double valueInW, final Set<String> groups) { public ValueGroup(final ActivePowerRecord record, final Set<String> groups) {
this.valueInW = valueInW; this.record = record;
this.groups = groups; this.groups = groups;
} }
public ActivePowerRecord getRecord() {
return this.record;
}
public Double getValueInW() { public Double getValueInW() {
return this.valueInW; return this.record.getValueInW();
} }
public Set<String> getGroups() { public Set<String> getGroups() {
...@@ -30,12 +35,12 @@ public class ValueGroup { ...@@ -30,12 +35,12 @@ public class ValueGroup {
for (final String group : this.groups) { for (final String group : this.groups) {
groupString = groupString + group + "/";// NOPMD groupString = groupString + group + "/";// NOPMD
} }
return this.valueInW.toString() + ";" + groupString + "]"; return this.record.getValueInW() + ";" + groupString + "]";
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(this.valueInW, this.groups); return Objects.hash(this.record, this.groups);
} }
@Override @Override
...@@ -45,7 +50,7 @@ public class ValueGroup { ...@@ -45,7 +50,7 @@ public class ValueGroup {
} }
if (obj instanceof ValueGroup) { if (obj instanceof ValueGroup) {
final ValueGroup other = (ValueGroup) obj; final ValueGroup other = (ValueGroup) obj;
return Objects.equals(this.valueInW, other.valueInW) return Objects.equals(this.record.getValueInW(), other.getValueInW())
&& this.groups.containsAll(other.groups); && this.groups.containsAll(other.groups);
} }
return false; return false;
......
...@@ -19,13 +19,13 @@ public class ValueGroupSerializer implements StreamSerializer<ValueGroup> { ...@@ -19,13 +19,13 @@ public class ValueGroupSerializer implements StreamSerializer<ValueGroup> {
@Override @Override
public void write(final ObjectDataOutput out, final ValueGroup key) throws IOException { public void write(final ObjectDataOutput out, final ValueGroup key) throws IOException {
out.writeDouble(key.getValueInW()); out.writeObject(key);
out.writeString(String.join(",", key.getGroups())); out.writeString(String.join(",", key.getGroups()));
} }
@Override @Override
public ValueGroup read(final ObjectDataInput in) throws IOException { public ValueGroup read(final ObjectDataInput in) throws IOException {
return new ValueGroup(in.readDouble(), return new ValueGroup(in.readObject(ValueGroup.class),
new HashSet<>(Arrays.asList(in.readString().split(",")))); new HashSet<>(Arrays.asList(in.readString().split(","))));
} }
......
...@@ -12,10 +12,7 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException; ...@@ -12,10 +12,7 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
import com.hazelcast.jet.pipeline.test.Assertions; import com.hazelcast.jet.pipeline.test.Assertions;
import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.test.SerialTest; import com.hazelcast.jet.test.SerialTest;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.Map; import java.util.Map;
import java.util.TimeZone;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import org.junit.After; import org.junit.After;
...@@ -34,7 +31,6 @@ import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; ...@@ -34,7 +31,6 @@ import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
import titan.ccp.model.sensorregistry.MachineSensor; import titan.ccp.model.sensorregistry.MachineSensor;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry; import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.model.sensorregistry.SensorRegistry;
@Category(SerialTest.class) @Category(SerialTest.class)
public class Uc4PipelineTest extends JetTestSupport { public class Uc4PipelineTest extends JetTestSupport {
...@@ -42,7 +38,7 @@ public class Uc4PipelineTest extends JetTestSupport { ...@@ -42,7 +38,7 @@ public class Uc4PipelineTest extends JetTestSupport {
// TEst Machinery // TEst Machinery
JetInstance testInstance = null; JetInstance testInstance = null;
Pipeline testPipeline = null; Pipeline testPipeline = null;
StreamStage<Entry<String, Double>> uc4Topology = null; StreamStage<Entry<String, ActivePowerRecord>> uc4Topology = null;
@Before @Before
public void buildUc4Pipeline() { public void buildUc4Pipeline() {
...@@ -72,10 +68,10 @@ public class Uc4PipelineTest extends JetTestSupport { ...@@ -72,10 +68,10 @@ public class Uc4PipelineTest extends JetTestSupport {
}); });
// Create test source 2 : Mock aggregation Values // Create test source 2 : Mock aggregation Values
final StreamSource<Entry<String, Double>> testAggregationSource = final StreamSource<Entry<String, ActivePowerRecord>> testAggregationSource =
TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> {
final Double testAggValue = testValueInW; final ActivePowerRecord testAggValue = new ActivePowerRecord(testSensorName,System.currentTimeMillis(),testValueInW);
final Entry<String, Double> testEntry = final Entry<String, ActivePowerRecord> testEntry =
Map.entry(testLevel1GroupName, testAggValue); Map.entry(testLevel1GroupName, testAggValue);
return testEntry; return testEntry;
}); });
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment