Each collision that occurs in the detector is called an event. Most events are ignored while some of them are recorded. Each event takes about 8 Kbytes of storage. Data recorded in this stage is called raw data. It is later processed to compute physical properties of the particles such as masses and charges. This process is called reconstruction. It generates what is called pass2 data, which is later lossily-compressed in the last step to generate roar data. Each event in pass2 and roar data form take about 20 Kbytes and 2 Kbytes of storage, respectively.
The events are recorded at the rate of about 1 event per second (even though many more events are generated per second). The most recent record shows that the experiment generates about 5 terabytes of event data per year. This massive amount of data is what the physicists have to work with. Common operations that they perform on CLEO data include constructing histograms and computing statistics related to particles of interest. Simulations of the detector using Monte Carlo method to verify the accuracy and efficiency of the detector are also conducted. And roughly every two years, improvements in detector calibration and reconstruction algorithms allow the physicists to recompress the raw data to generated better pass2 data.
These applications have two important aspects in common. First, they can often be divided into jobs each of which operates on a part of the data. These jobs generally perform independent analysis of the data with little or no communication among the work partitions. Second, they do not require much synchronization during the computation. Usually, the only synchronization step is the aggregation of the results from the work partitions at the end of the computation. These two characteristics make CLEO applications naturally parallelizable over a Wide-Area Network (WAN).
The characteristics of CLEO applications and the sheer volume of the
data on which they operate presents many interesting issues to be addressed
and opportunities to be exploited. NILE is a system that is built to address
these issues and to help the applications exploit the opportunities.
One important aspect of NILE that should be mentioned here is the design decision to use Common Object Request Broker Architecture (CORBA) as the middleware. A middleware is a piece of software whose task is to mediate the interactions between applications across heterogeneous computing platforms. Since this is the environment in which CLEO applications operate, a middleware can help facilitate the application programmers' tasks considerably. CORBA is a set of specifications written by an industry consortium known as the Object Management Group (OMG) to facilitate interoperations among distributed objects. It also specifies many layered services commonly needed in distributed computing such as transaction and security. CORBA is chosen for NILE for many reasons including its platform independence, its stability, its various layered services, and its support from major players in the industry.
There are many other aspects of NILE addressing various other issues.
One such issue is Fault-tolerance. Since applications dealing with
CLEO data generally perform their computation across a WAN in which occasional
disconnections are common, it becomes increasingly important that applications
are able to operate in a fault-tolerant manner. This property benefits
various applications especially those that take a very long time to run
(e.g. a Monte Carlo simulation can take 3 months to run on 24 200-MIP workstations).
By using techniques such as checkpoint/restart, an application can
save the results of the computation before failure occurs and resume its
operation after the failure disappears, and hence, reducing the amount
of lost work. Fault-tolerance is beyond the scope of this document. For
this document, we will focus on the architecture of NILE. We will also
discuss the two aforementioned components of NILE- the data server and
the schedulers - in more details.
As alluded to in Figure 3, a site manager manages resources in its site. Subjobs in a site are managed by a job manager. A job manager is an object that is created on demand as the user submits a job to the site manager. The job manager's task is to oversee the execution of subjobs corresponding to a particular job. For example, Job Manager 1 shown in Figure 4 dispatches 2 subjobs to run on Machine 1 and Machine 2. All jobs in the site are kept track of in the job database.
The job manager manages operation of its 2 subjobs using the planner which runs a scheduling algorithm to get optimal performance for that site. The planner, in turn, makes its scheduling decisions with the help of the resource database (the database of resources for the site), data location manager (the entity that informs the planner of the physical location of the resource that best matches the specification in the job profile), and the host resource monitor (not shown in Figure 4). Once all subjobs are completed, they pass the results of their computation to the collector (not shown in Figure 4) which is also managed by the job manager.
At the machine level, each machine runs a subjob processor factory object. This object is responsible for creating subjob processor objects. (This programming structure is quite common in CORBA programs. The factory is always running, waiting for requests for object creation.) Once created, a subjob processor deals with the execution of its subjob. Figure 5 illustrates this concept.
One can see that the structure of NILE is highly
hierarchical. The global scheduler schedules a job to many sites. A
site scheduler (i.e. planner) schedules its subjob to many
machines in the site. A subjob processor then oversees the execution
of the sub-subjob it's been created to manage. This aspect of the
design is the key to achieving scalability.
To provide such a service in a scalable manner, NILE employs a distributed data server architecture. The entire data set is partitioned into ranges which are distributed across the system to data servers. Since it is still necessary to maintain a single data base view of the data set, a metadatabase is provided. A metadatabase maintains information about the data locations in the system. A commercial database package called Illustra is used for this purpose.
At one level below the metadatabase in the hierarchy of data management
lies the data servers. A data server partitions its data and stores them
on local disks. Its front-ends interact with the jobs while its back-ends
deal with fetching the data from disks. Techniques such as prefetching
and parallel I/O are used whenever possible to obtain optimal performance.
Various simplifying assumptions are made to make this problem tractable. One of the assumption is that all subjobs consume data at the same rates. Another assumption is that data runs to be processed have approximately same size. With these assumptions, the question becomes "what is the best way to break up a job into subjobs such that the size of the largest data chunks processed by the subjobs are minimized?"
To solve this problem, we represent the data servers and data runs as vertices in a graph G. We add an edge from u to v if and only if the data server u provides the data run v. Finally, we add an edge (s, u) from a new source node s to each data server u and an edge (r, t) from each data run r to a new sink node t. The graph is illustrated below.

Without the nodes s and t, this graph is a bipartite graph and part of the problem we are trying to solve is related to the maximum bipartite matching problem. A bipartite graph is a graph G = (V, E) whose nodes can be partitioned into two disjoint sets S and T such that for any edge (u, v) in E, u is in S and v is in T. A maximum matching is a matching of nodes in S and T such that the maximum number of edges are used in the match. In our case, we want to match the data servers (the set S) to the data runs (the set T) so that one data server can serve more than on data run. Therefore, our problem is not exactly a maximum bipartite matching problem but is very closely related to it.
The maximum bipartite matching problem is solvable using a max-flow algorithm. Two nodes, s and t are added to the graph. The node s is connected to the data servers while the node t is connected to the data runs. A max-flow algorithm can than be used to determine the maximum flow from s to t where a flow in this case is a function mapping an edge in the graph to the set of integers. One important constraint of a flow is that, for a flow along an edge, it does not exceed the capacity of that edge. A maximum flow is a flow that yields the maximum value of the total flow out of the source node s (or equivalently the total flow into the sink node t as a result of the property of flow conservation).
To apply the knowledge outlined above to our problem, we add the nodes
s and t to our graph of data servers and data runs. We
can then run any one of the max-flow algorithms (such as preflow-push) repeatedly to find
the smallest value of the largest flow out of a particular data server node
into data run nodes in a max-flow of the graph G. The total running time of
this scheduler is O(mn log2( m-1 - (m/(c1+...+cn)))
in the worst case where m = number of data runs, n = number
of data servers, and ci = number of concurrent connections
that the ith data server can handle. For more information
on max-flow algorithms and formulation of NILE scheduling into this problem,
see [3] and [4], respectively.