From b9407d776a66ade62bee36b2140c6b7b2d38f5d3 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Thu, 28 Oct 2021 16:25:47 +0200 Subject: [PATCH] Fix more cs, pmd, spotbugs warnings and errors in uc4-beam-samza --- .../AggregatedActivePowerRecordEventTimePolicy.java | 1 + .../src/main/java/application/EventTimePolicy.java | 1 + .../src/main/java/application/RecordAggregation.java | 9 +++------ .../main/java/application/UpdateChildParentPairs.java | 1 - .../serialization/AggregatedActivePowerRecordCoder.java | 7 +++++-- .../AggregatedActivePowerRecordDeserializer.java | 9 ++++++--- .../AggregatedActivePowerRecordSerializer.java | 8 ++++++-- .../src/main/java/serialization/EventCoder.java | 9 ++++++--- 8 files changed, 28 insertions(+), 17 deletions(-) diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java index a4bf080cf..245092323 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java @@ -16,6 +16,7 @@ public class AggregatedActivePowerRecordEventTimePolicy protected Instant currentWatermark; public AggregatedActivePowerRecordEventTimePolicy(final Optional<Instant> previousWatermark) { + super(); this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java index da389521f..5828c1c80 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java @@ -14,6 +14,7 @@ public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord> protected Instant currentWatermark; public EventTimePolicy(final Optional<Instant> previousWatermark) { + super(); this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java index 3cb79a11a..f3b9f95b5 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java @@ -16,7 +16,6 @@ public class RecordAggregation extends CombineFn<ActivePowerRecord, RecordAggregation.Accum, AggregatedActivePowerRecord> { - private static final long serialVersionUID = 4362213539553233529L; /** @@ -25,9 +24,9 @@ public class RecordAggregation @DefaultCoder(AvroCoder.class) public static class Accum implements Serializable { private static final long serialVersionUID = 3701311203919534376L; - protected long count = 0; - protected Double sum = 0.0; - protected long timestamp = 0; + private long count = 0; + private Double sum = 0.0; + private long timestamp = 0; } @Override @@ -62,6 +61,4 @@ public class RecordAggregation accumulator.sum, average); } - - } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java index cabd7acf5..55c445efe 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java @@ -18,7 +18,6 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); - /** * Match the changes accordingly. * @param kv the sensor parents set that contains the changes. diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java index a8e6f0917..aba4c5f9a 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java @@ -17,9 +17,11 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; @SuppressWarnings("serial") public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord> implements Serializable { + private static final boolean DETERMINISTIC = true; private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); + @Override public void encode(final AggregatedActivePowerRecord value, final OutputStream outStream) throws CoderException, IOException { @@ -47,7 +49,8 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe @Override public void verifyDeterministic() throws NonDeterministicException { - + if (!DETERMINISTIC) { + throw new NonDeterministicException(this, "This class should be deterministic!"); + } } - } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java index da0798cc4..efdc3adf5 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java @@ -5,6 +5,8 @@ import java.io.IOException; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.kafka.common.serialization.Deserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import titan.ccp.model.records.AggregatedActivePowerRecord; /** @@ -12,8 +14,9 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; */ public class AggregatedActivePowerRecordDeserializer implements Deserializer<AggregatedActivePowerRecord> { - - private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); + private static final Logger LOGGER = + LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class); + private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); @Override public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) { @@ -21,7 +24,7 @@ public class AggregatedActivePowerRecordDeserializer try { value = (AggregatedActivePowerRecord) avroEnCoder.decode(new ByteArrayInputStream(data)); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("Could not deserialize AggregatedActivePowerRecord",e); } return value; } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java index f4619ec9b..2429b6564 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java @@ -5,6 +5,8 @@ import java.io.IOException; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import titan.ccp.model.records.AggregatedActivePowerRecord; /** @@ -12,6 +14,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; */ public class AggregatedActivePowerRecordSerializer implements Serializer<AggregatedActivePowerRecord> { + private static final Logger LOGGER = + LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class); private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); @@ -29,13 +33,13 @@ public class AggregatedActivePowerRecordSerializer try { this.avroEnCoder.encode(data, out); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("Could not serialize AggregatedActivePowerRecord", e); } byte[] result = out.toByteArray(); try { out.close(); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("Could not close output stream after serialization of AggregatedActivePowerRecord", e); } return result; } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java index 128ead251..41a025485 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java @@ -19,6 +19,9 @@ import titan.ccp.configuration.events.EventSerde; public class EventCoder extends Coder<Event> implements Serializable { private static final long serialVersionUID = 8403045343970659100L; private static final int VALUE_SIZE = 4; + private static final boolean DETERMINISTIC = true; + + private transient Serde<Event> innerSerde = EventSerde.serde(); @@ -54,8 +57,8 @@ public class EventCoder extends Coder<Event> implements Serializable { @Override public void verifyDeterministic() throws NonDeterministicException { - + if (!DETERMINISTIC) { + throw new NonDeterministicException(this, "This class should be deterministic!"); + } } - - } -- GitLab