NILE: A Case Study of a Wide-Area Data-Intensive Parallel Application

Chanathip Namprempre
CSE 160/260

Introduction

NILE is a distributed computing environment built to support applications in a High Energy Physics (HEP) experiment called CLEO. The CLEO experiment, which is based at Cornell University, is conducted as an attempt to explain why there is so little antimatter in the universe. In this experiment, the Cornell Electron Storage Ring (shown in Figure 1) is used to generate a beam of electrons and a beam of positrons. These beams are then collided in a detector called the CLEO II detector (shown in Figure 2). The CLEO II detector is equipped with sensors and a magnetic field to detect secondary subatomic particles, among other things, that are generated from the collisions. One of the goals of this experiment is to study decay rates of these particles in order to perhaps verify a hypothesis that antimatter decays at a much faster rate than matter, and hence, there is little of antimatter left in the universe.

Each collision that occurs in the detector is called an event. Most events are ignored while some of them are recorded. Each event takes about 8 Kbytes of storage. Data recorded in this stage is called raw data. It is later processed to compute physical properties of the particles such as masses and charges. This process is called reconstruction. It generates what is called pass2 data, which is later lossily-compressed in the last step to generate roar data. Each event in pass2 and roar data form take about 20 Kbytes and 2 Kbytes of storage, respectively.

The events are recorded at the rate of about 1 event per second (even though many more events are generated per second). The most recent record shows that the experiment generates about 5 terabytes of event data per year. This massive amount of data is what the physicists have to work with. Common operations that  they perform on CLEO data include constructing histograms and computing statistics related to particles of interest. Simulations of the detector using Monte Carlo method to verify the accuracy and efficiency of the detector are also conducted. And roughly every two years, improvements in detector calibration and reconstruction algorithms allow the physicists to recompress the raw data to generated better pass2 data.

These applications have two important aspects in common. First, they can often be divided into jobs each of which operates on a part of the data. These jobs generally perform independent analysis of the data with little or no communication among the work partitions. Second, they do not require much synchronization during the computation. Usually, the only synchronization step is the aggregation of the results from the work partitions at the end of the computation. These two characteristics make CLEO applications naturally parallelizable over a Wide-Area Network (WAN).

The characteristics of CLEO applications and the sheer volume of the data on which they operate presents many interesting issues to be addressed and opportunities to be exploited. NILE is a system that is built to address these issues and to help the applications exploit the opportunities.
 

CLEO Requirements and the Design of NILE

The large amount of data generated from the experiment implies that NILE has to be able to support the demand of applications which are being performed on the data from 24 institutions across the United States and Canada in a scalable manner. As the number of resources and applications increases, the applications must still be able to access the data while incurring as low latency and attaining as high bandwidth as physically possible without performance degradation. This means data layout and management becomes crucial. The NILE data server addresses this requirement. In addition, since it is anticipated that there can be as many as 150 users executing thousands of processing jobs at a time, job scheduling becomes exceedingly important. NILE schedulers address this issue.

One important aspect of NILE that should be mentioned here is the design decision to use Common Object Request Broker Architecture (CORBA) as the middleware. A middleware is a piece of software whose task is to mediate the interactions between applications across heterogeneous computing platforms. Since this is the environment in which CLEO applications operate, a middleware can help facilitate the application programmers' tasks considerably. CORBA is a set of specifications written by an industry consortium known as the Object Management Group (OMG) to facilitate interoperations among distributed objects. It also specifies many layered services commonly needed in distributed computing such as transaction and security. CORBA is chosen for NILE for many reasons including its platform independence, its stability, its various layered services, and its support from major players in the industry.

There are many other aspects of NILE addressing various other issues. One such issue is Fault-tolerance. Since applications dealing with CLEO data generally perform their computation across a WAN in which occasional disconnections are common, it becomes increasingly important that applications are able to operate in a fault-tolerant manner. This property benefits various applications especially those that take a very long time to run (e.g. a Monte Carlo simulation can take 3 months to run on 24 200-MIP workstations). By using techniques such as checkpoint/restart, an application can save the results of the computation before failure occurs and resume its operation after the failure disappears, and hence, reducing the amount of lost work. Fault-tolerance is beyond the scope of this document. For this document, we will focus on the architecture of NILE. We will also discuss the two aforementioned components of NILE- the data server and the schedulers - in more details.
 

NILE System Architecture and Objects

NILE is an object-oriented system. Entities in the system are represented as objects. A user submits his/her job, which is represented as a job profile object, to the local site manager object. The site manager then sends the job profile to the global scheduler which is responsible for breaking up the job into subjobs before forwarding them in an optimal manner to various site managers so that the subjobs can be executed. (An optimal schedule in this case is one that yields the the smallest total execution time for a job.) A schematic illustrating the global system architecture is shown in Figure 3. The details of how the job partitioning and scheduling are done will be discussed later.

As alluded to in Figure 3, a site manager manages resources in its site. Subjobs in a site are managed by a job manager. A job manager is an object that is created on demand as the user submits a job to the site manager. The job manager's task is to oversee the execution of subjobs corresponding to a particular job. For example, Job Manager 1 shown in Figure 4 dispatches 2 subjobs to run on Machine 1 and Machine 2. All jobs in the site are kept track of in the job database.

The job manager manages operation of its 2 subjobs using the planner which runs a scheduling algorithm to get optimal performance for that site. The planner, in turn, makes its scheduling decisions with the help of the resource database (the database of resources for the site), data location manager (the entity that informs the planner of the physical location of the resource that best matches the specification in the job profile), and the host resource monitor (not shown in Figure 4).  Once all subjobs are completed, they pass the results of their computation to the collector (not shown in Figure 4) which is also managed by the job manager.

At the machine level, each machine runs a subjob processor factory object. This object is responsible for creating subjob processor objects. (This programming structure is quite common in CORBA programs. The factory is always running, waiting for requests for object creation.) Once created, a subjob processor deals with the execution of its subjob. Figure 5 illustrates this concept.

One can see that the structure of NILE is highly hierarchical. The global scheduler schedules a job to many sites. A site scheduler (i.e. planner) schedules its subjob to many machines in the site. A subjob processor then oversees the execution of the sub-subjob it's been created to manage. This aspect of the design is the key to achieving scalability.
 

NILE Data Management

As described in previous section, the planner object uses the data location manager object to help make its scheduling decisions. Therefore, the quality of the planner's schedule will depend partly on the quality of the data location manager. To give accurate information about the physical location of the data so as to maximize the aggregate performance of the job, the data location manager must be able to decide which data server can best provide data to which subjob. For example, if there is a data server running locally and if it is available, a good data location manager should inform the planner to schedule the subjob that will be using the data to run locally. This will eliminate the need for data transfer which would be necessary if the subjob were to run on a different site from the data server.

To provide such a service in a scalable manner, NILE employs a distributed data server architecture. The entire data set is partitioned into ranges which are distributed across the system to data servers. Since it is still necessary to maintain a single data base view of the data set, a metadatabase is provided. A metadatabase maintains information about the data locations in the system. A commercial database package called Illustra is used for this purpose.

At one level below the metadatabase in the hierarchy of data management lies the data servers. A data server partitions its data and stores them on local disks. Its front-ends interact with the jobs while its back-ends deal with fetching the data from disks. Techniques such as prefetching and parallel I/O are used whenever possible to obtain optimal performance.
 

NILE Scheduling

One of the most challenging parts of NILE job scheduling is in the global scheduling algorithm. Essentially, the global scheduler is faced with the task of breaking up a job into subjobs and pairing these subjobs with data servers that provide the data for each of them. The goal is to minimize turnaround time for the job.

Various simplifying assumptions are made to make this problem tractable. One of the assumption is that all subjobs consume data at the same rates. Another assumption is that data runs to be processed have approximately same size. With these assumptions, the question becomes "what is the best way to break up a job into subjobs such that the size of the largest data chunks processed by the subjobs are minimized?"

To solve this problem, we represent the data servers and data runs as vertices in a graph G. We add an edge from u to v if and only if the data server u provides the data run v. Finally, we add an edge (s, u) from a new source node s to each data server u and an edge (r, t) from each data run r to a new sink node t. The graph is illustrated below.

Without the nodes s and t, this graph is a bipartite graph and part of the problem we are trying to solve is related to the maximum bipartite matching problem. A bipartite graph is a graph G = (V, E) whose nodes can be partitioned into two disjoint sets S and T such that for any edge (u, v) in E, u is in S and v is in T. A maximum matching is a matching of nodes in S and T such that the maximum number of edges are used in the match. In our case, we want to match the data servers (the set S) to the data runs (the set T) so that one data server can serve more than on data run. Therefore, our problem is not exactly a maximum bipartite matching problem but is very closely related to it.

The maximum bipartite matching problem is solvable using a max-flow algorithm. Two nodes, s and t are added to the graph. The node s is connected to the data servers while the node t is connected to the data runs. A max-flow algorithm can than be used to determine the maximum flow from s to t where a flow in this case is a function mapping an edge in the graph to the set of integers. One important constraint of a flow is that, for a flow along an edge, it does not exceed the capacity of that edge. A maximum flow is a flow that yields the maximum value of the total flow out of the source node s (or equivalently the total flow into the sink node t as a result of the property of flow conservation).

To apply the knowledge outlined above to our problem, we add the nodes s and t to our graph of data servers and data runs. We can then run any one of the max-flow algorithms (such as preflow-push) repeatedly to find the smallest value of the largest flow out of a particular data server node into data run nodes in a max-flow of the graph G. The total running time of this scheduler is O(mn log2( m-1 - (m/(c1+...+cn))) in the worst case where m = number of data runs, n = number of data servers, and ci = number of concurrent connections that the ith data server can handle. For more information on max-flow algorithms and formulation of NILE scheduling into this problem, see [3] and [4], respectively.
 

Conclusion

NILE is a distributed environment that can support HEP applications in a scalable and fault-tolerant manner. Since HEP applications are usually embarrassingly parallel and require massive amount of data processing and computation, it is logical to try to pull together as much resources as possible to carry out the applications' tasks. NILE provides the system support necessary to allow as much parallelism to be extracted from these applications as possible using the resources available over the Wide-Area Network.

References

[1] K. Marzullo, M. Ogg, A. Ricciardi. NILE: Wide-Area Computing for High Energy Physics. Seventh ACM SIGOPS European Workshop, Connemara, Ireland, 2-4 September 1996.
[2] F. Berman, R. Wolski. Scheduling from the Perspective of the Application. Proceedings of Symposium on High Performance Distributed Computing, 1996.
[3] T. Corman, C. Leiserson, R. Rivest. Introduction to Algorithms. MIT Press. 1990.
[4] A. Amoroso, K. Marzullo, A. Ricciardi. Wide-Area NILE: A Case Study of a Wide-Area Data-Parallel Application. 1998.
[5] K. Jeong, T. Johnson, H. Yoon, M. Lohner, P. Avery. A High Performance Data Server Optimized for HEP Data. Submitted. 1997.

Study Questions