Wednesday, October 14, 2015

Scala case Class Introspection

I like the case class and use it quite liberally when I want to define an entity that is a collection of value types and want automatic toString and getter methods for those data elements. While the same can be done using Tuples, case classes are better because you can access the data elements or members by their names leading to readable code. So how does that happen in case classes? What happens when you define a case class? Lets answer those questions by defining a simple case class and looking at the compiled bytecode.


As you can see above, when we define a case class, a couple of things happened. First, the class was defined with the traits scala.Product and scala.Serialiable. Second, the class arguments were automatically defined as vals in the class, along with the "getter" methods. Third, some additional methods were added to the class - viz. copy, productPrefix, productArity, productElement, productIterator and canEqual. And fourth, a companion object was also defined.

So what is this Product trait? you can see in the Scala API, that the trait is inherited by all Tuple and case classes. And that's where the additional methods came from. Some of the methods were implementations of abstract methods in the Product trait while the others were overrides.
Here's an example of how you can impact the case class' toString by overriding the productPrefix method from Product.



The other big benefit of the case class is that it extends scala.Serializable. If your case class is made up of value types (e.g. Int or String), you don't need to do anymore work to read/write instances of case classes from/to files, streams, etc.

Thus case classes give you two freebies - toString and serializability.

Beware - case Classes Cannot Be Inherited

However a gotcha of case classes is that you cannot inherit them. So there is a likelyhood of code reptition if the common code is repeated in each related case class. One way to avoid code repitition is to use traits as a base for the common methods and parameters. Here's an example:



All class constructor parameters and any other common class variables and methods that need to be part of all case classes need to (or can be) be defined in the trait. The variables should be defined as abstract as shown above.


Thursday, October 1, 2015

Tips for Successful Kafka Deployments

Introduction

Apache Kafka is an open-source messaging system that provides pub-sub message passing between producers and consumers of messages via conduits called topics. It has a simple, efficient and peer-broker architecture with easy-to-use API. The simplicity along with the good and responsive support from Kafka developers and the Kafka community are some of the key factors for its huge success. It has been successfully deployed at many organizations to transport tens of millions to tens of billions of messages on a daily basis. I too have been using it successfully at my workplace (Conversant) for sometime now to act as a buffering system between Apache Flume and Hadoop. Since then, we have been finding more and more use cases. 

This article summarizes tips and lessons learnt from our 9+ months of experience and covers the following topics (no pun intended!) :

  • Kafka fundamentals and architecture
  • Hardware recommendations
  • Kafka design considerations
  • Broker configuration parameters
  • Operational aspects

Understand Kafka Architecture and Features

While Kafka is a messaging system, it is a little different than traditional JMS or AMQP based messaging systems. Like a traditional message queuing system, one can create virtual pipelines for messages, called topics. Producers publish messages into a topic pipeline which are then fetched by consumers. Topics are supported by one or more peer-to-peer Kafka broker daemons or simply brokers. The collection of brokers is referred to as a Kafka cluster. One arbitrary broker takes on the additional role of controller that performs additional administrative tasks.

Both message producers and consumers are unaware of the existence of each other. Hence for producers, message guarantee is only for its receipt by the cluster. There is no message delivery guarantee for receipt by a consumer or a group of consumers.  The consumer on the other hand has no awareness of which producer a message came from and needs to keep track of messages consumed, and pick up the next message. However, Kafka offers an API that can take of this for the consumer by periodically storing in Zookeeper, the message sequence known as offset, of the last read message. The message sequence is a sequentially increasing number given to each message in a topic. There can be multiple producers pushing messages into a topic and zero, one or more consumers extracting the messages. In delivering messages from producers to consumers, they are persisted on disk for durability, in files called log segments. Messages are automatically purged after a preset time and there are nuances to it based on certain configuration parameters.

These may not seem like an attractive feature list, but what's special is how the topic, producer and consumer throughput can be scaled linearly. Topics are scaled by configuring them to be made up of sub-pipelines, called partitions. Each topic partition has its own sequence number, Each partition takes on a portion of the messaging workload, and is assigned to a broker. If there are more partitions than brokers, than a broker may serve multiple partitions of the same topic. Partitioning of topics allows scaling topic throughput by adding additional pipelines and correspondingly adding servers as needed and distributing/assigning topic partitions over more servers. Furthermore, Kafka employs very optimized data transfer techniques from network buffers to filesystem cache, bypassing copying to process memory, resulting in tremendous throughput capacity.

In practice, a Kafka cluster may be supporting multiple topics and each topic may have multiple partitions with the partitions being distributed among the brokers in a cluster, resulting in a broker supporting many partitions. For fault-tolerance purposes, a partition may be replicated. In that case, a partition has a primary copy served by a broker and there are secondary brokers that replicate the partition by continuously copying messages from the primary partition broker and keeping the secondary copies in-sync with the primary copy. The replica brokers use Zookeeper to keep track of the state of the primary broker. In the event of a primary broker failure, one of the secondary partition brokers takes on the role of the primary broker. It is this partitioning, replication and brokers watching each other's backs that makes Kafka performant and fault-tolerant. Note that the broker hosting the primary partition aka leader partition broker communicates with the producers, consumers as well as replica partition brokers. Thus increasing the number of consumers and producers increases the traffic with brokers.

Kafka architecture allows scaling not only topics (message pipelines) but also producers and consumers. Multiple producers can concurrently push messages into a topic (or partitions of a topic) independent of each other. On the other end of the pipeline, a consumer using the Kafka API can spawn as many threads as the number of partitions. Thus scaling a topic also automatically scales a consumer. In addition, a consumer can be part of a group of multiple co-coordinating consumers, each of which can be multi-threaded, thereby further helping to drain the topic rapidly. Each consumer has its own set of independent offsets - viz. the sequence number of data consumed from a topic partition.

Kafka does not guarantee message ordering within a topic. In a single partition topic, the messages are strictly FIFO. However in a multi-partition topic, the FIFO order is guaranteed only within a partition, but not across the overall topic. Kafka comes with several useful utilities - e.g. a utility to replicate topic data from a source cluster to a target cluster.



Complement Kafka Architecture With Appropriate Hardware

The potential of the highly scalable architecture described above can be realized only if it is supported by appropriate hardware. Since Kafka does not do any significant data crunching, there is no need for high CPU processing power (lets defer the discussion of data compression for now). However, the zero-copy data transfer between network and I/O needs to be augmented by matching high network and I/O bandwidth. In this case, its a no brainer to use multiple 1Gb/sec or a single or dual 10Gb/sec NIC card(s).  As for I/O bandwidth, I tend to differ from the general consensus in the Kafka community to use a bunch of disks as a JBOD (Just a Bunch Of Disks) and creating a filesystem over each drive to get the best I/O throughput from each individual disk. However, I see this as sub-optimal use of combined disk storage and I/O throughput. This is because, topic partitions are distributed across different filesystems/directories. Since it is not always easy to have equally sized topic partitions across ALL topics, there are more chances of some filesystems being highly utilized and others being under-utilized. So instead of JOBD, I have seen it as a better use of resources by creating a RAID-based filesystem (hardware-based RAID highly recommended/preferred/urged) over the set of available disks and matching the gigabit network bandwidth by equally high combined throughput of the RAID-ed disks. RAID has additional benefits. First because you have a RAID-based filesystem, a disk-drive failure does not cause an outage (although there is the potential of slight to somewhat non-trivial performance degradation when a newly replaced disk is being rebuilt). Second, because there is no partition outage, there is no fail over of any Kafka partition leaders and subsequent catch-up by the Kafka partions on the failed drive. Furthermore, in a RAI configuration, a drive failure does not cause a panic situation, and the drive replacement can take place without much intervention and effort (assuming Kafka Admins and SysAdmins are different teams!). A final note on this topic is to use a highly performant filesystem like XFS or ext4 (XFS preferred). We have seen pretty good performance with XFS.

A decent configuration (as of 2015) is a server with a single or dual 10Gb/sec interfaces (or 4x1Gb/sec interfaces) with 8-12 good disk drives (each with 20+ MB/sec), 24-48 GB RAM and 6-8 CPU cores. I believe servers with even more powerful configuration may potentially be constrained by the system bus capacity as a couple of multi-GB NIC interfaces, several 20-40 MB/sec disk drives and the normal multi-core cpu traffic can result in sustained high system bus traffic.

Note that a Kafka cluster also requires a Zookeeper ensemble (cluster), with very responsive Zookeeper processes. In this case, it is highly recommended to either have much smaller but dedicated or very lightly loaded servers for Zookeeper. Additionally it is preferred to have seperate/dedicated drives for Zookeeper data and logs/snapshots.



Topic Design - Hardware Scales Well, Avoid Too Many Partitions Per Topic

Another thing that I disagree with the Kafka community is the over use of partitions for scalability. Having seen that larger number of partitions facilitate high capacity and throughput, it seems to be common advice within the community to have topics with tens or even a hundred partitions. However there are two reasons to go easy with that. First, the hardware today provides pretty sufficient throughput capacity and we dont need to over-eagerly scale topics with partitions. Scale just enough that the hardware has sufficient capacity to provide throughput under normal and high workload as well as when some brokers are down. In the later situation, secondary replica partitions assume the role of leader partitions and thereby can result on additional load on those brokers. Second, higher the number of partitions, higher is the load on the Zookeeper quorum. Zookeeper keeps track of all brokers, partitions (leaders and replicas) and consumer offsets. Having hundreds of partitions across a cluster results in non-trivial amount of low-latency requiring traffic load that can be over-whelming for the quorum and the cluster.



Message And Application Design

Messages in Kafka consist of an optional, non-unique key, and a value that is the the actual message payload. Given such a simple "schema", what's the big deal about message schema design? When the user-provided optional key is absent (i.e. null), the message-id becomes the key. Kafka maintains an index on the data contained in each partition, allowing data to be retrieved by the key. The index can be used by consumers to filter out data, thereby leading to optimal data access and resource utilization. On the other hand, if data filtering is not a requirement, skip using a message key. The default long valued sequence-id serves to be a pretty efficient key type for the index. Additionally, a custom key may require a well-designed custom hashing to evenly distribute data across topic partitions.

Kafka also supports compressing data when reading/writing data to disk. Depending upon the data volume and compression achieved, this can be quite beneficial. For example, imagine the storage savings and the I/O bandwidth reduction for textual data that can have around 4:1 compression ratio. However, note that the compression is for data at rest, not "in-flight" viz. to/from consumers, replica-brokers and producers. For the ultimate benefit, it might be worthwhile to have the producers send the actual message as compressed data and not incur overhead of compression at any of the brokers and also have reduced in-flight message size.

In Kafka, the smallest unit of data transfer is obviously a message. Each message incurs some negligible overhead on producers, consumers and brokers. However, when dealing with billions of messages per day or millions of messages per second, the overhead becomes substantial. This overhead can be reduced by batching messages at the producers i.e. making a collection of messages into one logical message into Kafka. Message batching along with compression at the producer can result in significant resource savings.

Now lets come to application design. The fact that there is no message delivery guarantee nor once-only delivery guarantee, can freak out traditional, transactional application designers. However, Kafka is suitable as-is for many big-data messaging and pipelines - e.g. in log delivery (log delivery can occur in many forms and in many areas - e.g. web server or application server logs). Note that the lack of message delivery guarantee only arises because messages may be expired before they have been consumed. This can be avoided by ensuring a sufficiently high data retention and controlling consuming application outages. As for the lack of once-only delivery guarantee, it arises only a consumer crashes or is restarted. When consumers use the Kafka Simple Consumer API, the API automatically stores the offset in the Zookeeper at periodic intervals. However, if the consumer crashes after consuming a few messages but before the offset can be saved, there is the possibility of duplicate message delivery. This can be avoided by doing some application-level checks in the consumer to find the last delivered message for each partition and fetching the subsequent messages.



Broker Configuration

Kafka has configuration prameters for producers, brokers and consumers. Once sufficient hardware resources are made available to a Kafka server, configure the broker processes so that they have sufficient buffer memory and are able to handle the expected message size and volume. Also, You do not want to carry any more data than needed, so also ensure that data retention is long enough, but not too long. But at the same time, ensure that there is sufficient data redundancy in terms of replicas for each partition. Ensure that you allocate sufficient heap memory to the Kafka broker jvm - say, 6-8 GB.

Here's a good starting point for broker configuration parameters: 

log.dirs: Use a single, large filesystem (preferably backed by RAID in production) instead of multiple directories. As discussed earlier, this is for better I/O and storage utilization.

message.max.bytes: Adjust this as necessary for the expected max message size. It is better to over-configure this for any change in the future. The default value is 1 MB - feel free to increase it to 2-4 MB.

socket.send.buffer.bytes and socket.receive.buffer.bytes: This is the amount of memory the broker process will ask/request when creating socket connections. Increase it from 100 KB to 4 MB so that larger chunks of data are transferred between network and storage.

log.roll.hours: This is the time interval at which Kafka will switch to a new log data file (segment). The default value is 7 days (7 x 24 hours). However, you want the partition data logs to switch more frequently so that older, stale data is not retained in the system because there is newer data in the log segment that prevents it from being deleted. 

auto.create.topics.enable: This parameter controls the auto-creation of topics when a consumer or producer tries to pull/push messages from a non-existent topic. If this is set to true then attempts to produce, consume, or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions. Set it to false. You don't want arbitrary topics being created often due to human or configuration errors. Also when there is a problem - either in the program or configuration which causes an incorrect topic to be accessed or in operation when a topic that should exist does not exist, you want it to be detected as early as possible - taking the approach of fail-fast.

default.replication.factor: The default replication factor is 1, that there are no backup replicas for a partition, and that is unsafe. Increase it to 3 for better resiliency and redundancy.

cleanup.policy: This parameter can be set to "delete" or "compact" and controls how old log segments (data that is older than its configured TTL - time-to-live duration) is handled. The default policy ("delete") will discard old segments when their retention time or size limit has been reached. The "compact" setting will enable log compaction on the topic. Avoid compaction if possible as it incurs additional overhead on the broker as well as system resources (I/O) without any significant benefit, since the only major gain from compaction is potential space savings.

log.retention.{ms,minutes,hours}: This is the amount of time to keep a log segment before it is deleted. Change this to as small a duration as practically possible, while at the same time taking into account for any system outages, maintenance on consumer processes. E.g. keep this as 3 days if you want to wait at most 3 days before you start purging data.

controlled.shutdown.enable: The default value until 8.1 was false but is true for 8.2 onwards. Ensure that it is always set to true to allow for controlled shutdown.

Operations: Monitoring, Administration and Fire-Drills

For a successful deployment, it is essential that you are operationally ready. This includes having the ability to know at a glance about the health and performance of Kafka cluster, early notification of any problems and having the tools and processes (aka SOP or playbook) for troubleshooting effectively and in a timely manner. We use OpenTSDB in our divison for collecting operational and business metrics from software infrastructure and applications. We collect and push 550+ million metric data points a day to TSDB. Initially we used JMXReporter that is integrated with Kafka, but then resorted to a homegrown Scala program. This program collects the state of each broker,and topic partition (viz inSync, isLeader) and the last message sequence number in each partition. This data is collected periodically (10 minutes) and pushed to OpenTSDB. We also use a script to periodically query TSDB for the same metrics and if rate of messages (change of rate of partition sequence number) has fallen below a certain threshold or any partition is not in-sync or a broker is down, we get alerted. In addition, we also have port monitors for the Kafka brokers and zookeepers. Besides using TSDB for metrics display, we also use Grafana dashboards that pull metrics from TSDB to give us an end-to-end picture of data flow (e.g. rate of data flow, hadoop job activity, system load, network traffic, I/O across appropriate clusters).

In a lot of our software components/modules, it is very hard to battle-harden our s/w (very expensive and difficult to reproduce our volume, traffic and variations in a controlled environment). As a result, we do parallel or canary runs on a small subset of servers or data in production itself. 

In the case of Kafka, we first did exhaustive feature testing in QA, making sure that data flowed through and that we did not loose data under normal conditions. Then we did a parallel run with an existing Flume data flow in production. We started with a small, trivial topic and then gradually moved on to larger topics. Along the way we did a couple of fire-drills like shutting down brokers, killing brokers with "kill -15" and "kill -9" and letting the brokers be down for a couple of hours and then restarting them. We observed the partitions on the restarted broker for recovery time. In most cases it was less than 15-30 minutes. 

Then next, we also did a massive reassignment of the partitions - which took about a day to complete. We did not realize at the time, but this is a pretty expensive operation - so understand the implications if and when you embark on such a thing. You may need to do a significant reassignment of partitions when adding/removing a large number of servers. However in most cases, you might be able to avoid a wholesale movement of partitions by reassigning a subset of all the partitions. For that, reassign the replicas for a single partition and understand what happens during that process.