- Frank Kane's Taming Big Data with Apache Spark and Python
- Frank Kane
- 815字
- 2025-02-20 14:21:24
Looking at the ratings-counter script in Canopy
So now that we have a better idea of what's actually going on here, let's take another look at the ratings-counter script as a whole. Go back to your SparkCourse directory and open it back in Canopy:

Here it is altogether, shown as follows. It looks a bit neater in here, so once again let's review what's going on here:
from pyspark import SparkConf, SparkContext import collections conf = SparkConf().setMaster("local").setAppName("RatingsHistogram") sc = SparkContext(conf = conf) lines = sc.textFile("file:///SparkCourse/ml-100k/u.data") ratings = lines.map(lambda x: x.split()[2]) result = ratings.countByValue() sortedResults = collections.OrderedDict(sorted(result.items())) for key, value in sortedResults.items(): print("%s %i" % (key, value))
We are importing the SparkConf and SparkContext objects from pyspark. You're going to have that at the beginning of pretty much every Spark Python script that we write:
from pyspark import SparkConf, SparkContext
We'll also use the collections object to sort our results so we import that next:
import collections
Next we create the SparkContext object using a SparkConf object that we create with the name RatingsHistogram, and we'll run it locally just on one process for now:
conf = SparkConf().setMaster("local").setAppName("RatingsHistogram") sc = SparkContext(conf = conf)
Now the magic of Spark starts happening in these next lines, we create an RDD called lines by calling sc.textFile and we give it an absolute path to our data file for the MovieLens dataset. That data file contains 100,000 lines, and at this point, every one of those lines will be a value in this lines RDD:
lines = sc.textFile("file:///SparkCourse/ml-100k/u.data")
Now we'll apply a map function to that lines RDD, and pass in a lambda function that splits out that line, breaking it up with whitespace and taking field number 2 out of each line. And again, each line of the u.data file represents a user ID, a movie ID, a rating, and a timestamp, so field number 2, given that we start counting from 0, ends up being the rating value from each line:
ratings = lines.map(lambda x: x.split()[2])
What this line does is, it takes our lines RDD that contains raw lines of text from our raw input data, and creates a new RDD called ratings, which contains only the ratings values from that input data.
Now that we have our RDD transformed into the form we want, where every line and every value of the RDD is a rating value, in the next line we can call countByValue to get a count of how many times each unique rating value occurs:
result = ratings.countByValue()
This gets the output into a plain old Python object called result. We can then turn around and use the collections package to sort those key:value pairs of keys representing the ratings, and values representing the count of each rating:
sortedResults = collections.OrderedDict(sorted(result.items()))
We sort those, and then in the next lines, we iterate through each result and print them out on the screen:
for key, value in sortedResults.items(): print("%s %i" % (key, value))
That's just plain old Python code there, given the result generated by our ratings RDD when we called countByValue on it.
Once more, let's run this just for good luck. Now that you know what's going on, it should make a little bit more sense. Let's open Canopy Command Prompt and type in spark-submit. This is a tool that's part of the Spark framework that allows you to submit Python scripts to the Spark framework; it does all the magic that needs to happen under the hood to make that actually run in the Scala environment for Spark. We also need to pass in the name of the script, ratings-counter.py. Then we can run our script using the following command:
spark-submit ratings-counter.py
You'll see some warnings every time we run on our local machine here, you can safely ignore them, by the way. These are our results:

So once again we have 34,000 4 star ratings, 21,000 5 stars, and so on and so forth. You can see now how that all happened:it was through the magic of RDDs. We started with one RDD that represented every line of our input data:
lines = sc.textFile("file:///SparkCourse/ml-100k/u.data")
We then created a new RDD that just extracted the ratings from each line:
ratings = lines.map(lambda x: x.split()[2])
Then, on that RDD, we used countByValue to produce these final counts for each rating type:
result = ratings.countByValue()
Then we just used some plain old standard Python code to sort and display those results when we were done, and that's all there is to it:
sortedResults = collections.OrderedDict(sorted(result.items())) for key, value in sortedResults.items(): print("%s %i" % (key, value))
Alright, now you've not only run your first Spark program, but hopefully you also understand how it works. So let's start building upon that and work our way up to some more complicated examples with Spark. Next, we'll take a look at how Spark handles key value data.