【Spark】免费获取IP信息的服务接口


本博客文章如无特别说明,均为原创!转载请注明出处:Big data enthusiast(http://www.lubinsu.com/)

本文链接地址:【Spark】免费获取IP信息的服务接口(http://www.lubinsu.com/index.php/archives/58)

Scala版本

package com.changtu

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import play.api.libs.json.Json

import scala.io.Source

/**
  * IP处理
  * 调用方式:
  * spark-submit --class com.changtu.IPHandler
    --jars /IdeaProjects/jars/play-json_2.10-2.3.9.jar,/IdeaProjects/jars/hadoop-common-2.2.0.jar,/IdeaProjects/jars/play-functional_2.10-2.3.10.jar
    --master spark://tts.node4:7077
    /IdeaProjects/jars/changtu.jar -1

    后续数据库导入命令:
    //导入语句
    sqoop export --connect jdbc:oracle:thin:@172.19.0.94:1521:xxx --username xxx --password xxx --table BI_ORDER_IPS --export-dir /user/root/order/orderIP --fields-terminated-by '01' -m 1;
  */
object IPHandler {

  /**
    * 使用淘宝的REST接口获取IP数据, 返回JSON数据
    * @param ipAddr ip地址
    */
  def getIPJSON(ipAddr: String): String = {
    //这里需要到k780上注册一个自己的免费账号。将申请下来的ID补充到 &secret=xxx&appkey=xxx&sign=xxx
    Source.fromURL("http://api.k780.com:88/?app=ip.get&ip=" concat ipAddr concat "&secret=xxx&appkey=xxx&sign=xxx&format=json").mkString
  }

  /**
    * 使用淘宝的REST接口获取IP数据, 返回运营商信息
    * @param ipAddr ip地址
    */
  def getISP(ipAddr: String): String = {
    val json = Json.parse(getIPJSON(ipAddr))
    val success: String = (json  "success").as[String]
    if (success == "0") {
      (json  "msg").as[String]
    } else {
      (json  "result"  "detailed").as[String]
    }
  }

  def main(args: Array[String]) {

    val hourDuration = args(0).toInt
    val fieldTerminate = "01"
    val hdfsPath = "hdfs://172.19.0.95:9999"
    val hdfsURI = new URI(hdfsPath)
    val conf = new SparkConf().setAppName("Spark IPHandler")
    val sc = new SparkContext(conf)
    val hdfsConf = new Configuration()

    val srcFiles = hdfsPath.concat("/user/root/order/")
      .concat(DateTime.now().plusHours(hourDuration).toString("yyyyMMdd"))
      .concat("/order-")
      .concat(DateTime.now().plusHours(hourDuration).toString("yyyyMMddHH"))
      .concat(".[0-9]*.log")
    val srcFile = sc.textFile(srcFiles)
    val orderInfo = srcFile.map(line => line.split("01")).filter(_.length > 11).filter(_ (0).contains("3011")).filter(!_ (11).isEmpty).map(x => (x(6), x(11), x(2).substring(x(2).indexOf("ip地址是") + 5)))
    val orderIP = orderInfo.map(x => x._1.concat(fieldTerminate).concat(x._2).concat(fieldTerminate).concat(x._3).concat(fieldTerminate).concat(getISP(x._3)).concat(fieldTerminate).concat(DateTime.now().toString("yyyy-MM-dd HH:mm:ss")))
    val output = new Path(hdfsPath.concat("/user/root/order/orderIP"))
    val hdfs = FileSystem.get(hdfsURI, hdfsConf)
    // 删除输出目录
    if (hdfs.exists(output)) hdfs.delete(output, true)
    orderIP.saveAsTextFile(hdfsPath.concat("/user/root/order/orderIP"))
  }
}

代码块地址:

http://git.oschina.net/lubinsu/c2iw9p4ym5oqk8ln17agz.code.git

One thought on “【Spark】免费获取IP信息的服务接口

  • 2016-10-27 at 16:54
    Permalink

    Well that was a lovely account of the &#0ni8;i2s2de” of a dancer’s morning. Thankyou for sharing and I wish you all the very best with your future. My daughter Claudia Dean has just started at The Royal Ballet, and I don’t hear the details that you shared, so thankyou so much.

    Reply

发表评论

电子邮件地址不会被公开。 必填项已用*标注