Skip to content
Snippets Groups Projects
Commit 22796246 authored by Sören Henning's avatar Sören Henning
Browse files

Finish essential parts of HTTP bridge

parent d64075f6
No related branches found
No related tags found
1 merge request!235HTTP bridge
Pipeline #6514 passed
......@@ -24,7 +24,8 @@ dependencies {
implementation project(':load-generator-commons')
implementation 'io.javalin:javalin:4.3.0'
implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation 'com.google.code.gson:gson:2.8.2'
runtimeOnly 'org.slf4j:slf4j-simple:1.7.25'
testImplementation 'junit:junit:4.12'
}
package theodolite.commons.httpbridge;
/**
* A class for converting objects to strings.
*
* @param <T> Type to be deserialized from.
*/
@FunctionalInterface
public interface Deserializer<T> {
......
......@@ -2,6 +2,12 @@ package theodolite.commons.httpbridge;
import theodolite.commons.workloadgeneration.RecordSender;
/**
* Class describing an endpoint of the {@link HttpBridge}, which converts JSON objects to Java
* objects and sends them using a {@link RecordSender}.
*
* @param <T> Type of objects this endpoint receives and converts.
*/
public class Endpoint<T> {
private final String path;
......@@ -10,6 +16,9 @@ public class Endpoint<T> {
private final RecordSender<? super T> recordSender;
/**
* Create a new {@link Endpoint} at the given path.
*/
public Endpoint(
final String path,
final Deserializer<? extends T> recordDeserializer,
......@@ -19,6 +28,9 @@ public class Endpoint<T> {
this.recordSender = recordSender;
}
/**
* Create a new {@link Endpoint} at the given path with a {@link GsonDeserializer}.
*/
public Endpoint(
final String path,
final Class<T> recordType,
......
package theodolite.commons.httpbridge;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import theodolite.commons.workloadgeneration.ConfigurationKeys;
import theodolite.commons.workloadgeneration.TitanKafkaSenderFactory;
import titan.ccp.model.records.ActivePowerRecord;
class EnvVarHttpBridgeFactory {
private static final String PORT_KEY = "PORT";
private static final int PORT_DEFAULT = 8080;
private static final String HOST_KEY = "HOST";
private static final String HOST_DEFAULT = "0.0.0.0"; // NOPMD
private static final String KAFKA_BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; // NOPMD
private static final String KAFKA_TOPIC_DEFAULT = "input";
private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
public HttpBridge create() {
final Endpoint<?> converter = new Endpoint<>(
"/",
ActivePowerRecord.class,
TitanKafkaSenderFactory.forKafkaConfig(
this.getKafkaBootstrapServer(),
this.getKafkaTopic(),
this.getSchemaRegistryUrl()));
return new HttpBridge(this.getHost(), this.getPort(), List.of(converter));
}
private String getHost() {
return Objects.requireNonNullElse(System.getenv(HOST_KEY), HOST_DEFAULT);
}
private int getPort() {
return Optional.ofNullable(System.getenv(PORT_KEY)).map(Integer::parseInt).orElse(PORT_DEFAULT);
}
private String getKafkaBootstrapServer() {
return Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS),
KAFKA_BOOTSTRAP_SERVERS_DEFAULT);
}
private String getKafkaTopic() {
return Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
KAFKA_TOPIC_DEFAULT);
}
private String getSchemaRegistryUrl() {
return Objects.requireNonNullElse(
System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL),
SCHEMA_REGISTRY_URL_DEFAULT);
}
}
......@@ -2,6 +2,11 @@ package theodolite.commons.httpbridge;
import com.google.gson.Gson;
/**
* A {@link Deserializer} based on GSON.
*
* @param <T> Type to be serialized from.
*/
public class GsonDeserializer<T> implements Deserializer<T> {
private final Gson gson;
......
package theodolite.commons.httpbridge;
import java.util.List;
import theodolite.commons.workloadgeneration.TitanKafkaSenderFactory;
import titan.ccp.model.records.ActivePowerRecord;
import theodolite.commons.workloadgeneration.RecordSender;
/**
* Class that creates a webserver with potentially multiple {@link Endpoint}s, which receives JSON
* objects at these endpoints, converts them to Java objects and send them using
* {@link RecordSender}s.
*/
public class HttpBridge {
private static final int PORT = 8080;
private static final String HOST = "0.0.0.0"; // NOPMD
private final JavalinWebServer webServer;
public void run() {
final Endpoint<?> converter = new Endpoint<>(
"/",
ActivePowerRecord.class,
TitanKafkaSenderFactory.forKafkaConfig(null, null, null));
public HttpBridge(final String host, final int port, final List<Endpoint<?>> converters) {
this.webServer = new JavalinWebServer(converters, host, port);
}
public void start() {
this.webServer.start();
}
public void stop() {
this.webServer.stop();
}
public void runAsStandalone() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop()));
this.start();
}
public static HttpBridge fromEnvironment() {
return new EnvVarHttpBridgeFactory().create();
}
final JavalinWebServer webServer = new JavalinWebServer(List.of(converter), HOST, PORT);
webServer.start();
public static void main(final String[] args) {
HttpBridge.fromEnvironment().runAsStandalone();
}
}
......@@ -3,6 +3,9 @@ package theodolite.commons.httpbridge;
import io.javalin.Javalin;
import java.util.Collection;
/**
* Implementation of a webserver based on the Javalin framework.
*/
public class JavalinWebServer {
private static final int HTTP_SUCCESS = 200;
......@@ -12,6 +15,9 @@ public class JavalinWebServer {
private final String host;
private final int port;
/**
* Create a new instance, running on the specified host and port with the configured endpoints.
*/
public JavalinWebServer(
final Collection<Endpoint<?>> converters,
final String host,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment