Skip to content
Snippets Groups Projects
Select Git revision
  • main default protected
  • v0.10
  • rework-examples
  • otel-demo-dynatrace-example
  • support-empty-query-response
  • java-operator-sdk
  • rework-state-handling
  • quarkus-36
  • bump-kotlinlogging-to-5.0.2
  • use-internal-registry protected
  • v0.9 protected
  • kafka-nodeport-config-windows
  • v0.8 protected
  • test-k3d protected
  • simpleuc4 protected
  • reduce-code-duplication
  • test-coverage
  • code-cleanup
  • cleanup-commit-interval protected
  • delete-action-for-other-namespace
  • v0.10.0 protected
  • v0.9.0 protected
  • v0.8.6 protected
  • v0.8.5 protected
  • v0.8.4 protected
  • v0.8.3 protected
  • v0.8.2 protected
  • v0.8.1 protected
  • v0.8.0 protected
  • v0.7.0 protected
  • v0.5.2 protected
  • v0.6.4 protected
  • v0.6.3 protected
  • v0.6.2 protected
  • v0.6.1 protected
  • v0.6.0 protected
  • v0.5.1 protected
  • v0.5.0 protected
  • v0.4.0 protected
  • v0.3.0 protected
40 results

EventCoder.java

Blame
  • lorenz's avatar
    Lorenz Boguhn authored and Lorenz Boguhn committed
    e83372f4
    History
    EventCoder.java 2.05 KiB
    package serialization;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.io.Serializable;
    import java.nio.ByteBuffer;
    import java.util.Collections;
    import java.util.List;
    import org.apache.beam.sdk.coders.Coder;
    import org.apache.beam.sdk.coders.CoderException;
    import org.apache.kafka.common.serialization.Serde;
    import titan.ccp.configuration.events.Event;
    import titan.ccp.configuration.events.EventSerde;
    
    /**
     * Wrapper Class that encapsulates a Event Serde in a org.apache.beam.sdk.coders.Coder.
     */
    public class EventCoder extends Coder<Event> implements Serializable {
      private static final long serialVersionUID = 8403045343970659100L;
      private static final int VALUE_SIZE = 4;
      private static final boolean DETERMINISTIC = true;
    
    
    
      private transient Serde<Event> innerSerde = EventSerde.serde();
    
      @Override
      public void encode(final Event value, final OutputStream outStream)
          throws CoderException, IOException {
        if (this.innerSerde == null) {
          this.innerSerde = EventSerde.serde();
        }
        final byte[] bytes = this.innerSerde.serializer().serialize("ser", value);
        final byte[] sizeinBytes = ByteBuffer.allocate(VALUE_SIZE).putInt(bytes.length).array();
        outStream.write(sizeinBytes);
        outStream.write(bytes);
      }
    
      @Override
      public Event decode(final InputStream inStream) throws CoderException, IOException {
        if (this.innerSerde == null) {
          this.innerSerde = EventSerde.serde();
        }
        final byte[] sizeinBytes = new byte[VALUE_SIZE];
        inStream.read(sizeinBytes);
        final int size = ByteBuffer.wrap(sizeinBytes).getInt();
        final byte[] bytes = new byte[size];
        inStream.read(bytes);
        return this.innerSerde.deserializer().deserialize("deser", bytes);
      }
    
      @Override
      public List<? extends Coder<?>> getCoderArguments() {
        return Collections.emptyList();
      }
    
      @Override
      public void verifyDeterministic() throws NonDeterministicException {
        if (!DETERMINISTIC) {
          throw new NonDeterministicException(this, "This class should be deterministic!");
        }
      }
    }