Skip to content
Snippets Groups Projects
Commit e9cf920e authored by Christian Wulf's avatar Christian Wulf
Browse files

migrated io stages

parent 919c546a
No related branches found
No related tags found
No related merge requests found
...@@ -22,26 +22,22 @@ import java.sql.ResultSet; ...@@ -22,26 +22,22 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import teetime.variant.explicitScheduling.framework.core.Description;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
import kieker.common.exception.MonitoringRecordException; import kieker.common.exception.MonitoringRecordException;
import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
import teetime.variant.explicitScheduling.framework.core.AbstractFilter;
import teetime.variant.explicitScheduling.framework.core.Context;
import teetime.variant.explicitScheduling.framework.core.Description;
import teetime.variant.explicitScheduling.framework.core.IOutputPort;
/** /**
* A very simple database reader that probably only works for small data sets. * A very simple database reader that probably only works for small data sets.
* *
* @author Jan Waller, Nils Christian Ehmke * @author Jan Waller, Nils Christian Ehmke
* *
* @since 1.10 * @since 1.10
*/ */
@Description("A reader which reads records from a database") @Description("A reader which reads records from a database")
public class DbReader extends AbstractFilter<DbReader> { public class DbReader extends ProducerStage<Void, IMonitoringRecord> {
private final IOutputPort<DbReader, IMonitoringRecord> outputPort = super.createOutputPort();
@Description("The classname of the driver used for the connection.") @Description("The classname of the driver used for the connection.")
private String driverClassname = "org.apache.derby.jdbc.EmbeddedDrive"; private String driverClassname = "org.apache.derby.jdbc.EmbeddedDrive";
...@@ -53,24 +49,23 @@ public class DbReader extends AbstractFilter<DbReader> { ...@@ -53,24 +49,23 @@ public class DbReader extends AbstractFilter<DbReader> {
private volatile boolean running = true; private volatile boolean running = true;
@Override @Override
public void onPipelineStarts() throws Exception { public void onStart() {
super.onPipelineStarts();
try { try {
Class.forName(this.driverClassname).newInstance(); Class.forName(this.driverClassname).newInstance();
} catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck) } catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck)
throw new Exception("DB driver registration failed. Perhaps the driver jar is missing?", ex); throw new RuntimeException("DB driver registration failed. Perhaps the driver jar is missing?", ex);
} }
} }
@Override // @Override // TODO implement onStop
public void onPipelineStops() { // public void onPipelineStops() {
super.logger.info("Shutdown of DBReader requested."); // super.logger.info("Shutdown of DBReader requested.");
this.running = false; // this.running = false;
super.onPipelineStops(); // super.onPipelineStops();
} // }
@Override @Override
protected boolean execute(final Context<DbReader> context) { protected void execute5(final Void element) {
Connection connection = null; Connection connection = null;
try { try {
connection = DriverManager.getConnection(this.connectionString); connection = DriverManager.getConnection(this.connectionString);
...@@ -84,7 +79,7 @@ public class DbReader extends AbstractFilter<DbReader> { ...@@ -84,7 +79,7 @@ public class DbReader extends AbstractFilter<DbReader> {
final String tablename = indexTable.getString(1); final String tablename = indexTable.getString(1);
final String classname = indexTable.getString(2); final String classname = indexTable.getString(2);
try { // NOCS (nested try) try { // NOCS (nested try)
this.table2record(context, connection, tablename, AbstractMonitoringRecord.classForName(classname)); this.table2record(connection, tablename, AbstractMonitoringRecord.classForName(classname));
} catch (final MonitoringRecordException ex) { } catch (final MonitoringRecordException ex) {
// log error but continue with next table // log error but continue with next table
super.logger.error("Failed to load records of type " + classname + " from table " + tablename, ex); super.logger.error("Failed to load records of type " + classname + " from table " + tablename, ex);
...@@ -103,7 +98,6 @@ public class DbReader extends AbstractFilter<DbReader> { ...@@ -103,7 +98,6 @@ public class DbReader extends AbstractFilter<DbReader> {
} }
} catch (final SQLException ex) { } catch (final SQLException ex) {
super.logger.error("SQLException with SQLState: '" + ex.getSQLState() + "' and VendorError: '" + ex.getErrorCode() + "'", ex); super.logger.error("SQLException with SQLState: '" + ex.getSQLState() + "' and VendorError: '" + ex.getErrorCode() + "'", ex);
return false;
} finally { } finally {
if (connection != null) { if (connection != null) {
try { try {
...@@ -113,7 +107,6 @@ public class DbReader extends AbstractFilter<DbReader> { ...@@ -113,7 +107,6 @@ public class DbReader extends AbstractFilter<DbReader> {
} }
} }
} }
return true;
} }
public final String getDriverClassname() { public final String getDriverClassname() {
...@@ -142,7 +135,7 @@ public class DbReader extends AbstractFilter<DbReader> { ...@@ -142,7 +135,7 @@ public class DbReader extends AbstractFilter<DbReader> {
/** /**
* This method uses the given table to read records and sends them to the output port. * This method uses the given table to read records and sends them to the output port.
* *
* @param connection * @param connection
* The connection to the database which will be used. * The connection to the database which will be used.
* @param tablename * @param tablename
...@@ -154,7 +147,7 @@ public class DbReader extends AbstractFilter<DbReader> { ...@@ -154,7 +147,7 @@ public class DbReader extends AbstractFilter<DbReader> {
* @throws MonitoringRecordException * @throws MonitoringRecordException
* If the data within the table could not be converted into a valid record. * If the data within the table could not be converted into a valid record.
*/ */
private void table2record(final Context<DbReader> context, final Connection connection, final String tablename, final Class<? extends IMonitoringRecord> clazz) private void table2record(final Connection connection, final String tablename, final Class<? extends IMonitoringRecord> clazz)
throws SQLException, MonitoringRecordException { throws SQLException, MonitoringRecordException {
Statement selectRecord = null; Statement selectRecord = null;
try { try {
...@@ -170,7 +163,7 @@ public class DbReader extends AbstractFilter<DbReader> { ...@@ -170,7 +163,7 @@ public class DbReader extends AbstractFilter<DbReader> {
} }
final IMonitoringRecord record = AbstractMonitoringRecord.createFromArray(clazz, recordValues); final IMonitoringRecord record = AbstractMonitoringRecord.createFromArray(clazz, recordValues);
record.setLoggingTimestamp(records.getLong(2)); record.setLoggingTimestamp(records.getLong(2));
context.put(this.outputPort, record); this.send(record);
} }
} finally { } finally {
if (records != null) { if (records != null) {
......
...@@ -21,21 +21,14 @@ import java.io.FileFilter; ...@@ -21,21 +21,14 @@ import java.io.FileFilter;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import teetime.variant.explicitScheduling.framework.core.AbstractFilter; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.explicitScheduling.framework.core.Context;
import teetime.variant.explicitScheduling.framework.core.IInputPort;
import teetime.variant.explicitScheduling.framework.core.IOutputPort;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter> { public class Directory2FilesFilter extends ConsumerStage<File, File> {
public final IInputPort<Directory2FilesFilter, File> directoryInputPort = this.createInputPort();
public final IOutputPort<Directory2FilesFilter, File> fileOutputPort = this.createOutputPort();
private FileFilter filter; private FileFilter filter;
private Comparator<File> fileComparator; private Comparator<File> fileComparator;
...@@ -69,21 +62,13 @@ public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter> ...@@ -69,21 +62,13 @@ public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter>
super(); super();
} }
/**
* @since 1.10
*/
@Override @Override
protected boolean execute(final Context<Directory2FilesFilter> context) { protected void execute5(final File inputDir) {
final File inputDir = context.tryTake(this.directoryInputPort);
if (inputDir == null) {
return false;
}
final File[] inputFiles = inputDir.listFiles(this.filter); final File[] inputFiles = inputDir.listFiles(this.filter);
if (inputFiles == null) { if (inputFiles == null) {
this.logger.error("Directory '" + inputDir + "' does not exist or an I/O error occured."); this.logger.error("Directory '" + inputDir + "' does not exist or an I/O error occured.");
return true; return;
} }
if (this.fileComparator != null) { if (this.fileComparator != null) {
...@@ -91,10 +76,8 @@ public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter> ...@@ -91,10 +76,8 @@ public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter>
} }
for (final File file : inputFiles) { for (final File file : inputFiles) {
context.put(this.fileOutputPort, file); this.send(file);
} }
return true;
} }
public FileFilter getFilter() { public FileFilter getFilter() {
......
...@@ -20,10 +20,8 @@ import java.io.FileOutputStream; ...@@ -20,10 +20,8 @@ import java.io.FileOutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import teetime.variant.explicitScheduling.framework.core.AbstractFilter;
import teetime.variant.explicitScheduling.framework.core.Context;
import teetime.variant.explicitScheduling.framework.core.Description; import teetime.variant.explicitScheduling.framework.core.Description;
import teetime.variant.explicitScheduling.framework.core.IInputPort; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
/** /**
* @author Matthias Rohr, Jan Waller, Nils Christian Ehmke * @author Matthias Rohr, Jan Waller, Nils Christian Ehmke
...@@ -31,7 +29,7 @@ import teetime.variant.explicitScheduling.framework.core.IInputPort; ...@@ -31,7 +29,7 @@ import teetime.variant.explicitScheduling.framework.core.IInputPort;
* @since 1.10 * @since 1.10
*/ */
@Description("A filter to print objects to a configured stream") @Description("A filter to print objects to a configured stream")
public class Printer<T> extends AbstractFilter<Printer<T>> { public class Printer<T> extends ConsumerStage<T, Void> {
public static final String STREAM_STDOUT = "STDOUT"; public static final String STREAM_STDOUT = "STDOUT";
public static final String STREAM_STDERR = "STDERR"; public static final String STREAM_STDERR = "STDERR";
...@@ -40,14 +38,29 @@ public class Printer<T> extends AbstractFilter<Printer<T>> { ...@@ -40,14 +38,29 @@ public class Printer<T> extends AbstractFilter<Printer<T>> {
public static final String ENCODING_UTF8 = "UTF-8"; public static final String ENCODING_UTF8 = "UTF-8";
public final IInputPort<Printer<T>, T> input = this.createInputPort();
private PrintStream printStream; private PrintStream printStream;
private String streamName = STREAM_STDOUT; private String streamName = STREAM_STDOUT;
private String encoding = ENCODING_UTF8; private String encoding = ENCODING_UTF8;
private boolean active = true; private boolean active = true;
private boolean append = true; private boolean append = true;
@Override
protected void execute5(final T object) {
if (this.active) {
final StringBuilder sb = new StringBuilder(128);
sb.append(super.getId());
sb.append('(').append(object.getClass().getSimpleName()).append(") ").append(object.toString());
final String msg = sb.toString();
if (this.printStream != null) {
this.printStream.println(msg);
} else {
super.logger.info(msg);
}
}
}
public String getStreamName() { public String getStreamName() {
return this.streamName; return this.streamName;
} }
...@@ -73,16 +86,15 @@ public class Printer<T> extends AbstractFilter<Printer<T>> { ...@@ -73,16 +86,15 @@ public class Printer<T> extends AbstractFilter<Printer<T>> {
} }
@Override @Override
public void onPipelineStarts() throws Exception { public void onStart() {
super.onPipelineStarts();
this.initializeStream(); this.initializeStream();
} }
@Override // @Override // TODO implement onStop
public void onPipelineStops() { // public void onPipelineStops() {
this.closeStream(); // this.closeStream();
super.onPipelineStops(); // super.onPipelineStops();
} // }
private void initializeStream() { private void initializeStream() {
if (STREAM_STDOUT.equals(this.streamName)) { if (STREAM_STDOUT.equals(this.streamName)) {
...@@ -117,28 +129,4 @@ public class Printer<T> extends AbstractFilter<Printer<T>> { ...@@ -117,28 +129,4 @@ public class Printer<T> extends AbstractFilter<Printer<T>> {
} }
} }
@Override
protected boolean execute(final Context<Printer<T>> context) {
final T object = context.tryTake(this.input);
if (null == object) {
return false;
}
if (this.active) {
final StringBuilder sb = new StringBuilder(128);
sb.append(super.getId());
sb.append('(').append(object.getClass().getSimpleName()).append(") ").append(object.toString());
final String msg = sb.toString();
if (this.printStream != null) {
this.printStream.println(msg);
} else {
super.logger.info(msg);
}
}
return true;
}
} }
...@@ -22,52 +22,36 @@ import java.nio.ByteBuffer; ...@@ -22,52 +22,36 @@ import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
import kieker.common.exception.MonitoringRecordException; import kieker.common.exception.MonitoringRecordException;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
import kieker.common.record.misc.RegistryRecord;
import kieker.common.util.registry.ILookup; import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup; import kieker.common.util.registry.Lookup;
import teetime.variant.explicitScheduling.framework.core.AbstractFilter;
import teetime.variant.explicitScheduling.framework.core.Context;
import teetime.variant.explicitScheduling.framework.core.IOutputPort;
/** /**
* This is a reader which reads the records from a TCP port. * This is a reader which reads the records from a TCP port.
* *
* @author Jan Waller, Nils Christian Ehmke * @author Jan Waller, Nils Christian Ehmke
* *
* @since 1.10 * @since 1.10
*/ */
public class TCPReader extends AbstractFilter<TCPReader> { public class TCPReader extends ProducerStage<Void, IMonitoringRecord> {
private static final int MESSAGE_BUFFER_SIZE = 65535; private static final int MESSAGE_BUFFER_SIZE = 65535;
private final IOutputPort<TCPReader, IMonitoringRecord> outputPort = super.createOutputPort();
// BETTER use a non thread-safe implementation to increase performance. A thread-safe version is not necessary. // BETTER use a non thread-safe implementation to increase performance. A thread-safe version is not necessary.
private final ILookup<String> stringRegistry = new Lookup<String>(); private final ILookup<String> stringRegistry = new Lookup<String>();
private int port1 = 10133; private int port1 = 10133;
private int port2 = 10134; private int port2 = 10134;
@Override // @Override // implement onStop
public void onPipelineStarts() throws Exception { // public void onPipelineStops() {
super.onPipelineStarts(); // super.logger.info("Shutdown of TCPReader requested.");
// // TODO actually implement terminate!
// FIXME use the implementation from the thread or from execute(), but not both // super.onPipelineStops();
final TCPStringReader tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry); // }
tcpStringReader.start();
}
@Override
public void onPipelineStops() {
super.logger.info("Shutdown of TCPReader requested.");
// TODO actually implement terminate!
super.onPipelineStops();
}
public final int getPort1() { public final int getPort1() {
return this.port1; return this.port1;
...@@ -86,7 +70,7 @@ public class TCPReader extends AbstractFilter<TCPReader> { ...@@ -86,7 +70,7 @@ public class TCPReader extends AbstractFilter<TCPReader> {
} }
@Override @Override
protected boolean execute(final Context<TCPReader> context) { protected void execute5(final Void element) {
ServerSocketChannel serversocket = null; ServerSocketChannel serversocket = null;
try { try {
serversocket = ServerSocketChannel.open(); serversocket = ServerSocketChannel.open();
...@@ -109,7 +93,7 @@ public class TCPReader extends AbstractFilter<TCPReader> { ...@@ -109,7 +93,7 @@ public class TCPReader extends AbstractFilter<TCPReader> {
try { // NOCS (Nested try-catch) try { // NOCS (Nested try-catch)
record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp); record.setLoggingTimestamp(loggingTimestamp);
context.put(this.outputPort, record); this.send(record);
} catch (final MonitoringRecordException ex) { } catch (final MonitoringRecordException ex) {
super.logger.error("Failed to create record.", ex); super.logger.error("Failed to create record.", ex);
} }
...@@ -126,7 +110,6 @@ public class TCPReader extends AbstractFilter<TCPReader> { ...@@ -126,7 +110,6 @@ public class TCPReader extends AbstractFilter<TCPReader> {
// END also loop this one? // END also loop this one?
} catch (final IOException ex) { } catch (final IOException ex) {
super.logger.error("Error while reading", ex); super.logger.error("Error while reading", ex);
return false;
} finally { } finally {
if (null != serversocket) { if (null != serversocket) {
try { try {
...@@ -138,70 +121,5 @@ public class TCPReader extends AbstractFilter<TCPReader> { ...@@ -138,70 +121,5 @@ public class TCPReader extends AbstractFilter<TCPReader> {
} }
} }
} }
return true;
}
}
/**
*
* @author Jan Waller
*
* @since 1.8
*/
class TCPStringReader extends Thread {
private static final int MESSAGE_BUFFER_SIZE = 65535;
private static final Log LOG = LogFactory.getLog(TCPStringReader.class);
private final int port;
private final ILookup<String> stringRegistry;
public TCPStringReader(final int port, final ILookup<String> stringRegistry) {
this.port = port;
this.stringRegistry = stringRegistry;
}
@Override
public void run() {
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port));
if (LOG.isDebugEnabled()) {
LOG.debug("Listening on port " + this.port);
}
// BEGIN also loop this one?
final SocketChannel socketChannel = serversocket.accept();
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
while (socketChannel.read(buffer) != -1) {
buffer.flip();
try {
while (buffer.hasRemaining()) {
buffer.mark();
RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry);
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
buffer.reset();
buffer.compact();
}
}
socketChannel.close();
// END also loop this one?
} catch (final IOException ex) {
LOG.error("Error while reading", ex);
} finally {
if (null != serversocket) {
try {
serversocket.close();
} catch (final IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to close TCP connection!", e);
}
}
}
}
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment