Skip to content
Snippets Groups Projects
Commit efc5a96b authored by Sören Henning's avatar Sören Henning
Browse files

Fix code quality issues

parent f4c7e575
No related branches found
No related tags found
1 merge request!90Migrate Flink benchmark implementation
Pipeline #2251 failed
......@@ -6,6 +6,10 @@ import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* A {@link ProcessWindowFunction} that forwards a computed {@link Stats} object along with its
* associated key.
*/
public class StatsProcessWindowFunction
extends ProcessWindowFunction<Stats, Tuple2<String, Stats>, String, TimeWindow> {
......
......@@ -7,6 +7,10 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import theodolite.uc3.application.util.HourOfDayKey;
/**
* A {@link ProcessWindowFunction} that forwards a computed {@link Stats} object along with its
* associated key.
*/
public class HourOfDayProcessWindowFunction
extends ProcessWindowFunction<Stats, Tuple2<HourOfDayKey, Stats>, HourOfDayKey, TimeWindow> {
......
......@@ -47,6 +47,10 @@ public class HourOfDayKey {
return this.hourOfDay == k.hourOfDay && this.sensorId.equals(k.sensorId);
}
/**
* Convert this {@link HourOfDayKey} into a byte array. This method is the inverse to
* {@code HourOfDayKey#fromByteArray()}.
*/
public byte[] toByteArray() {
final int numBytes = (2 * Integer.SIZE + this.sensorId.length() * Character.SIZE) / Byte.SIZE;
final ByteBuffer buffer = ByteBuffer.allocate(numBytes).order(ByteOrder.LITTLE_ENDIAN);
......@@ -58,6 +62,10 @@ public class HourOfDayKey {
return buffer.array();
}
/**
* Construct a new {@link HourOfDayKey} from a byte array. This method is the inverse to
* {@code HourOfDayKey#toByteArray()}.
*/
public static HourOfDayKey fromByteArray(final byte[] bytes) {
final ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
final int hourOfDay = buffer.getInt();
......
......@@ -8,6 +8,8 @@ import java.time.LocalDateTime;
*/
public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey>, Serializable {
private static final long serialVersionUID = 4357668496473645043L; // NOPMD
@Override
public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) {
final int hourOfDay = dateTime.getHour();
......
......@@ -18,6 +18,8 @@ import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
public class HourOfDayKeySerde extends Serializer<HourOfDayKey>
implements BufferSerde<HourOfDayKey>, Serializable {
private static final long serialVersionUID = 1262778284661945041L; // NOPMD
@Override
public void serialize(final WriteBuffer buffer, final HourOfDayKey data) {
buffer.putInt(data.getHourOfDay());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment