【Scala】基于akka的多线程应用程序日志收集服务


本博客文章如无特别说明,均为原创!转载请注明出处:Big data enthusiast(http://www.lubinsu.com/)

本文链接地址:【Scala】基于akka的多线程应用程序日志收集服务(http://www.lubinsu.com/index.php/archives/409)

Akka is a toolkit and runtime for building highly concurrent,distributed, and resilient message-driven applications on the JVM. Akka是JVM之上高并发的分布式,可伸缩的消息驱动应用框架。下面我们将通过Akka框架实现多线程的日志收集功能。我们把收集到的日志数据实时存放到HDFS中,以供后续分析挖掘使用。

  • 通过SSH方式,远程登录到客户端 使用SSH在远程客户端执行SHELL命令,如最常见的tail -f实时读取日志文件增量
package com.changtu.service

import java.io.{BufferedWriter, OutputStreamWriter}

import akka.actor.Actor
import com.changtu.core.Host
import com.changtu.util.Logging
import com.changtu.util.hdfs.HDFSUtils
import com.changtu.util.host.{AES, Configuration, SSH}
import org.apache.hadoop.fs.Path
import org.joda.time.DateTime

import scala.util.{Failure, Success}

/**
  * Created by lubinsu on 8/22/2016.
  * 日志收集程序,通过指定的命令收集各个客户端的日志,通过akka实现并发操作
  */

class CollectLogService extends Actor with Logging {

  override def receive: Receive = {
    case Host(host, port, cmd) =>
      getLogs(host, port, cmd) match {
        case 0 => logger.info("success.")
        case _ => logger.error("error.")
      }
    case _ => logger.warn("unknown operation.")
  }


  /**
    * 根据shell命令收集指定主机上的日志
    *
    * @param host 需要收集的主机
    * @param port ssh端口号
    * @param cmd  执行命令
    * @return 返回执行后的状态码
    */
  private def getLogs(host: String, port: String, cmd: String): Int = {
    // 密码解密
    val password = AES.decrypt(Configuration("passwd").getProperty(host.concat("-hadoop")), "secretKey.changtu.com") match {
      case Success(encrypted) =>
        encrypted.asInstanceOf[String]
      case Failure(e) =>
        logger.error(e.getMessage)
        ""
    }

    val ssh = (cmd: String) => SSH(host, "hadoop", port.toInt, cmd, "", password, loadToHdfs)
    ssh(cmd)
  }

  /**
    * 收集到的日志处理方式
    * @param msg 传入一行行记录
    */
  private def loadToHdfs(msg: String, host: String): Unit = {
    //logger.info(msg)
    val currentTime = DateTime.now.toString("yyyyMMdd")
    val path = "/user/hadoop/bigdata/logs/rest.".concat(host).concat("-").concat(currentTime).concat(".log")
    HDFSUtils.createDirectory(path, deleteF = false)
    val fsout = HDFSUtils.getHdfs.append(new Path(path))

    val br = new BufferedWriter(new OutputStreamWriter(fsout))
    br.write(msg)
    br.newLine()
    br.close()
    fsout.close()
  }

  /**
    * 收集到的日志处理方式
    * @param msg 传入一行行记录
    */
  private def loadToKafka(msg: String, host: String): Unit = {
    //logger.info(msg)
    val currentTime = DateTime.now.toString("yyyyMMdd")
    val path = "/user/hadoop/bigdata/logs/rest.".concat(host).concat("-").concat(currentTime).concat(".log")
    HDFSUtils.createDirectory(path, deleteF = false)
    val fsout = HDFSUtils.getHdfs.append(new Path(path))

    val br = new BufferedWriter(new OutputStreamWriter(fsout))
    br.write(msg)
    br.newLine()
    br.close()
    fsout.close()
  }

}
  • 定义Host class

 

package com.changtu.util

/**
  * Created by lubinsu on 8/16/2016.
  * 配置默认参数
  */
package object host {
  /**
    * Specifies the default `charset` used to encode and decode strings.
    */
  private[host] final val DefaultCharset = "UTF-8"
}

  • 命令派发
package com.changtu.service

import akka.actor.{Actor, Props}
import com.changtu.core.Host
import com.changtu.util.Logging
import com.changtu.util.host.{AES, Configuration, SSH}

import scala.util.{Failure, Success}

/**
  * Created by lubinsu on 8/23/2016.
  * 命令派发
  */
class CollectMonitor extends Actor with Logging {
  override def receive: Receive = {
    case Host(host, port, cmd) =>
      getLogFiles(host, port, cmd)
      val collector = context.actorOf(Props[CollectLogService], "collector-".concat(host))
      context.children.foreach( p => {
        println(p.path.name)
      })
      collector ! Host(host, port, cmd)
    case _ => logger.warn("unknown operation.")
  }

  private def getLogFiles(host: String, port: String, cmd: String): Int = {
    // 密码解密
    val password = AES.decrypt(Configuration("passwd").getProperty(host.concat("-hadoop")), "secretKey.changtu.com") match {
      case Success(encrypted) =>
        encrypted.asInstanceOf[String]
      case Failure(e) =>
        logger.error(e.getMessage)
        ""
    }

    val ssh = (cmd: String) => SSH(host, "hadoop", port.toInt, cmd, "", password)
    ssh("find /appl/logs -type f")
  }

}
  • 主Actor
package com.changtu.api

import akka.actor.{ActorSystem, Props}
import com.changtu.core.Host
import com.changtu.service.CollectMonitor

/**
  * Created by lubinsu on 8/23/2016.
  * 主Actor
  */
object CollectLogs extends App {

  if (args.length < 2) {
    System.err.println("Usage: CollectLogs <hosts> <command>")
    System.exit(1)
  }

  val Array(hosts, cmd) = args

  val system = ActorSystem("CollectSystem")

  val monitor = system.actorOf(Props[CollectMonitor], name = "CollectMonitor-".concat(hosts))

  hosts.split(",").foreach( p => {
    // default Actor constructor
    monitor ! Host(p.split(":")(0), p.split(":")(1), cmd)
  })

}
  • 打包执行
mvn clean install
nohup java -Djava.ext.dirs=/appl/scripts/e-business/platform/target/lib:/usr/share/jdk1.8.0_77/jre/lib/ext -classpath /appl/scripts/e-business/platform/target/platform-1.1.1.jar com.changtu.api.CollectLogs "172.19.3.151:22,172.19.3.152:22,172.19.3.153:22,172.19.3.154:22,172.19.3.156:22,172.19.0.5:22,172.19.0.95:11017" "tail -f /appl/scripts/e-business/rest/*.log" >> /appl/scripts/e-business/platform/$$.log 2>&1 &

发表评论

电子邮件地址不会被公开。 必填项已用*标注