Bora Kaplan

Data Engineer

Apache Spark Crash Course

Posted at — Jun 1, 2019

This is a fast-paced introduction to Apache Spark. I tried to touch the most important subjects without going too much into details. You will learn enough information to get the overall picture. You can then go deep diving from there. First we will take a look at some theory about whats going on behind the scenes, then we will get our hands dirty with an example.

I gave a talk about these topics, if you would like to watch it (in Turkish) check it out on Youtube.

There is an example application at the end of this article, which you can find on Github.

Lets get started.

What is Apache Spark?

Spark is a distributed data processing engine. It is used for handling very large amounts of data, which Spark achieves with high performance. It is probably the most popular data processing engine at the moment. Even though Spark is mainly used for big data applications, it is not necessary to have big data to use Spark. You can use Spark if you need speed and safety for your data related applications or analytics jobs. You can even use it for machine learning applications.

Spark supports both batch and streaming modes. So you can use it to ingest data in near real-time with streaming mode, or use it to analyze large datasets at regular intervals with batch mode.

Spark is a really fast processing engine. Spark achieves high speed by using memory efficiently and avoiding hard disk IO as much as possible, unlike its predecessors on Hadoop ecosystem.

You can write Spark applications using Scala, Java, Python, R or SQL. Spark’s native language is Scala. So be aware that when a new feature is introduced, it may take some time for it to arrive on other languages like Python.

Example Use Cases

Architecture

Spark runs on a distributed cluster. The cluster is made of a Driver which is the leader of the cluster, Executors which run the jobs and create results, Cluster Manager which handles resource management. Driver controls the job and distributes tasks to executors. Cluster manager provides resources to executors and driver. Spark can run on standalone mode, Yarn or Mesos. It can even run on Kubernetes nowadays.

Spark Architecture

When you write Spark code you create a flow so to say. For example you start by reading a text file line by line. Then you filter it to search some words. Then you count how many times that word has occurred.

Spark represents the flow as a Directed Acyclic Graph (DAG). When you write a Spark application, you actually write a DAG for it to execute. Spark looks at the DAG to see if it can optimize a step. Then divides the steps in the DAG into tasks to send to executors. It also uses DAG for error recovery. When a step fails, Spark re-runs steps before the failed one to regenerate the loss.

Spark DAG

In this example we used operations like map, filter and count. There are two types of operations on Spark; Transformation and Action. Transformations are lazy operations that describe what will be done. They are not executed immediately. Transformations are immutable operations that create a new collection. Actions are what triggers transformations and generate a result. For example count is an action that counts the elements in an RDD and returns a Long. On the other hand map and filter are transformations. They take a RDD and return a new RDD with changes they describe.

Spark APIs

Spark can be written in multiple ways. Its main API is called RDD (Resillient Distributed Datasets). It is the most powerful API that brings low level control with it. All other APIs are translated into this API under the hood. Nowadays it is not recommended to use RDD API for new applications as it can be hard to optimize. Newer APIs are easier to optimize thanks to Catalyst optimization engine. Unless you have a specific reason, try to avoid RDD API. Recent versions introduced Structured APIs that are higher level. These are called DataFrame, Dataset and SQL.

Spark APIs

Using RDD API feels like using just a regular Scala collection. Its like having a List[T] that you can do map or filter operations on but its actually distributed over the cluster. Its the most type safe option. SQL API is just good old SQL that you know. You can create tables and use standard SQL on them. Even though you can use Spark like a database, you should never do that because it is not suited for OLTP applications. DataFrame is like writing SQL but using the programming language while doing it. So instead of writing SELECT first_name AS name FROM Person WHERE age > 18 you write personDataframe.select($"first_name".as("name")).where($"age" > 18). I hope you get the point. Dataset is something in between DataFrame and RDD. It has better type safety than DataFrame but it comes with a little performance cost. I personally think that Dataset is in a sweet spot between RDD and DataFrame and its performance cost can be negligible.

Spark uses micro-batches for its streaming mode. Its not a real-time stream but its near enough. If your use case is latency bound then you should probably check out Apache Flink. If its throughput bound then Spark should do the job just fine. The structured APIs are also available to you in streaming mode via Structured Streaming. You use the same DataFrames and Datasets in streaming applications.

Example application with Last.fm and MusicBrainz Data

I created this application to give you a taste of what its like to write Spark. That’s why I am going to use three different APIs (RDD, Dataset, DataFrame) in the example. You can check out the full example on Github.

I would like to know which bands have I listened the most at 2018, and then get a recommendation for each of them. I will use my Last.fm data to find out who I did listen the most. If you don’t know what Last.fm is, it is a website which you can connect to your music players so it can keep track of each song you listen. For every song you listen it creates a scrobble with info about the song and the date.

I will filter Last.fm data to make it contain scrobbles that I listened at 2018. Then I will count how many times I have listened each band. And then I will join that with MusicBrainz data. MusicBrainz data contains a lot of information about the bands like listen count and similar artists. I will use that similar artists info to give myself recommendations for my top ten bands.

I assume that you downloaded and installed Apache Spark. You can find it on the official website with instructions on how to install. I used Spark 2.4.0 with Scala 2.11 in this example.

Also download the datasets that we are going to use in this example. Last.fm scrobbles and MusicBrainz Artist Info.

When you install Spark you get an executable called spark-shell. It gives you a REPL with ready to go Spark on standalone mode. Now go to directory where you downloaded datasets and open up spark-shell to follow along with the post. If you want to paste code snippets here instead of writing, make sure you enter the paste mode by typing :paste in the shell so you can paste multi-line code snippets. Lets get started by importing dependencies.

import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

Each Spark application needs a starting point. For structured APIs that starting point is called SparkSession (and SparkContext for low level APIs). You start the application by creating a SparkSession instance and giving it some meta data. Now spark-shell already provides you with a SparkSession instance so you don’t need to create one. Just so you know this is how you create one. You don’t need to copy and paste this into shell.

val spark: SparkSession = SparkSession.builder()
  .master("local[*]")
  .appName("lastfm-analyzer")
  .getOrCreate()

Lets create a model to represent Last.fm scrobbles. Then use the SparkSession instance called spark to read the csv file.

import spark.implicits._

case class Scrobble(artist: String,
                    artist_mbid: String,
                    album: String,
                    album_mbid: String,
                    track: String,
                    track_mbid: String,
                    uts: Long,
                    utc_time: String)

val scrobbles: Dataset[Scrobble] = 
  spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("scrobbles.csv")
    .na.fill("N/A")
    .as[Scrobble]

Spark has builtin support for most common file types like csv, json, avro, parquet. We use .csv() to read our scrobble data and infer its schema. We also fill empty fields with an arbitrary value with .na.fill("N/A") part so we don’t get error while processing the data. When you use spark.read to read a file you get a DataFrame. It is the default in the structured APIs. In DataFrame you mostly work with strings and lose type safety. To get back some of the type safety and still be in the high level API zone we can use Dataset API. You can convert a DataFrame to Dataset by using .as[MyModel] on it. Make sure you imported implicit type conversions by import spark.implicits._ as we did at the beginning.

Spark gives you some helper methods to get information about your data. For example with .printSchema() you can check if its inferred schema is correct.

scrobbles.printSchema()

Which should print out something like this

root
 |-- uts: integer (nullable = true)
 |-- utc_time: string (nullable = false)
 |-- artist: string (nullable = false)
 |-- artist_mbid: string (nullable = false)
 |-- album: string (nullable = false)
 |-- album_mbid: string (nullable = false)
 |-- track: string (nullable = false)
 |-- track_mbid: string (nullable = false)

You can also preview the data with show() method.

scrobbles.show()

Its a good way to see if everything works as expected.

+----------+------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       uts|          utc_time|           artist|         artist_mbid|               album|          album_mbid|               track|          track_mbid|
+----------+------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|1546973872|08 Jan 2019, 18:57|        Meshuggah|cf8b3b8c-118e-413...|Nothing ( Re- Rel...|                 N/A|            Obsidian|b0cdecc5-d5a9-4ec...|
|1546973445|08 Jan 2019, 18:50|        Meshuggah|cf8b3b8c-118e-413...|Nothing ( Re- Rel...|                 N/A|            Nebulous|7d533b43-5a6c-458...|
|1546973190|08 Jan 2019, 18:46|        Meshuggah|cf8b3b8c-118e-413...|Nothing ( Re- Rel...|                 N/A|               Spasm|32406b5d-eb5b-41b...|
|1546972873|08 Jan 2019, 18:41|        Meshuggah|cf8b3b8c-118e-413...|Nothing ( Re- Rel...|                 N/A|Straws Pulled At ...|bc0f428a-7f9d-4d6...|
|1546972554|08 Jan 2019, 18:35|        Meshuggah|cf8b3b8c-118e-413...|Nothing ( Re- Rel...|                 N/A|     Organic Shadows|52e0e73b-0f02-49b...|
|1546971557|08 Jan 2019, 18:19|        Meshuggah|cf8b3b8c-118e-413...|Nothing ( Re- Rel...|                 N/A|      Glints Collide|d047c07a-b927-442...|
|1546971111|08 Jan 2019, 18:11|        Meshuggah|cf8b3b8c-118e-413...|Nothing ( Re- Rel...|                 N/A|  Closed Eye Visuals|02254580-0673-47e...|
|1546970833|08 Jan 2019, 18:07|        Meshuggah|cf8b3b8c-118e-413...|Nothing ( Re- Rel...|                 N/A|Perpetual Black S...|a34c4628-11f7-4c0...|
|1546970507|08 Jan 2019, 18:01|        Meshuggah|cf8b3b8c-118e-413...|Nothing ( Re- Rel...|                 N/A|       Rational Gaze|b1f6d637-7dfc-4cb...|
|1546970169|08 Jan 2019, 17:56|        Meshuggah|cf8b3b8c-118e-413...|Nothing ( Re- Rel...|                 N/A|             Stengah|124d056a-dec5-4ac...|
|1546967006|08 Jan 2019, 17:03|Dark Tranquillity|9d30e408-1559-448...|         The Gallery|cd0071fb-eb46-4bc...|...Of Melancholy ...|f7d44c40-381e-476...|
|1546966859|08 Jan 2019, 17:00|Dark Tranquillity|9d30e408-1559-448...|         The Gallery|cd0071fb-eb46-4bc...|Mine is the Grandeur|                 N/A|
|1546966516|08 Jan 2019, 16:55|Dark Tranquillity|9d30e408-1559-448...|         The Gallery|cd0071fb-eb46-4bc...|The Emptiness Fro...|6e5906da-ed5b-40d...|
|1546966233|08 Jan 2019, 16:50|Dark Tranquillity|9d30e408-1559-448...|         The Gallery|cd0071fb-eb46-4bc...|               Lethe|4303910e-a343-4f1...|
|1546966023|08 Jan 2019, 16:47|Dark Tranquillity|9d30e408-1559-448...|         The Gallery|cd0071fb-eb46-4bc...|Midway Through In...|5e789917-1341-4e9...|
|1546965768|08 Jan 2019, 16:42|Dark Tranquillity|9d30e408-1559-448...|         The Gallery|cd0071fb-eb46-4bc...|The One Brooding ...|cbfc3048-ee12-47e...|
|1546965520|08 Jan 2019, 16:38|Dark Tranquillity|9d30e408-1559-448...|         The Gallery|cd0071fb-eb46-4bc...|         The Gallery|7b04f83b-4ee8-43a...|
|1546965219|08 Jan 2019, 16:33|Dark Tranquillity|9d30e408-1559-448...|         The Gallery|cd0071fb-eb46-4bc...|   The Dividing Line|9ac0e29d-bdde-42b...|
|1546964948|08 Jan 2019, 16:29|Dark Tranquillity|9d30e408-1559-448...|         The Gallery|cd0071fb-eb46-4bc...|          Edenspring|202514b8-b547-430...|
|1546964792|08 Jan 2019, 16:26|Dark Tranquillity|9d30e408-1559-448...|         The Gallery|cd0071fb-eb46-4bc...|Silence and the F...|                 N/A|
+----------+------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 20 rows

It looks like we successfully managed to read our scrobble data and convert it to our Scrobble model. Now we must do the same thing for artist data we got from MusicBrainz API. As I said before artist data has a lot of information in it. It is a json file with a lot of inner objects. So we have to use more case classes to model it for our use case.

import spark.implicits._

case class SimilarArtistName(name: String)

case class Tag(name: String)

case class Stats(listeners: String, playcount: String)

case class Artist(name: String,
                  mbid: String,
                  stats: Stats,
                  similarArtists: Array[SimilarArtistName],
                  tags: Array[Tag])


val artists: Dataset[Artist] =
  spark.read
    .json("artistinfo.json")
    .selectExpr("artist.name",
                "artist.mbid",
                "artist.stats",
                "artist.similar.artist as similarArtists",
                "artist.tags.tag as tags")
    .na.drop()
    .as[Artist]

We read json files just like how we read csv files. Then we select parts of the json we are interested in. After that we drop entries if they are missing data and convert it to a Dataset of our model Artist.

Lets preview the data

artists.show()
+--------------------+--------------------+--------------------+--------------------+--------------------+
|                name|                mbid|               stats|      similarArtists|                tags|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|             Shining|ef1849a2-bd73-4e1...|   [156777, 7782587]|[[[[https://lastf...|[[black metal, ht...|
|              Windir|1cc7ebc1-8ea6-43f...|    [78503, 5027072]|[[[[https://lastf...|[[black metal, ht...|
|           Trap Them|b24b6ae8-333e-435...|    [63523, 2763323]|[[[[https://lastf...|[[hardcore, https...|
| Mors Principium Est|8d08484c-c532-457...|   [108169, 5493645]|[[[[https://lastf...|[[Melodic Death M...|
|           The Doors|9efff43b-3b29-408...|[3168883, 132620861]|[[[[https://lastf...|[[classic rock, h...|
|      Kriegsmaschine|a210cc03-3c4f-41c...|     [14422, 660922]|[[[[https://lastf...|[[black metal, ht...|
|              Hiromi|91df368e-a22a-429...|    [81495, 1366498]|[[[[https://lastf...|[[jazz, https://w...|
|       Colin Stetson|22b2f6dc-fcc2-490...|    [95142, 1962858]|[[[[https://lastf...|[[experimental, h...|
|               Azusa|2f7afaf7-ddc9-44a...|     [16342, 512497]|[[[[https://lastf...|[[j-pop, https://...|
|          Embodiment|e99254d7-5168-4fe...|       [1149, 10790]|[[[[https://lastf...|[[death metal, ht...|
|        Sofa Surfers|fc01cdec-f2e3-4d4...|   [429597, 4070014]|[[[[https://lastf...|[[downtempo, http...|
|       Kenny Garrett|d72e6ce3-641f-46a...|     [96884, 572260]|[[[[https://lastf...|[[jazz, https://w...|
|The Dillinger Esc...|1bc41dff-5397-4c5...|  [432731, 23497624]|[[[[https://lastf...|[[mathcore, https...|
|     Mammoth Grinder|b33356ff-7644-4c5...|     [23208, 559358]|[[[[https://lastf...|[[Sludge, https:/...|
|          True Widow|34650c85-f787-491...|    [82855, 2309497]|[[[[https://lastf...|[[shoegaze, https...|
|              Lo-Pan|17dffc12-01e0-444...|     [11423, 207499]|[[[[https://lastf...|[[Stoner Rock, ht...|
|        Funeral Mist|ee69cf08-ae31-47a...|    [35924, 1564627]|[[[[https://lastf...|[[black metal, ht...|
|       Blut aus Nord|c21db200-1f7c-4c2...|    [86551, 4373980]|[[[[https://lastf...|[[black metal, ht...|
|       Dream Theater|28503ab7-8bf2-466...| [1007339, 93316594]|[[[[https://lastf...|[[Progressive met...|
| Conquering Dystopia|e836e946-62a6-4a3...|     [13093, 322640]|[[[[https://lastf...|[[Progressive met...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 20 rows

It seems like we still need to do a bit more cleaning for similar artists field. There should only be artists name but as you see there are some URLs going on there. We will clear it later. Other than that it seems good. We got some statistics that we could use like listen count or genre of the band.

Now that we loaded our datasets we can start analyzing them. Lets start by counting how many times each band has been listened and order them from highest to lowest. We will do the same thing using RDD, DataFrame and Dataset APIs so you can see their differences even more. Lets get started with RDD.

val topArtistsRdd: RDD[(String, Int)] =
  scrobbles.rdd
    .filter(scrobble => scrobble.utc_time.contains("2018"))
    .map(scrobble => (scrobble.artist, 1))
    .reduceByKey((count1, count2) => count1 + count2)
    .sortBy({ case (artist, count) => count }, ascending = false)

If you wrote any Scala code before this should look familiar to you. RDD looks just like regular Scala collections like List. You can do your ordinary map, filter, reduce, sort etc. just as you do it on lists. The only thing different in this part is reduceByKey function which is a special function for RDDs.

We had a Dataset of Scrobbles as scrobbles if you remember. To convert it to RDD we just say .rdd and that’s it. After that we use filter to keep only 2018 scrobbles. Then scrobbles to make a pair. Spark provides some aggregation functions that work on paired RDDs called literally PairRDD. When you create a tuple of two in a RDD you end up with a PairRDD which you can use it with functions like reduceByKey. Reduce by key is a great general purpose function to generate a result from your data. What it does is, it first groups up entries by the first element of the pair (in this case artist name). This is first done in each executor locally to avoid excessive shuffling. After each executors containing the grouped results, shuffling begins between them. Then the reduce part of the function count1 + count2 starts working to give us the results we want. After that we just sort them by their count from biggest to smallest.

When you have a collection in RDD, DataFrame or Dataset that means your data is distributed across executor nodes. You can use functions like collect or take on RDDs to move the data from executors to driver. Your collection will become an Array on the driver node instead of a distributed RDD. You should only use these functions to get the results after the job is complete. Otherwise you may end up with out of memory exceptions or the job being too slow to complete.

Now lets see the results of our counting.

topArtistsRdd.take(20).foreach(println)
(Dimmu Borgir,430)
(Devin Townsend,418)
(Cannibal Corpse,392)
(Caligula's Horse,377)
(Meshuggah,369)
(Nevermore,366)
(Demons & Wizards,279)
(Enslaved,275)
(Six Feet Under,274)
(Amorphis,260)
(Between the Buried and Me,255)
(The Haunted,238)
(Djevel,231)
(Septicflesh,229)
(Mastodon,210)
(Sunless,203)
(ALTARAGE,201)
(TesseracT,199)
(Beyond Creation,196)
(Metallica,196)

We got some nice results! Now lets use the DataFrame API to do the same calculations.

val topArtistsDataFrame: DataFrame =
  scrobbles
    .where($"utc_time".contains(lit("2018")))
    .groupBy($"artist")
    .count
    .orderBy($"count".desc)

As you see DataFrame is quite different from RDD. Its more like SQL than Scala. You use SQL keywords like where and orderBy and access the fields or columns with $"column_name" syntax. DataFrame has the highest performance above all others and its so familiar to SQL that Spark: The Definitive Guide book recommends using it for most jobs. Lets check out the results to see if they are any different.

topArtistsDataFrame.show()
+--------------------+-----+
|              artist|count|
+--------------------+-----+
|        Dimmu Borgir|  430|
|      Devin Townsend|  418|
|     Cannibal Corpse|  392|
|    Caligula's Horse|  377|
|           Meshuggah|  369|
|           Nevermore|  366|
|    Demons & Wizards|  279|
|            Enslaved|  275|
|      Six Feet Under|  274|
|            Amorphis|  260|
|Between the Burie...|  255|
|         The Haunted|  238|
|              Djevel|  231|
|         Septicflesh|  229|
|            Mastodon|  210|
|             Sunless|  203|
|            ALTARAGE|  201|
|           TesseracT|  199|
|     Beyond Creation|  196|
|           Metallica|  196|
+--------------------+-----+
only showing top 20 rows

This time we get a nice table with our results in it. Now its time to use Dataset API.

import spark.implicits._

case class CountedArtist(artist: String, count: Long)

val topArtistsDataset: Dataset[CountedArtist] =
  scrobbles
    .filter(scrobble => scrobble.utc_time.contains("2018"))
    .groupByKey(_.artist)
    .count
    .select($"value".as("artist"), $"count(1)".as("count"))
    .as[CountedArtist]
    .orderBy($"count".desc)

As you can see Dataset is like in-between RDD and DataFrame. You can do map, filter like an RDD but also use select and aggregation functions like count as a DataFrame. Using Dataset is a bit tricky. The count function actually returns a DataFrame which you have to rename with aliases if you would like to convert it back to Dataset. Lets check out the results again.

topArtistsDataset.show()
+--------------------+-----+
|              artist|count|
+--------------------+-----+
|        Dimmu Borgir|  430|
|      Devin Townsend|  418|
|     Cannibal Corpse|  392|
|    Caligula's Horse|  377|
|           Meshuggah|  369|
|           Nevermore|  366|
|    Demons & Wizards|  279|
|            Enslaved|  275|
|      Six Feet Under|  274|
|            Amorphis|  260|
|Between the Burie...|  255|
|         The Haunted|  238|
|              Djevel|  231|
|         Septicflesh|  229|
|            Mastodon|  210|
|             Sunless|  203|
|            ALTARAGE|  201|
|           TesseracT|  199|
|     Beyond Creation|  196|
|           Metallica|  196|
+--------------------+-----+
only showing top 20 rows

Exactly the same results before. Now you should be somewhat familiar with differences between APIs. From now on I will use Dataset only.

Now that we have our top artists counted and ordered, we can now use MusicBrainz data with artist info to get recommendations for them. But first lets clean the artist info data a bit more. Clean up the data a bit more

import spark.implicits._

case class MinifiedArtist(name: String, listeners: Long, tags: Array[String], similarArtists: Array[String])

val minifiedArtists: Dataset[MinifiedArtist] =
  artists
    .map { artist =>  
      (artist.name,
      artist.stats.listeners.toLong,
      artist.tags.map(_.name),
      artist.similarArtists.map(_.name))
    }
    .selectExpr("_1 as name", "_2 as listeners", "_3 as tags", "_4 as similarArtists")
    .as[MinifiedArtist]

Lets see if its finally usable for our scenario.

minifiedArtists.show()
+--------------------+---------+--------------------+------------------------+
|                name|listeners|                tags|          similarArtists|
+--------------------+---------+--------------------+------------------------+
|             Shining|   156777|[black metal, dep...|    [Lifelover, Psych...|
|              Windir|    78503|[black metal, vik...|    [Vreid, Mistur, F...|
|           Trap Them|    63523|[hardcore, grindc...|    [All Pigs Must Di...|
| Mors Principium Est|   108169|[Melodic Death Me...|    [Omnium Gatherum,...|
|           The Doors|  3168883|[classic rock, Ps...|    [Jim Morrison, Je...|
|      Kriegsmaschine|    14422|[black metal, Ort...|    [Mgła, Cultes des...|
|              Hiromi|    81495|[jazz, piano, jaz...|    [Hiromi's Sonicbl...|
|       Colin Stetson|    95142|[experimental, ja...|    [Colin Stetson an...|
|               Azusa|    16342|[j-pop, japanese,...|[Airi, Lia, 上原れな,...|
|          Embodiment|     1149|[death metal, Mel...|    [Mastifal, Hollow...|
|        Sofa Surfers|   429597|[downtempo, trip-...|    [Boozoo Bajou, To...|
|       Kenny Garrett|    96884|[jazz, saxophone,...|    [Michael Brecker,...|
|The Dillinger Esc...|   432731|[mathcore, hardco...|    [Converge, Car Bo...|
|     Mammoth Grinder|    23208|[Sludge, hardcore...|    [Gatecreeper, Ace...|
|          True Widow|    82855|[shoegaze, slowco...|    [King Woman, Marr...|
|              Lo-Pan|    11423|[Stoner Rock, sto...|    [Valley of the Su...|
|        Funeral Mist|    35924|[black metal, met...|    [Craft, Marduk, A...|
|       Blut aus Nord|    86551|[black metal, atm...|    [Deathspell Omega...|
|       Dream Theater|  1007339|[Progressive meta...|    [Liquid Tension E...|
| Conquering Dystopia|    13093|[Progressive meta...|    [Jeff Loomis, Mer...|
+--------------------+---------+--------------------+------------------------+

Perfect! Now we can join two datasets together and use these similar artists to give us some recommendations.

import spark.implicits._

case class EnrichedArtist(name: String,
                          count: Long,
                          listeners: Long,
                          tags: Seq[String],
                          similarArtists: Seq[String])

val enrichedArtists: Dataset[EnrichedArtist] =
  topArtistsDataset
    .join(minifiedArtists, $"artist" === $"name")
    .orderBy($"count".desc)
    .as[EnrichedArtist]

As you can see joining is very easy too. Just give datasets to join and a condition to match. Although this is an inner join, Spark supports other types of joins you may need. Now lets see the results of the join.

enrichedArtists.show()
+--------------------+-----+--------------------+---------+--------------------+--------------------+
|              artist|count|                name|listeners|                tags|      similarArtists|
+--------------------+-----+--------------------+---------+--------------------+--------------------+
|        Dimmu Borgir|  430|        Dimmu Borgir|   495590|[black metal, Sym...|[Cradle of Filth,...|
|     Cannibal Corpse|  392|     Cannibal Corpse|   419120|[death metal, Bru...|[Deicide, Suffoca...|
|    Caligula's Horse|  377|    Caligula's Horse|    29684|[Progressive meta...|[Leprous, Haken, ...|
|           Meshuggah|  369|           Meshuggah|   433788|[Progressive meta...|[Vildhjarta, Fred...|
|           Nevermore|  366|           Nevermore|   250734|[Progressive meta...|[Warrel Dane, San...|
|    Demons & Wizards|  279|    Demons & Wizards|   231463|[Power metal, hea...|[Blind Guardian, ...|
|            Enslaved|  275|            Enslaved|   192080|[black metal, vik...|[Borknagar, Ihsah...|
|      Six Feet Under|  274|      Six Feet Under|   195021|[death metal, Bru...|[Torture Killer, ...|
|            Amorphis|  260|            Amorphis|   430958|[Progressive meta...|[Barren Earth, Da...|
|Between the Burie...|  255|Between the Burie...|   311684|[Progressive meta...|[The Contortionis...|
|         The Haunted|  238|         The Haunted|   243680|[thrash metal, Me...|[Hatesphere, The ...|
|              Djevel|  231|              Djevel|     9235|[black metal, nor...|[Ljå, Taake, Anga...|
|         Septicflesh|  229|         Septicflesh|   118655|[death metal, sym...|[Fleshgod Apocaly...|
|            Mastodon|  210|            Mastodon|   815145|[Progressive meta...|[Baroness, Gojira...|
|             Sunless|  203|             Sunless|     6492|[chillout, Techni...|[Ingurgitating Ob...|
|           TesseracT|  199|           TesseracT|   149681|[Progressive meta...|[Skyharbor, Perip...|
|     Beyond Creation|  196|     Beyond Creation|    32578|[Technical Death ...|[Obscura, Archspi...|
|           Metallica|  196|           Metallica|  2873278|[thrash metal, me...|[Megadeth, Slayer...|
|              Gojira|  191|              Gojira|   406922|[death metal, Pro...|[Mastodon, Opeth,...|
|             Sylosis|  188|             Sylosis|    86184|[Melodic Death Me...|[Unearth, Anterio...|
+--------------------+-----+--------------------+---------+--------------------+--------------------+
only showing top 20 rows

We are almost there! Now lets just pick top 10 artist and one recommendation for each.

enrichedArtists
  .map(artist => (artist.name, artist.similarArtists.head))
  .take(10)
  .foreach { case (artist, recommendation) =>
    println(s"Artist: $artist, Recommendation: $recommendation")
  }

Which should give us our final results.

Artist: Dimmu Borgir, Recommendation: Cradle of Filth
Artist: Cannibal Corpse, Recommendation: Deicide
Artist: Caligula's Horse, Recommendation: Leprous
Artist: Meshuggah, Recommendation: Vildhjarta
Artist: Nevermore, Recommendation: Warrel Dane
Artist: Demons & Wizards, Recommendation: Blind Guardian
Artist: Enslaved, Recommendation: Borknagar
Artist: Six Feet Under, Recommendation: Torture Killer
Artist: Amorphis, Recommendation: Barren Earth
Artist: Between the Buried and Me, Recommendation: The Contortionist

There we have it! We went through quite a lot. I hope you now have somewhat of an idea of how its like to work with Apache Spark. It is a wonderful tool packed with features and a joy to work with. If you would like to learn more about it check out the links below.

Learn More About Spark