Apache Cassandra Thrift API Wrapper

Ich habe mir mir einen Zugriffs-Wrapper für die Thrift-API in Groovy geschrieben. Alle Zugriffe erfolgen über eine diese zentrale Basisklasse, welche die CRUD Standardfunktionalitäten mitbringt.

package de.ronnyfriedland.cassandra.wrapper

import de.ronnyfriedland.cassandra.entity.BaseEntity

/**
 * @author Ronny Friedland
 */
class EntityWrapper {

    def private static EXLUCDE_PROPERTIES = ["metaClass"]
    def private static READONLY_PROPERTIES = ["class"]

    static Map fromEntity(BaseEntity entity) {
        Expando ex = new Expando()
        entity?.properties?.each {
            if(!EXLUCDE_PROPERTIES.grep(it.key)) {
                ex.setProperty(it.key, it.value)
            }
        }
        ex.properties
    }

    static BaseEntity toEntity(Map properties) {
        def clazzProperty = properties?.get("class")
        def clazz
        if(clazzProperty instanceof Class) {
            clazz = clazzProperty.name
        } else if(clazzProperty instanceof String) {
            clazz = clazzProperty.replaceAll("class", "").trim()
        } else {
            throw new RuntimeException("Unexpected type for property 'class' !")
        }

        READONLY_PROPERTIES?.each { properties.remove(it) }

        BaseEntity entity = Class.forName(clazz).newInstance(properties)
    }
}

Der Zugriff auf diese Wrapper-Klasse muss aber nicht direkt erfolgen. Ich habe zusätzlich einen CassandraClient geschrieben, welcher auch den Auf- und Abbau der Connections verwaltet.

package de.ronnyfriedland.cassandra.api

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer

import org.apache.cassandra.thrift.Cassandra
import org.apache.cassandra.thrift.Column
import org.apache.cassandra.thrift.ColumnOrSuperColumn
import org.apache.cassandra.thrift.ColumnParent
import org.apache.cassandra.thrift.ColumnPath
import org.apache.cassandra.thrift.ConsistencyLevel
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.SlicePredicate
import org.apache.cassandra.thrift.SliceRange
import org.apache.cassandra.thrift.TBinaryProtocol
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TFramedTransport
import org.apache.thrift.transport.TSocket
import org.apache.thrift.transport.TTransport

import de.ronnyfriedland.cassandra.entity.BaseEntity
import de.ronnyfriedland.cassandra.wrapper.EntityWrapper

/**
 * Einfacher Cassandra-Client, welcher Zugriffsmethoden für
 *
 * SELECT
 * INSERT
 * DELETE
 *
 * zur Verfügung stellt.
 *
 * @author Ronny Friedland
 */
class CassandraClient {

    protected final String ENCODING = "UTF-8";

    protected final TTransport transport
    protected final Cassandra.Client client

    /**
     * Erzeugt eine neue CassandraClient-Instanz.
     * @param host Cassandra-Host
     * @param port Cassandra-Port
     * @param keySpace zu nutzender Keyspace
     */
    public CassandraClient(String host, Integer port, String keySpace) {
        transport = new TFramedTransport(new TSocket(host, port))
        client = new Cassandra.Client(new TBinaryProtocol(transport))
        transport.open()
        client.set_keyspace(keySpace)
    }

    /**
     * Schliesst die Verbindung zur Cassandra-DB.
     */
    public void close() {
        transport.flush()
        transport.close()
    }

    /**
     * Einfügen eines neuen Datensatzes
     * @param entity zu speicherndes Entity
     */
    public void insert(BaseEntity entity) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException,
    TimedOutException, TException {
        doInsert(EntityWrapper.fromEntity(entity))
    }

    protected void doInsert(def properties) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException,
    TimedOutException, TException {
        if(!transport.open) {
            throw new RuntimeException("Connection already closed !")
        }
        long timeStamp = System.currentTimeMillis()
        ByteBuffer uuid = ByteBuffer.wrap(properties?.uuid?.getBytes(ENCODING))
        ColumnParent parent = new ColumnParent(properties?.columnFamily)

        properties?.each {
            Column col = new Column(ByteBuffer.wrap("${it.key}".getBytes(ENCODING)), ByteBuffer.wrap("${it.value}"
                    .getBytes(ENCODING)), timeStamp)
            client.insert(uuid, parent, col, ConsistencyLevel.ONE)
        }
    }

    /**
     * Selektieren eines Datensatzes anhand des eindeutigen Schlüssels.
     * @param uuid eindeutige ID
     * @param cf zu nutzende KeyFamily
     * @return BaseEntity
     */
    public BaseEntity select(String uuid, String cf) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException,
    TimedOutException, TException {
        def resultProperties = doSelect(uuid, cf)

        BaseEntity entity = EntityWrapper.toEntity(resultProperties)
        return entity
    }

    protected Map doSelect(String uuid, String cf) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException,
    TimedOutException, TException {
        if(!transport.open) {
            throw new RuntimeException("Connection already closed !")
        }
        ColumnParent parent = new ColumnParent(cf)
        SlicePredicate slicePredicate = new SlicePredicate();
        SliceRange sliceRange = new SliceRange();
        sliceRange.setStart(new byte[0]);
        sliceRange.setFinish(new byte[0]);
        slicePredicate.setSlice_range(sliceRange);

        List resultColumns = client.get_slice(ByteBuffer.wrap(uuid?.getBytes(ENCODING)), parent,
                slicePredicate, ConsistencyLevel.ONE);

        def timeStamp = -1;
        def resultProperties = [:]
        resultColumns?.each {
            resultProperties[new String(it?.column?.name, ENCODING)] = new String(it?.column?.value, ENCODING)
            resultProperties["creationTime"] = it?.column?.timestamp
        }
        return resultProperties
    }

    /**
     * Löschen eines Datensatzes.
     * @param entity das zu löschende Entity
     */
    public void remove(BaseEntity entity) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException,
    TimedOutException, TException {
        remove(entity?.uuid, entity?.creationTime, entity?.columnFamily)
    }

    /**
     * Löschen eines Datensatzes.
     * @param uuid eindeutige ID des Datensatzes
     * @param creationTime Erstellungszeitpunkt
     * @param cf zu nutzende KeyFamily
     */
    public void remove(String uuid, Long creationTime, String cf) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException,
    TimedOutException, TException {
        if(!transport.open) {
            throw new RuntimeException("Connection already closed !")
        }
        ColumnPath path = new ColumnPath();
        path.column_family = cf;
        client.remove(ByteBuffer.wrap(uuid.getBytes(ENCODING)), path, creationTime, ConsistencyLevel.ALL);
    }
}
Die Daten können als Java-Klasse - welche von einer Basisklasse BaseEntity erben muss - übergeben werden und werden durch den Wrapper als Key-Value Paare in der Datenbank abgelegt. Beim Auslesen werden die Key-Value Paare wieder auf die Java-Klasse gemappt und zurück gegeben.
Es können auch eigene Java-Klassen als Entitäten genutzt werden, solange sie von der Basisklasse ableiten.
package de.ronnyfriedland.cassandra.entity;

import java.util.UUID;

/**
 * Basis Entity für Cassandra
 *
 * @author Ronny Friedland
 */
public class BaseEntity {

    private String uuid;
    private String columnFamily;
    private long creationTime;

    /**
     * Erzeugt eine neue BaseEntity-Instanz.
     */
    protected BaseEntity() {
        super();
    }

    /**
     * Erzeugt eine neue BaseEntity-Instanz.
     *
     * @param aColumnFamily
     */
    public BaseEntity(final String aColumnFamily) {
        uuid = UUID.randomUUID().toString();
        columnFamily = aColumnFamily;
    }

    /**
     * Erzeugt eine neue BaseEntity-Instanz.
     *
     * @param aUuid
     * @param aColumnFamily
     */
    public BaseEntity(final String aUuid, final String aColumnFamily) {
        uuid = aUuid;
        columnFamily = aColumnFamily;
    }

    public String getUuid() {
        return uuid;
    }

    public void setUuid(final String uuid) {
        this.uuid = uuid;
    }

    public void setColumnFamily(final String columnFamily) {
        this.columnFamily = columnFamily;
    }

    public String getColumnFamily() {
        return columnFamily;
    }

    public void setCreationTime(final long creationTime) {
        this.creationTime = creationTime;
    }

    public long getCreationTime() {
        return creationTime;
    }

    /**
     * (non-Javadoc)
     *
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        StringBuffer sbuf = new StringBuffer(super.toString());
        sbuf.append(String.format("[uuid: %s]", getUuid()));
        sbuf.append(String.format("[columnFamily: %s]", getColumnFamily()));
        sbuf.append(String.format("[creationTime: %d]", getCreationTime()));
        return sbuf.toString();
    }
}

Die Datenbank muss wie folgt konfiguriert werden:

  • Keyspace: PLAYGROUND (create keyspace PLAYGROUND;)

  • ColumnFamily: User (create column family User with comparator = UTF8Type;)

  • update column family User with
     column_metadata =
     [
      {column_name: uuid, validation_class: UTF8Type},
      {column_name: creationTime, validation_class: UTF8Type},
      {column_name: column_Family, validation_class: UTF8Type},
      {column_name: class, validation_class: UTF8Type}
     ];