Apache Hama [1] is an opensource distributed computation framework written in Java and based on BSP programming model. It is currently a top level project at Apache Software Foundation (ASF). Hama runs on top of Hadoop [2] and can work seamlessly in any Hadoop environment. It processes data which is stored in HDFS and can work with massive datasets for matrix and graph computations. Its capabilities can be best utilized in case of iterative algorithms. Apart from providing the BSP model for computation, Hama also provides graph [3] and deep learning packages [4] for implementing scalable graph and machine learning algorithms.

In the following sections, we will have a look at the architecture and programming API of Hama.

Architecture

Apache Hama is based on layered architecture which utilizes HDFS as file system. Hama consists of three main components: Hama Core provides functionality for matrix and graph computation, Hama User Interface provides console and web based interface for monitoring and Hama Programming API [Seo+10]. Internally Hama Core uses BSP engine for computation. Hama API has already been used to create different projects like graph package and neuron centric machine learning package named Horn.

Apache Hama architecture

The internal architecture of Apache Hama Core differs from other computation frame- works due to its BSP based communication and synchronization mechanism. It is based on Master-Slave model, where master has a unidirectional control on all of its slaves. The architecture of Core consists of three main components where each component is a separate Java Virtual Machine (JVM) deployed in cluster.

BSPMaster (Master)

BSPMaster is responsible for maintaining its own state, status of its slaves, supersteps, counters, task assignment to slaves, job scheduling, providing cluster control interface to users (web and console) and distributing execution classes to slaves [5].

GroomServer (Slave)

Groom Server is responsible for managing the life cycle of tasks assigned to it by BSPMaster. Based on the task, the groom server launches one or multiple BSPPeer JVM processes. Each of the BSPPeer process(es) act as a worker process where the actual computation is done. Groom periodically sends a heartbeat to master to report the status of running tasks and other metrics.

Zookeeper

Zookeeper is used to manage the barrier synchronization between BSPPeers.

 

Apache Hama Internal Architecture

Apache Hama Internal Architecture

Programming API

This section presents the main features of API.

BSP Class

Hama provides an abstract class named BSP which needs to be extended in order to create a BSP based Hama job. This class contains an abstract function bsp which has to be implemented in the extended class. This function runs in parallel and it should include all the computation, message communication and synchronization. Once the execution of this function is complete, the task will be killed. In order to implement iterative algorithms, user can use loops within this function. Apart from bsp function, this class also allows users to override setup and cleanup functions. setup is called before the bsp function and can be used to execute tasks before the actual job starts (e.g. selection of master among all the BSPPeers). cleanup is called after the bsp function and it is guaranteed to be executed even if the job fails. It can be used for logging or monitoring purposes.

BSP is a template class with parameters K1, V1, K2, V2 and M and these parameters should be provided by user extending the BSP class. During job initialization, user can submit an input file which is automatically partitioned and provided as input to all the tasks. The type of this input is represented by V1 and type of key against the input is specified as K1. Each BSP task can also output something based on the user preference. K2 and V2 represent the type of data that user wants to output. M represents the type of messages that will be communicated between the BSPPeers.

Apache Hama BSP Class

Message Sending and Barrier Synchronization

Within the BSP class, users can use the send method of BSPPeer interface to send messages to peers (of type M defined in template of BSP). Messages sent by this method are not guaranteed to be received in the same order. Messages are sent in batches rather than individually to avoid extra overheads. In practice, Hama is very efficient in sending huge amounts of messages.

After sending the messages, barrier synchronization can be requested using sync method. The peer requesting synchronization has to wait for other peers to finish processing and message communication. Once all the peers have reached synchronization, next superstep starts. The job does not stops until all the peers have no more incoming or outgoing messages.

Message Reception

getCurrentMessage function of BSPPeer can be used to pick messages from the received message queue. User can loop through the queue to process all the messages.

Counters

Counters are enumerations defined by Hama or user that can be used to measure differ- ent metrics of job during runtime. They can only be incremented. Hama automatically outputs all the defined counters. They can be useful in measuring the total time spent during synchronization, time spent in computation, total supersteps or total job time.

Reopening the Input

In some cases, user might want to iterate the whole input in a single superstep but accessing the input again will result in exception until reopenInput is called. This function closes the input and opens it again which moves the file pointer back to the beginning.

Others

Following figure shows all the functions that can be useful for the implementation of Hama job.

Apache Hama BSPPeer class

Data Partition

Data partition in Hama can be a little tricky and it depends on multiple factors like total input files, size of input file and total number of tasks specified. This section explains different scenarios and how partition is handled in each of them.

FileInputFormat.addInputPaths method of Hama API can be used to add one or more files as input. Further, BSPJob.setNumBspTask method can be used to set total number of peers to launch. There can be the following cases related to these functions.

No input

User has the option to provide no input and within each task he can use Java framework to access data in any way he wants. In this case, user can launch as many BSPPeer tasks as his cluster allows (by using setNumBspTask).

1..n input file

Once user provides an input file, setting the total number of BSP tasks will have no effect. The total BSPPeer launched will depend on the number of input files and their sizes. Hama partitions the input files based on the block size specified in Hadoop configuration. If the size of file is bigger than the block size, Hama will parition it. Here are a fews examples explaining the parition (assuming the block size is 64MB).

1- If we have 1 file with size 50MB, then Hama will only open 1 task.
2- If we have 1 file with size 200MB, then Hama will open 200/64 = 4 tasks.
3- If we have 2 files with sizes 10MB and 20MB, then Hama will open 2 tasks. One for each file.
4- If we have 2 files with sizes 250MB and 200MB, then Hama will open (250+200)/64 = 8 tasks.

Force setting the number of tasks

Sometimes user might want to partition data into more tasks then Hama allows. For example, compute intensive algorithms sometimes have less data but they want to process on multiple tasks to achieve high performance. Currently, this feature is not supported and still under implementation[6] but user can manually partition the data and provide it as input in order to force Hama to open more tasks.

[1] – https://hama.apache.org/
[2] – https://hadoop.apache.org/
[3] – https://hama.apache.org/hama_graph_tutorial.html
[4] – http://horn.incubator.apache.org/index.html
[5] – http://wiki.apache.org/hama/Architecture
[6] – https://issues.apache.org/jira/browse/HAMA-970?jql=project%20%3D%20HAMA