数据集成-作业二
完整项目链接:homework2 · main · 垃圾桶 / Data-Intergration · GitLab
实时数据部分
一 环境配置
操作系统使用ubuntu20.04
1. jdk 1.8
官网下载openjdk1.8,解压至本地,添加环境变量,使用java -version
检查。

2. kafka 2.11-2.1.0
官网下载Kafka包后,解压至本地,然后修改kafka/config/下的zookeeper.properties文件,将zookeeper端口设置为2291,并修改zookeeper日志存放地址,再修改server.properties文件,设置连接zookeeper地址为localhost:2291,并修改kafka日志地址。
在zookeeper.properties中:

在server.properties中:


3. flink 1.13.5
官网下载flink包后(注意对应的scala版本为2.11),解压至本地,添加环境变量,使用flink -v
检查。
运行start-cluster.sh
命令启动flink,可以访问localhost:8081来查看flink web ui:

4. clickhouse 23.2.4.12
对于clickhouse版本没有特别的要求,执行如下命令安装:
1 | sudo yum install -y yum-utils |
此时输入clickhouse-client
应该就可以访问clickhouse了,默认使用default账户,正常创建密码即可。使用sql语句操作clickhouse,创建dm数据库,然后按照ch.sql这个文件创建各个表。
1 | CREATE DATABASE dm |
其中以info结尾的为静态数据表,以mx结尾的为实时数据表,可以用以下命令查看各个表中的数据情况:
1 | SELECT database,name,total_rows FROM system.tables WHERE database = 'dm' |
然后这里配置的clickhouse是本地虚拟机内的,要想让远程主机访问并写入,还要做一个ip映射,并修改clickhouse访问权限。先使用ifconfig
命令检查一下原来的ip,可以发现只有本地内网(10.0.2.15)。

在virtualbox中设置网卡为桥接网卡,然后再查一次ifconfig
,发现有了公网ip(172.31.56.66):


后面远程主机想连接这里的clickhouse使用这个地址 172.31.56.66:8123 即可。
然后再修改clickhouse的访问权限,打开/etc/clickhouse-server/users.xml,将

使用postman测试一下:

二 数据处理
1. 接收Kafka流式数据
在助教文档给的代码进行简单修改,将接收的数据写入csv文件中保存,并对接受速率进行简单的统计,代码省略。
实时接收助教推送时的速率大约为1000条/s,当助教退完数据后再接收速率大约为10000-20000条/s,共接受19.6G数据,简单统计后数据分布如下:

小组组号为28,共收到25条特殊数据,如下(因为没有更改编码为utf-8,所以中文显示乱码):

2. 搭建Kafka生产者
先在kafka2.11_2.1.0/config/中配置zookeeper为localhost:2181端口,kafka为localhost:9092端口,并设置好日志文件地址。
从csv文件中取数据,并进行生产,具体步骤为:
1)启动zookeeper和kafka,以下命令是前台启动(方便查看日志),也可以后台启动。
1 | bin/zookeeper-server-start.sh config/zookeeper.properties |
2)创建kafka主题“kafka2flink”,表示数据将生产至该主题中。
1 | bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka2flink |
3)运行kafka生产者代码,代码如下:
1 | /** |
此时Kafka生产者开始生产数据,可以用以下命令验证是否生产成功(如果成功应该能消费到数据):
1 | bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka2flink --from-beginning |
3. Flink消费kafka数据并写入clickhouse
当启动Kafka生产者后,可以使用flink消费数据了,先启动flink:
1 | start-cluster.sh |
然后用mvn clean package
将flink代码打包(需要带依赖打包,具体见下面pom.xml),使用flink命令行或上传至flink web ui(localhost:8081),设置好并行度和参数,然后点击submit执行(执行时需要确保zookeeper和kafka在运行)。

flink web ui上显示如下表明正常运行:

如果flink web ui的日志中报错提示:No suitable driver found for jdbc:clickhouse,这时由于找不到相关的依赖导致的,可以将/usr/local/java/jre/lib/rt.jar复制一份至flink/lib/中。
当消费完全部数据后,flink不会自动停止而是继续等待新的数据被生产,需要Cancel Job手动停止。
该部分在代码中具体处理步骤为:
1)定义FlinkKafkaConsumer,消费前面Kafka生产的数据,并添加至数据源,该部分代码为:
1 | StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
2)使用过滤函数对数据进行空值过滤,代码如下:
1 | // 对数据流,过滤空值 |
其中,FilterMapFunction为自定义的过滤函数,如下:
1 | /** |
3)接下来将过滤后的数据依据不同的‘eventType’写入clickhouse对应的表中,
1 | MyClickhouseSink myClickhouseSink = new MyClickhouseSink(username,password,url,new ClickhouseSqlHelper(dataTypeMap,columnNameListMap)); |
其中,MyClickhouseSink代码如下:
1 | public class MyClickhouseSink extends RichSinkFunction<String> { |
由于一共有14种不同的数据类型和14张对应的要写入的表,必须准备14个不同的PreparedStatement,每个PreparedStatement提前设置对应的sql插入语句。
为了加快写入速率,使用按批处理的办法,每批次5000条,使用一个count计数当前批次中的数量,每当批次中数量超过5000,或者切换到一个新的PreparedStatement时,就将批次中的所有数据写入clickhouse,这样可以避免频繁连接数据库,能极大提高速率(在不设置batch时,平均速率只有60条/s)。
在方法中,为了设置PreparedStatement,借助了一个自实现的ClickhouseSqlHelper,该类会读取接收到的数据,根据‘eventType’判断要写入哪张表中,然后根据具体的数据类型设置所有的字段(即statement.setXXX(i+1, value) )核心代码如下:
1 | public void setStatement(PreparedStatement statement, String json) throws SQLException { |
其他一些使用到的自实现的工具类有:
SqlMap,其作用为根据传入的表名返回对应的sql插入语句;
DataTypeMap,其作用为根据传入的表名和字段名,返回该字段的数据类型;
ColumnNameListMap,其作用为根据表明,按顺序返回一个包含该表所有字段名的列表(与sql插入语句的顺序对应);
具体代码省略。
流式数据处理部分使用的全部依赖和插件如下,为了使用已搭建的flink,关于flink核心库的依赖需要添加<scope>pro</scope>
:
1 |
|
三 处理结果展示
在kafka全部生产完后,Flink过程速率为10000-80000条/s不等,如果边生产边消费,实际速率受kafka生产者速率限制
Clickhouse中数据展示:
静态数据部分
一 环境配置
操作系统使用windows10,实时数据与静态数据使用的是两台设备,最后写入同一张表中。
1. hadoop 2.7.4
下载hadoop2.7.4并配置环境变量
拷贝hadoop2.7.4\bin\hadoop.dll到目录C:\Windows\System32
配置文件
在etc\hadoop子目录下,对如下文件做出如下修改并创建相应的tmp、dataNode、nameNode的文件夹
core-site.xml
hdfs-site.xml
yarn-site.xml
复制mapred-site.xml.template,并重命名为mapred-site.xml,修改里面的内容
格式化namenode:切换至$HADOOP_HOME/bin目录下,使用CMD输入命令:hdfs namenode -format
启动Hadoop服务:切换至$HADOOP_HOME/sbin目录下,执行start-all.cmd脚本,执行相应命令验证hadoop环境搭建成功
2. Scala 2.11.12
- 下载并安装scala 2.11.12,配置环境变量
- 在IDEA中,Project Structure->Libraries->Scala SDK处添加对应的路径
3. Spark 2.3.3
- 下载并安装spark-2.3.3-bin-hadoop2.7,配置环境变量
二 数据处理
1. 读取数据并进行简单的数据清洗
具体代码如下:
var df = reader.option("dbtable", tblNameDst).load() val columnNames = df.columns.toList.map(name => name.substring(tblNameDst.length + 1)).toArray df = df.toDF(columnNames: _*) // 1.去掉重复数据行 var uniDf = df.dropDuplicates() uniDf = uniDf.na.drop() // 2.去掉除uid全部为null的行 val colNames = uniDf.columns.toList.toArray val colNamesWithoutUid = colNames.filter(colName => { !colName.equals("uid") }) uniDf = uniDf.na.drop("all", colNamesWithoutUid) // 3.填充null值 // """" -> "" uniDf = uniDf.na.replace(colNames, Map("\"\"" -> "")) val columnType: Map[String, String] = uniDf.dtypes.toMap var nonStringTypeCol: List[String] = List() var stringTypeCols: List[String] = List() for(columnName <- uniDf.columns){ // 获得数据类型 val tmp = columnType(key = columnName) if(!tmp.equals("StringType")){ nonStringTypeCol = nonStringTypeCol :+ columnName } else { stringTypeCols = stringTypeCols :+ columnName } } var resData = uniDf.na.fill(0, nonStringTypeCol) resData = resData.na.fill("", stringTypeCols)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
结果如下:

##### 2. ETL小作业
10. ```scala
if(tblNameDst == "pri_cust_contact_info"){
// 定义电话号码的正则表达式
val phoneRegex: Regex = "^1[3-9]\\d{9}$".r
// 创建一个用户定义函数(UDF)来验证电话号码,并根据其有效性返回原始电话号码或空字符串
val validatePhoneUDF = udf((phone: String, con_type: String) => {
if (con_type == "HOMEADD") {
phone // 如果con_type为HOMEADD,直接返回原始值
} else if (Set("TEL", "OTH", "MOB").contains(con_type) && phone != null && honeRegex.findFirstIn(phone).isDefined) {
phone
} else{
""
}
})
// 使用withColumn方法将validatePhoneUDF应用于数据集的contact列,更新电话号码
df = df.withColumn("contact", validatePhoneUDF(col("contact"), col("con_type")))
//去除除了主键以外全为空的列
// 定义主键
val primaryKey = "uid"
// 获取除主键(uid)以外的所有列名
val columns = df.columns.filter(_ != primaryKey)
// 创建一个用于检查所有列是否为空的条件表达式
val allColumnsNullCondition = columns.map(col => isnull(df(col))).reduce(_ && _)
// 使用`filter`方法,仅保留至少有一个非主键列不为空的行
df.filter(not(allColumnsNullCondition))
val rdd = df.rdd
// 将RDD转换为PairRDD,键为uid
val pairRDD = rdd.map(row => {
val uid = row.getAs[String]("uid")
val con_type = row.getAs[String]("con_type")
val contact = row.getAs[String]("contact")
val contactPhone = if (Set("TEL", "OTH", "MOB").contains(con_type) && !Set("无", "-", "", null).contains(contact)) contact else ""
val contactAddress = if (!Set("TEL", "OTH", "MOB", "无", "-", "", null).contains(con_type) && !Set("无", "-", "", null).contains(contact)) contact else ""
(uid, (row, contactPhone, contactAddress))
})
// 定义一个函数用于合并联系方式
// 修改的mergeContacts函数
def mergeContacts(contact1: (Row, String, String), contact2: (Row, String, String)): (Row, String, String) = {
val phoneSeparator = ","
val addressSeparator = ","
val invalidContacts = Set("无", "-", "", null)
val (phone1, address1) = (contact1._2, contact1._3)
val (phone2, address2) = (contact2._2, contact2._3)
val newPhone = (phone1.split(phoneSeparator) ++ phone2.split(phoneSeparator)).filterNot(invalidContacts.contains).distinct.mkString(phoneSeparator)
val newAddress = (address1.split(addressSeparator) ++ address2.split(addressSeparator)).filterNot(invalidContacts.contains).distinct.mkString(addressSeparator)
// 获取 update_date 和 create_date 的列索引
val updateDateIndex = 5
val createDateIndex = 4
// 使用 Option 类型处理空值
val updateDate1 = Option(contact1._1.get(updateDateIndex)).map(_.toString).getOrElse("")
val updateDate2 = Option(contact2._1.get(updateDateIndex)).map(_.toString).getOrElse("")
val createDate1 = Option(contact1._1.get(createDateIndex)).map(_.toString).getOrElse("")
val createDate2 = Option(contact2._1.get(createDateIndex)).map(_.toString).getOrElse("")
// 使用 create_date 作为 update_date 的备选值
val effectiveUpdateDate1 = if (updateDate1.nonEmpty) updateDate1 else createDate1
val effectiveUpdateDate2 = if (updateDate2.nonEmpty) updateDate2 else createDate2
// 根据 update_date 和 create_date 选择较新的一行
val newerRow = if (effectiveUpdateDate1 > effectiveUpdateDate2) contact1._1 else contact2._1
// 直接合并两行中的所有列
val newRowData = newerRow.toSeq
val newRow = Row.fromSeq(newRowData)
(newRow, newPhone, newAddress)
}
// 使用reduceByKey合并相同uid的数据
val reducedPairRDD = pairRDD.reduceByKey(mergeContacts)
// 定义结果的schema
val schema = df.schema
.add(StructField("contact_phone", StringType, nullable = true))
.add(StructField("contact_address", StringType, nullable = true))
// 将结果转换回DataFrame
val rowRDD = reducedPairRDD.map { case (uid, (row, contactPhone, contactAddress)) =>
val newRowData = row.toSeq :+ contactPhone :+ contactAddress
Row.fromSeq(newRowData)
}
val reducedDF = session.createDataFrame(rowRDD, schema)
// 删除原始的contact列
val newDF = reducedDF.drop("contact")
// 显示和打印合并后的DataFrame
newDF.show(20)
println(newDF.count())
// 定义一个正则表达式,用于匹配18位数字的uid
val uidRegex: Regex = "^\\d{18}$".r
// 过滤掉不是18位数字的行
val finalDF = newDF.filter(row=> uidRegex.findFirstIn(row.getAs[String("uid")).isDefined) println(finalDF.count())
小作业结果如下:

3. 写入ClickHouse
写入clickhouse代码如下:
1 | val clickHouseProperties = new Properties() |
4. 处理结果展示
一共写入9张九张静态表,即下图中包含数据的9张表

可视化部分
一 可视化工具使用
使用百度的Sugar BI作为可视化工具,具体步骤为:
申请账号后,在Sugar BI中创建一个空间
导入数据源,具体如下:
为了使Sugar BI能访问到本地8123端口的clickhouse,需要使用内网隧道,从Sugar BI下载内网隧道客户端后,解压至本地,然后修改db.txt文件内容为127.0.0.1:8123,然后使用
bash start.sh
命令启动内网隧道,回到Sugar BI测试连接。Sugar BI成功连接本地clickhouse后,可以在数据表预览中查看所有表,应该能看到里面的数据,但仅显示部分:
二 创建可视化模型
在Sugar BI中新建一个大屏,选择一个合适的界面,进入编辑,然后选择想要的组件(如柱状图、折线图、饼图等),为组件添加sql模型,这样组件就会展示clickhouse中的数据。
例如展示活期交易日top20用户,先创建一个sql模型,如下:
然后创建一张横向柱状图,绑定刚刚的sql模型,设置绑定字段、x轴与y轴、刷新时间等等,如下:
三 可视化结果展示

左上的图展示每日客户交易量TOP20,显示了当天实时活期交易额最高的20个客户及其交易量;
下面的图展示了从数据记录之初到实时的每日交易总额变化,从中可以清晰看出交易量的变化趋势、峰值谷值的分布;
右上的饼图使用了静态数据,显示了各种存款类型在银行存款中所占比例以及各种定期存款的大致比例。
可视化视频已作为附件一并打包上传,展示数据的实时变化。
团队分工
姓名 | 学号 | 分工 |
---|---|---|
xxx | xxxxxxxxx | 可视化部分 |
xxx | xxxxxxxxx | 实时数据kafka和flink部分 |
xxx | xxxxxxxxx | 静态数据ETL处理 |
xxx | xxxxxxxxx | 静态数据部分环境搭建 |
xxx | xxxxxxxxx | clickhouse搭建 |