【Spark】使用DataFrame读取复杂JSON中的嵌套数组
众所周知,在早期Spark版本中就已经支持读取Json格式的数据文件,并能够直接转换为数据库表,以方便我们进行处理数据,在本篇文章中我们将介绍如何通过Spark API很简单地读取Json数据,并进一步讲解,读取复杂Json中的嵌套数组。
本次使用的各个组件版本设定如下:
Spark: 2.1.0
Scala 2.11.8
Hadoop 2.6.2
加载Json数据
我们可以简单地通过SQLContext读取Json文件
val dfSQLContext = sqlContext.read.json("/user/hadoop/flume/my_crawler_jd_report/*/*")
或者通过SparkSession读取Json文件
val dfSparkSession = spark.read.json("/user/hadoop/flume/my_crawler_jd_report/*/*")
以上两种方式读取的结果是一样的
scala> val dfSQLContext = sqlContext.read.json("/user/hadoop/flume/my_crawler_jd_report/*/*") dfSQLContext: org.apache.spark.sql.DataFrame = [addTime: string, channel: string ... 3 more fields] scala> val dfSparkSession = spark.read.json("/user/hadoop/flume/my_crawler_jd_report/*/*") dfSparkSession: org.apache.spark.sql.DataFrame = [addTime: string, channel: string ... 3 more fields]
现在我们来看下整个Json的数据结构
scala> dfSparkSession.printSchema root |-- addTime: string |-- channel: string |-- consume_time: string |-- message: struct | |-- addressInfo: array | | |-- element: struct | | | |-- address: string | | | |-- addressLabel: string | | | |-- consignee: string | | | |-- email: string | | | |-- phoneNumber : string | | | |-- region: string | |-- certifyInfo: struct | | ...结构比较复杂,此处省略 |-- sUserId: string (nullable = true)
因为结构比较复杂,直接挑重点进行讲解,在这里我们先看下addressInfo这个节点,节点中存储的是数组(array)形式的多条(struct)用户地址信息,地址信息中又包含了编号,手机号,地址等详细信息。
嵌套数组结构
打平/分解数组
如果Json对象中包含了数组结构,我们应该如何访问到数组中的元素呢?有一种方法就是直接将这条数据中的数组打平成多条数据,也就是一条记录拆分成多条记录,在这里我们可以直接通过explode()函数实现
scala> val dfDetails = dfSQLContext.select(dfSQLContext("sUserId"),explode(dfSQLContext("message.addressInfo"))).toDF("userid","addressInfo") scala> dfDetails.printSchema root |-- userid: string |-- addressInfo: struct | |-- address: string | |-- addressLabel: string | |-- consignee: string | |-- email: string | |-- phoneNumber : string | |-- region: string | |-- telephoneNumber: string
现在我们已经将addressInfo打平成一条条记录,不再是数组结构的数据了。
这样就可以很简单的通过select来读取struct结构中的数据了,我们通过addressInfo.address来读取详细地址信息:
scala> val dfDetailsAddress = dfDetails.select("addressInfo.address") scala> dfDetailsAddress.show(10) +--------------------+ | address| +--------------------+ |师大京东派| |障岗村广东青年职业学院| |永泰永康路致和街一巷之三号(永泰小学旁)| |燕岭路89号燕侨大厦407室翰墨教育| |广东财经大学华商学院-华商路1号| |蓬馨园西二栋203| |障岗村广东青年职业职业学院| |新兴街教育局对面梦韵床上用品专卖店| |大园街十一巷九号一楼| |玉亭镇百湖凤凰溪35号| +--------------------+ only showing top 10 rows
至此,每一条地址信息都拆分成了单独的一条记录。
参考文献
explode()
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@explode(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column