Skip to content
Snippets Groups Projects
Commit 80741887 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Repair uc3-beam-flink HourOfDaykeyCoder

parent a0bb0d9b
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
...@@ -15,11 +15,11 @@ import org.apache.kafka.common.serialization.Serde; ...@@ -15,11 +15,11 @@ import org.apache.kafka.common.serialization.Serde;
* Wrapper Class that encapsulates a HourOfDayKeySerde in a org.apache.beam.sdk.coders.Coder. * Wrapper Class that encapsulates a HourOfDayKeySerde in a org.apache.beam.sdk.coders.Coder.
*/ */
public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializable { public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializable {
private static final int VALUE_SIZE = 4;
private static final boolean DETEMINISTIC = false;
public static final long serialVersionUID = 4444444; public static final long serialVersionUID = 4444444;
private static final boolean DETERMINISTIC = true;
private static final int VALUE_SIZE = 4;
private Serde<HourOfDayKey> innerSerde = HourOfDayKeySerde.create(); private transient Serde<HourOfDayKey> innerSerde = HourOfDayKeySerde.create();
@Override @Override
public void encode(final HourOfDayKey value, final OutputStream outStream) public void encode(final HourOfDayKey value, final OutputStream outStream)
...@@ -39,10 +39,10 @@ public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializab ...@@ -39,10 +39,10 @@ public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializab
this.innerSerde = HourOfDayKeySerde.create(); this.innerSerde = HourOfDayKeySerde.create();
} }
final byte[] sizeinBytes = new byte[VALUE_SIZE]; final byte[] sizeinBytes = new byte[VALUE_SIZE];
//inStream.read(sizeinBytes); inStream.read(sizeinBytes);
final int size = ByteBuffer.wrap(sizeinBytes).getInt(); final int size = ByteBuffer.wrap(sizeinBytes).getInt();
final byte[] bytes = new byte[size]; final byte[] bytes = new byte[size];
//inStream.read(bytes); inStream.read(bytes);
return this.innerSerde.deserializer().deserialize("deser", bytes); return this.innerSerde.deserializer().deserialize("deser", bytes);
} }
...@@ -53,7 +53,7 @@ public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializab ...@@ -53,7 +53,7 @@ public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializab
@Override @Override
public void verifyDeterministic() throws NonDeterministicException { public void verifyDeterministic() throws NonDeterministicException {
if (!DETEMINISTIC) { if (!DETERMINISTIC) {
throw new NonDeterministicException(this, "This class is not deterministic!"); throw new NonDeterministicException(this, "This class is not deterministic!");
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment