Skip to content
Snippets Groups Projects
Commit 41c39de7 authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Merge remote-tracking branch 'upstream/master' into feature/346-migrate-strimzi-kafka

parents ced76fd1 ef530a0b
No related branches found
No related tags found
1 merge request!243Migrate to Strimzi Kafka
Showing
with 293 additions and 56 deletions
...@@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory; ...@@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* Abstraction of a Beam microservice. * Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and
* Encapsulates the corresponding {@link PipelineOptions} and the beam Runner. * the beam Runner.
*/ */
public class AbstractBeamService { public class AbstractBeamService {
...@@ -20,9 +20,7 @@ public class AbstractBeamService { ...@@ -20,9 +20,7 @@ public class AbstractBeamService {
// Application Configurations // Application Configurations
private final Configuration config = ServiceConfigurations.createWithDefaults(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private final String applicationName = private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
config.getString(ConfigurationKeys.APPLICATION_NAME);
/** /**
* Creates AbstractBeamService with options. * Creates AbstractBeamService with options.
...@@ -33,13 +31,13 @@ public class AbstractBeamService { ...@@ -33,13 +31,13 @@ public class AbstractBeamService {
for (final String s : args) { for (final String s : args) {
LOGGER.info("{}", s); LOGGER.info("{}", s);
} }
options = PipelineOptionsFactory.fromArgs(args).create(); this.options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName(applicationName); this.options.setJobName(this.applicationName);
LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString()); LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString());
} }
public Configuration getConfig() { public Configuration getConfig() {
return config; return this.config;
} }
} }
...@@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform; ...@@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.Deserializer;
/** /**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
...@@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection; ...@@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection;
* @param <K> Type of the Key. * @param <K> Type of the Key.
* @param <V> Type of the Value. * @param <V> Type of the Value.
*/ */
public class KafkaGenericReader<K, V> extends public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
PTransform<PBegin, PCollection<KV<K, V>>> {
private static final long serialVersionUID = 2603286150183186115L; private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<K, V>>> reader; private final PTransform<PBegin, PCollection<KV<K, V>>> reader;
...@@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends ...@@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/ */
public KafkaGenericReader(final String bootstrapServer, final String inputTopic, public KafkaGenericReader(
final String bootstrapServer,
final String inputTopic,
final Map<String, Object> consumerConfig, final Map<String, Object> consumerConfig,
final Class<? extends final Class<? extends Deserializer<K>> keyDeserializer,
org.apache.kafka.common.serialization.Deserializer<K>> final Class<? extends Deserializer<V>> valueDeserializer) {
keyDeserializer,
final Class<? extends
org.apache.kafka.common.serialization.Deserializer<V>>
valueDeserializer) {
super(); super();
// Check if boostrap server and inputTopic are defined // Check if boostrap server and inputTopic are defined
...@@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends ...@@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends
throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
} }
reader = this.reader =
KafkaIO.<K, V>read() KafkaIO.<K, V>read()
.withBootstrapServers(bootstrapServer) .withBootstrapServers(bootstrapServer)
.withTopic(inputTopic) .withTopic(inputTopic)
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
sleep 55s # to let the benchmark and produce some output sleep 55s # to let the benchmark and produce some output
docker-compose logs --tail 100 benchmark-taskmanager | docker-compose logs --tail 100 benchmark-taskmanager |
sed -n "s/^.*Key:\s\(\S*\), Value:\s\(\S*\).*$/\2/p" | sed -n "s/^.*Record:\s\(\S*\)$/\1/p" |
tee /dev/stderr | tee /dev/stderr |
jq .identifier | jq .identifier |
sort | sort |
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
sleep 55s # to let the benchmark and produce some output sleep 55s # to let the benchmark and produce some output
docker-compose logs --tail 100 benchmark | docker-compose logs --tail 100 benchmark |
sed -n "s/^.*Key:\s\(\S*\), Value:\s\(\S*\).*$/\2/p" | sed -n "s/^.*Record:\s\(\S*\)$/\1/p" |
tee /dev/stderr | tee /dev/stderr |
jq .identifier | jq .identifier |
sort | sort |
......
application.name=theodolite-uc1-application application.name=theodolite-uc1-application
application.version=0.0.1 application.version=0.0.1
sink.type=logger
kafka.bootstrap.servers=localhost:9092 kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input kafka.input.topic=input
kafka.output.topic=output kafka.output.topic=output
......
application.name=theodolite-uc1-application application.name=theodolite-uc1-application
application.version=0.0.1 application.version=0.0.1
sink.type=logger
kafka.bootstrap.servers=localhost:9092 kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input kafka.input.topic=input
kafka.output.topic=output kafka.output.topic=output
......
...@@ -4,4 +4,5 @@ plugins { ...@@ -4,4 +4,5 @@ plugins {
dependencies { dependencies {
implementation project(':uc1-commons') implementation project(':uc1-commons')
implementation 'org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.35.0'
} }
package application;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A {@link PTransform} for a generic {@link DatabaseAdapter}.
*
* @param <T> Type parameter of {@link DatabaseAdapter}.
*/
public class GenericSink<T> extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
private static final long serialVersionUID = 1L;
private final DatabaseAdapter<T> databaseAdapter;
private final Class<T> type;
/**
* Create a {@link GenericSink} for the provided {@link DatabaseAdapter}. Requires also the
* corresponding {@link Class} object for Beam.
*/
public GenericSink(final DatabaseAdapter<T> databaseAdapter, final Class<T> type) {
super();
this.databaseAdapter = databaseAdapter;
this.type = type;
}
@Override
public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) {
return activePowerRecords
.apply(MapElements
.via(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter(), this.type)))
.apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
}
}
package application;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Interface for a class that creates sinks (i.e., {@link PTransform}s that map and store
* {@link ActivePowerRecord}s, optionally, using a {@link Configuration}.
*/
public interface SinkFactory {
PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(Configuration configuration);
}
package application;
import application.firestore.FirestoreSink;
import java.util.stream.Stream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Supported Sink types, i.e., {@link PTransform} for converting and storing
* {@link ActivePowerRecord}s.
*/
public enum SinkType implements SinkFactory {
LOGGER("logger") {
@Override
public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(
final Configuration config) {
return new GenericSink<>(LogWriterFactory.forJson(), String.class);
}
},
FIRESTORE("firestore") {
@Override
public PTransform<PCollection<ActivePowerRecord>, PCollection<?>> create(
final Configuration config) {
return FirestoreSink.fromConfig(config);
}
};
private final String value;
SinkType(final String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
/**
* Create a new {@link SinkType} from its string representation.
*/
public static SinkType from(final String value) {
return Stream.of(SinkType.values())
.filter(t -> t.value.equals(value))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Sink '" + value + "' does not exist."));
}
}
package application; package application;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.Values;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To * Implementation of benchmark UC1: Database Storage with Apache Beam.
* execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
* generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
* using--flinkMaster as run parameter. To persist logs add
* ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
*/ */
public final class Uc1BeamPipeline extends AbstractPipeline { public final class Uc1BeamPipeline extends AbstractPipeline {
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); public static final String SINK_TYPE_KEY = "sink.type";
protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) { protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) {
super(options, config); super(options, config);
// Set Coders for Classes that will be distributed final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY));
final CoderRegistry cr = this.getCoderRegistry();
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
// build KafkaConsumerConfig // Set Coders for classes that will be distributed
final Map<String, Object> consumerConfig = this.buildConsumerConfig(); final CoderRegistry cr = super.getCoderRegistry();
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
// Create Pipeline transformations final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader(
final KafkaActivePowerTimestampReader kafka = super.bootstrapServer,
new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig); super.inputTopic,
super.buildConsumerConfig());
// Apply pipeline transformations super.apply(kafka)
// Read from Kafka
this.apply(kafka)
.apply(Values.create()) .apply(Values.create())
.apply(MapElements.via(new ConverterAdapter<>( .apply(sinkType.create(config));
this.databaseAdapter.getRecordConverter(),
String.class)))
.apply(ParDo.of(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())));
} }
} }
package application.firestore;
import com.google.firestore.v1.Document;
import com.google.firestore.v1.Value;
import java.io.IOException;
import org.apache.beam.sdk.transforms.SimpleFunction;
import titan.ccp.model.records.ActivePowerRecord;
final class DocumentMapper extends SimpleFunction<ActivePowerRecord, Document> {
private static final long serialVersionUID = -5263671231838353749L; // NOPMD
private transient FirestoreConfig firestoreConfig;
private final String collection;
public DocumentMapper(final String collection) {
super();
this.collection = collection;
}
@Override
public Document apply(final ActivePowerRecord record) {
return Document
.newBuilder()
.setName(this.createDocumentName(record.getIdentifier() + record.getTimestamp()))
.putFields("identifier",
Value.newBuilder().setStringValue(record.getIdentifier()).build())
.putFields("timestamp", Value.newBuilder().setIntegerValue(record.getTimestamp()).build())
.putFields("valueInW", Value.newBuilder().setDoubleValue(record.getValueInW()).build())
.build();
}
private String createDocumentName(final String documentId) {
this.initFirestoreConfig();
return "projects/" + this.firestoreConfig.getProjectId()
+ "/databases/" + this.firestoreConfig.getDatabaseDdlRequest()
+ "/documents/" + this.collection
+ "/" + documentId;
}
private void initFirestoreConfig() {
if (this.firestoreConfig == null) {
try {
this.firestoreConfig = FirestoreConfig.createFromDefaults();
} catch (final IOException e) {
throw new IllegalStateException("Cannot create Firestore configuration.", e);
}
}
}
}
package application.firestore;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.firestore.FirestoreOptions;
import java.io.IOException;
final class FirestoreConfig {
private final FirestoreOptions firestoreOptions;
private FirestoreConfig(final FirestoreOptions firestoreOptions) {
this.firestoreOptions = firestoreOptions;
}
public String getProjectId() {
return this.firestoreOptions.getProjectId();
}
public String getDatabaseDdlRequest() {
return this.firestoreOptions.getProjectId();
}
public static FirestoreConfig createFromDefaults() throws IOException {
return new FirestoreConfig(FirestoreOptions.getDefaultInstance().toBuilder()
.setCredentials(GoogleCredentials.getApplicationDefault())
.build());
}
}
package application.firestore;
import com.google.cloud.firestore.DocumentSnapshot;
import com.google.firestore.v1.Document;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.configuration2.Configuration;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A {@link PTransform} mapping {@link ActivePowerRecord}s to {@link Document}s, followed by storing
* these {@link DocumentSnapshot} to Firestore.
*/
public class FirestoreSink extends PTransform<PCollection<ActivePowerRecord>, PCollection<?>> {
public static final String SINK_FIRESTORE_COLLECTION_KEY = "sink.firestore.collection";
private static final long serialVersionUID = 1L;
private final String collectionName;
public FirestoreSink(final String collectionName) {
super();
this.collectionName = collectionName;
}
@Override
public PCollection<?> expand(final PCollection<ActivePowerRecord> activePowerRecords) {
return activePowerRecords
.apply(MapElements.via(new DocumentMapper(this.collectionName)))
.apply(MapElements.via(new UpdateOperationMapper()))
.apply(FirestoreIO.v1().write().batchWrite().build());
}
public static FirestoreSink fromConfig(final Configuration config) {
final String collectionName = config.getString(SINK_FIRESTORE_COLLECTION_KEY);
return new FirestoreSink(collectionName);
}
}
package application.firestore;
import com.google.firestore.v1.Document;
import com.google.firestore.v1.Write;
import org.apache.beam.sdk.transforms.SimpleFunction;
final class UpdateOperationMapper extends SimpleFunction<Document, Write> {
private static final long serialVersionUID = -5263671231838353748L; // NOPMD
@Override
public Write apply(final Document document) {
return Write.newBuilder()
.setUpdate(document)
.build();
}
}
...@@ -13,8 +13,10 @@ repositories { ...@@ -13,8 +13,10 @@ repositories {
} }
dependencies { dependencies {
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } // Make this implementation once this is a local subproject.
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } // Currently, Flink needs its own version of these dependencies.
compileOnly('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
compileOnly('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.code.gson:gson:2.8.9' implementation 'com.google.code.gson:gson:2.8.9'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment