【Kafka】Kafka 1.0.1案例详解之Kafka Connect


本博客文章如无特别说明,均为原创!转载请注明出处:Big data enthusiast(http://www.lubinsu.com/)

本文链接地址:【Kafka】Kafka 1.0.1案例详解之Kafka Connect(http://www.lubinsu.com/index.php/archives/513)

Kafka Connect是一个用于Kafka与外部系统之间高可靠的、可扩展的流数据传输工具。它使得我们能够简单快速的定义数据集合在Kafka与外部系统之间输入输出。Kafka Connect可以从数据库或者应用程序服务器中手机数据指标到Kafka的topic中,以便数据进行低延迟的数据处理。一个实现了导出功能的Connect可以将数据从Kafka中导出到外部存储系统、查询系统或者批处理系统进行离线分析。
Kafka Connect包括如下特性:
  • 提供了一个通用的Connectors开发框架
  • 支持分布式模式或者单机模式
  • 支持REST接口
  • 自动offset管理
  • 分布式并且可扩展
  • 支持流处理和批处理
 
Kafka Connect功能示意图
 
 
运行Kafka Connect
支持两种运行模式:standalone模式(单线程)和分布式
 
 
standalone模式
standalone模式中,所有的工作都在单个线程中完成。一般情况下这种模式适合于只有单节点工作的情况,但是这并不能做到Kafka Connect的高容错性,如果进程down掉则没有替代的进程来完成后续的工作。那么如何启动一个standalone进程呢:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties …]
connect-standalone.properties中配置了worker的相关信息,包括连接参数、系列化格式、和提交offset的时间间隔等,第二个配置文件则指定了connect的相关信息,是source还是sink,输入的源或者输出的目标。
 
 
分布式模式
分布式模式和standalone模式有点区别,我们先来看看启动命令:
> bin/connect-distributed.sh config/connect-distributed.properties
这样我们相当于启动了一个Connect服务,Kafka Connect的本意就是打算以服务的方式运行,所以它提供了REST API来管理connectors,分布式模式启动之后,我们就可以通过REST服务进行job管理了。
 
现在我们来尝试创建一个connector:
POST方式发送到:http://localhost:8083/connectors
返回:

{
    "name": "local-file-source",
        "config": {
    "connector.class": "FileStreamSource",
            "file": "test.txt",
            "tasks.max": "1",
            "name": "local-file-source-name",
            "topic": "connect-test"
},
    "tasks": [
    {
        "connector": "local-file-source",
            "task": 0
    }
],
    "type": "source"
}
GET方式获取connectors列表:
[
    "local-file-source"
]
现在我们向文件中增加一行数据:
echo "nowcccsadfasdfasdfadvvc" >> test.txt
 
在connect-test消费端可以发现接收到了一条新纪录:
 
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"nowcccsadfasdfasdfadvvc"}

 
Kafka Connect的内容就讲解到这里,实际上Connector是可以直接定制的,我们可以通过继承SourceConnector、SinkConnector、SourceTask和SinkTask来实现我们所需要的功能。

发表评论

电子邮件地址不会被公开。 必填项已用*标注