From 0bd6372c01620579d93440298ca889e8d0f1e5c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <post@soeren-henning.de> Date: Thu, 26 Mar 2020 15:06:36 +0100 Subject: [PATCH] Add repositories for missing sources --- uc4-application/build.gradle | 12 ++++++ .../java/uc4/streamprocessing/Serdes.java | 37 +++++++++++-------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/uc4-application/build.gradle b/uc4-application/build.gradle index e61af74b1..009c083e1 100644 --- a/uc4-application/build.gradle +++ b/uc4-application/build.gradle @@ -12,6 +12,18 @@ buildscript { sourceCompatibility = "1.11" targetCompatibility = "1.11" +allprojects { + repositories { + jcenter() + maven { + url "https://oss.sonatype.org/content/repositories/snapshots/" + } + maven { + url 'https://packages.confluent.io/maven/' + } + } +} + dependencies { compile project(':') diff --git a/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java b/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java index 637c63c4d..dd7079734 100644 --- a/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java +++ b/uc4-application/src/main/java/uc4/streamprocessing/Serdes.java @@ -1,34 +1,41 @@ package uc4.streamprocessing; import com.google.common.math.Stats; +import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.common.serialization.Serde; import titan.ccp.common.kafka.GenericSerde; +import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.records.AggregatedActivePowerRecord; +import titan.ccp.model.records.WindowedActivePowerRecord; final class Serdes { - // private final SchemaRegistryAvroSerdeFactory avroSerdeFactory; + private final SchemaRegistryAvroSerdeFactory avroSerdeFactory; public Serdes(final String schemaRegistryUrl) { - // this.avroSerdeFactory = new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl); + this.avroSerdeFactory = new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl); } public Serde<String> string() { return org.apache.kafka.common.serialization.Serdes.String(); } - /* - * public Serde<WindowedActivePowerRecord> windowedActivePowerValues() { return - * this.avroSerdeFactory.forKeys(); } - * - * public Serde<ActivePowerRecord> activePowerRecordValues() { return - * this.avroSerdeFactory.forValues(); } - * - * public Serde<AggregatedActivePowerRecord> aggregatedActivePowerRecordValues() { return - * this.avroSerdeFactory.forValues(); } - * - * public <T extends SpecificRecord> Serde<T> avroValues() { return - * this.avroSerdeFactory.forValues(); } - */ + public Serde<WindowedActivePowerRecord> windowedActivePowerValues() { + return this.avroSerdeFactory.forKeys(); + } + + public Serde<ActivePowerRecord> activePowerRecordValues() { + return this.avroSerdeFactory.forValues(); + } + + public Serde<AggregatedActivePowerRecord> aggregatedActivePowerRecordValues() { + return this.avroSerdeFactory.forValues(); + } + + public <T extends SpecificRecord> Serde<T> avroValues() { + return this.avroSerdeFactory.forValues(); + } public Serde<Stats> stats() { return GenericSerde.from(Stats::toByteArray, Stats::fromByteArray); -- GitLab