diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java index 5dad520e5583d15f3c60e85d179f3765c4e65b44..a5079f94a3201a1f794ab281fef8353d93491249 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java @@ -22,26 +22,22 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import teetime.variant.explicitScheduling.framework.core.Description; +import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; + import kieker.common.exception.MonitoringRecordException; import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.IMonitoringRecord; -import teetime.variant.explicitScheduling.framework.core.AbstractFilter; -import teetime.variant.explicitScheduling.framework.core.Context; -import teetime.variant.explicitScheduling.framework.core.Description; -import teetime.variant.explicitScheduling.framework.core.IOutputPort; - /** * A very simple database reader that probably only works for small data sets. - * + * * @author Jan Waller, Nils Christian Ehmke - * + * * @since 1.10 */ @Description("A reader which reads records from a database") -public class DbReader extends AbstractFilter<DbReader> { - - private final IOutputPort<DbReader, IMonitoringRecord> outputPort = super.createOutputPort(); +public class DbReader extends ProducerStage<Void, IMonitoringRecord> { @Description("The classname of the driver used for the connection.") private String driverClassname = "org.apache.derby.jdbc.EmbeddedDrive"; @@ -53,24 +49,23 @@ public class DbReader extends AbstractFilter<DbReader> { private volatile boolean running = true; @Override - public void onPipelineStarts() throws Exception { - super.onPipelineStarts(); + public void onStart() { try { Class.forName(this.driverClassname).newInstance(); } catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck) - throw new Exception("DB driver registration failed. Perhaps the driver jar is missing?", ex); + throw new RuntimeException("DB driver registration failed. Perhaps the driver jar is missing?", ex); } } - @Override - public void onPipelineStops() { - super.logger.info("Shutdown of DBReader requested."); - this.running = false; - super.onPipelineStops(); - } + // @Override // TODO implement onStop + // public void onPipelineStops() { + // super.logger.info("Shutdown of DBReader requested."); + // this.running = false; + // super.onPipelineStops(); + // } @Override - protected boolean execute(final Context<DbReader> context) { + protected void execute5(final Void element) { Connection connection = null; try { connection = DriverManager.getConnection(this.connectionString); @@ -84,7 +79,7 @@ public class DbReader extends AbstractFilter<DbReader> { final String tablename = indexTable.getString(1); final String classname = indexTable.getString(2); try { // NOCS (nested try) - this.table2record(context, connection, tablename, AbstractMonitoringRecord.classForName(classname)); + this.table2record(connection, tablename, AbstractMonitoringRecord.classForName(classname)); } catch (final MonitoringRecordException ex) { // log error but continue with next table super.logger.error("Failed to load records of type " + classname + " from table " + tablename, ex); @@ -103,7 +98,6 @@ public class DbReader extends AbstractFilter<DbReader> { } } catch (final SQLException ex) { super.logger.error("SQLException with SQLState: '" + ex.getSQLState() + "' and VendorError: '" + ex.getErrorCode() + "'", ex); - return false; } finally { if (connection != null) { try { @@ -113,7 +107,6 @@ public class DbReader extends AbstractFilter<DbReader> { } } } - return true; } public final String getDriverClassname() { @@ -142,7 +135,7 @@ public class DbReader extends AbstractFilter<DbReader> { /** * This method uses the given table to read records and sends them to the output port. - * + * * @param connection * The connection to the database which will be used. * @param tablename @@ -154,7 +147,7 @@ public class DbReader extends AbstractFilter<DbReader> { * @throws MonitoringRecordException * If the data within the table could not be converted into a valid record. */ - private void table2record(final Context<DbReader> context, final Connection connection, final String tablename, final Class<? extends IMonitoringRecord> clazz) + private void table2record(final Connection connection, final String tablename, final Class<? extends IMonitoringRecord> clazz) throws SQLException, MonitoringRecordException { Statement selectRecord = null; try { @@ -170,7 +163,7 @@ public class DbReader extends AbstractFilter<DbReader> { } final IMonitoringRecord record = AbstractMonitoringRecord.createFromArray(clazz, recordValues); record.setLoggingTimestamp(records.getLong(2)); - context.put(this.outputPort, record); + this.send(record); } } finally { if (records != null) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java index 0de805e15fde11cb1815c02f717ffcb377f1a876..3e728acbadacb011dd4756ce721f6c81b43b00c0 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java @@ -21,21 +21,14 @@ import java.io.FileFilter; import java.util.Arrays; import java.util.Comparator; -import teetime.variant.explicitScheduling.framework.core.AbstractFilter; -import teetime.variant.explicitScheduling.framework.core.Context; -import teetime.variant.explicitScheduling.framework.core.IInputPort; -import teetime.variant.explicitScheduling.framework.core.IOutputPort; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; /** * @author Christian Wulf * * @since 1.10 */ -public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter> { - - public final IInputPort<Directory2FilesFilter, File> directoryInputPort = this.createInputPort(); - - public final IOutputPort<Directory2FilesFilter, File> fileOutputPort = this.createOutputPort(); +public class Directory2FilesFilter extends ConsumerStage<File, File> { private FileFilter filter; private Comparator<File> fileComparator; @@ -69,21 +62,13 @@ public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter> super(); } - /** - * @since 1.10 - */ @Override - protected boolean execute(final Context<Directory2FilesFilter> context) { - final File inputDir = context.tryTake(this.directoryInputPort); - if (inputDir == null) { - return false; - } - + protected void execute5(final File inputDir) { final File[] inputFiles = inputDir.listFiles(this.filter); if (inputFiles == null) { this.logger.error("Directory '" + inputDir + "' does not exist or an I/O error occured."); - return true; + return; } if (this.fileComparator != null) { @@ -91,10 +76,8 @@ public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter> } for (final File file : inputFiles) { - context.put(this.fileOutputPort, file); + this.send(file); } - - return true; } public FileFilter getFilter() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java index bb38e50e12fb6cedce26b7ccd72ce48bad530403..af115a5325bd72fd07e82fd0b78bfb5bc18f1222 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java @@ -20,10 +20,8 @@ import java.io.FileOutputStream; import java.io.PrintStream; import java.io.UnsupportedEncodingException; -import teetime.variant.explicitScheduling.framework.core.AbstractFilter; -import teetime.variant.explicitScheduling.framework.core.Context; import teetime.variant.explicitScheduling.framework.core.Description; -import teetime.variant.explicitScheduling.framework.core.IInputPort; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; /** * @author Matthias Rohr, Jan Waller, Nils Christian Ehmke @@ -31,7 +29,7 @@ import teetime.variant.explicitScheduling.framework.core.IInputPort; * @since 1.10 */ @Description("A filter to print objects to a configured stream") -public class Printer<T> extends AbstractFilter<Printer<T>> { +public class Printer<T> extends ConsumerStage<T, Void> { public static final String STREAM_STDOUT = "STDOUT"; public static final String STREAM_STDERR = "STDERR"; @@ -40,14 +38,29 @@ public class Printer<T> extends AbstractFilter<Printer<T>> { public static final String ENCODING_UTF8 = "UTF-8"; - public final IInputPort<Printer<T>, T> input = this.createInputPort(); - private PrintStream printStream; private String streamName = STREAM_STDOUT; private String encoding = ENCODING_UTF8; private boolean active = true; private boolean append = true; + @Override + protected void execute5(final T object) { + if (this.active) { + final StringBuilder sb = new StringBuilder(128); + + sb.append(super.getId()); + sb.append('(').append(object.getClass().getSimpleName()).append(") ").append(object.toString()); + + final String msg = sb.toString(); + if (this.printStream != null) { + this.printStream.println(msg); + } else { + super.logger.info(msg); + } + } + } + public String getStreamName() { return this.streamName; } @@ -73,16 +86,15 @@ public class Printer<T> extends AbstractFilter<Printer<T>> { } @Override - public void onPipelineStarts() throws Exception { - super.onPipelineStarts(); + public void onStart() { this.initializeStream(); } - @Override - public void onPipelineStops() { - this.closeStream(); - super.onPipelineStops(); - } + // @Override // TODO implement onStop + // public void onPipelineStops() { + // this.closeStream(); + // super.onPipelineStops(); + // } private void initializeStream() { if (STREAM_STDOUT.equals(this.streamName)) { @@ -117,28 +129,4 @@ public class Printer<T> extends AbstractFilter<Printer<T>> { } } - @Override - protected boolean execute(final Context<Printer<T>> context) { - final T object = context.tryTake(this.input); - if (null == object) { - return false; - } - - if (this.active) { - final StringBuilder sb = new StringBuilder(128); - - sb.append(super.getId()); - sb.append('(').append(object.getClass().getSimpleName()).append(") ").append(object.toString()); - - final String msg = sb.toString(); - if (this.printStream != null) { - this.printStream.println(msg); - } else { - super.logger.info(msg); - } - } - - return true; - } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java index 33f1583db8acfc272b4ececea426e839f737bd38..10b3c8b1f576bd10fe564bb9efd39908b6f0e792 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -22,52 +22,36 @@ import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; + import kieker.common.exception.MonitoringRecordException; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.IMonitoringRecord; -import kieker.common.record.misc.RegistryRecord; import kieker.common.util.registry.ILookup; import kieker.common.util.registry.Lookup; -import teetime.variant.explicitScheduling.framework.core.AbstractFilter; -import teetime.variant.explicitScheduling.framework.core.Context; -import teetime.variant.explicitScheduling.framework.core.IOutputPort; - /** * This is a reader which reads the records from a TCP port. - * + * * @author Jan Waller, Nils Christian Ehmke - * + * * @since 1.10 */ -public class TCPReader extends AbstractFilter<TCPReader> { +public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { private static final int MESSAGE_BUFFER_SIZE = 65535; - private final IOutputPort<TCPReader, IMonitoringRecord> outputPort = super.createOutputPort(); - // BETTER use a non thread-safe implementation to increase performance. A thread-safe version is not necessary. private final ILookup<String> stringRegistry = new Lookup<String>(); private int port1 = 10133; private int port2 = 10134; - @Override - public void onPipelineStarts() throws Exception { - super.onPipelineStarts(); - - // FIXME use the implementation from the thread or from execute(), but not both - final TCPStringReader tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry); - tcpStringReader.start(); - } - - @Override - public void onPipelineStops() { - super.logger.info("Shutdown of TCPReader requested."); - // TODO actually implement terminate! - super.onPipelineStops(); - } + // @Override // implement onStop + // public void onPipelineStops() { + // super.logger.info("Shutdown of TCPReader requested."); + // // TODO actually implement terminate! + // super.onPipelineStops(); + // } public final int getPort1() { return this.port1; @@ -86,7 +70,7 @@ public class TCPReader extends AbstractFilter<TCPReader> { } @Override - protected boolean execute(final Context<TCPReader> context) { + protected void execute5(final Void element) { ServerSocketChannel serversocket = null; try { serversocket = ServerSocketChannel.open(); @@ -109,7 +93,7 @@ public class TCPReader extends AbstractFilter<TCPReader> { try { // NOCS (Nested try-catch) record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); record.setLoggingTimestamp(loggingTimestamp); - context.put(this.outputPort, record); + this.send(record); } catch (final MonitoringRecordException ex) { super.logger.error("Failed to create record.", ex); } @@ -126,7 +110,6 @@ public class TCPReader extends AbstractFilter<TCPReader> { // END also loop this one? } catch (final IOException ex) { super.logger.error("Error while reading", ex); - return false; } finally { if (null != serversocket) { try { @@ -138,70 +121,5 @@ public class TCPReader extends AbstractFilter<TCPReader> { } } } - return true; - } - -} - -/** - * - * @author Jan Waller - * - * @since 1.8 - */ -class TCPStringReader extends Thread { - - private static final int MESSAGE_BUFFER_SIZE = 65535; - - private static final Log LOG = LogFactory.getLog(TCPStringReader.class); - - private final int port; - private final ILookup<String> stringRegistry; - - public TCPStringReader(final int port, final ILookup<String> stringRegistry) { - this.port = port; - this.stringRegistry = stringRegistry; - } - - @Override - public void run() { - ServerSocketChannel serversocket = null; - try { - serversocket = ServerSocketChannel.open(); - serversocket.socket().bind(new InetSocketAddress(this.port)); - if (LOG.isDebugEnabled()) { - LOG.debug("Listening on port " + this.port); - } - // BEGIN also loop this one? - final SocketChannel socketChannel = serversocket.accept(); - final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); - while (socketChannel.read(buffer) != -1) { - buffer.flip(); - try { - while (buffer.hasRemaining()) { - buffer.mark(); - RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry); - } - buffer.clear(); - } catch (final BufferUnderflowException ex) { - buffer.reset(); - buffer.compact(); - } - } - socketChannel.close(); - // END also loop this one? - } catch (final IOException ex) { - LOG.error("Error while reading", ex); - } finally { - if (null != serversocket) { - try { - serversocket.close(); - } catch (final IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to close TCP connection!", e); - } - } - } - } } }