Skip to content
Snippets Groups Projects
Commit 2f37edf6 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Remove unused time attribute aggregation classes from uc4 kafkaStreams app

- Remove DayOfWeek classes
- Remove HourOfWeek classes
parent 29f14d89
No related branches found
No related tags found
1 merge request!4Feature/cleanup kafka streams
Showing
with 0 additions and 244 deletions
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
/**
* Composed key of a {@link DayOfWeek} and a sensor id.
*/
public class DayOfWeekKey {
private final DayOfWeek dayOfWeek;
private final String sensorId;
public DayOfWeekKey(final DayOfWeek dayOfWeek, final String sensorId) {
this.dayOfWeek = dayOfWeek;
this.sensorId = sensorId;
}
public DayOfWeek getDayOfWeek() {
return this.dayOfWeek;
}
public String getSensorId() {
return this.sensorId;
}
@Override
public String toString() {
return this.sensorId + ";" + this.dayOfWeek.toString();
}
}
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
import java.time.LocalDateTime;
/**
* {@link StatsKeyFactory} for {@link DayOfWeekKey}.
*/
public class DayOfWeekKeyFactory implements StatsKeyFactory<DayOfWeekKey> {
@Override
public DayOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) {
final DayOfWeek dayOfWeek = dateTime.getDayOfWeek();
return new DayOfWeekKey(dayOfWeek, sensorId);
}
@Override
public String getSensorId(final DayOfWeekKey key) {
return key.getSensorId();
}
}
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.simpleserdes.BufferSerde;
import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
/**
* {@link BufferSerde} for a {@link DayOfWeekKey}. Use the {@link #create()} method to create a new
* Kafka {@link Serde}.
*/
public class DayOfWeekKeySerde implements BufferSerde<DayOfWeekKey> {
@Override
public void serialize(final WriteBuffer buffer, final DayOfWeekKey data) {
buffer.putInt(data.getDayOfWeek().getValue());
buffer.putString(data.getSensorId());
}
@Override
public DayOfWeekKey deserialize(final ReadBuffer buffer) {
final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt());
final String sensorId = buffer.getString();
return new DayOfWeekKey(dayOfWeek, sensorId);
}
public static Serde<DayOfWeekKey> create() {
return SimpleSerdes.create(new DayOfWeekKeySerde());
}
}
package spesb.uc4.streamprocessing;
import com.google.common.math.Stats;
import org.apache.kafka.streams.kstream.Windowed;
import titan.ccp.model.records.DayOfWeekActivePowerRecord;
/**
* {@link StatsRecordFactory} to create an {@link DayOfWeekActivePowerRecord}.
*/
public class DayOfWeekRecordFactory
implements StatsRecordFactory<DayOfWeekKey, DayOfWeekActivePowerRecord> {
@Override
public DayOfWeekActivePowerRecord create(final Windowed<DayOfWeekKey> windowed,
final Stats stats) {
return new DayOfWeekActivePowerRecord(
windowed.key().getSensorId(),
windowed.key().getDayOfWeek().getValue(),
windowed.window().start(),
windowed.window().end(),
stats.count(),
stats.mean(),
stats.populationVariance(),
stats.min(),
stats.max());
}
}
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
/**
* Composed key of a {@link DayOfWeek}, an hour of the day and a sensor id.
*/
public class HourOfWeekKey {
private final DayOfWeek dayOfWeek;
private final int hourOfDay;
private final String sensorId;
/**
* Create a new {@link HourOfDayKey} using its components.
*/
public HourOfWeekKey(final DayOfWeek dayOfWeek, final int hourOfDay, final String sensorId) {
this.dayOfWeek = dayOfWeek;
this.hourOfDay = hourOfDay;
this.sensorId = sensorId;
}
public DayOfWeek getDayOfWeek() {
return this.dayOfWeek;
}
public int getHourOfDay() {
return this.hourOfDay;
}
public String getSensorId() {
return this.sensorId;
}
@Override
public String toString() {
return this.sensorId + ";" + this.dayOfWeek.toString() + ";" + this.hourOfDay;
}
}
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
import java.time.LocalDateTime;
/**
* {@link StatsKeyFactory} for {@link HourOfWeekKey}.
*/
public class HourOfWeekKeyFactory implements StatsKeyFactory<HourOfWeekKey> {
@Override
public HourOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) {
final DayOfWeek dayOfWeek = dateTime.getDayOfWeek();
final int hourOfDay = dateTime.getHour();
return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId);
}
@Override
public String getSensorId(final HourOfWeekKey key) {
return key.getSensorId();
}
}
package spesb.uc4.streamprocessing;
import java.time.DayOfWeek;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.simpleserdes.BufferSerde;
import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
/**
* {@link BufferSerde} for a {@link HourOfWeekKey}. Use the {@link #create()} method to create a new
* Kafka {@link Serde}.
*/
public class HourOfWeekKeySerde implements BufferSerde<HourOfWeekKey> {
@Override
public void serialize(final WriteBuffer buffer, final HourOfWeekKey data) {
buffer.putInt(data.getDayOfWeek().getValue());
buffer.putInt(data.getHourOfDay());
buffer.putString(data.getSensorId());
}
@Override
public HourOfWeekKey deserialize(final ReadBuffer buffer) {
final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt());
final int hourOfDay = buffer.getInt();
final String sensorId = buffer.getString();
return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId);
}
public static Serde<HourOfWeekKey> create() {
return SimpleSerdes.create(new HourOfWeekKeySerde());
}
}
package spesb.uc4.streamprocessing;
import com.google.common.math.Stats;
import org.apache.kafka.streams.kstream.Windowed;
import titan.ccp.model.records.HourOfWeekActivePowerRecord;
/**
* {@link StatsRecordFactory} to create an {@link HourOfWeekActivePowerRecord}.
*/
public class HourOfWeekRecordFactory
implements StatsRecordFactory<HourOfWeekKey, HourOfWeekActivePowerRecord> {
@Override
public HourOfWeekActivePowerRecord create(final Windowed<HourOfWeekKey> windowed,
final Stats stats) {
return new HourOfWeekActivePowerRecord(
windowed.key().getSensorId(),
windowed.key().getDayOfWeek().getValue(),
windowed.key().getHourOfDay(),
windowed.window().start(),
windowed.window().end(),
stats.count(),
stats.mean(),
stats.populationVariance(),
stats.min(),
stats.max());
}
}
......@@ -52,11 +52,8 @@ public class TopologyBuilder {
* Build the {@link Topology} for the History microservice.
*/
public Topology build() {
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
// final StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> statsRecordFactory = new
// HourOfDayRecordFactory();
this.builder
.stream(this.inputTopic,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment