A Little Riak Book for LFE

Querying

So far we've only dealt with key-value lookups. The truth is, key-value is a pretty powerful mechanism that spans a spectrum of use-cases. However, sometimes we need to lookup data by value, rather than key. Sometimes we need to perform some calculations, or aggregations, or search.

Secondary Indexing (2i)

A secondary index (2i) is a data structure that lowers the cost of finding non-key values. Like many other databases, Riak has the ability to index data. However, since Riak has no real knowledge of the data it stores (they're just binary values), it uses metadata to index defined by a name pattern to be either integers or binary values.

If your installation is configured to use 2i (shown in the next chapter), simply writing a value to Riak with the header will be indexes, provided it's prefixed by X-Riak-Index- and suffixed by _int for an integer, or _bin for a string.

curl -i -XPUT $RIAK/types/shopping/buckets/people/keys/casey \
  -H "Content-Type:application/json" \
  -H "X-Riak-Index-age_int:31" \
  -H "X-Riak-Index-fridge_bin:97207" \
  -d '{"work":"rodeo clown"}'

Querying can be done in two forms: exact match and range. Add a couple more people and we'll see what we get: mark is 32, and andy is 35, they both share 97207.

What people own 97207? It's a quick lookup to receive the keys that have matching index values.

curl "$RIAK/types/shopping/buckets/people/index/fridge_bin/97207"
{"keys":["mark","casey","andy"]}

With those keys it's a simple lookup to get the bodies.

The other query option is an inclusive ranged match. This finds all people under the ages of 32, by searching between 0 and 32.

curl "$RIAK/types/shopping/buckets/people/index/age_int/0/32"
{"keys":["mark","casey"]}

That's about it. It's a basic form of 2i, with a decent array of utility.

MapReduce

MapReduce is a method of aggregating large amounts of data by separating the processing into two phases, map and reduce, that themselves are executed in parts. Map will be executed per object to convert/extract some value, then those mapped values will be reduced into some aggregate result. What do we gain from this structure? It's predicated on the idea that it's cheaper to move the algorithms to where the data lives, than to transfer massive amounts of data to a single server to run a calculation.

This method, popularized by Google, can be seen in a wide array of NoSQL databases. In Riak, you execute a MapReduce job on a single node, which then propagates to the other nodes. The results are mapped and reduced, then further reduced down to the calling node and returned.

MapReduce Returning Name Char Count

Let's assume we have a bucket for log values that stores messages prefixed by either INFO or ERROR. We want to count the number of INFO logs that contain the word "cart".

LOGS=$RIAK/types/default/buckets/logs/keys
curl -XPOST $LOGS -d "INFO: New user added"
curl -XPOST $LOGS -d "INFO: Kale added to shopping cart"
curl -XPOST $LOGS -d "INFO: Milk added to shopping cart"
curl -XPOST $LOGS -d "ERROR: shopping cart cancelled"

MapReduce jobs can be either Erlang or JavaScript code. This time we'll go the easy route and write JavaScript. You execute MapReduce by posting JSON to the /mapred path.

curl -XPOST "$RIAK/mapred" \
  -H "Content-Type: application/json" \
  -d @- \
<<EOF
{
  "inputs":"logs",
  "query":[{
    "map":{
      "language":"javascript",
      "source":"function(riakObject, keydata, arg) {
        var m = riakObject.values[0].data.match(/^INFO.*cart/);
        return [(m ? m.length : 0 )];
      }"
    },
    "reduce":{
      "language":"javascript",
      "source":"function(values, arg){
        return [values.reduce(
          function(total, v){ return total + v; }, 0)
        ];
      }"
    }
  }]
}
EOF

The result should be [2], as expected. Both map and reduce phases should always return an array. The map phase receives a single riak object, while the reduce phase received an array of values, either the result of multiple map function outputs, or of multiple reduce outputs. I probably cheated a bit by using JavaScript's reduce function to sum the values, but, well, welcome to the world of thinking in terms of MapReduce!

MR + 2i

Another option when using MapReduce is to combine it with secondary indexes. You can pipe the results of a 2i query into a MapReducer, simply specify the index you wish to use, and either a key for an index lookup, or start and end values for a ranged query.

    ...
    "inputs":{
       "bucket":"people",
       "index": "age_int",
       "start": 18,
       "end":   32
    },
    ...

MapReduce in Riak is a powerful way of pulling data out of an otherwise straight key/value store. But we have one more method of finding data in Riak.

Aside: Whatever Happened to Riak Search 1.x?

If you've used Riak before, or have some older documentation, you may wonder what the difference is between Riak Search 1.0 and 2.0. In an attempt to make Riak Search user friendly, it was originally developed with a "Solr like" interface. Sadly, due to the complexity of building distributed search engines, it was woefully incomplete. Basho decided that, rather than attempting to maintain parity with Solr, a popular and featureful search engine in its own right, it made more sense to integrate the two.

Search 2.0

Search 2.0 is a complete, from scratch, reimagining of distributed search in Riak. It's an extension to Riak that lets you perform searches to find values in a Riak cluster. Unlike the original Riak Search, Search 2.0 leverages distributed Solr to perform the inverted indexing and management of retrieving matching values.

Before using Search 2.0, you'll have to have it installed and a bucket set up with an index (these details can be found in the next chapter).

The simplest example is a full-text search. Here we add ryan to the people table (with a default index).

curl -XPUT "$RIAK/type/default/buckets/people/keys/ryan" \
  -H "Content-Type:text/plain" \
  -d "Ryan Zezeski"

To execute a search, request /solr/<index>/select along with any distributed Solr parameters. Here we query for documents that contain a word starting with zez, request the results to be in json format (wt=json), only return the Riak key (fl=_yz_rk).

curl "$RIAK/solr/people/select?wt=json&omitHeader=true&fl=_yz_rk&q=zez*"
{
  "response": {
    "numFound": 1,
    "start": 0,
    "maxScore": 1.0,
    "docs": [
      {
        "_yz_rk": "ryan"
      }
    ]
  }
}

With the matching _yz_rk keys, you can retrieve the bodies with a simple Riak lookup.

Search 2.0 supports Solr 4.0, which includes filter queries, ranges, page scores, start values and rows (the last two are useful for pagination). You can also receive snippets of matching highlighted text (hl,hl.fl), which is useful for building a search engine (and something we use for search.basho.com). You can perform facet searches, stats, geolocation, bounding shapes, or any other search possible with distributed Solr.

Tagging

Another useful feature of Search 2.0 is the tagging of values. Tagging values give additional context to a Riak value. The current implementation requires all tagged values begin with X-Riak-Meta, and be listed under a special header named X-Riak-Meta-yz-tags.

curl -XPUT "$RIAK/types/default/buckets/people/keys/dave" \
  -H "Content-Type:text/plain" \
  -H "X-Riak-Meta-yz-tags: X-Riak-Meta-nickname_s" \
  -H "X-Riak-Meta-nickname_s:dizzy" \
  -d "Dave Smith"

To search by the nickname_s tag, just prefix the query string followed by a colon.

curl "$RIAK/solr/people/select?wt=json&omitHeader=true&q=nickname_s:dizzy"
{
  "response": {
    "numFound": 1,
    "start": 0,
    "maxScore": 1.4054651,
    "docs": [
      {
        "nickname_s": "dizzy",
        "id": "dave_25",
        "_yz_ed": "20121102T215100 dave m7psMIomLMu/+dtWx51Kluvvrb8=",
        "_yz_fpn": "23",
        "_yz_node": "[email protected]",
        "_yz_pn": "25",
        "_yz_rk": "dave",
        "_version_": 1417562617478643712
      }
    ]
  }
}

Notice that the docs returned also contain "nickname_s":"dizzy" as a value. All tagged values will be returned on matching results.

Datatypes

One of the more powerful combinations in Riak 2.0 are datatypes and Search. If you set both a datatype and a search index in a bucket type's properties, values you set are indexed as you'd expect. Map fields are indexed as their given types, sets are multi-field strings, counters as indexed as integers, and flags are boolean. Nested maps are also indexed, seperated by dots, and queryable in such a manner.

For example, remember Joe, from the datatype section? Let's assume that this people bucket is indexed. And let's also add another pet.

curl -XPUT "$RIAK/types/map/buckets/people/keys/joe" \
  -H "Content-Type:application/json"
  -d '{"update": {"pets_set": {"add":"dog"}}}'

Then let's search for pets_set:dog, filtering only type/bucket/key.

{
  "response": {
    "numFound": 1,
    "start": 0,
    "maxScore": 1.0,
    "docs": [
      {
        "_yz_rt": "map"
        "_yz_rb": "people"
        "_yz_rk": "joe"
      }
    ]
  }
}

Bravo. You've now found the object you wanted. Thanks to Solr's customizable schema, you can even store the field you want to return, if it's really that important to save a second lookup.

This provides the best of both worlds. You can update and query values without fear of conflicts, and can query Riak based on field values. It doesn't require much imagination to see that this combination effectively turns Riak into a scalable, stable, highly available, document datastore. Throw strong consistency into the mix (which we'll do in the next chapter) and you can store and query pretty much anything in Riak, in any way.

If you're wondering to yourself, "What exactly does Mongo provide, again?", well, I didn't ask it. You did. But that is a great question...

Well, moving on.