博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【数据平台】之Kafka+Minio数据埋点大数据利器
阅读量:4119 次
发布时间:2019-05-25

本文共 6675 字,大约阅读时间需要 22 分钟。

商城埋点数据,即客户在商城的购买行为的各种浏览数据,是对客户购买行为分析的重要资产。在大数据领域,可以通过收集客户行为数据,分析客户行为规律,进而指导商城各种营销活动制定。例如淘宝和京东展示的各种维度分析报表,可展现客户购买偏好,浏览商品偏好,地域性偏好等;另一个重要应用便是商城“商品推荐”,商城商品推荐便时对客户浏览和购买行为综合分析的结果,在亚马逊“商品推荐”能为企业商城来带40%左右的营收。基于此,对客户行为数据保存就显得至关重要。

流程图

kafka+minio+connector s3

1、通过kafka实时接收商城端行为(埋点)数据;
2、通过kafka插件connect-s3将数据转换为文本格式数据;
3、数据存储到文件存储Minio集群;
以上是通过3个步骤,便可将商城的客户行为数据,最终转换落地存储到文件系统MINIO集群。

1、Kafka部署

以kafka_2.12-2.4.x版本为例,先解压到/appuser/kafka_2.12-2.4.x目录,并说明kafka部署,部署集群节点为:

192.168.0.1192.168.0.2192.168.0.3

1、系统参数修改

  • vi /etc/sysctl.conf增加如下配置项
vm.swappiness=5 net.core.wmem_default=256960 net.core.rmem_default=256960 vm.max_map_count=262144 sysctl -p 重启使其生效;
  • vi /proc/sys/vm/dirty_background_ratio
    设置值为5
  • vi /etc/security/limits.conf
*soft nofile 204800 *hard nofile 204800 *soft nproc 204800 *hard nproc 204800

2、找到config目录下的server.properties文件

broker.id=1                     #三台机器分别为1、2、3zookeeper.connect=192.168.0.1,192.168.0.2,192.168.0.3listeners=PLAINTEXT://host:9092  #host为当前服务的实际IP地址num.partitions=2                 #默认分区数为1,可根据实际调整log.dirs=/appuser/datanum.network.threads=3socket.send.buffer.bytes=1048576socket.receive.buffer.bytes=1048576log.flush.interval.messages=15000log.flush.interval.ms=3000replica_lag_max_messages=7000

3、配置启动jvm参数

修改vi kafka-server-start.sh

export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G -server -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=70 -XX:G1RSetUpdatingPauseTimePercent=5 -XX:MAXGCPauseMillis=500 -XX:ParallelGCThreads=16 -XX:ConcGCThreads=16"

其他可选参数:

-XX: +PrintGCDetails -XX: +PrintGCDatestamps XX: +PrintHeapAtGC -XX: +PrintTenuringDistribution -XX: +PrintGCApplicationStoppedTime -XX: +PrintPromotionFailure #-XX: PrintFLSStatistics=1 #-Xloggc: /applog/gc.log #-XX:HeapDumpPath:/heapdump -XX: +UseGCLogFileRotation -XX: NumberOfGCLogFiles=10 -XX: GCLogFileSize=10M

4、修改默认日志文件输出路径

配置kafka-run-class.sh,在文件最前面增加内容:LOG_DIR=/appuser/logs
修改日志级别:log4j.properties

1.1、启动

执行命令

./bin/kafka-server-start.sh -daemon config/server.properties

1.2、常用操作

  • 进入任意节点,创建topic
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --create --topic s3-minio --partitions 5 --replication-factor 2
    topic名称:s3-minio
    分区数:5
    副本数:2
  • 查看topic
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --list #查看集群可用的topics
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --describe #查看topics详情
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --describe --topic s3-minio #查看具体topic详情
  • 修改topic
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --alter --partitions n --topic s3-minio #修改s3-minio的分区数
    ./bin/kafka-reassian-partitions.sh --zookeeper zk集群地址 --reassignment-json-file decrease-replica-factor.json --execute #修改topic分区副本数
decrease-replica-factor.json的json格式如下	{"version":1,	"partitions":[		{"topic":"s3-minio","partition":0,"replicas":[1,2]},		{"topic":"s3-minio","partition":1,"replicas":[1,3]}		]	}
  • 删除topic
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --delete --topic s3-minio
  • Producer发送消息
    ./bin/kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic s3-minio
  • Consumer消费消息
    ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic s3-minio
    #从开始处进行消费消息
    ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic s3-minio --from-beginning

2、connect-s3插件部署

下载confluent-connect-s3,或,下载confluentinc-kafka-connect-s3-5.5.x.zip完成后进行解压

mkdir -r /appuser/kafka_2.12-2.4.x/plugins 创建文件夹
将文件解压到/appuser/kafka_2.12-2.4.x/plugins/kafka-connect-s3-5.5.x
1、配置connect-distributed-s3.properties文件
找到connect-distributed.properties文件,执行 cp connect-distributed.peroperties connect-distributed-s3.properties命令,拷贝一份文件。

vi connect-distributed-s3.propertiesbootstrap.servers=192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092plugin.path=/appuser/kafka_2.12-2.4.x/plugins/kafka-connect-s3-5.5.x

2.1 启动connect-s3

bin/kafka-distributes.sh -daemon config/connect-distributed-s3.properties

2.2 部署验证connect-s3

应能正常访问:http://host:8083/connectors

其中host可以是任意节点IP地址:192.168.0.1,192.168.0.2,192.168.0.3

2.3 创建s3的sink插件

为了完成kafka和minio之间的数据转换,需要创建s3的sink插件,通过插件将实时流数据转换为文本数据存储到文件系统,和,以下以JSON格式埋点数据为例:

  • 创建s3-sink
    最大tasks.max和 connector分区数保持一致,192.168.0.1为kafka服务器IP地址,可以是任意节点的IP地址。
    f5-host:为minio集群的F5地址或者ng的地址;
    flush.size:标识多少数据量生成一个文件;
    aws.access.key.id:为minio集群设置用户名;
    aws.secret.access.key:为minio集群设置密码;
    s3.bucket.name:为minio集群设置的bucket名称;
curl -X POST -H "Content-Type:application/json" \--data'{"name":s3-sink","config":{ "name":"s3-sink", "connector.class":"io.confluent.connects3.S3SinkConnector", "tasks.max"5", "topics":"s3-minio", "topics.dir":"", "s3.bucket.name":"s3-bucket", "s3.part.size:"26214400", "flush.size":"4500", "store.url":"http://f5-host:9000", "aws.access.key.id":"minio", "aws.secret.access.key":"minio", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable":"false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "storage.class":"io.confluent.connect.s3.storage.S3Storage", "schema.compatibility":"NONE", "schemas.enable":"false", "format.class":"io.confluent.connect.s3.format.json.JsonFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.Time.BasedPartitioner", "locale":"zh-CN", "timezone":"Asia/Shanghai", "timestamp.extractor":"Record", "path.format":"YYYY-MM-dd", "partition.duration.ms":"600000"}}'\http://192.168.0.1:8083/connectors
  • 修改s3-sink
建议最tasks max和 connector分区数保持一致,f5-host为minio集群的F5地址或者ng的地址。 curl -X PUT -H "Content-Type:application/json" \--data'{ "name":"s3-sink", "connector.class":"io.confluent.connects3.S3SinkConnector", "tasks.max"5", "topics":"s3-minio", "topics.dir":"", "s3.bucket.name":"s3-bucket", "s3.part.size:"26214400", "flush.size":"4500", "store.url":"http://f5-host:9000", "aws.access.key.id":"minio", "aws.secret.access.key":"minio", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable":"false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "storage.class":"io.confluent.connect.s3.storage.S3Storage", "schema.compatibility":"NONE", "schemas.enable":"false", "format.class":"io.confluent.connect.s3.format.json.JsonFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.Time.BasedPartitioner", "locale":"zh-CN", "timezone":"Asia/Shanghai", "timestamp.extractor":"Record", "path.format":"YYYY-MM-dd", "partition.duration.ms":"600000"}'\http://192.168.0.1:8083/connectors/s3-sink/config|jq
  • 删除s3-sink
    curl -X DELETE http://host:8083/connectors/s3-sink
  • 检查s3-sink
    查看工作状态:http://host:8083/connectors/s3-sink/status
    查看配置:http://host:8083/connectors/s3-sink/config

通过以上部署步骤,即可完成kafka,kafka-connect-s3插件部署;

3、MINIO部署

请参考我的另一篇文章:

转载地址:http://occpi.baihongyu.com/

你可能感兴趣的文章
db db2_monitorTool IBM Rational Performace Tester
查看>>
postgresql监控工具pgstatspack的安装及使用
查看>>
【JAVA数据结构】双向链表
查看>>
【JAVA数据结构】先进先出队列
查看>>
Objective-C 基础入门(一)
查看>>
Flutter Boost的router管理
查看>>
iOS开发支付集成之微信支付
查看>>
C++模板
查看>>
【C#】如何实现一个迭代器
查看>>
【C#】利用Conditional属性完成编译忽略
查看>>
DirectX11 光照演示示例Demo
查看>>
VUe+webpack构建单页router应用(一)
查看>>
Node.js-模块和包
查看>>
(python版)《剑指Offer》JZ01:二维数组中的查找
查看>>
Spring MVC中使用Thymeleaf模板引擎
查看>>
PHP 7 的五大新特性
查看>>
深入了解php底层机制
查看>>
PHP中的stdClass 【转】
查看>>
XHProf-php轻量级的性能分析工具
查看>>
OpenCV gpu模块样例注释:video_reader.cpp
查看>>