diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java index a4bf080cfaf502f881cab42bb8b578d7b0834dae..245092323fd7edf4929fcb906a8713aa5aa3679c 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java @@ -16,6 +16,7 @@ public class AggregatedActivePowerRecordEventTimePolicy protected Instant currentWatermark; public AggregatedActivePowerRecordEventTimePolicy(final Optional<Instant> previousWatermark) { + super(); this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java index da389521f7f9157b736dc4f559cbfa64cab0aaa9..5828c1c80f2b3c791b6cd47a9d8a6c00059ade10 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java @@ -14,6 +14,7 @@ public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord> protected Instant currentWatermark; public EventTimePolicy(final Optional<Instant> previousWatermark) { + super(); this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java index 3cb79a11a78b331e4fdc02b51759b8dac559d453..f3b9f95b5fc8897230afc1924d07c547301c1eb5 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java @@ -16,7 +16,6 @@ public class RecordAggregation extends CombineFn<ActivePowerRecord, RecordAggregation.Accum, AggregatedActivePowerRecord> { - private static final long serialVersionUID = 4362213539553233529L; /** @@ -25,9 +24,9 @@ public class RecordAggregation @DefaultCoder(AvroCoder.class) public static class Accum implements Serializable { private static final long serialVersionUID = 3701311203919534376L; - protected long count = 0; - protected Double sum = 0.0; - protected long timestamp = 0; + private long count = 0; + private Double sum = 0.0; + private long timestamp = 0; } @Override @@ -62,6 +61,4 @@ public class RecordAggregation accumulator.sum, average); } - - } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java index cabd7acf573752c05c8ecd18a639efa21bc4accb..55c445efe9483c0295d5aff0b743b7826f7df06d 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java @@ -18,7 +18,6 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); - /** * Match the changes accordingly. * @param kv the sensor parents set that contains the changes. diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java index a8e6f0917f321062ef85542db488ad496d087bc9..aba4c5f9adbea7c3cebe9cee7a71cdf731509a82 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java @@ -17,9 +17,11 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; @SuppressWarnings("serial") public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord> implements Serializable { + private static final boolean DETERMINISTIC = true; private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); + @Override public void encode(final AggregatedActivePowerRecord value, final OutputStream outStream) throws CoderException, IOException { @@ -47,7 +49,8 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe @Override public void verifyDeterministic() throws NonDeterministicException { - + if (!DETERMINISTIC) { + throw new NonDeterministicException(this, "This class should be deterministic!"); + } } - } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java index da0798cc4554694e404f5fb6344da6911c51045d..efdc3adf5a3e70d65c76130bc5fc4ff5feac6183 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java @@ -5,6 +5,8 @@ import java.io.IOException; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.kafka.common.serialization.Deserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import titan.ccp.model.records.AggregatedActivePowerRecord; /** @@ -12,8 +14,9 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; */ public class AggregatedActivePowerRecordDeserializer implements Deserializer<AggregatedActivePowerRecord> { - - private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); + private static final Logger LOGGER = + LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class); + private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); @Override public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) { @@ -21,7 +24,7 @@ public class AggregatedActivePowerRecordDeserializer try { value = (AggregatedActivePowerRecord) avroEnCoder.decode(new ByteArrayInputStream(data)); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("Could not deserialize AggregatedActivePowerRecord",e); } return value; } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java index f4619ec9b7920b573126de4f32e3b141d8c53bf3..2429b6564d690b9cd388faed5829f827aa25c26f 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java @@ -5,6 +5,8 @@ import java.io.IOException; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import titan.ccp.model.records.AggregatedActivePowerRecord; /** @@ -12,6 +14,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; */ public class AggregatedActivePowerRecordSerializer implements Serializer<AggregatedActivePowerRecord> { + private static final Logger LOGGER = + LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class); private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); @@ -29,13 +33,13 @@ public class AggregatedActivePowerRecordSerializer try { this.avroEnCoder.encode(data, out); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("Could not serialize AggregatedActivePowerRecord", e); } byte[] result = out.toByteArray(); try { out.close(); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("Could not close output stream after serialization of AggregatedActivePowerRecord", e); } return result; } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java index 128ead251d437d2df9ba3858e90b86878e610ac4..41a025485819c384bc502fe6fb622878a61165ec 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java @@ -19,6 +19,9 @@ import titan.ccp.configuration.events.EventSerde; public class EventCoder extends Coder<Event> implements Serializable { private static final long serialVersionUID = 8403045343970659100L; private static final int VALUE_SIZE = 4; + private static final boolean DETERMINISTIC = true; + + private transient Serde<Event> innerSerde = EventSerde.serde(); @@ -54,8 +57,8 @@ public class EventCoder extends Coder<Event> implements Serializable { @Override public void verifyDeterministic() throws NonDeterministicException { - + if (!DETERMINISTIC) { + throw new NonDeterministicException(this, "This class should be deterministic!"); + } } - - }