Skip to content
Snippets Groups Projects
Commit e83372f4 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Fix more cs, pmd, spotbugs warnings and errors in uc4-beam-samza

parent f2e3645f
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
Showing
with 28 additions and 17 deletions
...@@ -16,6 +16,7 @@ public class AggregatedActivePowerRecordEventTimePolicy ...@@ -16,6 +16,7 @@ public class AggregatedActivePowerRecordEventTimePolicy
protected Instant currentWatermark; protected Instant currentWatermark;
public AggregatedActivePowerRecordEventTimePolicy(final Optional<Instant> previousWatermark) { public AggregatedActivePowerRecordEventTimePolicy(final Optional<Instant> previousWatermark) {
super();
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
} }
......
...@@ -14,6 +14,7 @@ public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord> ...@@ -14,6 +14,7 @@ public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord>
protected Instant currentWatermark; protected Instant currentWatermark;
public EventTimePolicy(final Optional<Instant> previousWatermark) { public EventTimePolicy(final Optional<Instant> previousWatermark) {
super();
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
} }
......
...@@ -16,7 +16,6 @@ public class RecordAggregation ...@@ -16,7 +16,6 @@ public class RecordAggregation
extends CombineFn<ActivePowerRecord, RecordAggregation.Accum, AggregatedActivePowerRecord> { extends CombineFn<ActivePowerRecord, RecordAggregation.Accum, AggregatedActivePowerRecord> {
private static final long serialVersionUID = 4362213539553233529L; private static final long serialVersionUID = 4362213539553233529L;
/** /**
...@@ -25,9 +24,9 @@ public class RecordAggregation ...@@ -25,9 +24,9 @@ public class RecordAggregation
@DefaultCoder(AvroCoder.class) @DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable { public static class Accum implements Serializable {
private static final long serialVersionUID = 3701311203919534376L; private static final long serialVersionUID = 3701311203919534376L;
protected long count = 0; private long count = 0;
protected Double sum = 0.0; private Double sum = 0.0;
protected long timestamp = 0; private long timestamp = 0;
} }
@Override @Override
...@@ -62,6 +61,4 @@ public class RecordAggregation ...@@ -62,6 +61,4 @@ public class RecordAggregation
accumulator.sum, average); accumulator.sum, average);
} }
} }
...@@ -18,7 +18,6 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str ...@@ -18,7 +18,6 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str
private final StateSpec<ValueState<Set<String>>> parents = private final StateSpec<ValueState<Set<String>>> parents =
StateSpecs.value(); StateSpecs.value();
/** /**
* Match the changes accordingly. * Match the changes accordingly.
* @param kv the sensor parents set that contains the changes. * @param kv the sensor parents set that contains the changes.
......
...@@ -17,9 +17,11 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; ...@@ -17,9 +17,11 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord> public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord>
implements Serializable { implements Serializable {
private static final boolean DETERMINISTIC = true;
private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
@Override @Override
public void encode(final AggregatedActivePowerRecord value, final OutputStream outStream) public void encode(final AggregatedActivePowerRecord value, final OutputStream outStream)
throws CoderException, IOException { throws CoderException, IOException {
...@@ -47,7 +49,8 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe ...@@ -47,7 +49,8 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe
@Override @Override
public void verifyDeterministic() throws NonDeterministicException { public void verifyDeterministic() throws NonDeterministicException {
if (!DETERMINISTIC) {
throw new NonDeterministicException(this, "This class should be deterministic!");
}
} }
} }
...@@ -5,6 +5,8 @@ import java.io.IOException; ...@@ -5,6 +5,8 @@ import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.AggregatedActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord;
/** /**
...@@ -12,8 +14,9 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; ...@@ -12,8 +14,9 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
*/ */
public class AggregatedActivePowerRecordDeserializer public class AggregatedActivePowerRecordDeserializer
implements Deserializer<AggregatedActivePowerRecord> { implements Deserializer<AggregatedActivePowerRecord> {
private static final Logger LOGGER =
private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class);
private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
@Override @Override
public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) { public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) {
...@@ -21,7 +24,7 @@ public class AggregatedActivePowerRecordDeserializer ...@@ -21,7 +24,7 @@ public class AggregatedActivePowerRecordDeserializer
try { try {
value = (AggregatedActivePowerRecord) avroEnCoder.decode(new ByteArrayInputStream(data)); value = (AggregatedActivePowerRecord) avroEnCoder.decode(new ByteArrayInputStream(data));
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); LOGGER.error("Could not deserialize AggregatedActivePowerRecord",e);
} }
return value; return value;
} }
......
...@@ -5,6 +5,8 @@ import java.io.IOException; ...@@ -5,6 +5,8 @@ import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.AggregatedActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord;
/** /**
...@@ -12,6 +14,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; ...@@ -12,6 +14,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
*/ */
public class AggregatedActivePowerRecordSerializer public class AggregatedActivePowerRecordSerializer
implements Serializer<AggregatedActivePowerRecord> { implements Serializer<AggregatedActivePowerRecord> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class);
private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
...@@ -29,13 +33,13 @@ public class AggregatedActivePowerRecordSerializer ...@@ -29,13 +33,13 @@ public class AggregatedActivePowerRecordSerializer
try { try {
this.avroEnCoder.encode(data, out); this.avroEnCoder.encode(data, out);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); LOGGER.error("Could not serialize AggregatedActivePowerRecord", e);
} }
byte[] result = out.toByteArray(); byte[] result = out.toByteArray();
try { try {
out.close(); out.close();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); LOGGER.error("Could not close output stream after serialization of AggregatedActivePowerRecord", e);
} }
return result; return result;
} }
......
...@@ -19,6 +19,9 @@ import titan.ccp.configuration.events.EventSerde; ...@@ -19,6 +19,9 @@ import titan.ccp.configuration.events.EventSerde;
public class EventCoder extends Coder<Event> implements Serializable { public class EventCoder extends Coder<Event> implements Serializable {
private static final long serialVersionUID = 8403045343970659100L; private static final long serialVersionUID = 8403045343970659100L;
private static final int VALUE_SIZE = 4; private static final int VALUE_SIZE = 4;
private static final boolean DETERMINISTIC = true;
private transient Serde<Event> innerSerde = EventSerde.serde(); private transient Serde<Event> innerSerde = EventSerde.serde();
...@@ -54,8 +57,8 @@ public class EventCoder extends Coder<Event> implements Serializable { ...@@ -54,8 +57,8 @@ public class EventCoder extends Coder<Event> implements Serializable {
@Override @Override
public void verifyDeterministic() throws NonDeterministicException { public void verifyDeterministic() throws NonDeterministicException {
if (!DETERMINISTIC) {
throw new NonDeterministicException(this, "This class should be deterministic!");
}
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment