Skip to content
Snippets Groups Projects
Commit 394d5bb3 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'http-bridge' into 'master'

HTTP bridge

Closes #336

See merge request !235
parents 2d87de45 22796246
No related branches found
No related tags found
1 merge request!235HTTP bridge
Pipeline #6515 passed
Showing
with 407 additions and 0 deletions
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
configFilePath=../config/checkstyle.xml
customModulesJarPaths=
eclipse.preferences.version=1
enabled=true
customRulesJars=
eclipse.preferences.version=1
enabled=true
ruleSetFilePath=../config/pmd.xml
plugins {
// common java conventions
id 'theodolite.java-conventions'
// make executable
id 'application'
}
tasks.distZip.enabled = false
repositories {
mavenCentral()
maven {
url "https://oss.sonatype.org/content/repositories/snapshots/"
}
maven {
url 'https://packages.confluent.io/maven/'
}
}
dependencies {
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation project(':load-generator-commons')
implementation 'io.javalin:javalin:4.3.0'
implementation 'com.google.code.gson:gson:2.8.2'
runtimeOnly 'org.slf4j:slf4j-simple:1.7.25'
testImplementation 'junit:junit:4.12'
}
package theodolite.commons.httpbridge;
/**
* A class for converting objects to strings.
*
* @param <T> Type to be deserialized from.
*/
@FunctionalInterface
public interface Deserializer<T> {
T deserialize(String json);
}
package theodolite.commons.httpbridge;
import theodolite.commons.workloadgeneration.RecordSender;
/**
* Class describing an endpoint of the {@link HttpBridge}, which converts JSON objects to Java
* objects and sends them using a {@link RecordSender}.
*
* @param <T> Type of objects this endpoint receives and converts.
*/
public class Endpoint<T> {
private final String path;
private final Deserializer<? extends T> recordDeserializer;
private final RecordSender<? super T> recordSender;
/**
* Create a new {@link Endpoint} at the given path.
*/
public Endpoint(
final String path,
final Deserializer<? extends T> recordDeserializer,
final RecordSender<? super T> recordSender) {
this.path = path;
this.recordDeserializer = recordDeserializer;
this.recordSender = recordSender;
}
/**
* Create a new {@link Endpoint} at the given path with a {@link GsonDeserializer}.
*/
public Endpoint(
final String path,
final Class<T> recordType,
final RecordSender<? super T> recordSender) {
this.path = path;
this.recordDeserializer = new GsonDeserializer<>(recordType);
this.recordSender = recordSender;
}
public String getPath() {
return this.path;
}
public void convert(final String json) {
final T record = this.recordDeserializer.deserialize(json);
this.recordSender.send(record);
}
}
package theodolite.commons.httpbridge;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import theodolite.commons.workloadgeneration.ConfigurationKeys;
import theodolite.commons.workloadgeneration.TitanKafkaSenderFactory;
import titan.ccp.model.records.ActivePowerRecord;
class EnvVarHttpBridgeFactory {
private static final String PORT_KEY = "PORT";
private static final int PORT_DEFAULT = 8080;
private static final String HOST_KEY = "HOST";
private static final String HOST_DEFAULT = "0.0.0.0"; // NOPMD
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
private static final String KAFKA_TOPIC_DEFAULT = "input";
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
public HttpBridge create() {
final Endpoint<?> converter = new Endpoint<>(
"/",
ActivePowerRecord.class,
TitanKafkaSenderFactory.forKafkaConfig(
this.getKafkaBootstrapServer(),
this.getKafkaTopic(),
this.getSchemaRegistryUrl()));
return new HttpBridge(this.getHost(), this.getPort(), List.of(converter));
}
private String getHost() {
return Objects.requireNonNullElse(System.getenv(HOST_KEY), HOST_DEFAULT);
}
private int getPort() {
return Optional.ofNullable(System.getenv(PORT_KEY)).map(Integer::parseInt).orElse(PORT_DEFAULT);
}
private String getKafkaBootstrapServer() {
return Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
}
private String getKafkaTopic() {
return Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
KAFKA_TOPIC_DEFAULT);
}
private String getSchemaRegistryUrl() {
return Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
SCHEMA_REGISTRY_URL_DEFAULT);
}
}
package theodolite.commons.httpbridge;
import com.google.gson.Gson;
/**
* A {@link Deserializer} based on GSON.
*
* @param <T> Type to be serialized from.
*/
public class GsonDeserializer<T> implements Deserializer<T> {
private final Gson gson;
private final Class<T> targetClass;
public GsonDeserializer(final Class<T> targetClass) {
this(new Gson(), targetClass);
}
public GsonDeserializer(final Gson gson, final Class<T> targetClass) {
this.gson = gson;
this.targetClass = targetClass;
}
@Override
public T deserialize(final String json) {
return this.gson.fromJson(json, this.targetClass);
}
}
package theodolite.commons.httpbridge;
import java.util.List;
import theodolite.commons.workloadgeneration.RecordSender;
/**
* Class that creates a webserver with potentially multiple {@link Endpoint}s, which receives JSON
* objects at these endpoints, converts them to Java objects and send them using
* {@link RecordSender}s.
*/
public class HttpBridge {
private final JavalinWebServer webServer;
public HttpBridge(final String host, final int port, final List<Endpoint<?>> converters) {
this.webServer = new JavalinWebServer(converters, host, port);
}
public void start() {
this.webServer.start();
}
public void stop() {
this.webServer.stop();
}
public void runAsStandalone() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop()));
this.start();
}
public static HttpBridge fromEnvironment() {
return new EnvVarHttpBridgeFactory().create();
}
public static void main(final String[] args) {
HttpBridge.fromEnvironment().runAsStandalone();
}
}
package theodolite.commons.httpbridge;
import io.javalin.Javalin;
import java.util.Collection;
/**
* Implementation of a webserver based on the Javalin framework.
*/
public class JavalinWebServer {
private static final int HTTP_SUCCESS = 200;
private final Javalin app = Javalin.create();
private final String host;
private final int port;
/**
* Create a new instance, running on the specified host and port with the configured endpoints.
*/
public JavalinWebServer(
final Collection<Endpoint<?>> converters,
final String host,
final int port) {
this.host = host;
this.port = port;
this.configureRoutes(converters);
}
private void configureRoutes(final Collection<Endpoint<?>> endpoints) {
for (final Endpoint<?> endpoint : endpoints) {
this.app.post(endpoint.getPath(), ctx -> {
endpoint.convert(ctx.body());
ctx.status(HTTP_SUCCESS);
});
}
}
public void start() {
this.app.start(this.host, this.port);
}
public void stop() {
this.app.close();
}
}
......@@ -34,3 +34,4 @@ include 'uc4-flink'
include 'uc4-beam-flink'
include 'uc4-beam-samza'
include 'http-bridge'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment