From df4ebc105792fa489df62e13d316e964e329cc8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Thu, 24 Mar 2022 11:54:07 +0100 Subject: [PATCH] Support pipeline cancelation from BeamService --- .../benchmarks/commons/beam/BeamService.java | 36 +++++++++++++++++-- .../uc1/beam/flink/Uc1BeamFlink.java | 2 +- .../uc1/beam/samza/Uc1BeamSamza.java | 2 +- .../benchmarks/uc1/beam/WriterAdapter.java | 2 +- .../uc2/beam/flink/Uc2BeamFlink.java | 2 +- .../uc2/beam/samza/Uc2BeamSamza.java | 2 +- .../uc3/beam/flink/Uc3BeamFlink.java | 2 +- .../uc3/beam/samza/Uc3BeamSamza.java | 2 +- .../uc4/beam/flink/Uc4BeamFlink.java | 2 +- .../uc4/beam/samza/Uc4BeamSamza.java | 2 +- 10 files changed, 43 insertions(+), 11 deletions(-) diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java index a4a8f69d7..607f591ff 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/BeamService.java @@ -1,7 +1,9 @@ package rocks.theodolite.benchmarks.commons.beam; +import java.io.IOException; import java.util.function.Function; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -23,6 +25,7 @@ public class BeamService { private final AbstractPipelineFactory pipelineFactory; private final PipelineOptions pipelineOptions; + private PipelineResult pipelineResult; /** * Create a new {@link BeamService}. @@ -43,14 +46,43 @@ public class BeamService { } /** - * Start this microservice, by running the underlying Beam pipeline. + * Start this microservice by running the underlying Beam pipeline. */ public void run() { LOGGER.info("Constructing Beam pipeline with pipeline options: {}", this.pipelineOptions.toString()); final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions); LOGGER.info("Starting BeamService {}.", this.applicationName); - pipeline.run().waitUntilFinish(); + this.pipelineResult = pipeline.run(); + } + + /** + * Start this microservice by running the underlying Beam pipeline and block until this process is + * terminated. + */ + public void runStandalone() { + this.run(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> this.stop())); + this.pipelineResult.waitUntilFinish(); + } + + /** + * Stop this microservice by canceling the underlying Beam pipeline. + */ + public void stop() { + LOGGER.info("Initiate shutdown of Beam service {}.", this.applicationName); + if (this.pipelineResult == null) { + throw new IllegalStateException("Cannot stop service since it has never been started."); + } + LOGGER.info("Stoping Beam pipeline."); + try { + this.pipelineResult.cancel(); + this.pipelineResult = null; // NOPMD use null to indicate absence + } catch (final IOException e) { + throw new IllegalStateException( + "Stoping the service failed due to failed stop of Beam pipeline.", e); + } + LOGGER.info("Shutdown of Beam service {} complete.", this.applicationName); } } diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java index e1317219f..7f3950043 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc1/beam/flink/Uc1BeamFlink.java @@ -17,7 +17,7 @@ public final class Uc1BeamFlink { private Uc1BeamFlink() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java index d3455db71..9c3f650b7 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc1/beam/samza/Uc1BeamSamza.java @@ -21,6 +21,6 @@ public final class Uc1BeamSamza { * Main method. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java index 4519515cf..c1dc2f730 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/rocks/theodolite/benchmarks/uc1/beam/WriterAdapter.java @@ -6,7 +6,7 @@ import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter; /** * {@link DoFn} which wraps a {@link DatabaseAdapter} to be used with Beam. - * + * * @param <T> type the {@link DatabaseWriter} is associated with. */ public class WriterAdapter<T> extends DoFn<T, Void> { diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java index ab6a9992a..2772d76fa 100644 --- a/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java +++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc2/beam/flink/Uc2BeamFlink.java @@ -15,7 +15,7 @@ public final class Uc2BeamFlink { private Uc2BeamFlink() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java b/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java index 80981818d..1b3f4ac8a 100644 --- a/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java +++ b/theodolite-benchmarks/uc2-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc2/beam/samza/Uc2BeamSamza.java @@ -19,7 +19,7 @@ public final class Uc2BeamSamza { private Uc2BeamSamza() {} public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java index 8782559fe..f4f456392 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc3/beam/flink/Uc3BeamFlink.java @@ -21,7 +21,7 @@ public final class Uc3BeamFlink { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java b/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java index 84e705f6f..247cd99be 100644 --- a/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java +++ b/theodolite-benchmarks/uc3-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc3/beam/samza/Uc3BeamSamza.java @@ -21,7 +21,7 @@ public final class Uc3BeamSamza { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java b/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java index 5d398d610..f5f9af3fc 100644 --- a/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java +++ b/theodolite-benchmarks/uc4-beam-flink/src/main/java/rocks/theodolite/benchmarks/uc4/beam/flink/Uc4BeamFlink.java @@ -15,7 +15,7 @@ public final class Uc4BeamFlink { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).runStandalone(); } } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java index 044b3dc4b..585e3ff95 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/rocks/theodolite/benchmarks/uc4/beam/samza/Uc4BeamSamza.java @@ -22,7 +22,7 @@ public final class Uc4BeamSamza { * Start running this microservice. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).runStandalone(); } } -- GitLab