diff --git a/src/main/java/kiekpad/vizprovider/controller/MeasurementsController.java b/src/main/java/kiekpad/vizprovider/controller/MeasurementsController.java index b490eaec32ff797135665fc4580361483608554e..5fb7f525890fe0311a946bd7b5134927056dcc6b 100644 --- a/src/main/java/kiekpad/vizprovider/controller/MeasurementsController.java +++ b/src/main/java/kiekpad/vizprovider/controller/MeasurementsController.java @@ -1,5 +1,9 @@ package kiekpad.vizprovider.controller; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.RequestMapping; @@ -43,12 +47,7 @@ public class MeasurementsController { try { final ResultSet results = this.cassandraService.getSession().execute(statement); - for (Row row : results) { - ObjectNode node = jsonNodeFactory.objectNode(); - node.set("time", jsonNodeFactory.numberNode(row.getTimestamp("time").toInstant().toEpochMilli())); - node.set("measurement", jsonNodeFactory.numberNode(row.getDouble("measurement"))); - node.set("prediction", jsonNodeFactory.numberNode(row.getDouble("prediction"))); - node.set("anomalyscore", jsonNodeFactory.numberNode(row.getDouble("anomalyscore"))); + for (ObjectNode node : new AggregatedResultSet(results, new TakeFirstAggregator())) { measurements.add(node); } @@ -60,4 +59,79 @@ public class MeasurementsController { return measurements; } + private static interface RowsAggregator { + public ObjectNode aggregate(List<Row> rows); + } + + private static class TakeFirstAggregator implements RowsAggregator { + + private final JsonNodeFactory jsonNodeFactory = JsonNodeFactory.instance; + + @Override + public ObjectNode aggregate(final List<Row> rows) { + long timestamp = rows.get(0).getTimestamp("time").toInstant().toEpochMilli(); + double measurement = rows.get(0).getDouble("measurement"); + double prediction = rows.get(0).getDouble("prediction"); + double anomalyscore = rows.get(0).getDouble("anomalyscore"); + + ObjectNode node = this.jsonNodeFactory.objectNode(); + node.set("time", this.jsonNodeFactory.numberNode(timestamp)); + node.set("measurement", this.jsonNodeFactory.numberNode(measurement)); + node.set("prediction", this.jsonNodeFactory.numberNode(prediction)); + node.set("anomalyscore", this.jsonNodeFactory.numberNode(anomalyscore)); + + return node; + } + + } + + private static class AggregatedResultSet implements Iterable<ObjectNode> { + + private final ResultSet resultSet; + private final RowsAggregator aggregator; + + public AggregatedResultSet(final ResultSet resultSet, final RowsAggregator aggregator) { + this.resultSet = resultSet; + this.aggregator = aggregator; + } + + @Override + public Iterator<ObjectNode> iterator() { + return new Iterator<ObjectNode>() { + + private final Iterator<Row> rowsIterator = resultSet.iterator(); + private Row previous = null; + + @Override + public boolean hasNext() { + return this.rowsIterator.hasNext(); + } + + @Override + public ObjectNode next() { + + List<Row> rows = new ArrayList<>(); + if (this.previous == null) { + rows.add(this.rowsIterator.next()); + } else { + rows.add(this.previous); + } + + while (this.rowsIterator.hasNext()) { + Row row = this.rowsIterator.next(); + if (row.getTimestamp("time") == rows.get(0).getTimestamp("time")) { + rows.add(row); + } else { + this.previous = row; + break; + } + } + + return AggregatedResultSet.this.aggregator.aggregate(rows); + } + }; + } + + } + }