【Scala】基于Spary的REST服务


本博客文章如无特别说明,均为原创!转载请注明出处:Big data enthusiast(http://www.lubinsu.com/)

本文链接地址:【Scala】基于Spary的REST服务(http://www.lubinsu.com/index.php/archives/244)

Spray简介

spray is an open-source toolkit for building REST/HTTP-based integration layers on top of Scala and Akka. Being asynchronous, actor-based, fast, lightweight, modular and testable it’s a great way to connect your Scala applications to the world.
Spary是一个轻量级的,基于Scala和Akka之上的REST/HTTP框架。

本文相关的测试代码可以我的GitHub中获取到:

https://github.com/lubinsu/rest.git

我们来看下基于Akka模型的Spary框架是如何实现交互的,首先对外通过Http.bind暴露端口,为了保证服务能力,这里使用

DistributedPubSubExtension来实现分布式集群能力(参考文档:http://doc.akka.io/docs/akka/2.3.6/contrib/distributed-pub-sub.html)。

 

DistributedPubSubExtension

关于DistributedPubSubExtension的简介,请看下面的描述:

Akka provides power clustering capabilities but reading through the online docs it may not be obvious how to set it up for your particular needs. For my use case, I wanted to create multiple actors representing a particular service, that would run on more that one machine. If one machine failed, actors on other machines could still handle the requests, as in a typical high availability scenario. In addition, I did not want the calling actor to have any knowledge of which machines could handle the request, only that one actor in the cluster would handle it. Akka’s provides exactly these capabilities using the DistributedPubSubExtension provided in Akka’s contrib module. This article will demonstrate how to create a clustered service using the DistributedPubSubExtension.

In our example we have two types of actors; a BackendService actor which represents some arbitrary service running in the cluster and a WebService actor which handles http requests and in turn calls a BackendService to perform some work. The BackendService is stateless and there are many running in the cluster. The WebService just need to call any one BackendService to have the work performed.

rest

DistributedPubSubMediator

WebServiceActor与BackendServiceActor创建一个DistributedPubSubMediator来维护已经注册了的ActorRefs并将它们分布在集群中,这些子actor可以随时加入或者离开集群,mediator会自动查找当前已经注册并且活动着的actor。

val mediator = DistributedPubSubExtension(context.system).mediator

在BackendServiceActor中需要创建mediator并注册我们的actor,mediator会依次更新集群中所有的mediator并告诉它们,有一个actor加入了。

mediator ! Put(self)

最后就是标准的actor实现了,监听并回复接收到的消息:

def receive = {
case BusScenic(userId, labelCode) =>
    log.info("Backend Service is querying user's labels:" + userId)
    sender() ! getLabels(BusScenic(userId, labelCode))
}

Web Service

继承HttpService来提供http rest服务:

class WebServiceActor extends Actor with HttpService with ActorLogging

同样需要实现route功能:

def receive = runRoute(route)

val route =
  path("userlabel") {
    get {
      parameters('userId, 'labelCode) { (userId, labelCode) =>
        log.info("hello".concat(labelCode))
        /** 1. [[akka.contrib.pattern.DistributedPubSubMediator.Send]] -
         * The message will be delivered to one recipient with a matching path, if any such
         * exists in the registry. If several entries match the path the message will be sent
         * via the supplied `routingLogic` (default random) to one destination. The sender of the
         * message can specify that local affinity is preferred, i.e. the message is sent to an actor
         * in the same local actor system as the used mediator actor, if any such exists, otherwise
         * route to any other matching entry. A typical usage of this mode is private chat to one
         * other user in an instant messaging application. It can also be used for distributing
         * tasks to registered workers, like a cluster aware router where the routees dynamically
         * can register themselves.
         * */
        onComplete(mediator ? Send("/user/backend-service", BusScenic(userId, labelCode), localAffinity = false)) {
          case Success(value) =>
            complete(value.asInstanceOf[UserLabels])
          case Failure(e) =>
            complete(e.getMessage)
          case _ =>
            complete("unknown error.")
        }
      }
    }
  }

application.conf配置

在配置文件中application.conf,需要配置extentions:

extensions = [
  "akka.contrib.pattern.DistributedPubSubExtension"
]

完整的配置如下所示:

akka {
  loglevel = INFO
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = on
    netty.tcp {
      hostname = "hadoop.slave2"
      port = 0
    }
  }
  extensions = [
    "akka.contrib.pattern.DistributedPubSubExtension"
  ]

  cluster {
    seed-nodes = [
      "akka.tcp://cluster-example@hadoop.slave2:2551"
    ]

    auto-down-unreachable-after = 5s
  }
}

spray.can.server {
  request-timeout = 10s
}

akka.contrib.cluster.pub-sub {
  # Actor name of the mediator actor, /user/distributedPubSubMediator
  name = distributedPubSubMediator

  # Start the mediator on members tagged with this role.
  # All members are used if undefined or empty.
  role = ""

  # How often the DistributedPubSubMediator should send out gossip information
  gossip-interval = 1s

  # Removed entries are pruned after this duration
  removed-time-to-live = 120s
  
  # The routing logic to use for 'Send'
  # Possible values: random, round-robin, broadcast
  routing-logic = round-robin
}

完整的可执行代码和部署方式详见:https://github.com/lubinsu/rest.git

 

One thought on “【Scala】基于Spary的REST服务

发表评论

电子邮件地址不会被公开。 必填项已用*标注