Skip to content
Snippets Groups Projects
Commit df4ebc10 authored by Sören Henning's avatar Sören Henning
Browse files

Support pipeline cancelation from BeamService

parent 1e8a1d46
No related branches found
No related tags found
No related merge requests found
Pipeline #7241 passed
Showing
with 43 additions and 11 deletions
package rocks.theodolite.benchmarks.commons.beam;
import java.io.IOException;
import java.util.function.Function;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
......@@ -23,6 +25,7 @@ public class BeamService {
private final AbstractPipelineFactory pipelineFactory;
private final PipelineOptions pipelineOptions;
private PipelineResult pipelineResult;
/**
* Create a new {@link BeamService}.
......@@ -43,14 +46,43 @@ public class BeamService {
}
/**
* Start this microservice, by running the underlying Beam pipeline.
* Start this microservice by running the underlying Beam pipeline.
*/
public void run() {
LOGGER.info("Constructing Beam pipeline with pipeline options: {}",
this.pipelineOptions.toString());
final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions);
LOGGER.info("Starting BeamService {}.", this.applicationName);
pipeline.run().waitUntilFinish();
this.pipelineResult = pipeline.run();
}
/**
* Start this microservice by running the underlying Beam pipeline and block until this process is
* terminated.
*/
public void runStandalone() {
this.run();
Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop()));
this.pipelineResult.waitUntilFinish();
}
/**
* Stop this microservice by canceling the underlying Beam pipeline.
*/
public void stop() {
LOGGER.info("Initiate shutdown of Beam service {}.", this.applicationName);
if (this.pipelineResult == null) {
throw new IllegalStateException("Cannot stop service since it has never been started.");
}
LOGGER.info("Stoping Beam pipeline.");
try {
this.pipelineResult.cancel();
this.pipelineResult = null; // NOPMD use null to indicate absence
} catch (final IOException e) {
throw new IllegalStateException(
"Stoping the service failed due to failed stop of Beam pipeline.", e);
}
LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName);
}
}
......@@ -17,7 +17,7 @@ public final class Uc1BeamFlink {
private Uc1BeamFlink() {}
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run();
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
}
}
......
......@@ -21,6 +21,6 @@ public final class Uc1BeamSamza {
* Main method.
*/
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run();
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
}
}
......@@ -6,7 +6,7 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
/**
* {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam.
*
*
* @param <T> type the {@link DatabaseWriter} is associated with.
*/
public class WriterAdapter<T> extends DoFn<T, Void> {
......
......@@ -15,7 +15,7 @@ public final class Uc2BeamFlink {
private Uc2BeamFlink() {}
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run();
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
}
}
......@@ -19,7 +19,7 @@ public final class Uc2BeamSamza {
private Uc2BeamSamza() {}
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run();
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
}
}
......
......@@ -21,7 +21,7 @@ public final class Uc3BeamFlink {
* Start running this microservice.
*/
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run();
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
}
}
......@@ -21,7 +21,7 @@ public final class Uc3BeamSamza {
* Start running this microservice.
*/
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run();
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
}
}
......
......@@ -15,7 +15,7 @@ public final class Uc4BeamFlink {
* Start running this microservice.
*/
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run();
new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone();
}
}
......@@ -22,7 +22,7 @@ public final class Uc4BeamSamza {
* Start running this microservice.
*/
public static void main(final String[] args) {
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run();
new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment