ニュースレター

Hortonworks から最新情報をメールで受け取る

月に一度、ビッグデータに関する最新のインサイト、トレンド、分析情報、ナレッジをお届けします。

AVAILABLE NEWSLETTERS:

Sign up for the Developers Newsletter

月に一度、ビッグデータに関する最新のインサイト、トレンド、分析情報、ナレッジをお届けします。

行動喚起

始める

クラウド

スタートのご用意はできましたか?

Sandbox をダウンロード

ご質問はありませんか?

*いつでも登録を解除できることを理解しています。Hortonworks プライバシーポリシーのその他の情報も確認しています。
クローズクローズボタン
HDP > Hadoop を使用した開発 > 入門編の基本

Hadoop Tutorial – Getting Started with HDP

Spark - Risk Factor

クラウド スタートのご用意はできましたか?

SANDBOX をダウンロード

Spark – Risk Factor

はじめに

In this tutorial we will introduce Apache Spark. In the earlier section of the lab you have learned how to load data into HDFS and then manipulate it using Hive. We are using the Truck sensor data to better understand risk associated with every driver. This section will teach you how to compute risk using Apache Spark.

前提条件

This tutorial is a part of a series of hands on tutorials to get you started on HDP using the Hortonworks sandbox. Please ensure you complete the prerequisites before proceeding with this tutorial.

概要

Concepts

MapReduce has been useful, but the amount of time it takes for the jobs to run can at times be exhaustive. Furthermore, MapReduce jobs only work for a specific set of use cases. There is a need for computing framework that works for a wider set of use cases.

Apache Spark was designed to be a fast, general-purpose, easy-to-use computing platform. It extends the MapReduce model and takes it to a whole other level. The speed comes from the in-memory computations. Applications running in memory allow for much faster processing and response.

Apache Spark Basics

Apache Spark is a fast, in-memory data processing engine with elegant and expressive development APIs in Scala, Java, Python and R that allow data workers to efficiently execute machine learning algorithms that require fast iterative access to datasets. Spark on Apache Hadoop YARN enables deep integration with Hadoop and other YARN enabled workloads in the enterprise.

You can run batch application such as MapReduce types jobs or iterative algorithms that build upon each other. You can also run interactive queries and process streaming data with your application. Spark also provides a number of libraries which you can easily use to expand beyond the basic Spark capabilities such as Machine Learning algorithms, SQL, streaming, and graph processing. Spark runs on Hadoop clusters such as Hadoop YARN or Apache Mesos, or even in a Standalone Mode with its own scheduler. The Sandbox includes Spark 2.3.1.

Lab4_1

Let’s get started!

Configure Spark services using Ambari

1. Log on to Ambari Dashboard as maria_dev. At the bottom left corner of the services column, check that Spark2 and Zeppelin Notebook are running.

Note: If these services are disabled, start these services.

ambari-dash-running-spark

2. Open Zeppelin interface using URL: http://sandbox-hdp.hortonworks.com:9995/

You should see a Zeppelin Welcome Page:

welcome-to-zeppelin

Optionally, if you want to find out how to access the Spark shell to run code on Spark refer to Appendix A.

3. Create a Zeppelin Notebook

Click on a Notebook tab at the top left and select Create new note. Name your notebook:

Compute Riskfactor with Spark

create-new-notebook

new-spark-note

Create a Hive Context

For improved Hive integration, ORC file support has been added for Spark. This allows Spark to read data stored in ORC files. Spark can leverage ORC file’s more efficient columnar storage and predicate pushdown capability for even faster in-memory processing. HiveContext is an instance of the Spark SQL execution engine that integrates with data stored in Hive. The more basic SQLContext provides a subset of the Spark SQL support that does not depend on Hive. It reads the configuration for Hive from hive-site.xml on the classpath.

Import sql libraries:

If you already have a riskfactor table on your sandbox you must remove it so that you can populate it again using Spark. Copy and paste the following code into your Zeppelin notebook, then click the play button. Alternatively, press shift+enter to run the code.

Instantiate SparkSession

%spark2
val hiveContext = new org.apache.spark.sql.SparkSession.Builder().getOrCreate()

instantiate_hivecontext_hello_hdp_lab4

Create a RDD from Hive Context

What is an RDD?

Spark’s primary core abstraction is called a Resilient Distributed Dataset or RDD. It is a distributed collection of elements that is parallelized across the cluster. In other words, a RDD is an immutable collection of objects that is partitioned and distributed across multiple physical nodes of a YARN cluster and that can be operated in parallel.

There are three methods for creating a RDD:

  1. Parallelize an existing collection. This means that the data already resides within Spark and can now be operated on in parallel.
  2. Create a RDD by referencing a dataset. This dataset can come from any storage source supported by Hadoop such as HDFS, Cassandra, HBase etc.
  3. Create a RDD by transforming an existing RDD to create a new RDD.

We will be using the later two methods in our tutorial.

RDD Transformations and Actions
Typically, RDDs are instantiated by loading data from a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat on a YARN cluster.

Once a RDD is instantiated, you can apply a series of operations. All operations fall into one of two types: transformations or actions.

  • Transformation operations, as the name suggests, create new datasets from an existing RDD and build out the processing DAG that can then be applied on the partitioned dataset across the YARN cluster. Transformations do not return a value. In fact, nothing is evaluated during the definition of these transformation statements. Spark just creates these Direct Acyclic Graphs or DAG, which will only be evaluated at runtime. We call this lazy evaluation.
  • An Action operation, on the other hand, executes a DAG and returns a value.

Read CSV Files into Apache Spark

In this tutorial we will use the CSV files we stored in HDFS in previous sections. Additionally, we will leverage Global Temporary Views on SparkSessions to programmatically query DataFrames using SQL.

Import CSV data into a data frame without a user defined schema

%spark2
/**
 * Let us first see what temporary views are already existent on our Sandbox
 */
hiveContext.sql("SHOW TABLES").show()

If you have not created any temporary views in this Spark instance there should not be any tables:

empty-set

First we must read data from HDFS, in this case we are reading from a csv file without having defined the schema first:

%spark2
val geoLocationDataFrame = spark.read.format("csv").option("header", "true").load("hdfs:///tmp/data/geolocation.csv")

/**
 * Now that we have the data loaded into a DataFrame, we can register a temporary view.
 */
geoLocationDataFrame.createOrReplaceTempView("geolocation")

Let’s verify that the data in our CSV file was properly loaded into our data frame:

%spark2
hiveContext.sql("SELECT * FROM geolocation LIMIT 15").show()

select-geo-from-csv

Note that our data is casted onto the appropriate type when we register it as a temporary view:

%spark2
hiveContext.sql("DESCRIBE geolocation").show()

describe-geo

Alternatively, we can define our schema with specific types, we will explore this option on the next paragraph.

Import CSV data into a data frame with a user defined schema

%spark2
/**
 * The SQL Types library allows us to define the data types of our schema
 */
import org.apache.spark.sql.types._

/**
 * Recall from the previous tutorial section that the driverid schema only has two relations:
 * driverid (a String), and totmiles (a Double).
 */
val drivermileageSchema = new StructType().add("driverid",StringType,true).add("totmiles",DoubleType,true)

Now we can populate drivermileageSchema with our CSV files residing in HDFS

%spark2
val drivermileageDataFrame = spark.read.format("csv").option("header", "true").schema(drivermileageSchema)load("hdfs:///tmp/data/drivermileage.csv")

Finally, let’s create a temporary view

%spark2
drivermileageDataFrame.createOrReplaceTempView("drivermileage")

We can use SparkSession and SQL to query drivermileage

%spark2
hiveContext.sql("SELECT * FROM drivermileage LIMIT 15").show()

select-csv-from-driverm

Query Tables To Build Spark RDD

We will do a simple select query to fetch data from geolocation and drivermileage tables to a spark variable. Getting data into Spark this way also allows to copy table schema to RDD.

%spark2
val geolocation_temp0 = hiveContext.sql("SELECT * FROM geolocation")
val drivermileage_temp0 = hiveContext.sql("SELECT * FROM drivermileage")

Now let’s register temporary global tables from our dataFrames and use SQL syntax to query against that table.

%spark2
geolocation_temp0.createOrReplaceTempView("geolocation_temp0")
drivermileage_temp0.createOrReplaceTempView("drivermileage_temp0")

hiveContext.sql("SHOW TABLES").show()

another-way-to-make-rdd

Querying Against Registered Temporary Tables

Next, we will perform an iteration and a filter operation. First, we need to filter drivers that have non-normal events associated with them and then count the number for non-normal events for each driver.

%spark2
val geolocation_temp1 = hiveContext.sql("SELECT driverid, COUNT(driverid) occurance from geolocation_temp0 WHERE event!='normal' GROUP BY driverid")
/**
 * Show RDD
 */
geolocation_temp1.show(10)

filter-abnormal-events

  • As stated earlier about RDD transformations, select operation is a RDD transformation and therefore does not return anything.

  • The resulting table will have a count of total non-normal events associated with each driver. Register this filtered table as a temporary table so that subsequent SQL queries can be applied to it.

%spark2
geolocation_temp1.createOrReplaceTempView("geolocation_temp1")
hiveContext.sql("SHOW TABLES").show()

filtered-table

  • You can view the result by executing an action operation on the temporary view.
%spark2
hiveContext.sql("SELECT * FROM geolocation_temp1 LIMIT 15").show()

view-operation

Perform join Operation

In this section we will perform a join operation geolocation_temp1 table has details of drivers and count of their respective non-normal events. drivermileage_temp0 table has details of total miles travelled by each driver.

  • We will join two tables on common column, which in our case is driverid.
%spark2
val joined = hiveContext.sql("select a.driverid,a.occurance,b.totmiles from geolocation_temp1 a,drivermileage_temp0 b where a.driverid=b.driverid")

join_op_column_hello_hdp_lab4

  • The resulting data set will give us total miles and total non-normal events for a particular driver. Register this filtered table as a temporary table so that subsequent SQL queries can be applied to it.
%spark2
joined.createOrReplaceTempView("joined")
hiveContext.sql("SHOW TABLES").show()

created-joined

  • You can view the result by executing action operation on our temporary view.
%spark2
/**
 * We can view the result from our query with a select statement
 */
hiveContext.sql("SELECT * FROM joined LIMIT 10").show()

show-results-joined

Compute Driver Risk Factor

In this section we will associate a driver risk factor with every driver. The risk factor for each driver is the number of abnormal occurrences over the total number of miles driver. Simply put, a high number of abnormal occurrences over a short amount of miles driven is an indicator of high risk. Let’s translate this intuition into an SQL query:

%spark2
val risk_factor_spark = hiveContext.sql("SELECT driverid, occurance, totmiles, totmiles/occurance riskfactor FROM joined")

calculate_riskfactor_hello_hdp_lab4

  • The resulting data set will give us total miles and total non-normal events and what is a risk for a particular driver. Register this filtered table as a temporary table so that subsequent SQL queries can be applied to it.
%spark2
risk_factor_spark.createOrReplaceTempView("risk_factor_spark")
hiveContext.sql("SHOW TABLES").show()
  • View the results
%spark2
risk_factor_spark.show(10)

results-from-risk

Save Table as CSV

After finding the risk factor for each driver we might want to store our results as a CSV on HDFS:

%spark2
risk_factor_spark.coalesce(1).write.csv("hdfs:///tmp/data/riskfactor")

There will be a directory structure with our data under user/maria_dev/data/ named riskfactor there we can find our csv file with a auto generated name given to it by Spark.

csv-saved

Full Spark Code Review

Instantiate SparkSession

%spark2
val hiveContext = new org.apache.spark.sql.SparkSession.Builder().getOrCreate()

Shows tables in the default Hive database

hiveContext.sql("SHOW TABLES").show()

Select all rows and columns from tables, stores Hive script into variable
and registers variables as RDD

val geolocation_temp0 = hiveContext.sql("SELECT * FROM geolocation")

val drivermileage_temp0 = hiveContext.sql("SELECT * FROM drivermileage")

geolocation_temp0.createOrReplaceTempView("geolocation_temp0")
drivermileage_temp0.createOrReplaceTempView("drivermileage_temp0")

val geolocation_temp1 = hiveContext.sql("SELECT driverid, count(driverid) occurance FROM geolocation_temp0 WHERE event!='normal' GROUP BY driverid")

geolocation_temp1.createOrReplaceTempView("geolocation_temp1")

Load first 15 rows from geolocation_temp2, which is the data from
drivermileage table

hiveContext.sql("SELECT * FROM geolocation_temp1 LIMIT 15").show()

Create joined to join 2 tables by the same driverid and register joined
as a RDD

val joined = hiveContext.sql("SELECT a.driverid,a.occurance,b.totmiles FROM geolocation_temp1 a,drivermileage_temp0 b WHERE a.driverid=b.driverid")

joined.createOrReplaceTempView("joined")

Load first 10 rows and columns in joined

hiveContext.sql("SELECT * FROM joined LIMIT 10").show()

Initialize risk_factor_spark and register as an RDD

val risk_factor_spark = hiveContext.sql("SELECT driverid, occurance, totmiles, totmiles/occurance riskfactor from joined")

risk_factor_spark.createOrReplaceTempView("risk_factor_spark")

Print the first 15 lines from the risk_factor_spark table

hiveContext.sql("SELECT * FROM risk_factor_spark LIMIT 15").show()

Summary

Congratulations! Let’s summarize the Spark coding skills and knowledge we acquired to compute the risk factor associated with every driver. Apache Spark is efficient for computation because of its in-memory data processing engine. We learned how to integrate Hive with Spark by creating a Hive Context. We used our existing data from Hive to create an RDD. We learned to perform RDD transformations and actions to create new datasets from existing RDDs. These new datasets include filtered, manipulated and processed data. After we computed risk factor, we learned to load and save data into Hive as ORC.

参考文献

To learn more about Spark, checkout these resources:

Appendix A: Run Spark in the Spark Interactive Shell

1. Using the built-in SSH Web Client (aka shell-in-a-box), logon using maria_dev/maria_dev

2. Let’s enter the Spark interactive shell by typing the command:

  • spark-shell

This will load the default Spark Scala API. Issue the command :help for help and :quit to exit.

shell-hello-hdp

  • Execute the commands:
val hiveContext = new org.apache.spark.sql.SparkSession.Builder().getOrCreate()
hiveContext.sql("SHOW TABLES").show()
val geoLocationDataFrame = spark.read.format("csv").option("header", "true").load("hdfs:///tmp/data/geolocation.csv")
geoLocationDataFrame.createOrReplaceTempView("geolocation")
hiveContext.sql("SELECT * FROM geolocation LIMIT 15").show()
hiveContext.sql("DESCRIBE geolocation").show()
import org.apache.spark.sql.types._
val drivermileageSchema = new StructType().add("driverid",StringType,true).add("totmiles",DoubleType,true)
val drivermileageDataFrame = spark.read.format("csv").option("header", "true").schema(drivermileageSchema)load("hdfs:///tmp/drivermileage.csv")
drivermileageDataFrame.createOrReplaceTempView("drivermileage")
hiveContext.sql("SELECT * FROM drivermileage LIMIT 15").show()
val geolocation_temp0 = hiveContext.sql("SELECT * FROM geolocation")
val drivermileage_temp0 = hiveContext.sql("SELECT * FROM drivermileage")
geolocation_temp0.createOrReplaceTempView("geolocation_temp0")
drivermileage_temp0.createOrReplaceTempView("drivermileage_temp0")
hiveContext.sql("SHOW TABLES").show()
val geolocation_temp1 = hiveContext.sql("SELECT driverid, count(driverid) occurance FROM geolocation_temp0 WHERE event!='normal' GROUP BY driverid")
geolocation_temp1.show(10)
geolocation_temp1.createOrReplaceTempView("geolocation_temp1")
hiveContext.sql("SHOW TABLES").show()
hiveContext.sql("SELECT * FROM geolocation_temp1 LIMIT 15").show()
val joined = hiveContext.sql("select a.driverid,a.occurance,b.totmiles from geolocation_temp1 a,drivermileage_temp0 b where a.driverid=b.driverid")
joined.createOrReplaceTempView("joined")
hiveContext.sql("SELECT * FROM joined LIMIT 10").show()
val risk_factor_spark = hiveContext.sql("select driverid, occurance, totmiles, (totmiles/occurance) riskfactor from joined")
risk_factor_spark.createOrReplaceTempView("risk_factor_spark")
hiveContext.sql("SHOW TABLES").show()
val joined = hiveContext.sql("SELECT a.driverid,a.occurance,b.totmiles FROM geolocation_temp2 a,drivermileage_temp1 b WHERE a.driverid=b.driverid")
hiveContext.sql("SELECT * FROM risk_factor_spark LIMIT 15").show()
risk_factor_spark.coalesce(1).write.csvrisk_factor_spark.coalesce(1).write.csv("hdfs:///tmp/data/riskfactor")

ユーザーの評価

ユーザーの評価
6 5 out of 5 stars
5 Star 100%
4 Star 0%
3 Star 0%
2 Star 0%
1 Star 0%
チュートリアル名
Hadoop Tutorial – Getting Started with HDP

質問する回答を探す場合は、Hortonworks Community Connectionをご参照ください。

6 Reviews
評価する

登録

登録して評価をご記入ください

ご自身の体験を共有してください

例: 最高のチュートリアル

この欄に最低50文字で記入してください。

成功

ご意見を共有していただきありがとうございます!

Excellent
by reena Bhatt on December 13, 2018 at 3:22 am

Very nicely explained and easy to understand! Very good introduction with nice screenshot and videos.

Very nicely explained and easy to understand! Very good introduction with nice screenshot and videos.

表示件数を減らす
Cancel

Review updated successfully.

limit number of paragraphs in sandboxed Zeppelin
by Tom Celuszak on November 29, 2018 at 11:20 am

Good tutorial, introduced me to Zeppelin and let me exercise some of its functions. Had a problem with the final query - the join of riskfactor and geolocation - hanging. I could get the same query to complete using Ambari and Hive View 2. Finally found that removing all but one paragraph, of which I had... 20 or so? ...let the query run to completion in 23 seconds. I had been creating a new paragraph each step; best to reuse the first paragraph. My config is the sandbox on VirtualBox on Windows 7.

Good tutorial, introduced me to Zeppelin and let me exercise some of its functions.

Had a problem with the final query – the join of riskfactor and geolocation – hanging. I could get the same query to complete using Ambari and Hive View 2. Finally found that removing all but one paragraph, of which I had… 20 or so? …let the query run to completion in 23 seconds. I had been creating a new paragraph each step; best to reuse the first paragraph.

My config is the sandbox on VirtualBox on Windows 7.

表示件数を減らす
Cancel

Review updated successfully.

Easy to understand
by Dennis Suhari on October 19, 2018 at 12:27 am

Informative and good practical description of the steps

Informative and good practical description of the steps

表示件数を減らす
Cancel

Review updated successfully.

Great Tutorial
by scott payne on July 24, 2018 at 8:55 pm

Tutorial was an excellent introduction to HDP data processing using a realistic data set. Each concept is presented succinctly with suggestions to explore the concept further. My only suggestion is that not enough emphasis is placed on how much faster it is to run your queries using a shell than it is to use the sandbox.

Tutorial was an excellent introduction to HDP data processing using a realistic data set. Each concept is presented succinctly with suggestions to explore the concept further.

My only suggestion is that not enough emphasis is placed on how much faster it is to run your queries using a shell than it is to use the sandbox.

表示件数を減らす
Cancel

Review updated successfully.

Outstanding
by Christian Lopez on May 8, 2018 at 8:29 pm

This review is written from the perspective of a new HDP user interested in understanding this environment and the tools included in the Sandbox. First you will be introduced to the technologies involved in the tutorial namely Hadoop, Ambari, Hive, Pig Latin, SPARK, HDFS, and most importantly HDP. Next, you will use IoT data to calculate the risk factor for truck drivers by using the truck's information and their geo-location, you will accomplish this goal by uploading the needed data to your VM and storing the data as Hive tables. Additionally, you will learn to use… Show More

This review is written from the perspective of a new HDP user interested in understanding this environment and the tools included in the Sandbox.

First you will be introduced to the technologies involved in the tutorial namely Hadoop, Ambari, Hive, Pig Latin, SPARK, HDFS, and most importantly HDP. Next, you will use IoT data to calculate the risk factor for truck drivers by using the truck’s information and their geo-location, you will accomplish this goal by uploading the needed data to your VM and storing the data as Hive tables. Additionally, you will learn to use PIG Latin and SPARK to extrapolate the data needed to find the risk factor for all drivers in the set and storing the information you found back into the database. Accomplishing the same task using two different tools (SPARK, and PIG) highlights the robustness and flexibility of HDP as all the operations happen flawlessly.

I highly recommend this tutorial as it is highly informative, shows a realistic use-case, and as a new user of HDP I learned about all the cool technologies enabled to work through the Hortonworks platform, most importantly I was left with a great sense of accomplishment and that’s reason alone to try the tutorial.

表示件数を減らす
Cancel

Review updated successfully.

Excellent Tutorial!
by Ana Castro on May 8, 2018 at 4:05 pm

The tutorial was very informative and had an excellent flow. It had just the right amount of detail per concept. Great introduction to Hadoop and other Apache projects.

The tutorial was very informative and had an excellent flow. It had just the right amount of detail per concept. Great introduction to Hadoop and other Apache projects.

表示件数を減らす
Cancel

Review updated successfully.