Less is More – Arden's Smalltalk Blog

making hard things easy, the impossible, possible

Archive for the month “January, 2015”

MapReduce, Hadoop, and Cincom Smalltalk

MapReduce is a popular and effective technique that’s used to apply concurrency to problems that often involve large amounts of data, in order to improve performance.

Hadoop is a popular implementation of the MapReduce model or technique.

MapReduce is named after the functional programming functions map and reduce. The map function applies a function to each element in a list, and reduce aggregates or combines the results. MapReduce can distribute the Map work to many machines, and then Reduce summarizes the work into a final answer.

MapReduce and Smalltalk

So how would this work in Smalltalk? To start, let’s determine what the Smalltalk equivalents to map and reduce are.

The collect: method can be used as a Smalltalk equivalent of map, since it can collect the result of a block applied to every element in a collection.
The fold: method (or inject:into: ) can be used as an equivalent of reduce, since it can reduce the results to a single object (simple  examples: finding the maximum, minimum, or sum value).

Pragmatically though, you might also think of map as mapping out the work (to be performed concurrently) to multiple cores or machines, and reduce as combining or summarizing the results from the map work. If you are following the pattern it doesn’t matter if  you use collect: or fold: specifically.

The purpose of Cincom’s MatriX framework is to simplify concurrency. The MatriX framework allows you to easily make many linear solutions concurrent.

The example below shows how to create a solution to a problem, and then use MatriX to create a mapReduce-style solution using the same code with minimal alterations.

A Simple Example

Let’s say that we had a long list of documents (files) and we wanted to get a count of how many times each word occurs in the set of documents. In Smalltalk, we would want to collect the word counts for each file and then combine or fold the results into an aggregated summary.   So how might we do this in Smalltalk?

Let’s start with some basics.

  1. A method to return a list of filenames to use for counting word occurrences
  2. A method that parses the file into tokens (words)
  3. A method that, given a file string, returns a count of the words found in the file
  4. A method that summarizes (reduces) the word counts into one set
  5. A method that provides a local solution using the above methods

We can test and debug by first running it locally, and then move forward distributing the work.

Below are the methods for the above basics, respectively:

Note: Be sure to change the dir in the myFiles method to a location on your machine.

myFiles
        "self myFiles"
        "Returns filename strings"
        | dir fileStrings |
        dir := 'C:\Arden\Documents\Cincom\'.
        fileStrings := dir asFilename filesMatching: '*.txt'.
        ^fileStrings as Array

 

parseFile: fileString
        | fileStream contents words |
        fileStream := fileString asFilename readStream.
        contents := [fileStream upToEnd] ensure:[fileStream close].
        words := contents tokensBasedOn: Character space.
        words copy do:[:word | (word includes: Character cr) ifTrue: [
               words remove: word.
               words addAll: (word tokensBasedOn: Character cr)]].
        ^words


wordCountFor: fileString
        | words |
        words := self parseFile: fileString.
        words := words collect:[:word | word select:[:char | 
		char isAlphabetic] ].
        words := words reject: #isEmpty.
        ^words asBag.

 

reduce: wordCounts
        "Combine the wordCounts and create a Dictionary summary"
        | aggregatedWords finalCounts |
        aggregatedWords := wordCounts fold:[:counts :newCounts | 
		newCounts valuesAndCountsDo:[:word :n | 
		counts add: word withOccurrences: n]. counts ].
        finalCounts := Dictionary new.
        aggregatedWords valuesAndCountsDo:[:word :count | 
		finalCounts at: word put: count].
        ^finalCounts

 

runExampleLocal
        "self runExampleLocal"
        | files wordCounts summary results |
        files :=self myFiles.
        wordCounts := files collect:[:fileStr | self wordCountFor: fileStr ].
        summary := self reduce: wordCounts.
        results := summary associations sort: #value descending.
        (results first: 100) do:[:ea |Transcript cr; show: 
		ea key; tab; show: ea value printString ].

So now that we have this running, we want to distribute the workload to allow the files to be processed and words to be counted, concurrently. The word counts will come back to a central place (our main image) where they will be summarized.

Making this concurrent is a lot of work, right?

Not in Smalltalk with Cincom’s MatriX concurrency framework.

  • Load MatriX
  • Add one line of code to create the virtual machines that do the work concurrently
  • Tweak the line of code that gets the word counts to distribute the work

That’s it! Here is the complete example of our solution running distributed:

runExample
        "self runExample"
        | files vms wordCounts summary results |
        files :=self myFiles.
        vms := MatriX.VirtualMachines new:3.
        wordCounts := [vms do:[:fileString | 
		MapReduceExample wordCountFor: fileString] with: files] 
		ensure:[vms release].
        summary := self reduce: wordCounts.
        results := summary associations sort: #value descending.
        (results first: 100) do:[:ea |Transcript cr; show: ea key; 
		tab; show: ea value printString ].

Note: I ran into an issue with marshaling Bags in MatriX, and I have a patch available. (Thank you Michael for finding and fixing!)

Advertisements

Post Navigation