Skip to content
Snippets Groups Projects
Commit c4cece98 authored by Christian Wulf's avatar Christian Wulf
Browse files

migrated io stages

parent fce69aa9
Branches
Tags
No related merge requests found
......@@ -22,15 +22,13 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import teetime.variant.explicitScheduling.framework.core.Description;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
import kieker.common.exception.MonitoringRecordException;
import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord;
import teetime.variant.explicitScheduling.framework.core.AbstractFilter;
import teetime.variant.explicitScheduling.framework.core.Context;
import teetime.variant.explicitScheduling.framework.core.Description;
import teetime.variant.explicitScheduling.framework.core.IOutputPort;
/**
* A very simple database reader that probably only works for small data sets.
*
......@@ -39,9 +37,7 @@ import teetime.variant.explicitScheduling.framework.core.IOutputPort;
* @since 1.10
*/
@Description("A reader which reads records from a database")
public class DbReader extends AbstractFilter<DbReader> {
private final IOutputPort<DbReader, IMonitoringRecord> outputPort = super.createOutputPort();
public class DbReader extends ProducerStage<Void, IMonitoringRecord> {
@Description("The classname of the driver used for the connection.")
private String driverClassname = "org.apache.derby.jdbc.EmbeddedDrive";
......@@ -53,24 +49,23 @@ public class DbReader extends AbstractFilter<DbReader> {
private volatile boolean running = true;
@Override
public void onPipelineStarts() throws Exception {
super.onPipelineStarts();
public void onStart() {
try {
Class.forName(this.driverClassname).newInstance();
} catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck)
throw new Exception("DB driver registration failed. Perhaps the driver jar is missing?", ex);
throw new RuntimeException("DB driver registration failed. Perhaps the driver jar is missing?", ex);
}
}
@Override
public void onPipelineStops() {
super.logger.info("Shutdown of DBReader requested.");
this.running = false;
super.onPipelineStops();
}
// @Override // TODO implement onStop
// public void onPipelineStops() {
// super.logger.info("Shutdown of DBReader requested.");
// this.running = false;
// super.onPipelineStops();
// }
@Override
protected boolean execute(final Context<DbReader> context) {
protected void execute5(final Void element) {
Connection connection = null;
try {
connection = DriverManager.getConnection(this.connectionString);
......@@ -84,7 +79,7 @@ public class DbReader extends AbstractFilter<DbReader> {
final String tablename = indexTable.getString(1);
final String classname = indexTable.getString(2);
try { // NOCS (nested try)
this.table2record(context, connection, tablename, AbstractMonitoringRecord.classForName(classname));
this.table2record(connection, tablename, AbstractMonitoringRecord.classForName(classname));
} catch (final MonitoringRecordException ex) {
// log error but continue with next table
super.logger.error("Failed to load records of type " + classname + " from table " + tablename, ex);
......@@ -103,7 +98,6 @@ public class DbReader extends AbstractFilter<DbReader> {
}
} catch (final SQLException ex) {
super.logger.error("SQLException with SQLState: '" + ex.getSQLState() + "' and VendorError: '" + ex.getErrorCode() + "'", ex);
return false;
} finally {
if (connection != null) {
try {
......@@ -113,7 +107,6 @@ public class DbReader extends AbstractFilter<DbReader> {
}
}
}
return true;
}
public final String getDriverClassname() {
......@@ -154,7 +147,7 @@ public class DbReader extends AbstractFilter<DbReader> {
* @throws MonitoringRecordException
* If the data within the table could not be converted into a valid record.
*/
private void table2record(final Context<DbReader> context, final Connection connection, final String tablename, final Class<? extends IMonitoringRecord> clazz)
private void table2record(final Connection connection, final String tablename, final Class<? extends IMonitoringRecord> clazz)
throws SQLException, MonitoringRecordException {
Statement selectRecord = null;
try {
......@@ -170,7 +163,7 @@ public class DbReader extends AbstractFilter<DbReader> {
}
final IMonitoringRecord record = AbstractMonitoringRecord.createFromArray(clazz, recordValues);
record.setLoggingTimestamp(records.getLong(2));
context.put(this.outputPort, record);
this.send(record);
}
} finally {
if (records != null) {
......
......
......@@ -21,21 +21,14 @@ import java.io.FileFilter;
import java.util.Arrays;
import java.util.Comparator;
import teetime.variant.explicitScheduling.framework.core.AbstractFilter;
import teetime.variant.explicitScheduling.framework.core.Context;
import teetime.variant.explicitScheduling.framework.core.IInputPort;
import teetime.variant.explicitScheduling.framework.core.IOutputPort;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter> {
public final IInputPort<Directory2FilesFilter, File> directoryInputPort = this.createInputPort();
public final IOutputPort<Directory2FilesFilter, File> fileOutputPort = this.createOutputPort();
public class Directory2FilesFilter extends ConsumerStage<File, File> {
private FileFilter filter;
private Comparator<File> fileComparator;
......@@ -69,21 +62,13 @@ public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter>
super();
}
/**
* @since 1.10
*/
@Override
protected boolean execute(final Context<Directory2FilesFilter> context) {
final File inputDir = context.tryTake(this.directoryInputPort);
if (inputDir == null) {
return false;
}
protected void execute5(final File inputDir) {
final File[] inputFiles = inputDir.listFiles(this.filter);
if (inputFiles == null) {
this.logger.error("Directory '" + inputDir + "' does not exist or an I/O error occured.");
return true;
return;
}
if (this.fileComparator != null) {
......@@ -91,10 +76,8 @@ public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter>
}
for (final File file : inputFiles) {
context.put(this.fileOutputPort, file);
this.send(file);
}
return true;
}
public FileFilter getFilter() {
......
......
......@@ -20,10 +20,8 @@ import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import teetime.variant.explicitScheduling.framework.core.AbstractFilter;
import teetime.variant.explicitScheduling.framework.core.Context;
import teetime.variant.explicitScheduling.framework.core.Description;
import teetime.variant.explicitScheduling.framework.core.IInputPort;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
/**
* @author Matthias Rohr, Jan Waller, Nils Christian Ehmke
......@@ -31,7 +29,7 @@ import teetime.variant.explicitScheduling.framework.core.IInputPort;
* @since 1.10
*/
@Description("A filter to print objects to a configured stream")
public class Printer<T> extends AbstractFilter<Printer<T>> {
public class Printer<T> extends ConsumerStage<T, Void> {
public static final String STREAM_STDOUT = "STDOUT";
public static final String STREAM_STDERR = "STDERR";
......@@ -40,14 +38,29 @@ public class Printer<T> extends AbstractFilter<Printer<T>> {
public static final String ENCODING_UTF8 = "UTF-8";
public final IInputPort<Printer<T>, T> input = this.createInputPort();
private PrintStream printStream;
private String streamName = STREAM_STDOUT;
private String encoding = ENCODING_UTF8;
private boolean active = true;
private boolean append = true;
@Override
protected void execute5(final T object) {
if (this.active) {
final StringBuilder sb = new StringBuilder(128);
sb.append(super.getId());
sb.append('(').append(object.getClass().getSimpleName()).append(") ").append(object.toString());
final String msg = sb.toString();
if (this.printStream != null) {
this.printStream.println(msg);
} else {
super.logger.info(msg);
}
}
}
public String getStreamName() {
return this.streamName;
}
......@@ -73,16 +86,15 @@ public class Printer<T> extends AbstractFilter<Printer<T>> {
}
@Override
public void onPipelineStarts() throws Exception {
super.onPipelineStarts();
public void onStart() {
this.initializeStream();
}
@Override
public void onPipelineStops() {
this.closeStream();
super.onPipelineStops();
}
// @Override // TODO implement onStop
// public void onPipelineStops() {
// this.closeStream();
// super.onPipelineStops();
// }
private void initializeStream() {
if (STREAM_STDOUT.equals(this.streamName)) {
......@@ -117,28 +129,4 @@ public class Printer<T> extends AbstractFilter<Printer<T>> {
}
}
@Override
protected boolean execute(final Context<Printer<T>> context) {
final T object = context.tryTake(this.input);
if (null == object) {
return false;
}
if (this.active) {
final StringBuilder sb = new StringBuilder(128);
sb.append(super.getId());
sb.append('(').append(object.getClass().getSimpleName()).append(") ").append(object.toString());
final String msg = sb.toString();
if (this.printStream != null) {
this.printStream.println(msg);
} else {
super.logger.info(msg);
}
}
return true;
}
}
......@@ -22,19 +22,14 @@ import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
import kieker.common.exception.MonitoringRecordException;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.misc.RegistryRecord;
import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup;
import teetime.variant.explicitScheduling.framework.core.AbstractFilter;
import teetime.variant.explicitScheduling.framework.core.Context;
import teetime.variant.explicitScheduling.framework.core.IOutputPort;
/**
* This is a reader which reads the records from a TCP port.
*
......@@ -42,32 +37,21 @@ import teetime.variant.explicitScheduling.framework.core.IOutputPort;
*
* @since 1.10
*/
public class TCPReader extends AbstractFilter<TCPReader> {
public class TCPReader extends ProducerStage<Void, IMonitoringRecord> {
private static final int MESSAGE_BUFFER_SIZE = 65535;
private final IOutputPort<TCPReader, IMonitoringRecord> outputPort = super.createOutputPort();
// BETTER use a non thread-safe implementation to increase performance. A thread-safe version is not necessary.
private final ILookup<String> stringRegistry = new Lookup<String>();
private int port1 = 10133;
private int port2 = 10134;
@Override
public void onPipelineStarts() throws Exception {
super.onPipelineStarts();
// FIXME use the implementation from the thread or from execute(), but not both
final TCPStringReader tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry);
tcpStringReader.start();
}
@Override
public void onPipelineStops() {
super.logger.info("Shutdown of TCPReader requested.");
// TODO actually implement terminate!
super.onPipelineStops();
}
// @Override // implement onStop
// public void onPipelineStops() {
// super.logger.info("Shutdown of TCPReader requested.");
// // TODO actually implement terminate!
// super.onPipelineStops();
// }
public final int getPort1() {
return this.port1;
......@@ -86,7 +70,7 @@ public class TCPReader extends AbstractFilter<TCPReader> {
}
@Override
protected boolean execute(final Context<TCPReader> context) {
protected void execute5(final Void element) {
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
......@@ -109,7 +93,7 @@ public class TCPReader extends AbstractFilter<TCPReader> {
try { // NOCS (Nested try-catch)
record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp);
context.put(this.outputPort, record);
this.send(record);
} catch (final MonitoringRecordException ex) {
super.logger.error("Failed to create record.", ex);
}
......@@ -126,7 +110,6 @@ public class TCPReader extends AbstractFilter<TCPReader> {
// END also loop this one?
} catch (final IOException ex) {
super.logger.error("Error while reading", ex);
return false;
} finally {
if (null != serversocket) {
try {
......@@ -138,70 +121,5 @@ public class TCPReader extends AbstractFilter<TCPReader> {
}
}
}
return true;
}
}
/**
*
* @author Jan Waller
*
* @since 1.8
*/
class TCPStringReader extends Thread {
private static final int MESSAGE_BUFFER_SIZE = 65535;
private static final Log LOG = LogFactory.getLog(TCPStringReader.class);
private final int port;
private final ILookup<String> stringRegistry;
public TCPStringReader(final int port, final ILookup<String> stringRegistry) {
this.port = port;
this.stringRegistry = stringRegistry;
}
@Override
public void run() {
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port));
if (LOG.isDebugEnabled()) {
LOG.debug("Listening on port " + this.port);
}
// BEGIN also loop this one?
final SocketChannel socketChannel = serversocket.accept();
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
while (socketChannel.read(buffer) != -1) {
buffer.flip();
try {
while (buffer.hasRemaining()) {
buffer.mark();
RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry);
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
buffer.reset();
buffer.compact();
}
}
socketChannel.close();
// END also loop this one?
} catch (final IOException ex) {
LOG.error("Error while reading", ex);
} finally {
if (null != serversocket) {
try {
serversocket.close();
} catch (final IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to close TCP connection!", e);
}
}
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment