本文共 6675 字,大约阅读时间需要 22 分钟。
商城埋点数据,即客户在商城的购买行为的各种浏览数据,是对客户购买行为分析的重要资产。在大数据领域,可以通过收集客户行为数据,分析客户行为规律,进而指导商城各种营销活动制定。例如淘宝和京东展示的各种维度分析报表,可展现客户购买偏好,浏览商品偏好,地域性偏好等;另一个重要应用便是商城“商品推荐”,商城商品推荐便时对客户浏览和购买行为综合分析的结果,在亚马逊“商品推荐”能为企业商城来带40%左右的营收。基于此,对客户行为数据保存就显得至关重要。
以kafka_2.12-2.4.x版本为例,先解压到/appuser/kafka_2.12-2.4.x目录,并说明kafka部署,部署集群节点为:
192.168.0.1192.168.0.2192.168.0.3
1、系统参数修改
vm.swappiness=5 net.core.wmem_default=256960 net.core.rmem_default=256960 vm.max_map_count=262144 sysctl -p 重启使其生效;
*soft nofile 204800 *hard nofile 204800 *soft nproc 204800 *hard nproc 204800
2、找到config目录下的server.properties文件
broker.id=1 #三台机器分别为1、2、3zookeeper.connect=192.168.0.1,192.168.0.2,192.168.0.3listeners=PLAINTEXT://host:9092 #host为当前服务的实际IP地址num.partitions=2 #默认分区数为1,可根据实际调整log.dirs=/appuser/datanum.network.threads=3socket.send.buffer.bytes=1048576socket.receive.buffer.bytes=1048576log.flush.interval.messages=15000log.flush.interval.ms=3000replica_lag_max_messages=7000
3、配置启动jvm参数
修改vi kafka-server-start.shexport KAFKA_HEAP_OPTS="-Xmx4G -Xms4G -server -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=70 -XX:G1RSetUpdatingPauseTimePercent=5 -XX:MAXGCPauseMillis=500 -XX:ParallelGCThreads=16 -XX:ConcGCThreads=16"
其他可选参数:
-XX: +PrintGCDetails -XX: +PrintGCDatestamps XX: +PrintHeapAtGC -XX: +PrintTenuringDistribution -XX: +PrintGCApplicationStoppedTime -XX: +PrintPromotionFailure #-XX: PrintFLSStatistics=1 #-Xloggc: /applog/gc.log #-XX:HeapDumpPath:/heapdump -XX: +UseGCLogFileRotation -XX: NumberOfGCLogFiles=10 -XX: GCLogFileSize=10M
4、修改默认日志文件输出路径
配置kafka-run-class.sh,在文件最前面增加内容:LOG_DIR=/appuser/logs 修改日志级别:log4j.properties执行命令
./bin/kafka-server-start.sh -daemon config/server.propertiesdecrease-replica-factor.json的json格式如下 {"version":1, "partitions":[ {"topic":"s3-minio","partition":0,"replicas":[1,2]}, {"topic":"s3-minio","partition":1,"replicas":[1,3]} ] }
下载confluent-connect-s3,或,下载confluentinc-kafka-connect-s3-5.5.x.zip完成后进行解压
mkdir -r /appuser/kafka_2.12-2.4.x/plugins 创建文件夹 将文件解压到/appuser/kafka_2.12-2.4.x/plugins/kafka-connect-s3-5.5.x 1、配置connect-distributed-s3.properties文件 找到connect-distributed.properties文件,执行 cp connect-distributed.peroperties connect-distributed-s3.properties命令,拷贝一份文件。vi connect-distributed-s3.propertiesbootstrap.servers=192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092plugin.path=/appuser/kafka_2.12-2.4.x/plugins/kafka-connect-s3-5.5.x
bin/kafka-distributes.sh -daemon config/connect-distributed-s3.properties
应能正常访问:http://host:8083/connectors
其中host可以是任意节点IP地址:192.168.0.1,192.168.0.2,192.168.0.3
为了完成kafka和minio之间的数据转换,需要创建s3的sink插件,通过插件将实时流数据转换为文本数据存储到文件系统,和,以下以JSON格式埋点数据为例:
curl -X POST -H "Content-Type:application/json" \--data'{"name":s3-sink","config":{ "name":"s3-sink", "connector.class":"io.confluent.connects3.S3SinkConnector", "tasks.max"5", "topics":"s3-minio", "topics.dir":"", "s3.bucket.name":"s3-bucket", "s3.part.size:"26214400", "flush.size":"4500", "store.url":"http://f5-host:9000", "aws.access.key.id":"minio", "aws.secret.access.key":"minio", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable":"false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "storage.class":"io.confluent.connect.s3.storage.S3Storage", "schema.compatibility":"NONE", "schemas.enable":"false", "format.class":"io.confluent.connect.s3.format.json.JsonFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.Time.BasedPartitioner", "locale":"zh-CN", "timezone":"Asia/Shanghai", "timestamp.extractor":"Record", "path.format":"YYYY-MM-dd", "partition.duration.ms":"600000"}}'\http://192.168.0.1:8083/connectors
建议最tasks max和 connector分区数保持一致,f5-host为minio集群的F5地址或者ng的地址。 curl -X PUT -H "Content-Type:application/json" \--data'{ "name":"s3-sink", "connector.class":"io.confluent.connects3.S3SinkConnector", "tasks.max"5", "topics":"s3-minio", "topics.dir":"", "s3.bucket.name":"s3-bucket", "s3.part.size:"26214400", "flush.size":"4500", "store.url":"http://f5-host:9000", "aws.access.key.id":"minio", "aws.secret.access.key":"minio", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable":"false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "storage.class":"io.confluent.connect.s3.storage.S3Storage", "schema.compatibility":"NONE", "schemas.enable":"false", "format.class":"io.confluent.connect.s3.format.json.JsonFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.Time.BasedPartitioner", "locale":"zh-CN", "timezone":"Asia/Shanghai", "timestamp.extractor":"Record", "path.format":"YYYY-MM-dd", "partition.duration.ms":"600000"}'\http://192.168.0.1:8083/connectors/s3-sink/config|jq
通过以上部署步骤,即可完成kafka,kafka-connect-s3插件部署;
请参考我的另一篇文章:
转载地址:http://occpi.baihongyu.com/