diff --git a/uc4-application/build.gradle b/uc4-application/build.gradle index e61af74b1b90d3ea3c9f70444aa2413b27a56631..009c083e1fcd3dffcbb358098e2e0e0900f98e07 100644 --- a/uc4-application/build.gradle +++ b/uc4-application/build.gradle @@ -12,6 +12,18 @@ buildscript { sourceCompatibility = "1.11" targetCompatibility = "1.11" +allprojects { + repositories { + jcenter() + maven { + url "https://oss.sonatype.org/content/repositories/snapshots/" + } + maven { + url 'https://packages.confluent.io/maven/' + } + } +} + dependencies { compile project(':') diff --git a/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java b/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java index 637c63c4d8d787c0269e9bb6e1a44a22acf63f29..dd707973463330c05d7cc9f099ba52ee26ac4f41 100644 --- a/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java +++ b/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java @@ -1,34 +1,41 @@ package uc4.streamprocessing; import com.google.common.math.Stats; +import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.common.serialization.Serde; import titan.ccp.common.kafka.GenericSerde; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; +import titan.ccp.model.records.WindowedActivePowerRecord; final class Serdes { - // private final SchemaRegistryAvroSerdeFactory avroSerdeFactory; + private final SchemaRegistryAvroSerdeFactory avroSerdeFactory; public Serdes(final String schemaRegistryUrl) { - // this.avroSerdeFactory = new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl); + this.avroSerdeFactory = new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl); } public Serde<String> string() { return org.apache.kafka.common.serialization.Serdes.String(); } - /* - * public Serde<WindowedActivePowerRecord> windowedActivePowerValues() { return - * this.avroSerdeFactory.forKeys(); } - * - * public Serde<ActivePowerRecord> activePowerRecordValues() { return - * this.avroSerdeFactory.forValues(); } - * - * public Serde<AggregatedActivePowerRecord> aggregatedActivePowerRecordValues() { return - * this.avroSerdeFactory.forValues(); } - * - * public <T extends SpecificRecord> Serde<T> avroValues() { return - * this.avroSerdeFactory.forValues(); } - */ + public Serde<WindowedActivePowerRecord> windowedActivePowerValues() { + return this.avroSerdeFactory.forKeys(); + } + + public Serde<ActivePowerRecord> activePowerRecordValues() { + return this.avroSerdeFactory.forValues(); + } + + public Serde<AggregatedActivePowerRecord> aggregatedActivePowerRecordValues() { + return this.avroSerdeFactory.forValues(); + } + + public <T extends SpecificRecord> Serde<T> avroValues() { + return this.avroSerdeFactory.forValues(); + } public Serde<Stats> stats() { return GenericSerde.from(Stats::toByteArray, Stats::fromByteArray);