【Spark】Hive通过Spark读取数据

所有的HiveSQL语法都可以在Spark中正常运行,本文通过Spark与Hive对接,并实现HiveSQL的执行和关联,完全可以将原来Hive中的业务平滑地迁移到Spark中。

如下为,Hive通过Spark读取数据并进行操作的实例,实际本质上还是SQL:

package com.changtu.hive

import java.io.{File, FileInputStream}

import com.google.common.io.{ByteStreams, Files}
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by lubinsu on 2016/4/5
  */
object HiveFromSpark {

  case class Record(key: Int, value: String)

  // Copy kv1.txt file from classpath to temporary directory
  val file = new File(System.getenv("CONF_HOME") + "/kv2.txt")
  val kv1Stream = new FileInputStream(file)
  val kv1File = File.createTempFile("kv1", "txt")
  kv1File.deleteOnExit()
  ByteStreams.copy(kv1Stream, Files.newOutputStreamSupplier(kv1File))


  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HiveFromSpark")
    val sc = new SparkContext(sparkConf)

    // A hive context adds support for finding tables in the MetaStore and writing queries
    // using HiveQL. Users who do not have an existing Hive deployment can still create a
    // HiveContext. When not configured by the hive-site.xml, the context automatically
    // creates metastore_db and warehouse in the current directory.
    val hiveContext = new HiveContext(sc)
    import hiveContext.implicits._
    import hiveContext.sql

    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
    sql(s"LOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src")

    // Queries are expressed in HiveQL
    println("Result of 'SELECT *': ")
    sql("SELECT * FROM src").collect().foreach(println)

    // Aggregation queries are also supported.
    val count = sql("SELECT COUNT(*) FROM src").collect().head.getLong(0)
    println(s"COUNT(*): $count")

    // The results of SQL queries are themselves RDDs and support all normal RDD functions.  The
    // items in the RDD are of type Row, which allows you to access each column by ordinal.
    val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") println("Result of RDD.map:") val rddAsStrings = rddFromSql.rdd.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }

    // You can also register RDDs as temporary tables within a HiveContext.
    val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
    rdd.toDF().registerTempTable("records")

    // Queries can then join RDD data with data stored in Hive.
    println("Result of SELECT *:")
    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)

    sc.stop()
  }
}

主类:

org.apache.spark.deploy.SparkSubmit

调用参数:

--class
com.changtu.hive.HiveFromSpark
--jars
E:IdeaProjectsjarshivespark-hive_2.10-1.5.2.jar
--master
spark://tts.node4:7077
E:IdeaProjectssparkoutartifactschangtuchangtu.jar

2 thoughts on “【Spark】Hive通过Spark读取数据

  • 2016-10-25 at 01:39
    Permalink

    Reading your website is big pleasure for me, it deserves to go viral,
    you need some initial traffic only. If you want to know how to get
    it search for: blackhatworren’s strategies

    Reply
  • 2016-10-27 at 15:35
    Permalink

    These topics are so coiusfnng but this helped me get the job done.

    Reply

发表评论

电子邮件地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据