Skip to content
Snippets Groups Projects

Draft: Uc1 teetime implementation

Open Lorenz Boguhn requested to merge stu203404/theodolite:uc1-teetime into main
Files
20
package theodolite.commons.teetime.communication;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.AbstractProducerStage;
import teetime.framework.OutputPort;
import theodolite.commons.teetime.config.ConfigurationKeys;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* TeeTime Producer change to Reader.
*
*/
public class KafkaRecordsReader extends
AbstractProducerStage<ConsumerRecords<String, ActivePowerRecord>> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordsReader.class);
private final KafkaConsumer<String, ActivePowerRecord> consumer;
private final int commitIntervalMs;
private final OutputPort<Object> startPort = this.createOutputPort();
/**
* TeeTime Producer stage that reads from Kafka.
*/
public KafkaRecordsReader(final Configuration kafkaconfig) {
super();
final Configuration config = kafkaconfig;
this.commitIntervalMs =
Integer.parseInt(config.getString(ConfigurationKeys.COMMIT_INTERVAL_MS));
final Properties props = new Properties();
props.put("application.name", config.getString(ConfigurationKeys.APPLICATION_NAME));
props.put("application.version", config.getString(ConfigurationKeys.APPLICATION_VERSION));
props.put("auto.commit.interval.ms",
config.getString(ConfigurationKeys.COMMIT_INTERVAL_MS));
props.put("bootstrap.servers",
config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS));
props.put("group.id", "test");
final SchemaRegistryAvroSerdeFactory serdes = new SchemaRegistryAvroSerdeFactory(
kafkaconfig.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL));
final ArrayList<String> inputTopics = new ArrayList<>();
inputTopics.add(config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC));
this.consumer = new KafkaConsumer<>(props, new StringDeserializer(),
serdes.<ActivePowerRecord>forValues().deserializer());
this.consumer.subscribe(inputTopics);
}
@Override
protected void execute() {
try {
this.runConsumer();
} catch (final InterruptedException e) {
LOGGER.error(e.toString());
}
}
private void runConsumer() throws InterruptedException {
final Duration pollTime = Duration.ofMillis(commitIntervalMs);
while (true) {
final ConsumerRecords<String, ActivePowerRecord> consumerRecords =
this.consumer.poll(pollTime);
if (!consumerRecords.isEmpty()) {
this.getOutputPort().send(consumerRecords);
this.consumer.commitAsync();
}
this.consumer.commitAsync();
}
}
@Override
protected void onTerminating() {
this.consumer.close();
super.onTerminating();
}
public OutputPort<Object> getStartPort() {
return startPort;
}
}
Loading