diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java index aba4c5f9adbea7c3cebe9cee7a71cdf731509a82..d2b484f5ab30be63f311d6dbcf495baebbd5e2b4 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java @@ -17,10 +17,11 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; @SuppressWarnings("serial") public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord> implements Serializable { - private static final boolean DETERMINISTIC = true; - private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); + private static final boolean DETERMINISTIC = true; + private transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder = + AvroCoder.of(AggregatedActivePowerRecord.class); @Override public void encode(final AggregatedActivePowerRecord value, final OutputStream outStream) @@ -36,9 +37,9 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe public AggregatedActivePowerRecord decode(final InputStream inStream) throws CoderException, IOException { if (this.avroEnCoder == null) { - avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); + this.avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); } - return (AggregatedActivePowerRecord) this.avroEnCoder.decode(inStream); + return this.avroEnCoder.decode(inStream); } diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java index efdc3adf5a3e70d65c76130bc5fc4ff5feac6183..6e2f2765ff65d3bca2a127be36db0854f15afebc 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java @@ -4,7 +4,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.kafka.common.serialization.Deserializer; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import titan.ccp.model.records.AggregatedActivePowerRecord; @@ -14,17 +13,20 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; */ public class AggregatedActivePowerRecordDeserializer implements Deserializer<AggregatedActivePowerRecord> { + private static final Logger LOGGER = LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class); - private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); + + private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder = + AvroCoder.of(AggregatedActivePowerRecord.class); @Override public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) { AggregatedActivePowerRecord value = null; try { - value = (AggregatedActivePowerRecord) avroEnCoder.decode(new ByteArrayInputStream(data)); - } catch (IOException e) { - LOGGER.error("Could not deserialize AggregatedActivePowerRecord",e); + value = this.avroEnCoder.decode(new ByteArrayInputStream(data)); + } catch (final IOException e) { + LOGGER.error("Could not deserialize AggregatedActivePowerRecord", e); } return value; } diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java index 319fca931208fd0fb346fdd144c3af74509e7e37..77b79d5465f1d561870bf5b04f8fa20f87076adb 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java @@ -13,24 +13,25 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; */ public class AggregatedActivePowerRecordSerializer implements Serializer<AggregatedActivePowerRecord> { + private static final Logger LOGGER = LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class); - private final transient AvroCoder<AggregatedActivePowerRecord> - avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class); + private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder = + AvroCoder.of(AggregatedActivePowerRecord.class); @Override public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) { final ByteArrayOutputStream out = new ByteArrayOutputStream(); try { this.avroEnCoder.encode(data, out); - } catch (IOException e) { + } catch (final IOException e) { LOGGER.error("Could not serialize AggregatedActivePowerRecord", e); } final byte[] result = out.toByteArray(); try { out.close(); - } catch (IOException e) { + } catch (final IOException e) { LOGGER.error( "Could not close output stream after serialization of AggregatedActivePowerRecord", e); } diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/EventCoder.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/EventCoder.java index 41a025485819c384bc502fe6fb622878a61165ec..710beb71dc8776e6309028327b05307aa590a7f6 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/EventCoder.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/EventCoder.java @@ -17,12 +17,11 @@ import titan.ccp.configuration.events.EventSerde; * Wrapper Class that encapsulates a Event Serde in a org.apache.beam.sdk.coders.Coder. */ public class EventCoder extends Coder<Event> implements Serializable { + private static final long serialVersionUID = 8403045343970659100L; private static final int VALUE_SIZE = 4; private static final boolean DETERMINISTIC = true; - - private transient Serde<Event> innerSerde = EventSerde.serde(); @Override diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/SensorParentKeyCoder.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/SensorParentKeyCoder.java index 3fd8758534cff8eee80c2923061eb730b90fe161..3e85c3242fb854bef514787c92bb58ad76526cb4 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/serialization/SensorParentKeyCoder.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/SensorParentKeyCoder.java @@ -16,6 +16,7 @@ import org.apache.kafka.common.serialization.Serde; * Wrapper Class that encapsulates a SensorParentKey Serde in a org.apache.beam.sdk.coders.Coder. */ public class SensorParentKeyCoder extends Coder<SensorParentKey> implements Serializable { + private static final long serialVersionUID = -3480141901035692398L; private static final boolean DETERMINISTIC = true; private static final int VALUE_SIZE = 4;