Skip to content
Snippets Groups Projects
Commit 1068dfed authored by Sören Henning's avatar Sören Henning
Browse files

Clean up code format

parent befcf2e1
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
Pipeline #5944 passed
......@@ -17,10 +17,11 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
@SuppressWarnings("serial")
public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord>
implements Serializable {
private static final boolean DETERMINISTIC = true;
private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
private static final boolean DETERMINISTIC = true;
private transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public void encode(final AggregatedActivePowerRecord value, final OutputStream outStream)
......@@ -36,9 +37,9 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe
public AggregatedActivePowerRecord decode(final InputStream inStream)
throws CoderException, IOException {
if (this.avroEnCoder == null) {
avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
this.avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
}
return (AggregatedActivePowerRecord) this.avroEnCoder.decode(inStream);
return this.avroEnCoder.decode(inStream);
}
......
......@@ -4,7 +4,6 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.AggregatedActivePowerRecord;
......@@ -14,17 +13,20 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
*/
public class AggregatedActivePowerRecordDeserializer
implements Deserializer<AggregatedActivePowerRecord> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class);
private final transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) {
AggregatedActivePowerRecord value = null;
try {
value = (AggregatedActivePowerRecord) avroEnCoder.decode(new ByteArrayInputStream(data));
} catch (IOException e) {
LOGGER.error("Could not deserialize AggregatedActivePowerRecord",e);
value = this.avroEnCoder.decode(new ByteArrayInputStream(data));
} catch (final IOException e) {
LOGGER.error("Could not deserialize AggregatedActivePowerRecord", e);
}
return value;
}
......
......@@ -13,24 +13,25 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
*/
public class AggregatedActivePowerRecordSerializer
implements Serializer<AggregatedActivePowerRecord> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class);
private final transient AvroCoder<AggregatedActivePowerRecord>
avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
this.avroEnCoder.encode(data, out);
} catch (IOException e) {
} catch (final IOException e) {
LOGGER.error("Could not serialize AggregatedActivePowerRecord", e);
}
final byte[] result = out.toByteArray();
try {
out.close();
} catch (IOException e) {
} catch (final IOException e) {
LOGGER.error(
"Could not close output stream after serialization of AggregatedActivePowerRecord", e);
}
......
......@@ -17,12 +17,11 @@ import titan.ccp.configuration.events.EventSerde;
* Wrapper Class that encapsulates a Event Serde in a org.apache.beam.sdk.coders.Coder.
*/
public class EventCoder extends Coder<Event> implements Serializable {
private static final long serialVersionUID = 8403045343970659100L;
private static final int VALUE_SIZE = 4;
private static final boolean DETERMINISTIC = true;
private transient Serde<Event> innerSerde = EventSerde.serde();
@Override
......
......@@ -16,6 +16,7 @@ import org.apache.kafka.common.serialization.Serde;
* Wrapper Class that encapsulates a SensorParentKey Serde in a org.apache.beam.sdk.coders.Coder.
*/
public class SensorParentKeyCoder extends Coder<SensorParentKey> implements Serializable {
private static final long serialVersionUID = -3480141901035692398L;
private static final boolean DETERMINISTIC = true;
private static final int VALUE_SIZE = 4;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment