The following code creates a parser for Key Value Pairs. It is useful as an example of how to write a parser plugin The topic Extensible Common Data Framework: Parsing Key Value Pairs describes the functionality of this plugin.
/*
// Copyright (C) 2016-<var styleclass="SQL code"><%YEAR%>
SQLstream, Inc.
*/
package com.sqlstream.aspen.namespace.keyvalue;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;
import com.sqlstream.aspen.namespace.common.BuffersInputStream;
import com.sqlstream.aspen.namespace.common.CommonRowParser;
import com.sqlstream.aspen.namespace.common.EcdaPluginFactory;
import com.sqlstream.aspen.namespace.common.OptionsUtils;
import com.sqlstream.aspen.namespace.common.TypeParser;
/*
* KeyValuePArser is a parser for KeyValue data. It parses a record at a time
* from a stream of bytes, reading up to the next line delimiter. The line is
* split into fields by the column delimiter, unless protected by quotes. (The
* quote and both delimiters are attruibutes. By default " and ,). Then the
* field values are parsed from byte-strings to scalar values and inserted into
* the next row of the target table or stream.
*
* For the sake of speed the code avoids making java objects. In particular, the
* current line is not a java String or a java byte array, but simply a range of
* contiguous bytes in a larger byte array. The same holds for each field to be
* parsed.
*
*/
public class KeyValueParser extends CommonRowParser {
  private final static String KEY_VALUE_SEPARATOR_CHARACTER = "KEY_VALUE_SEPARATOR_CHARACTER";
  protected final static String [] REQUIRED_PROPERTIES = {
  };
  protected final static String [] OPTIONAL_PROPERTIES = {
    ROW_SEPARATOR_CHAR_KEY, SEPARATOR_CHAR_KEY, PARSER_SKIP_HEADER_KEY,
    COLUMN_QUOTE_CHARACTER, QUOTED_COLUMNS, KEY_VALUE_SEPARATOR_CHARACTER
  };
  private BytePointer[] columns;
  private ArrayList<BytePointer> unmatchedKeys;
  private byte[] rowDelimiter;
  private byte[] colDelimiter;
  private byte[] quotes;
  private byte[] keyValueSeparator;
  private int quotesLength = 0;
  private int keyValueSeparatorLength;
  private boolean rowDelimiterSpecified;
  protected TypeParser[] rowParsers;
  protected BuffersInputStream.Line currentLine; // without line delimiter
  private boolean rowGenerated;
  private int rowCount = 0;
  private static BytePointer key;
  class Impl implements Runnable {
    public void run() {
      long rowStartPosition = 0;
      try {
        tracer.log(Level.FINE, "Started parsing the stream");
        while (inputStream.getLine(currentLine, rowDelimiter)) {
          parseRow(rowStartPosition);
          rowStartPosition = inputStream.getPosition();
        }
        tracer.log(Level.FINE, "Finished parsing the stream");
      } catch (Throwable e) {
        tracer.log(Level.SEVERE, "Exiting parser", e);
      } finally {
        context.close();
      }
    }
  }
  public KeyValueParser() {
    super();
  }
  /*
  * Initializes this formatter and throws an exception if something is
  * underspecified
  */
  public void init(Properties props) throws Exception {
    super.init(props);
    OptionsUtils.filterInitProperties(
      props, REQUIRED_PROPERTIES, OPTIONAL_PROPERTIES, initProps);
    key = new BytePointer();
    // Record separator. By default is a \n character
    String rowSeparator = props.getProperty(ROW_SEPARATOR_CHAR_KEY);
    rowDelimiterSpecified = (rowSeparator != null);
    if (rowSeparator == null) {
      rowSeparator = "\n";
    }
    rowDelimiter = rowSeparator.getBytes(charset);
    // Field (key=value) separator. By default is a comma (,)
    String colSeparator = props.getProperty(SEPARATOR_CHAR_KEY);
    if (colSeparator == null) {
      colSeparator = ",";
    }
    colDelimiter = colSeparator.getBytes(charset);
    initDefaultTypeParsers(props, initProps);
    rowParsers = initRowParsers();
    // The quote character. By defautl it is a double quote character"
    String quoteCharacter = props.getProperty(COLUMN_QUOTE_CHARACTER);
    String quotedColumns = props.getProperty(QUOTED_COLUMNS);
    if (null == quoteCharacter && quotedColumns != null) {
      quoteCharacter = "\"";
    }
    if (quoteCharacter != null) {
      quotes = quoteCharacter.getBytes(charset);
    } else {
      quotes = "\"".getBytes(charset);
    }
    quotesLength = quotes.length;
    // The character that separates a key from a value. By default is a =
    // It can be configured using the KEY_VALUE_SEPARATOR_CHARACTER property
    String keyValueSeparatorCharacter = props.getProperty(KEY_VALUE_SEPARATOR_CHARACTER);
    if (null == keyValueSeparatorCharacter) {
      keyValueSeparatorCharacter = "=";
    }
    keyValueSeparator = keyValueSeparatorCharacter.getBytes(charset);
    keyValueSeparatorLength = keyValueSeparator.length;
  }
  /*
  * Parses a single record
  *
  * @param rowStartPosition
  * @throws Exception
  */
  public void parseRow(long rowStartPosition) throws Exception {
    rowGenerated = false;
    final int rowLength = currentLine.length();
    key.setBytes(currentLine.buffer);
    int a = currentLine.buffer.length;
    assert (rowLength >= 0);
    try {
      traceLine(Level.FINEST, currentLine);
      // String s = new String (currentLine.buffer, charset);
      // Brief explanation of the index variables
      // assuming a a row with a "key=value" field, where the index of k=0
      // and the index of the last 'e' is 8 then
      // startKey = 0
      // endKEy = 2
      // startValue = 4
      // endValue = 8
      // colStart = 0
      // colEnd = 8
      int rowEnd = rowLength;
      int colStart = 0, colEnd = 0; // offsets in currentLine
      boolean sawComma = false;
      boolean finish = false;
      // parse the fields until it reaches the end of the line
      while (!finish) {
       Â
        // find next field in the line (exclude delimiters)
        colStart = colEnd;
        if (colStart < rowEnd) {
          if (sawComma) {
            colStart += colDelimiter.length;
          }
          colEnd = currentLine.findBytes(colStart, colDelimiter);
          if (colEnd < 0) {
            // it's the last key=value in the row
            colEnd = rowEnd;
            sawComma = false;
            finish = true;
          } else {
            sawComma = true;
          }
        }
        int startKey = colStart;
        int endKey = currentLine.findBytes(startKey, colEnd, keyValueSeparator);
        int startValue = endKey + keyValueSeparatorLength;
        int endValue = colEnd;
        // key = new BytePointer(startKey, endKey, currentLine.buffer);
        key.setLimits(startKey, endKey, currentLine.start);
        //
        // parse the text from colStart to colEnd
        try {
                  //traceField(Level.FINEST, null, startKey, endKey, currentLine, colStart, colEnd);
          TypeParser parser = null;
          int iCol = 0;
          for (iCol = 0; iCol < numColumns; iCol++) {
            if (columns[iCol].equals(key)) {
              parser = rowParsers[iCol];
              iCol++;
              break;
            }
          }
          // this condition check if the current key is present in the
          // table,
          // if not is is traced (only once for each new key) and the
          // loop
          // skips this iteration
          if (parser == null) {
            // unmatched key
            boolean contains = false;
            for(BytePointer unmatchedKey : this.unmatchedKeys){
              if(unmatchedKey.equals(key)){
                contains = true;
              }
            }
           Â
            if (!contains) {
              BytePointer unmatchedKey = (BytePointer)key.clone();
              this.unmatchedKeys.add(unmatchedKey);
              tracer.log(Level.WARNING, "Unmatched key: " + unmatchedKey.toString());
            }
            continue;
          }
          // Checks if this value starts with a quote character
          if (atQuote(startValue)) {
            // strip off quotes; but what about string "" vs
            // string null??
            int start = startValue + quotesLength;
            int end = currentLine.findBytes(start, endValue, quotes);
            if (end < 0) {
              end = colEnd; // expect a parse err with a msg
            }
            traceField(Level.FINEST, "trimmed ", startKey, endKey, currentLine, start, end);
            parser.parse(currentLine.buffer, currentLine.start + start, currentLine.start + end, inputStmt,
                iCol);
          } else {
            parser.parse(currentLine.buffer, currentLine.start + startValue, currentLine.start + endValue,
                inputStmt, iCol);
          }
        } catch (Exception e) {
          setParserPosition(rowStartPosition - messageStartPosition + colEnd);
          String msg = e.getMessage() + " while parsing field " + key.toString() + " at line " + getParserLineNumber()
              + " of " + inputStream.locationDescription();
          setParserError(msg);
          tracer.log(Level.WARNING, msg, e);
        }
      }
      rowGenerated = true;
    } finally {
      if (rowGenerated) {
        submitRow(tracer);
      }
      incParserLineNumber();
    }
  }
  /*
  * Logs the given line to the tracer
  *
  * @param level
  * @param line
  */
  private void traceLine(Level level, BuffersInputStream.Line line) {
    if (tracer.isLoggable(level)) {
      StringBuilder sb = new StringBuilder();
      sb.append("row ").append(rowCount).append(" location ").append(inputStream.locationDescription())
          .append(" line ").append(getParserLineNumber()).append(": ").append(line.asString());
      tracer.log(level, sb.toString());
    }
  }
  /*
  * Logs a single field (key=value) to the tracer
  *
  * @param level
  * @param prefix
  * @param startKey
  * @param endKey
  * @param line
  * @param startValue
  * @param endValue
  * @throws SQLException
  */
  private void traceField(
    Level level, String prefix, int startKey, int endKey, BuffersInputStream.Line line,
    int startValue, int endValue)
    throws SQLException
  {
    if (tracer.isLoggable(level)) {
      StringBuilder sb = new StringBuilder();
      if (prefix != null)
        sb.append(prefix);
      sb.append(" ").append(line.asString(startKey, (endKey - startKey))).append(" parse: ")
          .append(line.asString(startValue, (endValue - startValue)));
      tracer.log(level, sb.toString());
    }
  }
  /*
  * Initializes the required row parsers
  *
  * @return
  * @throws Exception
  */
  private TypeParser[] initRowParsers() throws Exception {
    Set<String> alreadyMapped = context.getMappedSpecialColumns();
    this.columns = new BytePointer[numColumns];
    this.unmatchedKeys = new ArrayList<>();
    ArrayList<TypeParser> parsers = new ArrayList<>();
    for (int i = 1; i <= numColumns; i++) {
      byte[] fieldName = metaData.getFieldName(i).getBytes();
      this.columns[i - 1] = new BytePointer(0, fieldName.length, fieldName);
      if (alreadyMapped.contains(fieldName)) {
        parsers.add(null);
        continue;
      }
      TypeParser customParser = customParsers.get(fieldName);
      if (customParser != null) {
        if (logLevel.isTraceFine()) {
          tracer.fine("custom parser " + customParser);
        }
        parsers.add(customParser);
      } else {
        parsers.add(getTypeParser(i));
      }
    }
    if (logLevel.isTraceFinest()) {
      tracer.finest("row parsers size " + parsers.size());
      tracer.finest("using type parsers " + parsers);
    }
    return parsers.toArray(new TypeParser[0]);
  }
  /*
  * Checks if an input field start with a quote
  *
  * @param valueStart
  *       the index of the beginning of the value
  * @return true if this value starts with a quote character
  * @throws IOException
  */
  private boolean atQuote(int valueStart) throws IOException {
    if (quotes == null)
      return false;
    for (int i = 0; i < quotesLength; ++i) {
      byte b = currentLine.buffer[currentLine.start + valueStart + i];
      if (b != quotes[i]) {
        return false;
      }
    }
    return true;
  }
  @Override
  public void closeAllocation() {
    tracer.fine("final rowCount=" + rowCount);
    super.closeAllocation();
  }
  @Override
  public void start() {
    Impl impl = new Impl();
    inputStream = context.getInputStream();
    if (quotes != null) {
      inputStream.setQuoteCharacters(quotes);
    }
    currentLine = inputStream.newLine();
    Thread thread = new Thread(impl);
    thread.start();
  }
  public static class PluginFactory extends EcdaPluginFactory {
    /*
    * Installs the parser
    */
    public void installPlugins() {
      installParser("KV", KeyValueParser.class);
      tracer.log(Level.FINE, "Installed KeyValue parser");
    }
  }
  /*
  *
  * A BytePointer is a type that contains a pointer to a byte[] and a start
  * and end indexes. The BytePointer is used to compares values of the stream
  * (requires less computation power than Strings)
  *
  */
  class BytePointer {
    private int start;
    private int end;
    private byte[] bytes;
    public BytePointer() {
      super();
      this.start = 0;
      this.end = 0;
      this.bytes = null;
    }
    public BytePointer(int start, int end, byte[] bytes) {
      super();
      this.start = start;
      this.end = end;
      this.bytes = bytes;
    }
    public void setLimits(int start, int end, long rowStartPosition) {
     Â
      this.start = start + (int)rowStartPosition;
      this.end = end + (int)rowStartPosition;
     Â
    }
    /*
    * hashCode implementation similar to the String hashCode
    */
    @Override
    public int hashCode() {
      int result = 0;
      int n = end - start;
      int i = 1;
      for (int index = start; index < end; index++) {
        result += (bytes[index] * Math.pow(31, n - i));
        i++;
      }
      return result;
    }
    /*
    * This equals compares two bytePointers. In order to be equal, they
    * muys have the same length (end-start) and the same bytes in this
    * interval
    */
    @Override
    public boolean equals(Object obj) {
      try{
      BytePointer other = (BytePointer) obj;
     Â
      if ((this.end - this.start) != (other.getEnd() - other.getStart())) {
        return false;
      }
      byte[] otherByteArray = other.getBytes();
      int otherStart = other.getStart();
      int len = end - start;
      for (int i = 0; i < len; i++) {
        if (this.bytes[i + start] != otherByteArray[i + otherStart]) {
          return false;
        }
      }
      return true;
      }catch(Exception e){
        e.printStackTrace();
      }
      return false;
    }
    @Override
    protected Object clone() throws CloneNotSupportedException {
     Â
      return new BytePointer(0, end-start, Arrays.copyOfRange(bytes, start, end));
     Â
    }
   Â
   Â
    // getters and setters
    @Override
    public String toString() {
      byte[] byteStr = Arrays.copyOfRange(bytes, start, end);
      return new String(byteStr, charset);
    }
    public int getStart() {
      return start;
    }
    public void setStart(int start) {
      this.start = start;
    }
    public int getEnd() {
      return end;
    }
    public void setEnd(int end) {
      this.end = end;
    }
    public byte[] getBytes() {
      return bytes;
    }
    public void setBytes(byte[] bytes) {
      this.bytes = bytes;
    }
  }
}
// End KeyValueParser.java