Skip to content
Snippets Groups Projects

Draft: Uc3 TeeTime implementation

Open Lorenz Boguhn requested to merge stu203404/theodolite:uc3-teetime into main
27 files
+ 1086
0
Compare changes
  • Side-by-side
  • Inline
Files
27
 
package theodolite.commons.teetime.communication;
 
 
import java.time.Duration;
 
import java.util.ArrayList;
 
import java.util.Properties;
 
import org.apache.commons.configuration2.Configuration;
 
import org.apache.kafka.clients.consumer.ConsumerRecords;
 
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import org.apache.kafka.common.serialization.StringDeserializer;
 
import org.slf4j.Logger;
 
import org.slf4j.LoggerFactory;
 
import teetime.framework.AbstractProducerStage;
 
import teetime.framework.OutputPort;
 
import theodolite.commons.teetime.config.ConfigurationKeys;
 
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
 
import titan.ccp.model.records.ActivePowerRecord;
 
 
/**
 
* TeeTime Producer change to Reader.
 
*
 
*/
 
public class KafkaRecordsReader extends
 
AbstractProducerStage<ConsumerRecords<String, ActivePowerRecord>> {
 
 
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordsReader.class);
 
private final KafkaConsumer<String, ActivePowerRecord> consumer;
 
private final int commitIntervalMs;
 
 
private final OutputPort<Object> startPort = this.createOutputPort();
 
 
/**
 
* TeeTime Producer stage that reads from Kafka.
 
*/
 
public KafkaRecordsReader(final Configuration kafkaconfig) {
 
super();
 
final Configuration config = kafkaconfig;
 
this.commitIntervalMs =
 
Integer.parseInt(config.getString(ConfigurationKeys.COMMIT_INTERVAL_MS));
 
 
final Properties props = new Properties();
 
props.put("application.name", config.getString(ConfigurationKeys.APPLICATION_NAME));
 
props.put("application.version", config.getString(ConfigurationKeys.APPLICATION_VERSION));
 
props.put("auto.commit.interval.ms",
 
config.getString(ConfigurationKeys.COMMIT_INTERVAL_MS));
 
props.put("bootstrap.servers",
 
config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS));
 
props.put("group.id", "test");
 
 
final SchemaRegistryAvroSerdeFactory serdes = new SchemaRegistryAvroSerdeFactory(
 
kafkaconfig.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
 
 
final ArrayList<String> inputTopics = new ArrayList<>();
 
inputTopics.add(config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC));
 
 
this.consumer = new KafkaConsumer<>(props, new StringDeserializer(),
 
serdes.<ActivePowerRecord>forValues().deserializer());
 
 
 
this.consumer.subscribe(inputTopics);
 
}
 
 
 
@Override
 
protected void execute() {
 
try {
 
this.runConsumer();
 
} catch (final InterruptedException e) {
 
LOGGER.error(e.toString());
 
}
 
}
 
 
private void runConsumer() throws InterruptedException {
 
 
final Duration pollTime = Duration.ofMillis(commitIntervalMs);
 
 
while (true) {
 
final ConsumerRecords<String, ActivePowerRecord> consumerRecords =
 
this.consumer.poll(pollTime);
 
 
if (!consumerRecords.isEmpty()) {
 
this.getOutputPort().send(consumerRecords);
 
this.consumer.commitAsync();
 
}
 
 
this.consumer.commitAsync();
 
}
 
 
}
 
 
@Override
 
protected void onTerminating() {
 
this.consumer.close();
 
super.onTerminating();
 
}
 
 
public OutputPort<Object> getStartPort() {
 
return startPort;
 
}
 
 
}
Loading