So I thought I would share some of my abstract adapter code for, eh, interacting with Hector client, that in turn interacts with Cassandra.
The idea is it adapts any model/entity POJO class into a key and columns that fit a column family that corresponds to the model. This happens through the marshal and unmarshal methods that are specific to each implementor of this trait. Curious names--marshal and unmarshal, but I just don't want to call them map and unmap or pack and unpack because in Scala they have different concepts. Marshal transforms the model into, really, a Scala tuple of key and list of tuples of column name and value. The persist methods in this adapter know how to work with them. Unmarshal transforms query results (in the form of ColumnFamilyResult) into the model. And this one is used by query methods.
As you can see the adapter also functions as a high-level DAO in that the user of the DAO shouldn't know about how to work with Hector/Cassandra.
Fine, I know it gets roundabout, the interaction between this trait and its subclass, but it's just how it is for now. I don't want to spend too much time refining for now.
Furthermore there is this automatic Id field finder method (getKeyFieldOpt). It does some reflection work against the model to find which field is the Id (row key). In my case Id is a case class (like var id = Id(9999)). And I think I use some implicit defs so retrieving id transforms it automatically to its value whenever relevant. But you get the gist.
This is a work in progress. At the moment all generic queries work to fetch the entire row of a column family and unmarshal its data to a model, but I'll be working on generic slices and ranges soon.
So here's the base trait:
import java.lang.reflect.Field import scala.collection.JavaConversions._ import id.pronimo.watchlet.exception.NoExistingRowException import id.pronimo.watchlet.exception.WrongModelException import javax.validation.ConstraintViolation import javax.validation.ConstraintViolationException import javax.validation.Validation import javax.validation.ValidationException import me.prettyprint.cassandra.serializers.AsciiSerializer import me.prettyprint.cassandra.serializers.StringSerializer import me.prettyprint.cassandra.serializers.UUIDSerializer import me.prettyprint.cassandra.serializers.CompositeSerializer import me.prettyprint.cassandra.serializers.DateSerializer import me.prettyprint.cassandra.service.template.ColumnFamilyResult import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate import me.prettyprint.cassandra.service.template.ColumnFamilyUpdater import me.prettyprint.hector.api.exceptions.HectorException import me.prettyprint.hector.api.factory.HFactory import me.prettyprint.hector.api.mutation.Mutator import me.prettyprint.hector.api.Keyspace import id.pronimo.watchlet.model.metadata.Id import me.prettyprint.cassandra.serializers.SerializerTypeInferer import me.prettyprint.hector.api.Serializer trait CommonStandardCFAdapter[R <: Serializable, K, N] { @transient val (us, ss, as, ds, cs) = (UUIDSerializer.get, StringSerializer.get, AsciiSerializer.get, DateSerializer.get, CompositeSerializer.get) @transient val EMPTY_BYTE_ARRAY = Array.empty[Byte] @transient val validator = Validation.buildDefaultValidatorFactory.getValidator def fs[F](x:F) = { SerializerTypeInferer.getSerializer[F](x) } val hcolumn = (n:N, v:Any) => { HFactory.createColumn(n, v, getKeyspace.createClock, getNameSerializer, fs(v)) } /* CONCRETE METHODS [BEGIN] */ /** * Add a new record, or override existing data. * * @tparam R the record type defined per implementation */ @throws(classOf[HectorException]) @throws(classOf[ValidationException]) def write(record: R, deleteFirst: Boolean) { validate(record) if (deleteFirst) remove(record) val thriftTemplate = this.getThriftTemplate val (k, c) = marshal(record) val updater = thriftTemplate.createUpdater(k) c.iterator.foreach { case (n, v) => updater.setColumn(hcolumn(n, v)) } thriftTemplate.update(updater) } /** * Update a record by specifying intended column to add/update. * * Will throw an exception when record with specified key does not exist. * * @note Use with caution, this method allows adding a column not defined in the record type. * @param key of the record * @param colName the column name * @param value the column value * @tparam K the key type defined per implementation * @tparam V the value type */ @throws(classOf[HectorException]) @throws(classOf[ValidationException]) def update[V](key: K, colName: N, value: V) { val thriftTemplate = this.getThriftTemplate if (false == thriftTemplate.isColumnsExist(key)) { throw new NoExistingRowException(key, "update") } val updater = thriftTemplate.createUpdater(key) val column = HFactory.createColumn(colName, value, getKeyspace.createClock) updater.setColumn(column) thriftTemplate.update(updater) } /** * Update a record by specifying multiple intended columns to add/update. * * Will throw an exception when record with specified key does not exist. * * @note Use with caution, this method allows adding columns not defined in the record type/column family metadata. * @param key of the record * @param map of column name/column value * @tparam K the key type defined per implementation */ @throws(classOf[HectorException]) @throws(classOf[ValidationException]) def update(key: K, map: Map[N, Any]) { val thriftTemplate = this.getThriftTemplate if (false == thriftTemplate.isColumnsExist(key)) { throw new NoExistingRowException(key, "update") } val updater = { thriftTemplate.createUpdater(key) } map.iterator.foreach { case (n, v) => val column = HFactory.createColumn(n, v, getKeyspace.createClock) updater.setColumn(column) } thriftTemplate.update(updater) } /** * Add multiple records, or override when such with the same key exists. * * Will throw an exception when record with specified key does not exist, automatically cancelling whole operation. * * @note Use with caution, this method allows adding columns not defined in the record type/column family metadata. * @param key of the record * @param map of column name/column value * @tparam K the key type defined per implementation */ @throws(classOf[HectorException]) @throws(classOf[ValidationException]) def writeBatch(records: List[R], deleteFirst: Boolean) { val cf = this.getThriftTemplate.getColumnFamily val thriftTemplate = this.getThriftTemplate records.iterator.foreach { it => validate(it) } val mutator = thriftTemplate.createMutator records.iterator.foreach { it => if (deleteFirst) remove(it) marshal(it) match { case (k, c) => c.iterator.foreach { tuple => mutator.addInsertion(k, cf, HFactory.createColumn(tuple._1, tuple._2, getKeyspace.createClock)) } } } thriftTemplate.executeBatch(mutator) } /** * Read record by key, returning Option * * @param key of the record * @tparam K the key type defined per implementation * @tparam R the record type defined per implementation */ @throws(classOf[HectorException]) def read(key: K): Option[R] = { val thriftTemplate = this.getThriftTemplate val result = thriftTemplate.queryColumns(key) unmarshal(result) } /** * Remove entire row by key * * @param key of the record * @tparam K the key type defined per implementation */ @throws(classOf[HectorException]) def remove(key: K) { this.getThriftTemplate().deleteRow(key) } /** * Remove entire row using the record's id * * @tparam R the record type defined per impl * ementation */ @throws(classOf[HectorException]) def remove(record: R) { remove(getKey(record)) } /** * Get key field option (thru reflection) */ protected def getKeyFieldOpt[E <: R: Manifest] = implicitly[Manifest[E]].erasure.getDeclaredFields.find(_.getType.equals(classOf[Id[K]])) /** * Get N serializer (thru reflection) */ protected def getNSerializer[E <: N: Manifest]():Serializer[N] = SerializerTypeInferer.getSerializer[N](implicitly[Manifest[E]].erasure) /** * Get key for record (thru reflection) */ def getKey[E <: R: Manifest](record: R): K = { try { getKeyField.get(record).asInstanceOf[Id[K]]} catch { case e => throw new WrongModelException(implicitly[Manifest[E]].erasure, "Key type specified in template does not match actual key type of record.", e) } } /** * Validate record using JSR-303 * * @param record * @tparam R the record type defined per implementation */ @throws(classOf[ValidationException]) def validate(record: R) { val violations = validator.validate(record) if (false == violations.isEmpty()) { val exceptionMessage = violations.iterator .map(it => it.getPropertyPath.toString + " " + it.getMessage) .mkString(", ") throw new ConstraintViolationException(exceptionMessage, violations.asInstanceOf[java.util.Set[ConstraintViolation[_]]]) } } /* CONCRETE METHODS [END] */ /* ABSTRACT METHODS [BEGIN] */ /** * Transform record into tuples with format (key, List[(columnName, columnValue)]) */ def marshal(record: R): (K, List[(N, Any)]) /** * Transform ColumnFamilyResult into record */ def unmarshal(columns: ColumnFamilyResult[K, N]): Option[R] /** * Get underlying Thrift template used by this spec template */ def getThriftTemplate(): ColumnFamilyTemplate[K, N] /** * Get column name serializer used by this spec template * This became mandatory for creating composite columns using the Hector client. */ def getNameSerializer():Serializer[N] def getKeyField: Field def getKeyspace: Keyspace /* ABSTRACT METHODS [END] */ }
And a sample implementation:
PS: Don't mind the CDI annotations. And the transient annotations are there just so it plays along well with CDI.
@Named( "ItemRepository" ) @ApplicationScoped class ItemCFAdapter extends CommonStandardCFAdapter[Item, UUID, String] with Serializable { @transient val keyField = { getKeyFieldOpt[Item] match { case Some( field ) => field.setAccessible( true ) field case None => throw new WrongModelException( classOf[Item], "Cannot find field with Id metadata." ) } } @transient val nameSerializer = getNSerializer[String] @Inject @transient var keyspace: Keyspace = _ var thriftTemplate: ThriftColumnFamilyTemplate[UUID, String] = _ @PostConstruct def initialise() { thriftTemplate = new ThriftColumnFamilyTemplate( keyspace, CF.CF_NAME, us, as ) } override def marshal( record: Item ): ( UUID, List[( String, Any )] ) = { ( getKey( record ), List( ( "label", record.label ), ( "url", record.url ) ) ) } override def unmarshal( columns: ColumnFamilyResult[UUID, String] ): Option[Item] = { val l = columns.getString( "label" ) val u = columns.getString( "url" ) ( Option( l ), Option( u ) ) match { case ( Some( _ ), Some( _ ) ) => Option( new Item { id = columns.getKey label = l url = u } ) case _ => None } } override def getThriftTemplate() = thriftTemplate override def getKeyField() = keyField override def getNameSerializer() = nameSerializer override def getKeyspace() = keyspace }