Skip to content
Snippets Groups Projects
Commit b9407d77 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Fix more cs, pmd, spotbugs warnings and errors in uc4-beam-samza

parent 72d2186b
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
Showing
with 28 additions and 17 deletions
......@@ -16,6 +16,7 @@ public class AggregatedActivePowerRecordEventTimePolicy
protected Instant currentWatermark;
public AggregatedActivePowerRecordEventTimePolicy(final Optional<Instant> previousWatermark) {
super();
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
......
......@@ -14,6 +14,7 @@ public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord>
protected Instant currentWatermark;
public EventTimePolicy(final Optional<Instant> previousWatermark) {
super();
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
......
......@@ -16,7 +16,6 @@ public class RecordAggregation
extends CombineFn<ActivePowerRecord, RecordAggregation.Accum, AggregatedActivePowerRecord> {
private static final long serialVersionUID = 4362213539553233529L;
/**
......@@ -25,9 +24,9 @@ public class RecordAggregation
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
private static final long serialVersionUID = 3701311203919534376L;
protected long count = 0;
protected Double sum = 0.0;
protected long timestamp = 0;
private long count = 0;
private Double sum = 0.0;
private long timestamp = 0;
}
@Override
......@@ -62,6 +61,4 @@ public class RecordAggregation
accumulator.sum, average);
}
}
......@@ -18,7 +18,6 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str
private final StateSpec<ValueState<Set<String>>> parents =
StateSpecs.value();
/**
* Match the changes accordingly.
* @param kv the sensor parents set that contains the changes.
......
......@@ -17,9 +17,11 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
@SuppressWarnings("serial")
public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord>
implements Serializable {
private static final boolean DETERMINISTIC = true;
private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public void encode(final AggregatedActivePowerRecord value, final OutputStream outStream)
throws CoderException, IOException {
......@@ -47,7 +49,8 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe
@Override
public void verifyDeterministic() throws NonDeterministicException {
if (!DETERMINISTIC) {
throw new NonDeterministicException(this, "This class should be deterministic!");
}
}
}
......@@ -5,6 +5,8 @@ import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
......@@ -12,8 +14,9 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
*/
public class AggregatedActivePowerRecordDeserializer
implements Deserializer<AggregatedActivePowerRecord> {
private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class);
private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) {
......@@ -21,7 +24,7 @@ public class AggregatedActivePowerRecordDeserializer
try {
value = (AggregatedActivePowerRecord) avroEnCoder.decode(new ByteArrayInputStream(data));
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("Could not deserialize AggregatedActivePowerRecord",e);
}
return value;
}
......
......@@ -5,6 +5,8 @@ import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
......@@ -12,6 +14,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
*/
public class AggregatedActivePowerRecordSerializer
implements Serializer<AggregatedActivePowerRecord> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class);
private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
......@@ -29,13 +33,13 @@ public class AggregatedActivePowerRecordSerializer
try {
this.avroEnCoder.encode(data, out);
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("Could not serialize AggregatedActivePowerRecord", e);
}
byte[] result = out.toByteArray();
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("Could not close output stream after serialization of AggregatedActivePowerRecord", e);
}
return result;
}
......
......@@ -19,6 +19,9 @@ import titan.ccp.configuration.events.EventSerde;
public class EventCoder extends Coder<Event> implements Serializable {
private static final long serialVersionUID = 8403045343970659100L;
private static final int VALUE_SIZE = 4;
private static final boolean DETERMINISTIC = true;
private transient Serde<Event> innerSerde = EventSerde.serde();
......@@ -54,8 +57,8 @@ public class EventCoder extends Coder<Event> implements Serializable {
@Override
public void verifyDeterministic() throws NonDeterministicException {
if (!DETERMINISTIC) {
throw new NonDeterministicException(this, "This class should be deterministic!");
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment