Process a Million Songs with Apache Pig

Datetime:2016-08-23 02:40:04         Topic: Apache Pig          Share        Original >>
Here to See The Original Article!!!

The following is a guest post kindly offered by Adam Kawa, a 26-year old Hadoop developer from Warsaw, Poland. This post was originally published in a slightly different form at his blog,  Hakuna MapData!

Recently I have found an interesting dataset, called  Million Song Dataset  (MSD), which contains detailed acoustic and contextual data about a million songs. For each song we can find information like title, hotness, tempo, duration, danceability, and loudness as well as artist name, popularity, localization (latitude and longitude pair), and many other things. There are no music files included here, but the links to MP3 song previews at can be easily constructed from the data.

The dataset consists of 339 tab-separated text files. Each file contains about 3,000 songs and each song is represented as one separate line of text. The dataset is publicly available and you can find it at  Infochimps or  Amazon S3 . Since the total size of this data sums up to around 218GB, processing it using one machine may take a very long time.

Definitely, a much more interesting and efficient approach is to use multiple machines and process the songs in parallel by taking advantage of open-source tools from the Apache Hadoop ecosystem (e.g. Apache Pig ). If you have your own machines, you can simply use CDH (Cloudera’s Distribution including Apache Hadoop), which includes the complete Apache Hadoop stack. CDH can be installed manually (quickly and easily by typing a couple of simple commands) or automatically using Cloudera Manager Free Edition (which is Cloudera’s recommended approach). Both CDH and Cloudera Manager are freely downloadable here . Alternatively, you may rent some machines from Amazon with Hadoop already installed and process the data using Amazon’s Elastic MapReduce (here is a cool description writen by Paul Lemere how to use it and pay as low as $1, and here is my presentation about Elastic MapReduce given at the second meeting of Warsaw Hadoop User Group ).

Problem Definition

I came up with the idea to process this dataset to find “exotic” (but still popular) songs. By exotic songs, I simply mean a song that is recorded by an artist who lives in some foreign country, far away from other artists. The general goal is to discover a couple of fancy folk songs that are associated with the culture of some country. A funny example could be the song  “Koko Koko Euro Spoko”  by Jarzębina which was chosen by Poles to be the official song of Polish national football team during UEFA EURO 2012.

Apache Pig

I have used Apache Pig to achieve this goal. Apache Pig is a convenient tool created at Yahoo! to analyze large datasets easily and efficiently. Apache Pig provides a high level and easy to learn, understand and maintain data flow programming language, called PigLatin.

PigLatin is quite similar to scripting languages, it supports many useful operations like filtering, sorting, aggregation, joining, splitting and provides several complex data types (tuples, bags and maps). An exemplary PigLatin script is 20x shorter than its equivalent in Java MapReduce and takes a programmer 16x less time to implement it, at the same time being only slightly slower than Java MapReduce (see PigMix2 benchmark ). Thanks to these advantages, Apache Pig is often used by many well-recognized companies and organizations e.g. Yahoo! (currently about 90% Hadoop jobs is written in PigLatin), Twitter, Nokia Ovi Maps, AOL, Mendeley, LinkedIn and ICM UW.

PigLatin Script

To find such popular folk and local songs, I have implemented a simple PigLatin script (about 50 lines of PigLatin code). This script uses a bit naive, but quite effective idea and tries to search for “isolated” songs. An isolated song is simply a song where the average distance between its artist localization and any other artists is as low as possible (to be more precise, I should say that an isolated song is a song recorded by an isolated artist). This approach gives us much bigger probability to discover “ear-catching” songs from Congo, Mali, Poland and Vietnam than from the US or the UK.

As mentioned, the dataset contains artists’ localization information in form of lat/long pairs and luckily there is a open-source library DataFu (created by LinkedIn) that provides e.g. PigLatin UDF to calculate distance between two lat/long pairs using the Haversine formula .

I read the input data and filter out unpopular songs or songs without lat/long localization (relation Filtered). Then I produce all pairs of different songs (relation Different) and calculate distance between their artists localization (relation Distanced). Next, for each song, I calculate the average distance between the song’s artist localization and all other songs’ artists localization (relation AvgDistanced). Now, I can limit my records to take only the most interesting results. Firstly, I want to have only the hottest song for a given location (relation Popular). Then I remain only a small number of songs with the lowest average distance (relation Limited) and finally store output results in the the format that will be useful for further visualization with GoogleMaps (relation Displable).

Here is the source code:

SET  default_parallel $parallel
SET  pig.tmpfilecompression true
SET  pig.tmpfilecompression.codec lzo
SET  pig.maxCombinedSplitSize 134217728

REGISTER '/usr/lib/pig-0.10.0/contrib/datafu-0.0.4/dist/datafu-0.0.4.jar';
DEFINE haversineMiles datafu.pig.geo.HaversineDistInMiles();

Songs = LOAD '$input';
--"Project early and often"
Projected = FOREACH Songs GENERATE
        (double) $6 AS artistLat, (double) $8 AS artistLong, $12 AS artistName,
        (double) $43 AS songHotness, $51 AS songTitle, (int) $52 AS songPreview;

--"Filter early and often"
Filtered = FILTER Projected BY (songHotness >= $hotness) AND
    (artistLat IS NOT NULL) AND (artistLong IS NOT NULL);
--"Copy useful fields from Popluar relation"
Filtered2 = FOREACH Filtered GENERATE songPreview as song2Preview, artistLat AS
    artist2Lat, artistLong AS artist2Long;

--"Produce all pairs of different songs and calculate distance between localizations
   of their artists"
Crossed = CROSS Filtered, Filtered2;
Different = FILTER Crossed BY songPreview != song2Preview;
Distanced = FOREACH Different GENERATE artistLat..songPreview,
        haversineMiles(artistLat, artistLong, artist2Lat, artist2Long) as distance;

--"For each song, calculate average distance between its artists and all other artists"
Grouped = GROUP Distanced BY artistLat..songPreview;
AvgDistanced = FOREACH Grouped {
        Distances = Distanced.distance;
        GENERATE FLATTEN(group), AVG(Distances) AS distanceAvg;

--"Find the most popular song for a given location"
Locationed = GROUP AvgDistanced BY (artistLat, artistLong);
Popular = FOREACH Locationed {
        OrderedSongs = ORDER AvgDistanced BY songHotness DESC;
        TopSong = LIMIT OrderedSongs 1;
        GENERATE FLATTEN(TopSong);
--"Find the most isolated songs which were recored by artists
    who live far away from other artists"
Ordered = ORDER Popular BY distanceAvg DESC;
Limited = LIMIT Ordered $topCount;

--"Generate results in such a format that can be easily displayed
   by Google Maps (by copy & paste)"
Displayed = FOREACH Limited GENERATE
        CONCAT('[', (chararray) artistLat), artistLong, songPreview,
        CONCAT('"', CONCAT((chararray) songTitle, '"')),
        CONCAT('"', CONCAT((chararray) artistName, '"')),
        CONCAT((chararray)distanceAvg, '],');

STORE Displable INTO '$output' USING PigStorage(',');

Running PigLatin script

My script takes five parameters: input and output paths, the song’s “hotness” threshold (a float number between 0.0 and 1.0), the number of output songs, and default level of parallelism.

time /usr/lib/pig-0.10.0/bin/pig -p input=input/million-song -p
hotness=0.5 -p topCount=200 -p parallel=100 -p
output=output/million-song/all.5.200.100 exotic-songs.pig


Pig runs this script as sequence of seven MapReduce jobs. The total time of running this script was 40m 47.758s. I have optimized this script by increasing memory available for child tasks, combining small input files, compressing the output of the map tasks and the output of the intermediate jobs using LZO compression. I have also turned off the speculative execution for both map and reduce tasks. (Note that there were no other jobs running on the Hadoop cluster at that time.)

I have run this script on the Hadoop cluster that belongs to ICM UW . I have used three “fat” slave nodes and a virtual machine on separate machine in the role of HDFS NameNode and Hadoop JobTracker. Each worker node has four AMD Opteron 6174 processors (48 cores in total), 192GB of RAM and can store 7TB of data. For the purpose of this Pig script, I have configured each worker node to run 45 map and 35 reduce tasks maximally. (So in total, I can run 135 map and 105 reduce tasks in parallel.) I do realize that it is not typical Hadoop configuration (definitely, it is not commodity hardware), but I just simply use what I have. Currently, there is CDH3u3 installed here, which includes Hadoop 0.20.2 and Pig 0.8.1 by default. However, I have installed Pig 0.10.0 manually on my client machine and used it to submit jobs to this cluster.

The readability and performance of this script could be improved by implementing own user-defined functions (UDFs) e.g. multi-text concatenation, tuning some MapReduce and Pig parameters (e.g. pig.maxCombinedSplitSize) or setting better customized level of parallelism using PARALLEL keyword.

The Results to See

The results can be visualized by using some JavaScipt with GoogleMaps. The map below shows 200 songs, and for each song, I put a clickable marker in its artist’s lat/long localization that displays basic information about this song and the link to a song’s preview.


The Results to Hear

Here are some songs that attracted my attention:



South America


If you find more interesting examples, just let me know or simply put them in the comments bellow. I do realize that my music taste can be questionable ;)

Special Thanks

Since all my calculations were performed on the Hadoop cluster that belongs to ICM UW , I would like to give big thanks to the ICM’s staff for the opportunity to use the cluster. 

Thanks, Adam!


Put your ads here, just $200 per month.