From 2f37edf6fc3599f35275be24d4a461687b19b669 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de> Date: Thu, 14 May 2020 15:33:02 +0200 Subject: [PATCH] Remove unused time attribute aggregation classes from uc4 kafkaStreams app - Remove DayOfWeek classes - Remove HourOfWeek classes --- .../uc4/streamprocessing/DayOfWeekKey.java | 31 -------------- .../streamprocessing/DayOfWeekKeyFactory.java | 22 ---------- .../streamprocessing/DayOfWeekKeySerde.java | 33 --------------- .../DayOfWeekRecordFactory.java | 28 ------------- .../uc4/streamprocessing/HourOfWeekKey.java | 40 ------------------- .../HourOfWeekKeyFactory.java | 23 ----------- .../streamprocessing/HourOfWeekKeySerde.java | 35 ---------------- .../HourOfWeekRecordFactory.java | 29 -------------- .../uc4/streamprocessing/TopologyBuilder.java | 3 -- 9 files changed, 244 deletions(-) delete mode 100644 uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java delete mode 100644 uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java delete mode 100644 uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java delete mode 100644 uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java delete mode 100644 uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java delete mode 100644 uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java delete mode 100644 uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java delete mode 100644 uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java deleted file mode 100644 index a3ae3461d..000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java +++ /dev/null @@ -1,31 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; - -/** - * Composed key of a {@link DayOfWeek} and a sensor id. - */ -public class DayOfWeekKey { - - private final DayOfWeek dayOfWeek; - private final String sensorId; - - public DayOfWeekKey(final DayOfWeek dayOfWeek, final String sensorId) { - this.dayOfWeek = dayOfWeek; - this.sensorId = sensorId; - } - - public DayOfWeek getDayOfWeek() { - return this.dayOfWeek; - } - - public String getSensorId() { - return this.sensorId; - } - - @Override - public String toString() { - return this.sensorId + ";" + this.dayOfWeek.toString(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java deleted file mode 100644 index 222785ca8..000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java +++ /dev/null @@ -1,22 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import java.time.LocalDateTime; - -/** - * {@link StatsKeyFactory} for {@link DayOfWeekKey}. - */ -public class DayOfWeekKeyFactory implements StatsKeyFactory<DayOfWeekKey> { - - @Override - public DayOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) { - final DayOfWeek dayOfWeek = dateTime.getDayOfWeek(); - return new DayOfWeekKey(dayOfWeek, sensorId); - } - - @Override - public String getSensorId(final DayOfWeekKey key) { - return key.getSensorId(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java deleted file mode 100644 index 9c246f912..000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java +++ /dev/null @@ -1,33 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import org.apache.kafka.common.serialization.Serde; -import titan.ccp.common.kafka.simpleserdes.BufferSerde; -import titan.ccp.common.kafka.simpleserdes.ReadBuffer; -import titan.ccp.common.kafka.simpleserdes.SimpleSerdes; -import titan.ccp.common.kafka.simpleserdes.WriteBuffer; - -/** - * {@link BufferSerde} for a {@link DayOfWeekKey}. Use the {@link #create()} method to create a new - * Kafka {@link Serde}. - */ -public class DayOfWeekKeySerde implements BufferSerde<DayOfWeekKey> { - - @Override - public void serialize(final WriteBuffer buffer, final DayOfWeekKey data) { - buffer.putInt(data.getDayOfWeek().getValue()); - buffer.putString(data.getSensorId()); - } - - @Override - public DayOfWeekKey deserialize(final ReadBuffer buffer) { - final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt()); - final String sensorId = buffer.getString(); - return new DayOfWeekKey(dayOfWeek, sensorId); - } - - public static Serde<DayOfWeekKey> create() { - return SimpleSerdes.create(new DayOfWeekKeySerde()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java deleted file mode 100644 index bdfecdbc4..000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -package spesb.uc4.streamprocessing; - -import com.google.common.math.Stats; -import org.apache.kafka.streams.kstream.Windowed; -import titan.ccp.model.records.DayOfWeekActivePowerRecord; - -/** - * {@link StatsRecordFactory} to create an {@link DayOfWeekActivePowerRecord}. - */ -public class DayOfWeekRecordFactory - implements StatsRecordFactory<DayOfWeekKey, DayOfWeekActivePowerRecord> { - - @Override - public DayOfWeekActivePowerRecord create(final Windowed<DayOfWeekKey> windowed, - final Stats stats) { - return new DayOfWeekActivePowerRecord( - windowed.key().getSensorId(), - windowed.key().getDayOfWeek().getValue(), - windowed.window().start(), - windowed.window().end(), - stats.count(), - stats.mean(), - stats.populationVariance(), - stats.min(), - stats.max()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java deleted file mode 100644 index 81d33f304..000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java +++ /dev/null @@ -1,40 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; - -/** - * Composed key of a {@link DayOfWeek}, an hour of the day and a sensor id. - */ -public class HourOfWeekKey { - - private final DayOfWeek dayOfWeek; - private final int hourOfDay; - private final String sensorId; - - /** - * Create a new {@link HourOfDayKey} using its components. - */ - public HourOfWeekKey(final DayOfWeek dayOfWeek, final int hourOfDay, final String sensorId) { - this.dayOfWeek = dayOfWeek; - this.hourOfDay = hourOfDay; - this.sensorId = sensorId; - } - - public DayOfWeek getDayOfWeek() { - return this.dayOfWeek; - } - - public int getHourOfDay() { - return this.hourOfDay; - } - - public String getSensorId() { - return this.sensorId; - } - - @Override - public String toString() { - return this.sensorId + ";" + this.dayOfWeek.toString() + ";" + this.hourOfDay; - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java deleted file mode 100644 index 980549309..000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java +++ /dev/null @@ -1,23 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import java.time.LocalDateTime; - -/** - * {@link StatsKeyFactory} for {@link HourOfWeekKey}. - */ -public class HourOfWeekKeyFactory implements StatsKeyFactory<HourOfWeekKey> { - - @Override - public HourOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) { - final DayOfWeek dayOfWeek = dateTime.getDayOfWeek(); - final int hourOfDay = dateTime.getHour(); - return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId); - } - - @Override - public String getSensorId(final HourOfWeekKey key) { - return key.getSensorId(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java deleted file mode 100644 index 63a6a445b..000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java +++ /dev/null @@ -1,35 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import org.apache.kafka.common.serialization.Serde; -import titan.ccp.common.kafka.simpleserdes.BufferSerde; -import titan.ccp.common.kafka.simpleserdes.ReadBuffer; -import titan.ccp.common.kafka.simpleserdes.SimpleSerdes; -import titan.ccp.common.kafka.simpleserdes.WriteBuffer; - -/** - * {@link BufferSerde} for a {@link HourOfWeekKey}. Use the {@link #create()} method to create a new - * Kafka {@link Serde}. - */ -public class HourOfWeekKeySerde implements BufferSerde<HourOfWeekKey> { - - @Override - public void serialize(final WriteBuffer buffer, final HourOfWeekKey data) { - buffer.putInt(data.getDayOfWeek().getValue()); - buffer.putInt(data.getHourOfDay()); - buffer.putString(data.getSensorId()); - } - - @Override - public HourOfWeekKey deserialize(final ReadBuffer buffer) { - final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt()); - final int hourOfDay = buffer.getInt(); - final String sensorId = buffer.getString(); - return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId); - } - - public static Serde<HourOfWeekKey> create() { - return SimpleSerdes.create(new HourOfWeekKeySerde()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java deleted file mode 100644 index 358e3d1a5..000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -package spesb.uc4.streamprocessing; - -import com.google.common.math.Stats; -import org.apache.kafka.streams.kstream.Windowed; -import titan.ccp.model.records.HourOfWeekActivePowerRecord; - -/** - * {@link StatsRecordFactory} to create an {@link HourOfWeekActivePowerRecord}. - */ -public class HourOfWeekRecordFactory - implements StatsRecordFactory<HourOfWeekKey, HourOfWeekActivePowerRecord> { - - @Override - public HourOfWeekActivePowerRecord create(final Windowed<HourOfWeekKey> windowed, - final Stats stats) { - return new HourOfWeekActivePowerRecord( - windowed.key().getSensorId(), - windowed.key().getDayOfWeek().getValue(), - windowed.key().getHourOfDay(), - windowed.window().start(), - windowed.window().end(), - stats.count(), - stats.mean(), - stats.populationVariance(), - stats.min(), - stats.max()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java index 66bb46003..c2489a9ae 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java +++ b/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java @@ -52,11 +52,8 @@ public class TopologyBuilder { * Build the {@link Topology} for the History microservice. */ public Topology build() { - final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); - // final StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> statsRecordFactory = new - // HourOfDayRecordFactory(); this.builder .stream(this.inputTopic, -- GitLab