diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java deleted file mode 100644 index a3ae3461d055694669e4d874930d5ade9dd83658..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java +++ /dev/null @@ -1,31 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; - -/** - * Composed key of a {@link DayOfWeek} and a sensor id. - */ -public class DayOfWeekKey { - - private final DayOfWeek dayOfWeek; - private final String sensorId; - - public DayOfWeekKey(final DayOfWeek dayOfWeek, final String sensorId) { - this.dayOfWeek = dayOfWeek; - this.sensorId = sensorId; - } - - public DayOfWeek getDayOfWeek() { - return this.dayOfWeek; - } - - public String getSensorId() { - return this.sensorId; - } - - @Override - public String toString() { - return this.sensorId + ";" + this.dayOfWeek.toString(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java deleted file mode 100644 index 222785ca8a2d8db72c81929a216fc53b43d06ec0..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java +++ /dev/null @@ -1,22 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import java.time.LocalDateTime; - -/** - * {@link StatsKeyFactory} for {@link DayOfWeekKey}. - */ -public class DayOfWeekKeyFactory implements StatsKeyFactory<DayOfWeekKey> { - - @Override - public DayOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) { - final DayOfWeek dayOfWeek = dateTime.getDayOfWeek(); - return new DayOfWeekKey(dayOfWeek, sensorId); - } - - @Override - public String getSensorId(final DayOfWeekKey key) { - return key.getSensorId(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java deleted file mode 100644 index 9c246f912ffc67ff6fb8d211a99d478cb58c2898..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java +++ /dev/null @@ -1,33 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import org.apache.kafka.common.serialization.Serde; -import titan.ccp.common.kafka.simpleserdes.BufferSerde; -import titan.ccp.common.kafka.simpleserdes.ReadBuffer; -import titan.ccp.common.kafka.simpleserdes.SimpleSerdes; -import titan.ccp.common.kafka.simpleserdes.WriteBuffer; - -/** - * {@link BufferSerde} for a {@link DayOfWeekKey}. Use the {@link #create()} method to create a new - * Kafka {@link Serde}. - */ -public class DayOfWeekKeySerde implements BufferSerde<DayOfWeekKey> { - - @Override - public void serialize(final WriteBuffer buffer, final DayOfWeekKey data) { - buffer.putInt(data.getDayOfWeek().getValue()); - buffer.putString(data.getSensorId()); - } - - @Override - public DayOfWeekKey deserialize(final ReadBuffer buffer) { - final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt()); - final String sensorId = buffer.getString(); - return new DayOfWeekKey(dayOfWeek, sensorId); - } - - public static Serde<DayOfWeekKey> create() { - return SimpleSerdes.create(new DayOfWeekKeySerde()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java deleted file mode 100644 index bdfecdbc4857b4d7a630b4afa07de39618435544..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -package spesb.uc4.streamprocessing; - -import com.google.common.math.Stats; -import org.apache.kafka.streams.kstream.Windowed; -import titan.ccp.model.records.DayOfWeekActivePowerRecord; - -/** - * {@link StatsRecordFactory} to create an {@link DayOfWeekActivePowerRecord}. - */ -public class DayOfWeekRecordFactory - implements StatsRecordFactory<DayOfWeekKey, DayOfWeekActivePowerRecord> { - - @Override - public DayOfWeekActivePowerRecord create(final Windowed<DayOfWeekKey> windowed, - final Stats stats) { - return new DayOfWeekActivePowerRecord( - windowed.key().getSensorId(), - windowed.key().getDayOfWeek().getValue(), - windowed.window().start(), - windowed.window().end(), - stats.count(), - stats.mean(), - stats.populationVariance(), - stats.min(), - stats.max()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java deleted file mode 100644 index 81d33f3042796ecb3c890e73a82e879ab2d0ac6e..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java +++ /dev/null @@ -1,40 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; - -/** - * Composed key of a {@link DayOfWeek}, an hour of the day and a sensor id. - */ -public class HourOfWeekKey { - - private final DayOfWeek dayOfWeek; - private final int hourOfDay; - private final String sensorId; - - /** - * Create a new {@link HourOfDayKey} using its components. - */ - public HourOfWeekKey(final DayOfWeek dayOfWeek, final int hourOfDay, final String sensorId) { - this.dayOfWeek = dayOfWeek; - this.hourOfDay = hourOfDay; - this.sensorId = sensorId; - } - - public DayOfWeek getDayOfWeek() { - return this.dayOfWeek; - } - - public int getHourOfDay() { - return this.hourOfDay; - } - - public String getSensorId() { - return this.sensorId; - } - - @Override - public String toString() { - return this.sensorId + ";" + this.dayOfWeek.toString() + ";" + this.hourOfDay; - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java deleted file mode 100644 index 980549309ce94b2e4a4c6da0835b8adfe47bb61e..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java +++ /dev/null @@ -1,23 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import java.time.LocalDateTime; - -/** - * {@link StatsKeyFactory} for {@link HourOfWeekKey}. - */ -public class HourOfWeekKeyFactory implements StatsKeyFactory<HourOfWeekKey> { - - @Override - public HourOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) { - final DayOfWeek dayOfWeek = dateTime.getDayOfWeek(); - final int hourOfDay = dateTime.getHour(); - return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId); - } - - @Override - public String getSensorId(final HourOfWeekKey key) { - return key.getSensorId(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java deleted file mode 100644 index 63a6a445bf46f521a220816896529a081a15bca0..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java +++ /dev/null @@ -1,35 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import org.apache.kafka.common.serialization.Serde; -import titan.ccp.common.kafka.simpleserdes.BufferSerde; -import titan.ccp.common.kafka.simpleserdes.ReadBuffer; -import titan.ccp.common.kafka.simpleserdes.SimpleSerdes; -import titan.ccp.common.kafka.simpleserdes.WriteBuffer; - -/** - * {@link BufferSerde} for a {@link HourOfWeekKey}. Use the {@link #create()} method to create a new - * Kafka {@link Serde}. - */ -public class HourOfWeekKeySerde implements BufferSerde<HourOfWeekKey> { - - @Override - public void serialize(final WriteBuffer buffer, final HourOfWeekKey data) { - buffer.putInt(data.getDayOfWeek().getValue()); - buffer.putInt(data.getHourOfDay()); - buffer.putString(data.getSensorId()); - } - - @Override - public HourOfWeekKey deserialize(final ReadBuffer buffer) { - final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt()); - final int hourOfDay = buffer.getInt(); - final String sensorId = buffer.getString(); - return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId); - } - - public static Serde<HourOfWeekKey> create() { - return SimpleSerdes.create(new HourOfWeekKeySerde()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java deleted file mode 100644 index 358e3d1a5acf8bfd9f4fca7c95b84bd5b13bea53..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -package spesb.uc4.streamprocessing; - -import com.google.common.math.Stats; -import org.apache.kafka.streams.kstream.Windowed; -import titan.ccp.model.records.HourOfWeekActivePowerRecord; - -/** - * {@link StatsRecordFactory} to create an {@link HourOfWeekActivePowerRecord}. - */ -public class HourOfWeekRecordFactory - implements StatsRecordFactory<HourOfWeekKey, HourOfWeekActivePowerRecord> { - - @Override - public HourOfWeekActivePowerRecord create(final Windowed<HourOfWeekKey> windowed, - final Stats stats) { - return new HourOfWeekActivePowerRecord( - windowed.key().getSensorId(), - windowed.key().getDayOfWeek().getValue(), - windowed.key().getHourOfDay(), - windowed.window().start(), - windowed.window().end(), - stats.count(), - stats.mean(), - stats.populationVariance(), - stats.min(), - stats.max()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java index 66bb460031f09f1cae77d5e93e3f130aa66f6e90..c2489a9ae4c10301a914be67db14d5556e459912 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java +++ b/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java @@ -52,11 +52,8 @@ public class TopologyBuilder { * Build the {@link Topology} for the History microservice. */ public Topology build() { - final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); - // final StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> statsRecordFactory = new - // HourOfDayRecordFactory(); this.builder .stream(this.inputTopic,