Thursday, February 18, 2016

Happy OpenTSDB Users With JVM Garbage Collection Tuning

Until last December, I considered Java garbage collection as something to be relegated to the JVM. I believed that the only impactable control we had over it was how much garbage we generated - and would strive to reduce the number of object creation to a minimum. I thought that garbage collection optimization and tuning did not have any significant or noticable impact.

However I was wrong - and had the opportunity to correct that misconception. The holiday season in December gave me some time to attend to a nagging latency problem with an application called OpenTSDB (or simply TSDB). Its a simple, yet very effective, scalable and highly performant timeseries datastore used primarily for monitoring and metrics data. Our usecase was nontrivial with 550+ million metrics per day being injected into the datastore and 20-30 concurrent users monitoring and displaying graphs that would fetch upto a few million metrics per graph and refreshed periodically. With so much activity going on in the system, it was not realistic to get sub-second graph refreshes - and I deemed it acceptable to have some latency to refresh a graph that fetches say, 2 million data points at each refresh. In TSDB once a graph is loaded on the browser, it does not disappear when the next refresh is scheduled. In fact, it is replaced only when the next graph is ready. Thus the user does not see a blank screen, but a graph that is replaced immediately.

Anyway, I decided to give this issue some attention. This required pouring over the TSDB logs and turning on garbage collection logging. As a result, I found that users were experiencing two kinds of latencies from TSDB. The first kind were long GC pauses that occurred quite frequently within TSDB. The second were delays from HBase, the underlying NoSQL datastore. This article focuses on the former.

With the frequent long gc pauses, it was obvious that the TSDB daemon JVM was working very hard to manage memory reclamation. Memory churn (object creation and collection) was probably more a measure of the activity in the system and not something that I could easily and directly influence. Since reclamation was triggered when the fractions of free heap memory falls below a threshold, it seemed that the JVM was crossing that used memory threshold very frequently. There were 2 easy options to help dampen it - increase heap memory or lower the free heap memory threshold(i.e. increase the occupancy threshold). I increased the TSDB memory limit from 6GB to 12GB in 2GB increements and saw noticable reduction in the frequency and duration of garbage collection.

By this time I had given gc a good look over and realized there were still a number of tuning knobs. But first, given the parallel, CMS and G1 garbage collectors, I wanted to find one that would go a long way - one that would give a decent improvement without any explicit tuning. And that work led to the revelations in this presentation

For the TSDB usecase, I discovered that G1 performed best out of the box. However, I did see occasional multi-second GC pauses. While 90% of the GC pauses were reasonable (within 200ms), the 10% of the GC pauses were long. However G1 did have the lowest gc frequency and the least amount of total gc time. The only further tuning that I had to do was to add the following JVM flags and things became smooth.

-XX:InitiatingHeapOccupancyPercent=70 
-XX:MaxGCPauseMillis=100 
-XX:+ParallelRefProcEnabled

InitiatingHeapOccupancyPercent changes the value at heap occupancy level (in %) at which the G1 garbage collection will start a concurrent garbage collection cycle. The default value of 45 is a little too early and does not (or did not) result in much garbage collection.

As the name indicates, MaxGCPauseMillis is a goal for the garbage collector to try to adhere to. Essentially G1 collector will use heuristics from previous cycles to estimate the amount of space it will try to collect to meet the GC pause goal.

And ParallelRefProcEnabled enables multiple/parallel threads during the concurrent collection cycle of the Old generation.

These changes and few changes at the HBase level (related to compaction and hstore tuning) got us back happy TSDB users again.

Tuesday, December 8, 2015

Scala: You have the Option to Try Either

Doesn't make sense, right? Well that's how the Scala types Option, Try or Either will seem like initially. They are an interesting way to solve the problem of returning values from methods. Lets look at each of them by way of examples.

Option

Lets say, you are writing a financial API system. And lets say you need to write a method that accepts a login and a security token and return a valid transaction handle to perform transactions, or an empty handle if either the login or security token are invalid. Then the method might look something like this.


The return type Option[TransactionHandle] above indicates that the method can return an instance of TransactionHandle or None. So here's how you might use the getTransactionHandle method:



Option is thus a wrapper type that encapsulates an instance of scala.Some or None (that's a little over-simplification - scala.Some is a subtype of Option and None is a subtype of Any). scala.Some in turn encapsulates an existing value of some type. Thus when a method returns an instance of Option[T], the returned value is either an instance of Some[T] or None. Option is Scala's way of avoiding the NPE (NullPointerException) surprises by explicitly informing that a method can return an empty value and allow the user to gracefully handle the situation. However more importantly, if the user fails to handle None - it will not cause an NPE. If you examine the Scala doc for Option, you will see that Option can be treated as a collection that has zero or one element. And that collection like treatment is valid whether the Option contains Some or None.


Try, Success Failure

Now lets look again at the getTransactionHandle method. It returns either a valid handle or None (null). We can rewrite that by using Try, Success and Failure as follows:


The return type Try[TransactionHandle] can be processed as follows:


Either Left or Right

Earlier we saw Option as a way to return None or Some. Either gives you the option (no pun intended) to return a value of one type or another. For example, the method below accepts a String and returns an instance of File if the String represents a valid file path or an array of String instances if the input String represents a valid directory, with the array being the contents of that directory. if the String represents a non-existent path, then the method returns None. Note that this is a very contrived example - an array would be sufficent to represent all three scenarios - a non-existent file, a file, and the contents of a directory. However in the simplistic case, an empty array due to a non-existent file v/s an empty array due to an empty directory would be non-distinguishable.


Why All This Fuss About Option, Either and Try?

First of all, the use of Option helps reduce (eliminate?) the dreaded NPEs (Null Pointer Exceptions). Instead of using a safety net of try-catch, you have a smooth logic flow of an if or getOrElse statement. And unlike a try-catch where the exception is handled only around the scope of the method call, using Option, one can examine the method result at various points in the code using their respective boolean methods. Below are the ways you can check for the alternate values from Option, Either and Try.

Option:   isDefined   or   isEmpty
Either:    isLeft         or   isRight
Try:        isSuccess    or   isFailure

Finally, just as you don't need to do "distracting exception handling" you can also seamlessly iterate through Option and Try using "foreach". So if the Option or Try values returned by a method had Some or Success instances, then the iteration would have one item processed, else none. 


Wednesday, October 14, 2015

Scala case Class Introspection

I like the case class and use it quite liberally when I want to define an entity that is a collection of value types and want automatic toString and getter methods for those data elements. While the same can be done using Tuples, case classes are better because you can access the data elements or members by their names leading to readable code. So how does that happen in case classes? What happens when you define a case class? Lets answer those questions by defining a simple case class and looking at the compiled bytecode.


As you can see above, when we define a case class, a couple of things happened. First, the class was defined with the traits scala.Product and scala.Serialiable. Second, the class arguments were automatically defined as vals in the class, along with the "getter" methods. Third, some additional methods were added to the class - viz. copy, productPrefix, productArity, productElement, productIterator and canEqual. And fourth, a companion object was also defined.

So what is this Product trait? you can see in the Scala API, that the trait is inherited by all Tuple and case classes. And that's where the additional methods came from. Some of the methods were implementations of abstract methods in the Product trait while the others were overrides.
Here's an example of how you can impact the case class' toString by overriding the productPrefix method from Product.



The other big benefit of the case class is that it extends scala.Serializable. If your case class is made up of value types (e.g. Int or String), you don't need to do anymore work to read/write instances of case classes from/to files, streams, etc.

Thus case classes give you two freebies - toString and serializability.

Beware - case Classes Cannot Be Inherited

However a gotcha of case classes is that you cannot inherit them. So there is a likelyhood of code reptition if the common code is repeated in each related case class. One way to avoid code repitition is to use traits as a base for the common methods and parameters. Here's an example:



All class constructor parameters and any other common class variables and methods that need to be part of all case classes need to (or can be) be defined in the trait. The variables should be defined as abstract as shown above.


Thursday, October 1, 2015

Tips for Successful Kafka Deployments

Introduction

Apache Kafka is an open-source messaging system that provides pub-sub message passing between producers and consumers of messages via conduits called topics. It has a simple, efficient and peer-broker architecture with easy-to-use API. The simplicity along with the good and responsive support from Kafka developers and the Kafka community are some of the key factors for its huge success. It has been successfully deployed at many organizations to transport tens of millions to tens of billions of messages on a daily basis. I too have been using it successfully at my workplace (Conversant) for sometime now to act as a buffering system between Apache Flume and Hadoop. Since then, we have been finding more and more use cases. 

This article summarizes tips and lessons learnt from our 9+ months of experience and covers the following topics (no pun intended!) :

  • Kafka fundamentals and architecture
  • Hardware recommendations
  • Kafka design considerations
  • Broker configuration parameters
  • Operational aspects

Understand Kafka Architecture and Features

While Kafka is a messaging system, it is a little different than traditional JMS or AMQP based messaging systems. Like a traditional message queuing system, one can create virtual pipelines for messages, called topics. Producers publish messages into a topic pipeline which are then fetched by consumers. Topics are supported by one or more peer-to-peer Kafka broker daemons or simply brokers. The collection of brokers is referred to as a Kafka cluster. One arbitrary broker takes on the additional role of controller that performs additional administrative tasks.

Both message producers and consumers are unaware of the existence of each other. Hence for producers, message guarantee is only for its receipt by the cluster. There is no message delivery guarantee for receipt by a consumer or a group of consumers.  The consumer on the other hand has no awareness of which producer a message came from and needs to keep track of messages consumed, and pick up the next message. However, Kafka offers an API that can take of this for the consumer by periodically storing in Zookeeper, the message sequence known as offset, of the last read message. The message sequence is a sequentially increasing number given to each message in a topic. There can be multiple producers pushing messages into a topic and zero, one or more consumers extracting the messages. In delivering messages from producers to consumers, they are persisted on disk for durability, in files called log segments. Messages are automatically purged after a preset time and there are nuances to it based on certain configuration parameters.

These may not seem like an attractive feature list, but what's special is how the topic, producer and consumer throughput can be scaled linearly. Topics are scaled by configuring them to be made up of sub-pipelines, called partitions. Each topic partition has its own sequence number, Each partition takes on a portion of the messaging workload, and is assigned to a broker. If there are more partitions than brokers, than a broker may serve multiple partitions of the same topic. Partitioning of topics allows scaling topic throughput by adding additional pipelines and correspondingly adding servers as needed and distributing/assigning topic partitions over more servers. Furthermore, Kafka employs very optimized data transfer techniques from network buffers to filesystem cache, bypassing copying to process memory, resulting in tremendous throughput capacity.

In practice, a Kafka cluster may be supporting multiple topics and each topic may have multiple partitions with the partitions being distributed among the brokers in a cluster, resulting in a broker supporting many partitions. For fault-tolerance purposes, a partition may be replicated. In that case, a partition has a primary copy served by a broker and there are secondary brokers that replicate the partition by continuously copying messages from the primary partition broker and keeping the secondary copies in-sync with the primary copy. The replica brokers use Zookeeper to keep track of the state of the primary broker. In the event of a primary broker failure, one of the secondary partition brokers takes on the role of the primary broker. It is this partitioning, replication and brokers watching each other's backs that makes Kafka performant and fault-tolerant. Note that the broker hosting the primary partition aka leader partition broker communicates with the producers, consumers as well as replica partition brokers. Thus increasing the number of consumers and producers increases the traffic with brokers.

Kafka architecture allows scaling not only topics (message pipelines) but also producers and consumers. Multiple producers can concurrently push messages into a topic (or partitions of a topic) independent of each other. On the other end of the pipeline, a consumer using the Kafka API can spawn as many threads as the number of partitions. Thus scaling a topic also automatically scales a consumer. In addition, a consumer can be part of a group of multiple co-coordinating consumers, each of which can be multi-threaded, thereby further helping to drain the topic rapidly. Each consumer has its own set of independent offsets - viz. the sequence number of data consumed from a topic partition.

Kafka does not guarantee message ordering within a topic. In a single partition topic, the messages are strictly FIFO. However in a multi-partition topic, the FIFO order is guaranteed only within a partition, but not across the overall topic. Kafka comes with several useful utilities - e.g. a utility to replicate topic data from a source cluster to a target cluster.



Complement Kafka Architecture With Appropriate Hardware

The potential of the highly scalable architecture described above can be realized only if it is supported by appropriate hardware. Since Kafka does not do any significant data crunching, there is no need for high CPU processing power (lets defer the discussion of data compression for now). However, the zero-copy data transfer between network and I/O needs to be augmented by matching high network and I/O bandwidth. In this case, its a no brainer to use multiple 1Gb/sec or a single or dual 10Gb/sec NIC card(s).  As for I/O bandwidth, I tend to differ from the general consensus in the Kafka community to use a bunch of disks as a JBOD (Just a Bunch Of Disks) and creating a filesystem over each drive to get the best I/O throughput from each individual disk. However, I see this as sub-optimal use of combined disk storage and I/O throughput. This is because, topic partitions are distributed across different filesystems/directories. Since it is not always easy to have equally sized topic partitions across ALL topics, there are more chances of some filesystems being highly utilized and others being under-utilized. So instead of JOBD, I have seen it as a better use of resources by creating a RAID-based filesystem (hardware-based RAID highly recommended/preferred/urged) over the set of available disks and matching the gigabit network bandwidth by equally high combined throughput of the RAID-ed disks. RAID has additional benefits. First because you have a RAID-based filesystem, a disk-drive failure does not cause an outage (although there is the potential of slight to somewhat non-trivial performance degradation when a newly replaced disk is being rebuilt). Second, because there is no partition outage, there is no fail over of any Kafka partition leaders and subsequent catch-up by the Kafka partions on the failed drive. Furthermore, in a RAI configuration, a drive failure does not cause a panic situation, and the drive replacement can take place without much intervention and effort (assuming Kafka Admins and SysAdmins are different teams!). A final note on this topic is to use a highly performant filesystem like XFS or ext4 (XFS preferred). We have seen pretty good performance with XFS.

A decent configuration (as of 2015) is a server with a single or dual 10Gb/sec interfaces (or 4x1Gb/sec interfaces) with 8-12 good disk drives (each with 20+ MB/sec), 24-48 GB RAM and 6-8 CPU cores. I believe servers with even more powerful configuration may potentially be constrained by the system bus capacity as a couple of multi-GB NIC interfaces, several 20-40 MB/sec disk drives and the normal multi-core cpu traffic can result in sustained high system bus traffic.

Note that a Kafka cluster also requires a Zookeeper ensemble (cluster), with very responsive Zookeeper processes. In this case, it is highly recommended to either have much smaller but dedicated or very lightly loaded servers for Zookeeper. Additionally it is preferred to have seperate/dedicated drives for Zookeeper data and logs/snapshots.



Topic Design - Hardware Scales Well, Avoid Too Many Partitions Per Topic

Another thing that I disagree with the Kafka community is the over use of partitions for scalability. Having seen that larger number of partitions facilitate high capacity and throughput, it seems to be common advice within the community to have topics with tens or even a hundred partitions. However there are two reasons to go easy with that. First, the hardware today provides pretty sufficient throughput capacity and we dont need to over-eagerly scale topics with partitions. Scale just enough that the hardware has sufficient capacity to provide throughput under normal and high workload as well as when some brokers are down. In the later situation, secondary replica partitions assume the role of leader partitions and thereby can result on additional load on those brokers. Second, higher the number of partitions, higher is the load on the Zookeeper quorum. Zookeeper keeps track of all brokers, partitions (leaders and replicas) and consumer offsets. Having hundreds of partitions across a cluster results in non-trivial amount of low-latency requiring traffic load that can be over-whelming for the quorum and the cluster.



Message And Application Design

Messages in Kafka consist of an optional, non-unique key, and a value that is the the actual message payload. Given such a simple "schema", what's the big deal about message schema design? When the user-provided optional key is absent (i.e. null), the message-id becomes the key. Kafka maintains an index on the data contained in each partition, allowing data to be retrieved by the key. The index can be used by consumers to filter out data, thereby leading to optimal data access and resource utilization. On the other hand, if data filtering is not a requirement, skip using a message key. The default long valued sequence-id serves to be a pretty efficient key type for the index. Additionally, a custom key may require a well-designed custom hashing to evenly distribute data across topic partitions.

Kafka also supports compressing data when reading/writing data to disk. Depending upon the data volume and compression achieved, this can be quite beneficial. For example, imagine the storage savings and the I/O bandwidth reduction for textual data that can have around 4:1 compression ratio. However, note that the compression is for data at rest, not "in-flight" viz. to/from consumers, replica-brokers and producers. For the ultimate benefit, it might be worthwhile to have the producers send the actual message as compressed data and not incur overhead of compression at any of the brokers and also have reduced in-flight message size.

In Kafka, the smallest unit of data transfer is obviously a message. Each message incurs some negligible overhead on producers, consumers and brokers. However, when dealing with billions of messages per day or millions of messages per second, the overhead becomes substantial. This overhead can be reduced by batching messages at the producers i.e. making a collection of messages into one logical message into Kafka. Message batching along with compression at the producer can result in significant resource savings.

Now lets come to application design. The fact that there is no message delivery guarantee nor once-only delivery guarantee, can freak out traditional, transactional application designers. However, Kafka is suitable as-is for many big-data messaging and pipelines - e.g. in log delivery (log delivery can occur in many forms and in many areas - e.g. web server or application server logs). Note that the lack of message delivery guarantee only arises because messages may be expired before they have been consumed. This can be avoided by ensuring a sufficiently high data retention and controlling consuming application outages. As for the lack of once-only delivery guarantee, it arises only a consumer crashes or is restarted. When consumers use the Kafka Simple Consumer API, the API automatically stores the offset in the Zookeeper at periodic intervals. However, if the consumer crashes after consuming a few messages but before the offset can be saved, there is the possibility of duplicate message delivery. This can be avoided by doing some application-level checks in the consumer to find the last delivered message for each partition and fetching the subsequent messages.



Broker Configuration

Kafka has configuration prameters for producers, brokers and consumers. Once sufficient hardware resources are made available to a Kafka server, configure the broker processes so that they have sufficient buffer memory and are able to handle the expected message size and volume. Also, You do not want to carry any more data than needed, so also ensure that data retention is long enough, but not too long. But at the same time, ensure that there is sufficient data redundancy in terms of replicas for each partition. Ensure that you allocate sufficient heap memory to the Kafka broker jvm - say, 6-8 GB.

Here's a good starting point for broker configuration parameters: 

log.dirs: Use a single, large filesystem (preferably backed by RAID in production) instead of multiple directories. As discussed earlier, this is for better I/O and storage utilization.

message.max.bytes: Adjust this as necessary for the expected max message size. It is better to over-configure this for any change in the future. The default value is 1 MB - feel free to increase it to 2-4 MB.

socket.send.buffer.bytes and socket.receive.buffer.bytes: This is the amount of memory the broker process will ask/request when creating socket connections. Increase it from 100 KB to 4 MB so that larger chunks of data are transferred between network and storage.

log.roll.hours: This is the time interval at which Kafka will switch to a new log data file (segment). The default value is 7 days (7 x 24 hours). However, you want the partition data logs to switch more frequently so that older, stale data is not retained in the system because there is newer data in the log segment that prevents it from being deleted. 

auto.create.topics.enable: This parameter controls the auto-creation of topics when a consumer or producer tries to pull/push messages from a non-existent topic. If this is set to true then attempts to produce, consume, or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions. Set it to false. You don't want arbitrary topics being created often due to human or configuration errors. Also when there is a problem - either in the program or configuration which causes an incorrect topic to be accessed or in operation when a topic that should exist does not exist, you want it to be detected as early as possible - taking the approach of fail-fast.

default.replication.factor: The default replication factor is 1, that there are no backup replicas for a partition, and that is unsafe. Increase it to 3 for better resiliency and redundancy.

cleanup.policy: This parameter can be set to "delete" or "compact" and controls how old log segments (data that is older than its configured TTL - time-to-live duration) is handled. The default policy ("delete") will discard old segments when their retention time or size limit has been reached. The "compact" setting will enable log compaction on the topic. Avoid compaction if possible as it incurs additional overhead on the broker as well as system resources (I/O) without any significant benefit, since the only major gain from compaction is potential space savings.

log.retention.{ms,minutes,hours}: This is the amount of time to keep a log segment before it is deleted. Change this to as small a duration as practically possible, while at the same time taking into account for any system outages, maintenance on consumer processes. E.g. keep this as 3 days if you want to wait at most 3 days before you start purging data.

controlled.shutdown.enable: The default value until 8.1 was false but is true for 8.2 onwards. Ensure that it is always set to true to allow for controlled shutdown.

Operations: Monitoring, Administration and Fire-Drills

For a successful deployment, it is essential that you are operationally ready. This includes having the ability to know at a glance about the health and performance of Kafka cluster, early notification of any problems and having the tools and processes (aka SOP or playbook) for troubleshooting effectively and in a timely manner. We use OpenTSDB in our divison for collecting operational and business metrics from software infrastructure and applications. We collect and push 550+ million metric data points a day to TSDB. Initially we used JMXReporter that is integrated with Kafka, but then resorted to a homegrown Scala program. This program collects the state of each broker,and topic partition (viz inSync, isLeader) and the last message sequence number in each partition. This data is collected periodically (10 minutes) and pushed to OpenTSDB. We also use a script to periodically query TSDB for the same metrics and if rate of messages (change of rate of partition sequence number) has fallen below a certain threshold or any partition is not in-sync or a broker is down, we get alerted. In addition, we also have port monitors for the Kafka brokers and zookeepers. Besides using TSDB for metrics display, we also use Grafana dashboards that pull metrics from TSDB to give us an end-to-end picture of data flow (e.g. rate of data flow, hadoop job activity, system load, network traffic, I/O across appropriate clusters).

In a lot of our software components/modules, it is very hard to battle-harden our s/w (very expensive and difficult to reproduce our volume, traffic and variations in a controlled environment). As a result, we do parallel or canary runs on a small subset of servers or data in production itself. 

In the case of Kafka, we first did exhaustive feature testing in QA, making sure that data flowed through and that we did not loose data under normal conditions. Then we did a parallel run with an existing Flume data flow in production. We started with a small, trivial topic and then gradually moved on to larger topics. Along the way we did a couple of fire-drills like shutting down brokers, killing brokers with "kill -15" and "kill -9" and letting the brokers be down for a couple of hours and then restarting them. We observed the partitions on the restarted broker for recovery time. In most cases it was less than 15-30 minutes. 

Then next, we also did a massive reassignment of the partitions - which took about a day to complete. We did not realize at the time, but this is a pretty expensive operation - so understand the implications if and when you embark on such a thing. You may need to do a significant reassignment of partitions when adding/removing a large number of servers. However in most cases, you might be able to avoid a wholesale movement of partitions by reassigning a subset of all the partitions. For that, reassign the replicas for a single partition and understand what happens during that process.

Thursday, September 17, 2015

I am a Java Geek - Why Should I Learn Scala?

Introduction

Java is undoubtedly the most common enterprise application programming language. However, its not usually the language of choice when it comes to writing small scripts, single-use informal code or code to glue together other scripts. In such cases, many resort to Python.

Thus there often seems to be a dichotomy within enterprises when it comes to programming languages - with Java being the language of choice for application development and Python being popular among data analysts, data scientists, sysadmins and devops. The later groups of people seem to like the interactive shell (REPL), the rapid prototyping, built-in support for high level constructs like file access and manipulation, high level internet protocol support, JSON parsing and processing, etc. 

Well, Scala helps resolve not only this dichotomy but also unite object-oriented and functional programming paradigms:

Scala = Java + Python + functional programming = OOP + FP + Rapid prototyping

Scala is an object-oriented language similar to Java and Python that comes with a REPL-based interactive shell for quick prototyping and has a lot of functionality and features bundled with the core language. And better still, its a language that marries OOP and FP as equal partners.

Isn't this argument reason enough for you to try out Scala? 
I figured you would need more convincing. So here are more reasons.


Scala's DNA and Origins

Martin Odersky, the developer of Scala has distilled into Scala a lot of object-oriented and functional programming features and practical experience, along with lessons learnt while working on Java Generics (introduced in Java 1.5) and the Java compiler. Odersky studied under Niklaus Wirth (inventor of Pascal) and also experimented with several pure OOP and FP languages before embarking on Scala. All this has given a solid foundation to the language's features and capabilities.


Scala Is Not Learning All Over Again

The fusion of OOP and FP is seamless, transparent and each paradigm can be exploited independently as well as jointly (in fact its hard to not use them together seamlessly once you discover their combined power). You can thus approach Scala from either the OOP or the FP camp, get comfortable with the language using familiar paradigms and then incorporate idioms from the other. Furthermore, Scala integrates seamlessly with Java itself - having the ability to use Java libraries and API from Scala and vice-versa. 


Increased Productivity

In the old days (1980s and 19990s), there used to be competitions on writing terse and obfuscated C code. People would try to write code that was often very succinct, hard to read and understand what it was trying to do. These competitions would be hard in Scala - because although the language is succinct, its not hard to to read or understand! As pointed out by Venkat Subramaniam, Scala does away with a lot of ceremony and allows writing code that focuses on the "what needs to be done" rather than decorators and syntactic appendage. Lets look at the example below:

In Scala, you can combine defining a class and its singleton instance by using object as shown above. And just like in Java, when the object is executed by the JVM, it looks for a main method that accepts an array of String. Note the use of Array genenric class typed with String. In Scala, type specification for a generic is done using "[type]" and array indexing is done using "(index)". And you can try out the above code in the Scala REPL shell as shown below:

And if you prefer, you can save it and run as a script as shown below:

Or run the compiled code as shown below:

Ofcourse, most developers will use an IDE like Eclipse or IDEA, but the above examples show the ease of development and prototyping at the most fundamental level. There are many more productivity helpers as discussed next.


Type Inference

Like Java, Scala is a statically typed language. However, it can infer the type of a variable or the return type of a function by analyzing the right-hand-side of an assignment. E.g. the two assignments below are equivalent.

And the concept can be applied when defining functions also (BTW, Scala has "standalone functions" too)

The function myUpperCase above returns a String - although the return type of String is not explicitly specified.


A Powerful match...case

Scala's case expression is very powerful as can be seen from the example below. It forms the basis of several idioms and best-practices in Scala.


Constructors and Built-in Setters and Getters

Scala allows skipping the explicit definition of class constructors by allowing you to define it as part of defining the class signature. The parameters defined in the class signature become the constructor parameters and the statements in the body becomes the body of the constructor. Furthermore, you can skip mundane setters and getters as shown in the example below. Note that the classes below have no "code" - just the signature itself.
Class parameters can be defined as private or as public. If parameters are prefixed with "val" or "var", then they are public.
Class parameters that are defined as "val" (immutable parameter variables) come with "setters" as shown below.

Classes with "var" (mutable parameter variables) come with both "setter" and "getter" functions as shown below:

Under the cover, "ex2.i = 5" is not an assignment, but invocation of a method (but its all transparent).


An Even Better Case With case

As see above, setters and getters come for free when you define public class parameters. But wait, there's more. If you prefix the class definition with "case", you get a "toString" method for free as can be seen here 


Anonymous Functions and Functional Programming

Scala being also a functional programming language, allows using functions and methods to be used in assignments, as function parameters, as return values etc. In many situations, you just want to plug a few lines directly where a function is expected. Scala makes this task easy in the form of anonymous functions. Here's an example



In the example above, "foreach" is an iterator method that requires a function parameter taking one argument. The argument type depends upon the type of the iterated collection  which in this case is of type Int. "x" above is the name for each value each time while iterating from 1 to 10. This "x" is then passed to the explicit function provided as parameter to foreach's function parameter.


More Cool Stuff

There are many more labor-saving, elegant and efficient features that I love - but they need a little more background to really appreciate them. But let me enumerate some of them:
  • Traits (these are the critical foundation for collection classes)
  • Collection classes and concurrent collection classes
  • advanced match...case with pattern matching 
  • Nesting within classes, functions and objects 
  • Companion objects and object's apply methods as default "action"
  • Function currying
  • Ability to use special characters in function names and omission of "()" brackets (this facilitates writing DSLs  or domain specific languages)
  • Ability to define multiple classes in a single file

Finally.....

As you can see, there's so much to Scala - and its best if you peel the layers off yourself at your own pace. The route I took was to use Twitter's crash course and the Neophyte's Guide to get me started and then referring to the Scala Tour, Scana Cheatsheets and the API whenever I wanted to dig a little deeper. And to keep you inspired and motivated, do watch some of Martin Odersky's videos. Then to challenge yourself, try out the 99 problems. Here's a list of some of the online resources that I used and found useful are:

Twitter's crash course on Scala 
The Neophyte's Guide to Scala
The Scala Tutorial, Tour and FAQ
Scala Style Guide
Scala Cheatsheets
Collection article on Scala website
A Concise Introduction to Scala
Scala API Documentation and Source Code Links
Interesting Scala Teasers
Scala Puzzlers
Scala with Style (Video)
Martin Odersky's Video - Working Hard to Keep It Simple
Martin Odersky's Video - The Simple Parts

A note for posterity - this blog article was written in Sept 2015 during the days of Scala 2.10 and 2.11. I am sure there will be many newer and better things in the future!

Saturday, March 30, 2013

A Little Teaser on NLP, Machine Learning, Data Mining, Data Science and Artificial Intelligence


While I touched on document classification in the previous articles, NLP on its own can, and is, applied to solve a variety of problems. Here are some examples.

Email Processing & Categorization
    - Spam v/s non-spam
    - Prioritization
    - Email distillation, summaries and even semantic linking/threading
      (e.g. summary of important unread emails by applying named entity recognition and other techniques)

Social Media Analysis 
    - Facebook messages that evoke most likes
    - Twitter hashtag re-tweet propensity
    - Gender identification
    - Socioeconomic analysis
       (e.g. work done at a data dive for the World Bank)


Document Clustering and Classification 
(Supervised as well as Unsupervised learning)
    - Sorting/grouping documents 
    - Identifying similar/related documents
      (presentation by .....)
    - News feed analysis
    - Financial report analysis (e.g. 10K document analysis)
    - Data analysis, information distillation and presentation
      (e.g. see Narrative Science)

Language Analysis and Processing
    - Automated grading of essays and answers
    - Checking for plagiarization
    - Post-processing of speech-to-text output
      (e.g. in medical transcription and dictation system)

Having vetted your creative side, let me venture into machine learning (ML) which can be considered a superset of NLP. From my perspective, NLP, ML, data science, data mining (DM), data analysis (DA) are all related topics. They all require applying data wrangling/processing, statistical data exploration and visualization. I will refer to these collective fields of study as Data Science (DS). And unlike standard software development, where you have well-defined input and output and processing requirements for each software module or component, DS is highly iterative and the "objectives" are refined over several iterations. It is a mixture of "science", "math", "art" and "black magic"!  

In a nutshell, machine learning is a journey of discovery and learning and has a very fluid process having the following skeleton steps -
  • Define/refine problem or quest
  • Determine what data are needed (its ok not to have all the data upfront)
  • Inspect and visualize available data (using appropriate sampling if necessary)
  • Enumerate anad analyze applicable approaches, algorithms and techniques
  • Determine if any additional features can be obtained or refined from existing or new data data
I am still exploring this "amazing and magical science" and as I "peel the layers", am amazed by its capabilities and potential. It seems that data science with an automated feedback loop and some more bells and whistles becomes AI (Artificial Intelligence). And the Google Driverless Car and a recent quadrocopter ball juggling feat are examples of that. If the links seem to have put you to sleep, these videos will wake you up.

Wednesday, February 13, 2013

Machine Learning: Natural Language Processing Using NLTK - Part 2

So in the previous post, I explored the use of nltk python module and the use of Naive Bayes classifier to classify or predict documents. The results of the classification were quite good - with 11 out of 12 documents being correctly classified. The incorrectly classified document was from George Bush Jr., and it was classified as being from Harry Truman. Naive Bayes used conditional probability to arrive at its result, using the probabilities of the different n-grams occurring in the significant set of n-grams of each of the Presidents. 

Based on casual observations, I had noticed that the documents from Harry Truman were exceptionally long and the feature set on n-grams was consequently quite large, with a significant overlap. George Bush Jr's documents on the other had were not so long and had a variety of n-grams across the documents. So I thought of taking a different approach - that of using Jaccard similarity of the n-grams (see Mining of Massive Datasets). 

And apparently that turned out to be a better approach - and got us 12 out of 12 correct classification of the documents. The program is similar to the previous one with a very small change. 

Python Code

#-----------------------------------------------
# Import pre-amble
#-----------------------------------------------
import sys
import os
import re
import pickle
from nltk.corpus import words
from nltk.corpus import stopwords
from nltk.tokenize import WordPunctTokenizer
from nltk.collocations import BigramCollocationFinder
from nltk.metrics import BigramAssocMeasures
from nltk.collocations import TrigramCollocationFinder
from nltk.metrics import TrigramAssocMeasures
#from nltk.classify import NaiveBayesClassifier
#-----------------------------------------------

#-----------------------------------------------
def print_script_usage():
    ''' Prints usage of this script.

    Script requires two parameters.
    First parameter (referred to as training_data_dir)
    is a top-level directory name that contains 
    sub-directories of documents for "training/learning".
    The name of each sub-directory is considered a
    "label" (classification) and all documents 
    under that sub-directory are classified 
    as having that label (or classification).

    The second parameter is a directory that
    contains test documents that need to be
    classified. 
    '''
    print "\n\nUsage: " + script_name + " <input_train_data_dir> <test_data_dir>\n\n"
#-----------------------------------------------

#-----------------------------------------------
def validate_dir(dir_name):
    ''' Checks if input dir is a valid an existing directory.'''
    if not os.path.isdir(dir_name):
        print dir_name + " is not a directory."
        print_script_usage()
        sys.exit(1)
#-----------------------------------------------

#-----------------------------------------------
def get_featureset(file, single_words=False, bigrams=True, trigrams=True):
    '''Function to extract featureset from a file.

    This is the main function. It takes a file
    as input parameter and generates a featureset for
    the file. 
    The featureset consists of a set (dictionary) of
    key/value pairs where the key is a single word, 
    bigram or trigram from the text in the file and 
    the value is the boolean value "True".
    '''
    #-----------------------------------------------
    # Word tokenization of input file
    #-----------------------------------------------
    try:
        f = open(file)
        all_text = f.readlines()
        f.close()
    except:
        print "Error in opening file " + file + "."
        print sys.exc_info()
    all_text = " ".join(all_text).lower()
    wp_tokenizer = WordPunctTokenizer()
    tokens = wp_tokenizer.tokenize(all_text)
    english_stopwords = set(stopwords.words('english'))
    filtered_tokens = [token for token in tokens if token not in english_stopwords and len(token) > 4]
    total_word_tokens = len(filtered_tokens)
    n_gram_limit = int(total_word_tokens*0.1)
    #-----------------------------------------------
    # Bigrams from word tokens
    #-----------------------------------------------
    if bigrams == True:
        bigram_finder = BigramCollocationFinder.from_words(filtered_tokens, 5)
        bigrams_chi_sq = bigram_finder.nbest(BigramAssocMeasures.chi_sq, n_gram_limit)
        bigrams_raw_freq = bigram_finder.nbest(BigramAssocMeasures.raw_freq, n_gram_limit)
        bigrams_likelihood_ratio = bigram_finder.nbest(BigramAssocMeasures.likelihood_ratio, n_gram_limit)
        bigrams_poisson_stirling = bigram_finder.nbest(BigramAssocMeasures.poisson_stirling, n_gram_limit)
    else:
        bigrams_chi_sq = []
        bigrams_raw_freq = []
        bigrams_likelihood_ratio = []
        bigrams_poisson_stirling = []
    #-----------------------------------------------
    # Trigrams from word tokens
    #-----------------------------------------------
    if trigrams == True:
        trigram_finder = TrigramCollocationFinder.from_words(filtered_tokens)
        trigrams_chi_sq = trigram_finder.nbest(TrigramAssocMeasures.chi_sq, n_gram_limit)
        trigrams_raw_freq = trigram_finder.nbest(TrigramAssocMeasures.raw_freq, n_gram_limit)
        trigrams_likelihood_ratio = trigram_finder.nbest(TrigramAssocMeasures.likelihood_ratio, n_gram_limit)
        trigrams_poisson_stirling = trigram_finder.nbest(TrigramAssocMeasures.poisson_stirling, n_gram_limit)
    else:
        trigrams_chi_sq = []
        trigrams_raw_freq = []
        trigrams_likelihood_ratio = []
        trigrams_poisson_stirling = []
    #-----------------------------------------------
    # Consolidated list of words, bigrams and trigrams
    #-----------------------------------------------
    if single_words == False:
        filtered_tokens = []
    
    all_tokens = list(set(filtered_tokens + bigrams_chi_sq + bigrams_raw_freq + bigrams_likelihood_ratio + bigrams_poisson_stirling + trigrams_chi_sq + trigrams_raw_freq + trigrams_likelihood_ratio + trigrams_poisson_stirling))
    #-----------------------------------------------
    # Featureset for the input file
    # The featureset consists of a key-value pair
    # where key is the word, bigram or trigram and 
    # the value is True. 
    #-----------------------------------------------
    return dict([(token, True) for token in all_tokens if token != tuple()])
#-----------------------------------------------


#-----------------------------------------------
script_name = os.path.basename(sys.argv[0]) or 'this_script'

training_data_dir, test_data_dir = 'C:\\Users\\jthakrar\\Jayesh\\Reference\\Machine-Learning\\Presidential-Speeches\\economic_reports\\Train', 'C:\\Users\\jthakrar\\Jayesh\\Reference\\Machine-Learning\\Presidential-Speeches\\economic_reports\\Test'
training_data_dir, test_data_dir = sys.argv[1], sys.argv[2]
validate_dir(training_data_dir)
validate_dir(test_data_dir)
#-----------------------------------------------


#-----------------------------------------------
print "\n\n\n\n**********************************************************************"
print "\t\t\tT R A I N I N G"
print "**********************************************************************"
print "\n\nProcessing training data from:", training_data_dir

labels = [dir_name for dir_name in os.listdir(training_data_dir) if os.path.join(training_data_dir, dir_name)]

training_featureset = []

for label in labels:
    dir = os.path.join(training_data_dir, label)
    if not os.path.isdir(dir):
        print "\tUnexpected error in resolving directory - " + dir
        sys.exit(1)
    print "\nProcessing sub-directory: " + label 
    for file in os.listdir(dir):
        file_path = os.path.join(dir, file)
        if not os.path.isfile(file_path):
            print "\tfile " + file + "(" + file_path + ") does not seem to be a file"
            continue
        featureset = get_featureset(file_path)
        print "\tCompleted file: " + file
        #print "file = " + file + "  label = " + label + " featureset length = " + str(len(featureset))
        labeled_featureset = tuple([featureset, label])
        training_featureset.append(labeled_featureset)

#-----------------------
# This is where we deviate from the NLTK approach Instead of using the Naive Bayes Algorithm,
# we combine the n-grams for the same label across different documents to get a single n-gram set per label.
# This becomes the "training_data" set. We will apply the Jacardina similarity test to each test document to predict
# the label for the test document.
#-----------------------
training_data = {}
for featureset, label in training_featureset:
    label_set = set(feature for feature in featureset)
    #-----------------------
    # Add each training document's tokens to the training data dictionary.
    # Note that we need to "append" the new training document's tokens to the label's existing tokenset
    #-----------------------
    if label in training_data:
        training_data[label] = training_data[label].union(label_set)
    else:
        training_data[label] = label_set

#-----------------------------------------------

#-----------------------------------------------
print "\n\n\n\n**********************************************************************"
print "\t\t\tT E S T I N G"
print "**********************************************************************"
print "\nProcessing test data from:", test_data_dir
for file in os.listdir(test_data_dir):
    infile = os.path.join(test_data_dir, file)
    featureset = get_featureset(infile)
    label_set = set(feature for feature in featureset)
    print "\n\nProcessing test file", file
    max_value = 0.0
    max_value_label = ''
    file_score = {}
    #-----------------------
    # For each test file, we examine the overlap of the n-grams between the test file/document
    # and each of the the labeled datasets (training data) using Jacardian Similarity 
    # This gives us a file_score dictionary where the key is the label and the value is the 
    # Jacardian Similarity overlap. We then select the key (or label) with the max value as the
    # label prediction for the test file.
    # Note that if we do not use "1.0" as the multiplier below, the file_score[key] will be zero
    # as all other numeric values are integers and the result for file_score[key] is also computed as integer.
    #-----------------------
    for key in training_data:
        file_score[key] = len(training_data[key].intersection(label_set))*1.0/(len(training_data[key].union(label_set)))
        if file_score[key] > max_value:
            max_value_label = key
            max_value = file_score[key]  
    print "\tLabel for", file, "is", max_value_label, "with score of", max_value

print "\n\n**********************************************************************"
#-----------------------------------------------