Saturday, September 8, 2012

Pignalytics - Pigs Eat Anything : Reading and Writing Text Data in Pig


In this second post on Pig, I would like to cover how Pig can read and write data in a variety of formats. However before I begin, let me go over some basics. In Pig, the smallest unit of data is an atomic value which can be of type integer (signed 32-bit integer), long (signed 64-bit integer), float (32-bit floating point), double (64-bit floating point), chararray (UTF-8 string or character array) or bytearray (byte blob).

Pig can also understand and handle complex data types- tuples, bags and maps. A tuple data type is an ordered collection of fields. A field can be atomic data type or one of the complex data types. Each field in a tuple is identified by its position or identifier name that may be associated with it as part of a schema definition. A bag is an unordered collection of tuples - wherein each tuple may consists of one or more atomic or complex fields. And finally, Pig supports a map data type which consists of a set of key-value pairs. The key has to be be of type chararray while the value can be any data type. Note that there can be arbitrary levels of nesting within a complex data type. Pig works over data sets referred to as relations and the unit of processing data within a relation is a tuple.

Here's the schema definition for an employee relation that contains 5 fields:

employee = tuple(employee_id:int, 
                 email:chararray, 
                 name:tuple(first_name:chararray, middle_name:chararray, last_name:chararray), 
                 project_list:bag{project(project_name:chararray)},
                 skills:map[skill_details:chararray])

Let us use the above schema definition to read data in plain-text and XML formats. We will deal with HBase and Hive input formats in the next blog article. Note that the schema definition is a combination of a JSON like representation of a record using Pig data types and Pig notations for the data types. To be successful with Pig, it is essential that you understand the complex (or compound?) Pig data types tuples, bags and maps - essentially their representation. It took me a few weeks at work of stumbling over and troubleshooting unexpected/wierd behavior to get comfortable. 

Unless specified otherwise, all my examples below are using pig executing in the local mode and using Unix filesystem files.

Reading Plain-text File

Below is a sample text file containing data confirming to the above format and a sample Pig script to read dump some of the fields.

1234|emp_1234@company.com|(first_name_1234,middle_initial_1234,last_name_1234)|{(project_1234_1),(project_1234_2),(project_1234_3)}|[programming#SQL,rdbms#Oracle]
4567|emp_4567@company.com|(first_name_4567,middle_initial_4567,last_name_4567)|{(project_4567_1),(project_4567_2),(project_4567_3)}|[programming#Java,OS#Linux]


The short Pig program below reads the above text file and prints (dumps) a few fields in a few different ways. 

a = LOAD 'a.txt' using PigStorage('|') 
    AS (employee_id:int, email:chararray, name:tuple(first_name:chararray, middle_name:chararray, last_name:chararray), project_list:bag{project: tuple(project_name:chararray)}, skills:map[chararray]) ;
DESCRIBE a ;
DUMP a ;

b = FOREACH a GENERATE employee_id, email, name.first_name, project_list, skills#'programming' ;
DESCRIBE b ;
DUMP b ;

c = FOREACH a GENERATE employee_id, email, name.first_name, project_list.$0, skills#'programming' ;
DESCRIBE c ;
DUMP c ;

d = FOREACH a GENERATE employee_id, email, name.first_name, FLATTEN(project_list) AS project, skills#'programming' ;
DESCRIBE d ;
DUMP d ;


PigStorage is one way to read a text file. It takes a single argument which is designated as the field seperator. Note that the field seperator is searched for as-is, and any "escaped" occurrences of the field seperator in the field-values will be considered as a field seperator. PigStorage can read data from a single file, from files in a directory (either all or files matching a wild-card parameter) and the files can be plain-text or compressed (zip, gz or bz2) - didn't I say Pigs can eat anything ?!

Note that if you are using Pig release 9.x or below, there is a defect in Pig version 0.9.2 - PIG-2086 that requires you to have the complete "AS" clause on the same line.

Below are some variants to the above LOAD statement (clause).

This next line reads a single plain-text file that is comma seperated
a = LOAD 'a.txt' USING PigStorage(',')

The next line reads files from a directory.The files may be plain-text, compressed, etc, but PigStorage will examine the file extension and wherever applicable will use an appropriate compression library. Also, there will be one mapper per file.

a = LOAD '/dump/data/directory/' USING PigStorage(|) -- 

This next one reads all txt files (*.txt) from a directory. Again there will be one mapper per text file.
a = LOAD '/dump/data/directory/*.txt' USING PigStorage(|) 


The next two lines read data from compressed files.
a = LOAD '/dump/data/myfile.txt.gz' USING PigStorage(|)
a = STORE b INTO '/dump/data/myfile.txt.bz2' USING PigStorage(|)


The last statement might catch you by surprise. First, note that the output is a directory and not a single file. This is intrinsic to the map-reduce framework. Your pig statement is like a map program, and there could be one or more mappers for your program. To allow *all* of them to direct their output to a single location, the location has to be a directory. And if you want to "trigger" the use of compression, you need to have a ".gz" or ".bz2" suffix to trigger the usage of the corresponding compression library.

Now here's a big gotcha - if your file or some of the records are not in the correct format, the program will silently ignore the file and/or records without any grunts. So be careful and put in some checks - either in the data or in the program. E.g. you can filter out records where one or more of the mandatory fields are empty or malformed and save those to a seperate location.

However a big bonus with PigStorage is that the location/path of the file(s) to be loaded can be an HDFS path, say, on a remote Hadooop cluster (remote = different from the one where the Pig MapReduce program is running) or even Amazon S3. Using such an approach can save "distcp"ing files.


Reading XML Data

Besides plain-text, Pig can also read an XML file. Below is an XML file with the same data as the plain text file.

<employee_list>
  <employee>
    <employee_id>1234</employee_id>
    <email>emp_1234@company.com</email>
    <name>(first_name_1234,middle_initial_1234,last_name_1234)</name>
    <projects>{(project_1234_1),(project_1234_2),(project_1234_3)}</projects>
    <skills>[programming:SQL,rdbms:Oracle]</skills>
  </employee>
  <employee>
    <employee_id>4567</employee_id>
    <email>emp_4567@company.com</email>
    <name>(first_name_4567,middle_initial_4567,last_name_4567)</name>
    <projects>{(project_4567_1),(project_4567_2),(project_4567_3)}</projects>
    <skills>[programming:Java,OS:'Linux Unix MacOS']</skills>
  </employee>
</employee_list>

Incidentally, the XML loader/reader is not part of the core Pig but is contained in user-contributed code that is bundled with Pig. So we need to explicitly specify it as shown below. Here's a small pig program to read the above file and dump each employee record. Note that you need to replace the appropriate path for the piggybank jar below.

REGISTER /usr/lib/pig/contrib/piggybank/java/piggybank.jar ;

a = LOAD 'a.xml' USING XMLLoader('employee') ;
DESCRIBE a ;
DUMP a ;

XMLLoader takes a single optional argument (in single quotes) that specifies the element name that is the container for each record. Note that schema specification ends here. Pig only recognizes a schema specified at the record level - e.g. "employee" in the above case. To translate XML elements and attributes to Pig fields, you will have to do the parsing and extraction yourself using custom (user-defined) functions or regex expressions.

For example, the above program can be re-written as follows to parse the XML data -

REGISTER /usr/lib/pig/contrib/piggybank/java/piggybank.jar ;

a = LOAD 'a.xml' USING XMLLoader('employee') AS xml_record ;
DESCRIBE a ;
DUMP a ;

b = FOREACH a GENERATE REGEX_EXTRACT(xml_record, '(\\<employee_id\\>.+?\\<\\/employee_id\\>)', 1) AS
            (employee_id_xml_element) ;
DESCRIBE b ;
DUMP b ;


And There's More

Besides plain-text and XML, Pig can also read CSV, Json (Pig v0.10 and higher) and Avro. And ofcourse you can also write custom loaders to read data in other formats/sources - e.g. http, ftp, etc. I came across something interesting from HStreaming (www.hstreaming.com) - http://www.hstreaming.com/docs/getting-started/pigscript/. I have not tried the product - but it looks interesting!

Saturday, August 25, 2012

Pignalytics - Big Data Analytics Using Pig : Introduction


Today, Hadoop is widely used as a big data storage and processing framework and provides a scalable, highly available storage and online archival solution and also serves as a platform to parallelize the processing of that big data using map-reduce paradigm. The "batch-oriented" nature of Hadoop is complemented by  HBase which is a high volume, high throughput key-value datastore. There are mainly three ways to harness the highly scalable processing capabilities of Hadoop - (1) using Java-based map-reduce programming (2) using Hadoop streaming and (3) using higher-language languages and tools like Hive, Pig and Sqoop. 

Although the whole Hadoop framework and ecosystem is written in Java, I find using Java as a big deterrent for me - probably, because I come from the traditional data warehousing, ETL tools and Unix scripting background. Hive provides some solace, but limits me to encapsulating all my processing into a single HiveQL statement and using a store-and-forward approach if I needed to use multiple statements. However, Pig has enamored me with its simple, yet powerful and extensible data-processing-flow approach. For the uninitiated (like how I was about 3 months ago), Pig is a scripting language/tool that facilitates (1) scripting based, (2) pipe-line or flow oriented, (3) batch-processing of data. The scripting language naturally fits into how one would think about a data processing or ETL flow - e.g. a data flow for processing web log files as shown below.


I should note that although conceived and associated with the Hadoop framework, Hadoop is not a requirement. Pig can seamlessly work with equal effectiveness with data in Cassandra and local filesystems. As shown in the diagram above, Pig facilitates creating a data processing pipeline by iterating through one or more datasets, referred to as relations, specifying processing logic on each row of the dataset and passing it to the next step in the pipeline. At each juncture of the pipeline, you have one result relation and one or more input relations. Relation are just logical concepts and are not necessarily a (transiently) persistent entity. One uses Pig Latin - a simple, yet very powerful instruction-set to specify the processing. Pig compiles that Pig Latin program into a series of map-reduce jobs, allowing one to focus on the processing. So for example, if one were to write a word-count program (the Hadoop equivalent of Java's "hello world") here's one way to write it -

lines = load 'my_text_file.txt' using TextLoader() ;
words = FOREACH lines GENERATE 
        TOKENIZE($0) AS bag_of_words ;
individual_words = FOREACH words GENERATE 
                   FLATTEN(bag_of_words) as word ;
DESCRIBE individual_words ;
word_groups = GROUP individual_words BY word ;
word_count = FOREACH word_groups GENERATE group, COUNT(individual_words.word) as word_occurrences ;
DESCRIBE word_count ;
ordered_word_count = ORDER word_count BY word_occurrences ASC ;
DUMP ordered_word_count ;

The same thing done using a Java mapreduce program would be atleast 5 times larger, with the number of "import" Java statements itself exceeding the size of the above program.

Its very easy to see that the above Pig program reads in lines from a file and "tokenizes" each line into a "bag (collection) of words". Next, the words are grouped together and a count is made of the number of times each word occurs in the bag. The words are then "dumped" in ascending order of their count.

The beauty about Pig is that this program can 
- run on an input file in Hadoop DFS or on a local filesystem
- scale from a few lines of input data, to thousands, millions and billions of lines of input data
- can scale from a single node to a thousand-node hadoop cluster 

At the same time, I would like to point out that the above pig program generates a pipeline of multiple map-reduce jobs and there is some room for optimization in that regards. However often the trade-off in the savings of development time and effort for slightly increased computational requirements is worth it. Isn't that why we use higher languages and scripting languages in the first place? If not, everyone would still be programming in assembly language!

There is sufficient introductory information on Pig out there that explains Pig very well. Pig Latin (the Pig programming language) is quite small, succinct and effective, and is quite easy to understand. What I intend to share in the next few blog entries are things that I have learnt in my Pig journey - some cool things, best-practices, design patterns, quirks/gotchas etc.

Tuesday, July 24, 2012

Comparing HBase and Cassandra


For quite some time I had been intrigued by Cassandra and so was reading it up for the the past few weeks. Most of my recent work has been focused on analytics for data in HBase. Due to historic and other reasons, we scan through our entire HBase data set to generate a variety of rich metrics. HBase and Hadoop mapreduce seem to be very good applications for such purposes. Ofcourse many of you might suggest looking into using realtime analytics - generating metrics as the data is being ingested into HBase. We are moving towards that direction - but lets leave that discussion for some other day. This interest in Cassandra has given me a chance to look into big data from a broader perspective.

So as I was saying, I have been reading up on Cassandra and wanted to share some thoughts on comparison of HBase and Cassandra covering similarities, differences, strengths and weaknesses. 

Big Data

First of all, both HBase and Cassandra are meant to solve today's big data problems. They both have proven production track-record of large clusters (tens to hundreds and upto a few thousand nodes) managing tens to hundreds of TB of data per cluster. Furthermore, they both provide scalability using commodity server hardware. Note - "commodity server hardware" today (July 2012) has a wide range - from 4 to 24+ CPU cores, 24 to 512+ GB RAM and 2 to 10 TB local hard disk storage.


Architecture:Cluster and Storage

At a high level, HBase is part of the Hadoop ecosystem - built on top of Hadoop's HDFS (or DFS) and requiring Zookeeper. HBase stores data in rows in a table, with the rows being stored in the tables by rowkey ranges. Each rowkey range or partition (referred to as a region in HBase) is assigned to a specific node (i.e. regionserver) in the cluster. The range is automatically determined and adjusted as more data is added to a table. As regions grow and reach a threshold, they split with one of the splits moving to another regionserver in the cluster. Regions can be also defined manually and can also be split manually if needed. There are two kinds of nodes in an HBase cluster - master and regionservers. Master is responsible for metadata management while the regionservers are responsible for region management. Actually there's also a third kind of node - secondary master node which syncs up with master's metadata at regular checkpoints. Rows are logically sorted by rowkey and columns within a row are sorted based on column names. HBase can be configured to store multiple versions of rows and columns. Versions are usually timestamp-based but can be based on some other large integer if desired. Columns within a single row may be grouped into one or more column families. Region data (or table partition data) is persisted on a regionserver in storefiles that are grouped together by columnfamilies. Each group of storefiles for a columnfamily on a regionserver is referred to as store. Here's a big caveat that many people often forget - storefiles, stores and regions that are servered by a regionserver are just logical entities. The actual files are stored in Hadoop DFS or HDFS. It is common practice for nodes that are HBase regionservers to be also Hadoop data nodes. However that is not a requirement, and technically it is possibe that the storefiles for a regionserver may physically reside on a different node than a regionserver. An implication of this is that caching happens at the Hadoop layer as well as the HBase layer.

Cassandra on the other hand stores data in columns contained in column families and column families are comparable to HBase or RDBMS tables. Column families are grouped together into "keyspaces" which can be compared to a schema in a traditional RDBMS. Data is stored in a keyspace by "rowkeys", similar to HBase. However the "domain" of a rowkey spans the complete keyspace - and a rowkey's data can span across one or more column families. So you can store data for the same row key in different column families, allowing you to logically seperate the data yet making it easy to "link/connect/associate/relate" the separate data. E.g. you can have a column famiy to store a person's information and another column family to store the friends of that person. Rowkeys are assigned to a specific nodes based on the MD5 hash of the rowkey. This is the default and most used and recommended mechanism of distributing data across nodes in a cluster, although, there are 2 other "out of the box" as well as a custom mechanism available to distribute the data across the nodes in a cluster. The assignment of a row to a node is fixed (however it can be manually changed if desired). All nodes in a Cassandra cluster are similar (i.e. no master) - this is critical to its  "no-SPOF" architecture (SPOF = single point of failure). Within a node, rows are stored sorted on the rowkey's MD5 hash and columns are sorted by column names (in actuality, the columns are sorted based on their comparator - the default compares the names, but it can also be the name in reverse order or some other custom comparator). Unlike HBase, Cassandra can only store a single version of a row. Row data is persisted by columnfamilies in SSTables (Sorted String Table). I will leave the discussion of super columnfamilies for a later date to keep the focus on comaprison.

A key take away from this storage architecture is that both HBase and Cassandra store data in "rows" whose columns are sparsely populated - i.e. only columns that contain data are stored. Furthermore, unlike RDMSes, all rows do not need to have to have the same "schema". This is one reason why bigdata is often also referred to as "unstructured" or "schema-free".


Architecture:Write,Read Replication and Consistency

Both HBase and Cassandra are highly optimized for writes. In both systems, there is no explicit update operation - an insert operation overwrites a prior insert or incarnation of a row in such a way that the net result is that the new columns overlay the existing columns (if any). However on retrieval, the columns in a row are still sorted by column names (or the comparator). In HBase, a client retrieves and caches the region information for a table. When performing a read, insert (or update) or delete operation the client communicates directly with the regionserver that hosts the region in which the rowkey is contained. In Cassandra, a client connects to any arbitrary node (referred to as co-ordinator node) as specified in a connection string. The rowkey is used to determine the nodes that will contain the row (note that a row is "replicated" at the Cassandra-level). While doing a read or insert, the co-ordinator node acts as an conductor/orchestrator for the operation and sends/receives the data to/from the node(s) that should contain the row. 

In both HBase and Cassandra, writes result in an append to a writelog that is persisted on disk (called HLog in HBase and commitlog in Cassandra) and then to an in-memory, sorted write-cache (called memstore in HBase and memtable in Cassandra). At periodic intervals and/or when the cache is full or reaches a threshold, the sorted in-memory cache data is flushed to a file (in HDFS filesystem for HBase and to a normal OS file for Cassandra). The on-disk file is called storefile in HBase and sstable in Cassandra. At periodic intervals and/or based on certain configuration thresholds, the multitude of on-disk files are consolidated and compacted via a process called compaction. HBase tries to keep the storefile size close to user-configurable parameters, while Cassandra uses two approaches to manage the size and number of files - sized tiered compaction and leveled compaction (see http://www.datastax.com/dev/blog/leveled-compaction-in-apache-cassandra). Sized tiered compaction is the "original/older" compaction technique while Leveled compaction was introduced in Cassandra 1.0. 

For availability reasons, both HBase and Cassandra (can) maintain multiple copies of data. In HBase, replication is specified as a configuration parameter for the table and the copy management is handled by DFS. In Cassandra, the copy count or replication factor is specified at the keyspace level. Consistency among the copies is always maintained at all times by Hadoop unless ofcourse a Hadoop node fails in which case you have less number of copies. However once the node is back up again or restored, Hadoop strives to make the data consistent across all copies again. In Cassandra, the co-ordinator node uses the table's replication factor to ensure that writes happen to all the nodes that are to contain the rowkey. The co-ordinator node writes to all the nodes that are designated to hold a copy of the row (based on the hash of the rowkey) and then waits for write confirmation from a subset of the nodes. The subset of nodes used to determine a successful write is based on the write consistency which can be the default for the keyspace or a client operation parameter. Consequently, the co-ordinator node writes the data to the number of copies and awaits successful acknowledgements. The sync-up among all copies of data happens via two different processes . The first is the read process - which is based on the replication factor and the client-configurable read-consistency, a number of copies of data are read and the latest copy of the data is returned to the client. In the process, any copy that is not current is brought uptodate. The second process by which the data is kept current is called the anti-entropy process which compares the data among the different nodes at periodic intervals. The data between the copies is compared using Merkle trees created on the MD5 hash of the rows. This makes the process of comparing the data efficient and quick. There is one more mechanism called "hinted handoff" in Cassandra that is used to maintain data consistency for nodes that may be offline/down. In both HBase and Cassandra, once a file is persisted on disk, it does not change (immutable) - and can only be merged/compacted. Thus the file on disk can only be deleted  but not updated.


Architecture: Authentication and Authorization

HBase uses Kerberos for authentication and a recent feature called co-processors for authorization. Co-processors allow the use of pre-built, out-of-the-box as well as custom server-side logic for each read/write operation. Cassandra on the other hand has built-in authentication that allows using plain-text and encrypted passwords as well as pluggable authentication mechanism. Overall Cassandra's authentication support seems to be more mature. As for authorization, I have very little knowledge with either of the platforms with regards to authentication and would refrain from making any comments.


Architecture: Indexing

Both HBase and Cassandra achieve well-performing scalability by providing for an effective sharding and indexing solution. Sharding was disucussed earlier. As for indexing, both the platforms have the concept of a primary key (or rowkey) that is inherently indexed. Furthermore, both have bloom filters to allow for efficient determination of the existence (well, actually non-existence) of a value. Bloom filters are available on the primary key as well as non-primary fields. And in addition, Cassandra supports secondary indexes. Under the hood these are column families that pivot on the column value of the indexed column(s). Starting with Release 0.92, HBase has introduced support for secondary indexes using the server-side logic/processing injection discussed in the authentication section. 


Architecture: Triggers and Server-side Logic

Triggers and server-side logic allows one to embed logic/code that is triggered during the read and/or write paths. Used judiciously, it has a variety of uses - from simulating database-like triggers, security, data-masking, real-time analytics (e.g. increment counters on every read/insert/delete), etc. HBase has this support starting 0.92 - however it should be used carefully as the logic now runs on the "server" and is exercised at the "server level". E.g. if you introduce "read path logic", it is loaded at regionserver startup and is exercised for every row of every table that is read. The onus is on you to make your code efficient. 


Performance and Scalability

It is essential that you understand the read-write path and storage architecture discussion to better appreciate the performance and scalability strengths of each. All the same, here's a recap for both. HBase shards/partitions the rows of a table by rowkey ranges with each range being referred to as a region. The range is dynamic and is based on the amount of data. The regions in turn are balanced/distrubuted across the HBase regionserver nodes. HBase write-path consists of clients directly sending (writing) the data to the regionserver that hosts the rowkey range and the regionserver writing to a commitlog followed by a write to an in-memory column-store-specific write cache - viz. memstore. The read-path consists of a client requesting the regionserver for the row(s) by rowkey or rowkey range or a scanfilter. The search for the data begins in the blockcache (not sure if the blockcache is columnfamiy specific or not), failing which disk search is made in the column-family storefiles. Optional bloom filters can make the search efficient. Data is retrieved from the disk in units of HBase blocks with the blocksize being configurable at the HBase table-level. A judicious HBase table blocksize helps balance the number of block I/O requests and memory cache utilization. Therefore you will see that HBase performs best when the writes happen with random rowkeys and the reads happen in contiguous ranges of rowkeys. Randowm write help take advantage of the memstore across differen regionservers in the cluster while the sequential reads help take advantage of the block cache by increasing the block cache hit ratio. Also, as discussed earlier, there is a Hadoop DFS level block cache and an HBase-level block cache for reads - each with its own, configurable block size and cache size.

In Cassandra, the rows of a column family are randomly distributed across the nodes and the same rationale of random writes helping spread the workload across the cluster applies. Unlike the double caching in the case of HBase reads, Cassandra has 2 caches smaller amount of memory  is supported by 2 utilizied/exercised for reads. There's the key cache and the row cache which cache keys and rows respectively. The sample/default configuration file that is bundled with Cassandra is quite well documented and explains how these caches can be tuned. Note that one of the advantages of the key and row caches is that Cassandra only caches the frequently accessed rows and keys not complete blocks. So for a given memory configuration, Cassandra will be able to fit more frequently accessed rows than HBase.

Overall, HBase has non-trivial overhead and is more suitable when writes are random and reads are somewhat sequential (small batches of sequential rows). Cassandra is more suitable for random reads as well as random writes. 


Installation, Configuration and Fault Tolerance

HBase requires Hadoop and Zookeeper to be operational. Each of the three components have their own separate configuration files and parameters. In the past, one had to be careful about compatibility between these components - which is less of an issue now, since they now keep these inter-dependencies into consideration as they evolve. In terms of fault-tolerance, Zookeeper is already fault-tolerant, and Hadoop has just released some fault tolerance in the Namenode and JobTracker (haven't had a chance to understand or exercise them yet). As for fault-tolerance in HBase it covers both the regionservers as well as the master node. If a regionserver goes down (detected by the master via zookeeper), the master reassigns the regions for that node to another regionserver. And for the master, one can configure a secondary master which keeps an eye on the primary via zookeeper. If the secondary master detects that the primary has not updated its status with a certain interval, the secondary takes over. Cassandra on the other hand is completely self-contained and compared to HBase+Hadoop+Zookeeper ensemble, is easier to install, setup and configure with well documented configuration parameters. It may take a while though for people to get used to (and trust) the implementation of consistency in Cassandra. From a fault-tolerance perspective, Cassandra might have an upper-hand based on the fact that all nodes are considered identical and there is no master/slave differentiation among the nodes. 


Operations: Monitoring, Backup and Replication

Hadoop and HBase come with tightly coupled monitoring capabilities in the form of Ganglia. And ofcourse, there's the Cloudera Manager from Cloudera which is tightly coupled with their distribution. It allows both cluster management and monitoring of Hadoop, HBase and other components that are part of the Cloudera stack. Furthermore, both Hadoop and HBase have explicit classes in the API for monitoring, and one can create custom monitoring solutions if needed. As for Cassandra, I do not know of any monitoring solution other than the Operations Center available from DataStax. However the commandline tool nodetool is quite powerful enough to provide a comprehensive set of metrics/stats. 

HBase provides full and incremental table-level backups of the data while Cassandra provides only full (not sure!) backup of the storage files. 

For the purpose of this discussion, it is assumed that replication is for providing a copy of the data at a remote datacenter for the purpose of disaster preparedness and/or closeness of data to (remote) applications. HBase provides table-level as well as column family replication that needs to be explicitly configured and enabled. Cassandra on the other hand has transparent, built-in remote datacenter replication capabilities (which implies that all the data is replicated!). 


Querying and CLI

Both HBase and Cassandra have a client query tool - however HBase's client has limited capabilities with only simple mechanisms for data/row insertion and retrieval. There is no explicit update since a "put" is the same as update if a row with the corresponding rowkey already exists. However the Hadoop+HBase ecosystem has hive and pig that are pretty decent data retrieval tools. Cassandra on the other hand also has a command-line interface that allows data retrieval and insertion using CQL - Cassandra Query Language. CQL is comparably sophisticated and feature-rich. Although for most practical purposes, non-trivial applications are developed in higher languages like Java, PHP, Python, etc as far as both HBase and Cassandra are concerned.


Conclusion

HBase being tightly integrated with the Hadoop framework has a very good supporting ecosystem of tools, technologies and proven practices. Cassandra on the other hand is comparatively a loner out there, although DataStax has integrated and bundled Cassandra with the Hadoop map-reduce framework to give the same (similar?) advantages. It is well accepted that map-reduce is a good big data processing paradigm, more applicable to batch processing of data and is here to stay. HBase is a good one-stop-shop choice for organized data (organized data = my term for loosely structured, or structured + unstructured data). However Cassandra seems to be an excellent choice for large-scale web and OLTP applications, especially if one is looking for data center redundancy and better resiliency. Note that although HBase's underlying Hadoop framework is batch-oriented, HBase is not, and one can use HBase for batch processing as well as a backend for OLTP/transactional processing systems. In the discussion above, I only discussed the "random partitioner" of Cassandra as that is the preferred choice, but Cassandra also has an ordered partitioner that stores data in the order of the primary key or rowkey. Another compelling reason to use Cassandra is the good support for secondary indexes. However Cassandra (significantly) lacks behind in terms of community, industry and marketing/media support - the like of which is attracted by HBase because of its close association with Hadoop.


References

You didn't think that I knew everything about HBase and Cassandra and am the authority on those subjects!  Did you?!


Besides the Apache websites for the respective projects (HBase, Cassandra), I referred to the following to enrich my understanding. However any misinterpretations are solely due to my shortcomings and not the referenced articles.


Cassandra

Aaron Morton's Blog
Note: Aaron is "the guy" who helps everyone on the Cassandra usergroup !
DataStax's Documentation Repository

HBase

Intro to HBase Internals and Schema Design
HBase I/O – HFile 
HBase Write Path 
Configuring HBase Memstore: What You Should Know
HBase Log Splitting 
Online HBase Backups with CopyTable 

Sunday, May 27, 2012

Takeaway from HBASECON 2012........

What better way to start a blog on BigData then to share one's thoughts about the just concluded First HBase Conference - HBaseCon 2012. The one day conference on May 23rd was a great opportunity to see many of the big names as well as enthusiasts of the HBase world come together and share their experiences and advice. 

Unlike many other conferences that I have attended in the past, this was unique because there wasn't a single vendor or vendor booth in the conference - it was "all stuff and no fluff". Ofcourse, there was Cloudera (a vendor!) - but hey, they were the conference organizer, and true to the spirit of the conference, there wasn't a single "Cloudera salesperson" around trying to ask you about your project or trying to sell you their consulting services. 

The conference was followed by an HBase Hackathon that was a wonderful experience to get an insight into the minds of the HBase committers and a great learning experience.

There were a number of takeaways from the HBaseCon which are listed below in no particular order. Note that these are purely my takeaways of the sessions that I attended. There were 4 tracks in parallel - so there were more than 4 times the number of sessions that I have listed below!

One of the biggest takeaway from the HBaseCon was Facebook's sharing of their use of HBase by Karthik Ranganathan. Facebook's showcase HBase implementation is their messaging application. We all know the amount of data that is pumped into Facebook by everyone and we would think that they would have a humongous HBase cluster. However (atleast) I was wrong - cause they have done a wonderful job of sharding their data and the shards are spread across a number of HBase clusters - referred to as "cells". The cells have about 200 nodes with some judicious use of intra-datacenter and inter-datacenter replication. It was also a pleasant surprise that their production environments use Hadoop version 0.89-fb - while the current stable Apache HBase version is 0.94. Finally, they are using HBase like an OLTP data store - configuring the HBase block size as 16KB (default is 64KB). This block size has allowed them to get the best balance in terms of compression, block index size control and efficient and effective use of block cache & memstore. My guess is that the rationale behind this block size is that it allows efficient use of the memory structures and blocks from the low, hardware-level L2/L3 and TLB (translation lookaside buffers) cache and associated memory management system/architecture. 

A second takeaway from Facebook was their use of HDFS hard-links for backups. Hard-links are Facebook's creation that are being fed into the open-source trunk (see Hadoop Jira 3370). The immutable nature of HFiles makes them an ideal candidate for backup as described in the session "HBase Backup" by Sunil Sitaula (Cloudera) and Madhuwanti Vaidya (Facebook). 


And a third takeaway from Facebook was Ryan Thiessen showing their battle scars of being DevOps for large HBase installations as theirs. He was able to drive home the fact about how different a large, distributed system is from a traditional RDBMS installation. When designing a traditional RDBMS system, one has the luxury of packing in all kinds of redundancy - RAID disks, redundant RAID controllers, redundant NICs, HA clusters, etc. However that design cannot be scaled to a large number of cells, each composed of about 200 nodes. And as per their experience - rack and cluster level network switch failures were the most disruptive failures. This was inspite of the Hadoop Namenode and HBase Master nodes being the obvious SPOFs (single points of failures).

Lars Hofhansl from Salesforce provided a very easy to follow walk-through of some of the HBase APIs encouraging one (or atleast me!) to peek into the code and try to understand some of the internals. His session - "Learning HBase Internals" explained how HBase guarantees row-level atomic operations, how blocks are used, how compression is applied, etc.

Ian Varley's (Salesforce) session on HBase Schema Design progressively walked us through how to design entities and entity relationships. He also emphasized the "cost of  verbosity" in your schema design - i.e. the larger your rowkeys, column-family and column names, higher is the overhead for each data cell metadata. He pointed out the use of very succinct data structure and schema design in TSDB.

Finally, Mikhail Bautin (Facebook) had a ton of best-practices and checklists for us. Every slide of his had a few tips - and I don't remember a lot of them at this time. So be on the lookout for his presentation to be shared/published on HBaseCon, Cloudera or somewhere else!  

I hope that this HBaseCon has set the bar and we will see more knowledge sharing and a conference that is "more stuff and less fluff" and zero to minimal hustling from vendors.

Ah, before I forget, let me share my Hackathon experience. It was organized/hosted by Cloudera at their offices. It was an immense pleasure to see all the big HBase names like Todd Lipcon, Michael Stack, Lars Hofhansl, Andrew Purtell, Lars George and many others come together in encouraging people like me to download and compile HBase source code, browse through the Jira tickets, do code-reviews, etc. In fact David S Wang - the Cloudera HBase development team manager himself sat with me for a while walking me through the process of building Hadoop and HBase from source.

This article is a tribute to the HBase team and their warmth and eagerness to help the community.