Begin by creating a clojure directory inside labs/12, and then a src subdirectory inside clojure. Then copy the file arraySum.clj from the course directory /home/cs/214/labs/12/clojure/ into your clojure/src directory.
Use a text editor to open this file, customize their opening documentation, and take a moment to study them to get a sense of what each does.
As before, we will test our work using the files in the directory:
/home/cs/214/labs/12/numbers/Build and run arraySum.clj for each file, and verify that it produces the correct sum for each file, for example:
clojure -m arraySum /home/cs/214/labs/12/numbers/5numbers.txt(After your first run, use the up-arrow key for your subsequent runs, as this will save you a ton of typing and time.)
If the program works correctly, add a Clojure column to the same spreadsheet as you used in the previous parts of this lab. Then run the program three times for each file, and record the middle of the three times on the row for that file in the Clojure column. As before, these will serve as a baseline for each of our files.
Let's go through the file arraySum.clj, to see what new features it incorporates. We'll start with the -main() function, since that is where execution begins.
Command-line Arguments. Unlike our previous programs, this -main() function has a parameter -- inFile -- where the user can specify the name of the file to be used. This mechanism can be used to pass arguments from the command-line to our program, in this case, the name of the input file.
File I/O. The first local variable inside -main() is anArray, which is defined by the function call (readFile inFile). If you scroll up to where that function is defined, it contains several new features:
Summing Array Values Sequentially. To define local variable sum, our -main() function calls sumArray(), which uses a recursive loop() function to perform the summing. After initializing loop parameters sum and i to zero, the loop employs this recursive logic:
Basis: i >= the number of values in anArray:
-> return sum
I-step: (i < the number of values in anArray)
-> return the result of recursively looping with
anArray[i] + sum, ++i.
When this loop() function terminates,
the last value it computes will be the final value of sum,
which will contain the accumulated sum of the values in anArray.
It is worth mentioning that we need to use the loop() functions to define fill() and sumArray(), rather than defining those functions as recursive functions. For example, we could have defined fill() this way:
(defn fill [ array index in]
(if (< index (count array))
(do
(aset array index (.nextLong in))
(fill array (inc index) in)
)
array
)
)
This function works fine for smaller input files,
but when using our large input files,
a StackOverflow error occurs.
To see why, recall that each call of a recursive function pushes an activation record for that function onto the run-time stack. If we were to define either fill() or sumArray() as a recursive function, our large input files would generate so many recursive calls, the number of activation records being pushed would cause our run-time stack to overflow, triggering the StackOverflow exception.
We avoid this problem by having these functions use the loop() function. This solves our problem because the loop() function is limited to tail-end recursion. For tail-end recursion, the Clojure compiler can re-use the same activation record for each recursive call. Each time we re-use the same activation record, we avoid a push onto the run-time stack, and thus circumvent the StackOverflow exception.
Now you know where the name StackOverflow comes from!
Timing a Clojure Computation. To simplify the timing of our computation, we again leverage Clojure's interoperability with Java by invoking the System.nanoTime() method. (Note that the Clojure notation for this is (System/nanoTime)). We call this function twice: once before we use sumArray() to define local variable sum and again afterward. We then use the difference of those two times to define local variable totalTime.
When you have baseline sequential times recorded for each of our input files, continue to the rest of the exercise.
There are a variety of mechanisms for multithreading in Clojure, all of which use a library called async, which stands for asynchronous. Clojure threads operate asynchronously from one another, so this is a descriptive name.
This library is not included with a default Clojure installation, so the first step is to get the latest version of this library.
Setting Up to Use the async Library. From the course directory /home/cs/214/labs/12/clojure, copy the file deps.edn into your labs/12/clojure/ directory. This file contains the following lines:
{:deps
{org.clojure/core.async {:mvn/version "0.4.490"} }
}
This file and these lines tell the Clojure compiler that
this project depends upon the async library
-- Maven version 0.4.490 -- which was the latest version
when this exercise was written.
To see if there is a newer version, go to the async page on GitHub, scroll down a bit, and check the version-number there. If a newer version-number is listed there, update the number in deps.edn with that version-number, save the changes, and close the deps.edn file.
Copying and Updating Your Source File. Next, make a copy of arraySum.clj and name the copy threadedArraySum.clj. Then open threadedArraySum.clj, update its opening documentation.
Updating Your Namespace. In your source file, update the ns() function so that it appears as follows:
(ns threadedArraySum
(:require [clojure.core.async ; need this clause to use async
:refer [chan go put! take! <! >! <!! >!!]
]
)
)
The (:require ...) function tells the Clojure compiler that
this namespace requires the async library.
The :refer clause within it is followed by a
vector of eight functions that we might want to
reference from async.
Any async function that our program uses needs to be
listed in this vector, or the Clojure compiler will generate an error.
(We won't actually use all of these functions,
but we will talk about each of them.)
Updating the -main() Function. Next, scroll down to your -main() function and update it so that it looks like this:
(defn -main [inFile numThreadsStr]
(let
[
anArray (readFile inFile) ; read inFile into anArray
numValues (count anArray) ; determine numValues
numThreads (read-string numThreadsStr) ; numThreadsStr -> integer
startTime (System/nanoTime) ; record start time
sum (sumArray anArray numThreads) ; sum the values
stopTime (System/nanoTime) ; record stop time
totalTime (- stopTime startTime) ; compute total time
]
; output results
(printf "\nThe *parallel* sum of the %d numbers is %d;\n"
numValues sum)
(printf " summing them with %d thread(s) took %d time units.\n\n"
numThreads totalTime)
)
)
Note that most of the -main() function remains the same;
the main difference (ha, ha, ha) is that where arraySum.clj
took a single command-line argument (the name of the input file),
threadedArraySum.clj takes two command-line arguments:
All command-line arguments are passed as string values. To make this clear, we name the new parameter numThreadsStr. Our let() function then uses the read-string() to convert numThreadsStr into the corresponding integer, and uses that integer to define local variable numThreads. We then pass this value to sumArray() as an argument, so that it knows how the number of threads to divide the work among; and to printf() so that it displays that number.
Clojure does not have a means of directly determining the number of arguments given on the command-line, but we could define -main() to take varying numbers of arguments like this:
(defn -main ([] (doSumUsing "/home/cs/214/labs/12/numbers/5numbers.txt" "1")) ([inFile] (doSumUsing inFile "1")) ([inFile numThreadsStr] (doSumUsing inFile numThreadsStr)) )where doSumUsing() would be a new function containing the logic current in our -main() function. We're not going to to take the time to implement this (this exercise is plenty long as it is), but we could.
Parallelizing sumArray(). Next, find the definition of method sumArray(). There, we want to rewrite the function so that the work of summing the array is divided among the desired number of threads. Here is one way to revise sumArray() so that it is defined as follows:
(defn sumArray [anArray numThreads]
(let
[ resChannel (chan) ] ; define shared channel for results
(loop [id 1] ; loop to fork threads,
(when (< id numThreads) ; each writing its partial-sum
(go ; to the channel
(sumSlice anArray id numThreads resChannel)
)
(recur (inc id))
)
)
; main thread does slice 0
(sumSlice anArray 0 numThreads resChannel)
; loop to read each partial-sum
(loop [ sum 0 ; from resChannel and add it to sum
id 0] ;
(if (>= id numThreads) ; Basis: id >= numThreads:
sum ; return sum
; I-Step:
; return sum + next resChannel val
; and recurse (id+1)
(recur (+ sum (<!! resChannel)) (inc id))
)
)
)
)
Take a moment to make sumArray() consistent with this definition.
This definition uses a Clojure version of the same fork-join pattern we saw previously: the first loop forks the new threads, and the second loop combines their partial results into an overall result. Implementing this pattern in Clojure requires us to use several new features, so let's examine them one at a time.
[ resChannel (chan) ]
uses this function to define a local variable named resChannel.
This channel will be shared among our threads;
each thread will put its partial sum into the channel,
and at the end, our main thread will take these partial sums
out of the channel and add them up.
Clojure provides a variety of ways for launching threads, two of which are:
However, when an application only needs a limited number of threads, the go() function should be faster than thread(). The reason is that creating a thread takes time; since go() is using threads from the thread pool, it can eliminate this time spent in thread-creation, making it (at least in theory) the faster option.
Since we aren't using that very many threads, our loop() in sumArray() uses the go() function, though we could just have easily have used thread():
(loop [id 1] ; loop to fork threads,
(when (< id numThreads) ; each writing its partial-sum
(go ; to the channel
(sumSlice anArray id numThreads resChannel)
)
(recur (inc id))
)
)
The call:
(go (sumSlice anArray id numThreads resChannel) )
thus requests a thread from the thread pool
and has it perform a function named sumSlice(),
passing it anArray, our thread's id,
the number of threads, and our results channel.
As we shall see below, this sumSlice() function
will use id and the number of threads to
compute the partial sum of the correct slice of anArray,
and put that partial sum into the channel.
(sumSlice anArray 0 numThreads resChannel)
(loop [ sum 0
id 0]
(if (>= id numThreads)
sum
(recur (+ sum (<!! resChannel)) (inc id))
)
)
This loop() function has two parameters:
sum and id (both initially zero).
If we have N threads, the function loops N times,
with id varying from 0 through N-1.
Each iteration, it checks our base-case
(id >= numThreads)
and if we have reached it, it returns the
values that have accumulated in sum.
Otherwise, it recurses on
(sum plus the next value from resChannel),
and the equivalent of ++id.
Clojure provides three functions by which a thread can take a value from a channel:
Since we want the next value from the results channel and we are outside of a go-block, our best option is the <!!() function, so we use it. The expression:
(+ sum (<!! resChannel))
removes the next value from resChannel,
and adds that value and sum together.
The resulting value is passed as the first argument to recur(),
and so will be the value of sum in the
loop() function's next recursive iteration.
Defining sumSlice(). Our sumArray() function includes two calls to sumSlice(), so let's define it next. We might specify what this function needs to do as follows:
;;; sum a 'slice' of an array based on thread ID and numThreads ;;; ;;; Receive: anArray, the array of ints to be summed; ;;; id, the ID of the current thread; ;;; numThreads, how many workers we have; ;;; channel, the channel to write its slice-sum to. ;;; Postcondition: the sum of the values in this thread's slice ;;; of anArray have been put into channel. ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;Take a moment to paste this documentation into threadedArraySum.clj above the documentation for sumArray().
One way to define this function is as follows:
(defn sumSlice [anArray id numThreads channel]
(let
[ arraySize (count anArray) ; determine array size
sliceSize (quot arraySize numThreads) ; calculate slice size
start (* id sliceSize) ; calc. starting index
stop (if (< id (- numThreads 1)) ; calc. stopping index
(+ start sliceSize) ; - all but last thread
arraySize ; - last thread
)
; calc. sum of my slice
partialSum (calcPartialSum anArray start stop)
]
; write partial sum to channel
(put! channel partialSum)
)
)
Take a moment to copy-paste this definition into threadedArraySum.clj.
Then let's go through it step by step to see how it solves its problem.
Just as Clojure provided three functions for a thread to take a value from a channel, Clojure provides three functions for a thread to put a value into a channel:
When we first wrote sumSlice(), we wrote it using the >!!() function. To our chagrin, the >!!() function worked if we only used newly-spawned threads to call sumSlice() (i.e., if our main thread did not invoke sumSlice()); if our main thread processed a slice, then it would hang upon calling (>!! channel partialSum).
After reading up on channels, it appears that >!!() is a blocking function -- using >!!() to put a value into a channel will suspend the thread invoking it until another thread takes that value from the channel. Because of this, our main thread cannot use >!!() to put an item into the channel if it is the thread that is supposed to take that item from the channel, since it will wait forever.
The put!() function did not suffer from this limitation, so we used it.
Defining calcPartialSum(). Given the arguments we are passing to calcPartialSum(), we might specify its behavior as follows:
;;; calcPartialSum(): ;;; - sum a 'slice' of an array given start and stop indices. ;;; ;;; Receive: anArray, an array containing values to be summed; ;;; start, a long containing the starting index; ;;; stop, a long containing the stopping index. ;;; Return: the sum of anArray[start] .. anArray[stop-1] (inclusive). ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn calcPartialSum [anArray start stop] )Take a moment to copy-paste this documentation and function stub into threadedArraySum.clj.
To finish defining this function, we can use a loop() function to implement the following recursive logic:
Given: partialSum = 0,
i = start.
Basis: i >= stop:
return partialSum.
I-Step (i < stop):
return result of recursively looping with
partialSum + anArray[i], ++i.
We might implement this logic in Clojure as follows:
(loop
[ ; initially:
partialSum 0 ; partialSum = 0
i start ; i = start
]
(if (>= i stop) ; Basis: i >= stop:
partialSum ; return partialSum
; I-step:
; return loop(partialSum + a[i], ++i)
(recur (+ partialSum (get anArray i)) (inc i))
)
)
Copy-paste this into the calcPartialSum() stub.
That's it!
Save all of your changes. Then from the command-line, build and run your program using our simple test file (5numbers.txt). Don't forget that you need to pass both a file-name and the number of threads!
Try it using 1, 2, and 4 threads. Continue when threadedArraySum.clj builds without errors and produces the correct results.
Before we begin performance-testing, recall that the Clojure compiler produces Java byte-code, which is then run on the local Java Virtual Machine (JVM). As we saw in this lab's Java Introduction, the JVM's HotSpot technology affects our timing results, and since Clojure uses the JVM, it is likely to suffer from the same issues as Java.
Clojure's performance can also be affected by a language feature called lazy evaluation. To improve performance (in general), Clojure may not evaluate an expression until its result is actually needed. This means that in our sumArray() function, when we write:
(go (sumSlice anArray id numThreads resChannel) )the call to sumSlice() may or may not happen right away. In fact, there is no guarantee that it will happen until its result is needed -- when our main thread tries to take that thread's partial sum from resChannel.
Clojure's features are designed for concurrent programming, in which performance is not an issue, and it works quite well for such problems. However, it is not a great language for parallel programming, in which performance is critical -- for such problems, we are better off using a compile-to-binary language that does not employ lazy evaluation. Languages like Ada, C, C++, Fortran, ... fall into this category.
Run threadedArraySum.clj on the input files from the course directory, verifying it is correctly computing the sums. For example, to test the largest file with 4 threads you can use:
clojure -m threadedArraySum /home/cs/214/labs/12/numbers/1000000numbers.txt 4
To see what kind of speedup we are getting, add 5 more columns to your spreadsheet, and label them 1 2 3 4 5. For each of the input files, run it three times using 1 thread, and record the middle time in the appropriate row of your spreadsheet under column 1. How does the time for threadedArraySum.clj using 1 thread compare to the time for arraySum.clj?
Repeat this procedure using 2, 3, 4 and 5 threads and record the middle times in the appropriate spreadsheet cells. Then create a spreadsheet chart of your middle execution times for 1000000numbers.txt using 1, 2, 3, 4, and 5 threads. The y-axis should be the time and the x-axis should be the number of threads.
In your spreadsheet, answer the following questions:
Clojure.1. How does the time using threadedArraySum and 1 thread compare to the time using arraySum?
Clojure.2. How do the times using T > 1 threads compare to the times using a single thread? Explain what is happening.
Clojure.3. Under what circumstances does using more threads to solve the problem make the time increase?
Create a clojure.script file in which you use cat to display threadedArraySum.clj, and then show that it builds and runs successfully. Run threadedArraySum, using the file 1000000numbers.txt and 1 thread; then run threadedArraySum, using the same input file and 4 threads. Then quit script.
Also, save your spreadsheet, but do not close it (unless this is your last exercise) since you will be continuing to update it in the other exercises.
If this is your last exercise, follow the Turn In instructions at the bottom of the lab 12 Introduction.
That concludes the Clojure part of this lab.
Calvin > CS > 214 > Labs > 12 > Clojure