数据集成-作业二

完整项目链接:homework2 · main · 垃圾桶 / Data-Intergration · GitLab

实时数据部分

一 环境配置

​ 操作系统使用ubuntu20.04

1. jdk 1.8

​ 官网下载openjdk1.8,解压至本地,添加环境变量,使用java -version检查。

image-20241215214549210
2. kafka 2.11-2.1.0

​ 官网下载Kafka包后,解压至本地,然后修改kafka/config/下的zookeeper.properties文件,将zookeeper端口设置为2291,并修改zookeeper日志存放地址,再修改server.properties文件,设置连接zookeeper地址为localhost:2291,并修改kafka日志地址。

​ 在zookeeper.properties中:

image-20241215214600302

​ 在server.properties中:

image-20241215214616126 image-20241215214627136

​ 官网下载flink包后(注意对应的scala版本为2.11),解压至本地,添加环境变量,使用flink -v检查。

image-20241215214807360

​ 运行start-cluster.sh命令启动flink,可以访问localhost:8081来查看flink web ui:

image-20241215214816919
4. clickhouse 23.2.4.12

​ 对于clickhouse版本没有特别的要求,执行如下命令安装:

1
2
3
4
5
sudo yum install -y yum-utils
sudo yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo //添加官方存储库
sudo yum install -y clickhouse-server clickhouse-client //安装clickhouse-server和clickhouse-client

sudo /etc/init.d/clickhouse-server start //启动clickhouse-server

​ 此时输入clickhouse-client应该就可以访问clickhouse了,默认使用default账户,正常创建密码即可。使用sql语句操作clickhouse,创建dm数据库,然后按照ch.sql这个文件创建各个表。

1
2
3
4
CREATE DATABASE dm
CREATE TABLE dm.dm_hlw_shop_info(....)...;
...
...

​ 其中以info结尾的为静态数据表,以mx结尾的为实时数据表,可以用以下命令查看各个表中的数据情况:

1
SELECT database,name,total_rows FROM system.tables WHERE database = 'dm'

​ 然后这里配置的clickhouse是本地虚拟机内的,要想让远程主机访问并写入,还要做一个ip映射,并修改clickhouse访问权限。先使用ifconfig命令检查一下原来的ip,可以发现只有本地内网(10.0.2.15)。

image-20241215214839318

​ 在virtualbox中设置网卡为桥接网卡,然后再查一次ifconfig,发现有了公网ip(172.31.56.66):

image-20241215214850602 image-20241215214903499

​ 后面远程主机想连接这里的clickhouse使用这个地址 172.31.56.66:8123 即可。

​ 然后再修改clickhouse的访问权限,打开/etc/clickhouse-server/users.xml,将的注释去掉。

image-20241215214919419

​ 使用postman测试一下:

image-20241215214927928

二 数据处理

1. 接收Kafka流式数据

​ 在助教文档给的代码进行简单修改,将接收的数据写入csv文件中保存,并对接受速率进行简单的统计,代码省略。

​ 实时接收助教推送时的速率大约为1000条/s,当助教退完数据后再接收速率大约为10000-20000条/s,共接受19.6G数据,简单统计后数据分布如下:

image-20241215215009081

​ 小组组号为28,共收到25条特殊数据,如下(因为没有更改编码为utf-8,所以中文显示乱码):

image-20241215215020320
2. 搭建Kafka生产者

​ 先在kafka2.11_2.1.0/config/中配置zookeeper为localhost:2181端口,kafka为localhost:9092端口,并设置好日志文件地址。

​ 从csv文件中取数据,并进行生产,具体步骤为:

1)启动zookeeper和kafka,以下命令是前台启动(方便查看日志),也可以后台启动。

1
2
bin/zookeeper-server-start.sh config/zookeeper.properties 
bin/kafka-server-start.sh config/server.properties

2)创建kafka主题“kafka2flink”,表示数据将生产至该主题中。

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka2flink

3)运行kafka生产者代码,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 读取数据(即KafkaConsumer消费的数据),并用kafka进行生产
*/
public class KafkaProducer {

public static void main(String[] args) {
Properties props = new Properties();
//kafka 集群,broker-list
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "0");
//重试次数
props.put("retries", 1);
//批次大小
props.put("batch.size", 16384);
//等待时间
props.put("linger.ms", 1);
//RecordAccumulator 缓冲区大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serializati" +
"on.StringSerializer");

Logger logger = Logger.getLogger("ProducerLogger");

try (Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props)) {
File file = new File("/home/jinqiqing/DI_data/data.csv");
//设置主题名,注意主题名要与前面创建的主题一致
String topic = "kafka2flink";
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
//批量生成数据
long totalCount = 0;
String line = bufferedReader.readLine();
while (line != null) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, line);
producer.send(record);
totalCount++;
line = bufferedReader.readLine();
if ((totalCount % 10000) == 0)
logger.info("hava produced data count = " + totalCount);
}
logger.info("All data produced. Total count: " + totalCount);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

​ 此时Kafka生产者开始生产数据,可以用以下命令验证是否生产成功(如果成功应该能消费到数据):

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka2flink --from-beginning
3. Flink消费kafka数据并写入clickhouse

​ 当启动Kafka生产者后,可以使用flink消费数据了,先启动flink:

1
start-cluster.sh

​ 然后用mvn clean package将flink代码打包(需要带依赖打包,具体见下面pom.xml),使用flink命令行或上传至flink web ui(localhost:8081),设置好并行度和参数,然后点击submit执行(执行时需要确保zookeeper和kafka在运行)。

image-20241215215040518

​ flink web ui上显示如下表明正常运行:

image-20241215215051545

​ 如果flink web ui的日志中报错提示:No suitable driver found for jdbc:clickhouse,这时由于找不到相关的依赖导致的,可以将/usr/local/java/jre/lib/rt.jar复制一份至flink/lib/中。

​ 当消费完全部数据后,flink不会自动停止而是继续等待新的数据被生产,需要Cancel Job手动停止。

​ 该部分在代码中具体处理步骤为:

1)定义FlinkKafkaConsumer,消费前面Kafka生产的数据,并添加至数据源,该部分代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Flink应用程序的检查点(checkpoint)间隔,每60000毫秒将发起一次检查点
env.enableCheckpointing(60000);
// 设置Flink应用程序的时间特征,在此代码中设置以事件时间(事件实际发生的时间)作为时间标记(而不是Flink默认的处理时间或摄取时间)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//设置主题名,表示从这个主题接收数据,即source,应该与KafkaProducer中的主题和前面命令行创建的主题对应
String topic = "kafka2flink";

Properties props = new Properties();
//应该与KafkaProducer中kafka集群地址对应
props.setProperty("bootstrap.servers", "localhost:9092");
//相同的组号维护相同的offset,也就是只要组号不同,就不会重复读数据
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "203");
// 定义Flink Kafka Consumer
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), props);
// 设置第一次从头消费,以后从上一次的位置继续消费
consumer.setStartFromGroupOffsets();

// 添加source数据流
DataStreamSource<String> source = env.addSource(consumer);

2)使用过滤函数对数据进行空值过滤,代码如下:

1
2
// 对数据流,过滤空值
SingleOutputStreamOperator<String> dataStream = source.map(new FilterMapFunction());

​ 其中,FilterMapFunction为自定义的过滤函数,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 过滤符合特定条件的JSON格式字符串。
* 在重载的map函数中,将接收到的json字符串中的键值对进行判断,如果满足以下条件之一,则将其过滤掉:
* uid的值为空;
* 除uid外所有值都为空;
* 所有值都为空。
* 如果该json字符串未被过滤,则直接返回原字符串,否则返回null。
*/
public class FilterMapFunction implements MapFunction<String, String> {
@Override
public String map(String json) throws Exception {
// 判断json字符串是否为空
if (json == null || json.equals("")) {
return null;
}
// 解析json字符串为JSONObject对象
JSONObject jsonObject = JSONObject.parseObject(json);
// 判断是否存在eventBody键
if (!jsonObject.containsKey("eventBody")) {
return null;
}
// 获取eventBody对象
JSONObject eventBody = jsonObject.getJSONObject("eventBody");
// 判断eventBody对象是否为空
if (eventBody == null || eventBody.isEmpty()) {
return null;
}
// 判断是否存在eventType键
if (!jsonObject.containsKey("eventType")) {
return null;
}
// 判断eventType键的值是否为空
String eventType = jsonObject.getString("eventType");
if (eventType == null || eventType.equals("")) {
return null;
}
// 判断uid键是否存在
if (!eventBody.containsKey("uid")) {
return null;
}
// 获取uid键的值
String uid = eventBody.getString("uid");
// 判断uid键的值是否为空
if (uid == null || uid.equals("")) {
return null;
}
// 判断eventBody中是否存在键的值不为空
for (String key : eventBody.keySet()) {
if (eventBody.get(key) != null && !eventBody.getString(key).equals("")) {
return json;
}
}
return null;

}
}

3)接下来将过滤后的数据依据不同的‘eventType’写入clickhouse对应的表中,

1
2
MyClickhouseSink myClickhouseSink = new MyClickhouseSink(username,password,url,new ClickhouseSqlHelper(dataTypeMap,columnNameListMap));
dataStream.addSink(myClickhouseSink);

其中,MyClickhouseSink代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class MyClickhouseSink extends RichSinkFunction<String> {
private transient Connection connection;
private final String username;
private final String password;
private final String url;
private final static int batchSize = 5000; //每积累5000条推送至clickhouse一次
private Map<String,PreparedStatement> prepareStatementMap;
PreparedStatement lastPrepareStatement;
int count;

private final ClickhouseSqlHelper clickhouseSqlHelper;
public MyClickhouseSink(String username, String password, String url, ClickhouseSqlHelper clickhouseSqlHelper) {
this.username = username;this.password = password;this.url = url;this.clickhouseSqlHelper = clickhouseSqlHelper;
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
connection = DriverManager.getConnection(url, username, password);
prepareStatementMap = new HashMap<>();
for (String simpleTableName: SqlMap.getSqlMap().keySet()){
prepareStatementMap.put(simpleTableName, connection.prepareStatement(SqlMap.getSqlMap().get(simpleTableName)));
}
count = 0;
lastPrepareStatement = null;
}


@Override
public void invoke(String value, Context context) throws Exception{
if (value == null) return;
String simpleTableName = JSONObject.parseObject(value).getString("eventType");
PreparedStatement preparedStatement = prepareStatementMap.get(simpleTableName);
if (lastPrepareStatement != preparedStatement){
System.err.println("change to sink table:" + simpleTableName);
}
//当发现不是同一个preparedStatement时,说明开始写入一张新表,此时必须将上次的preparedStatement中残留的数据写入
if (lastPrepareStatement != null && lastPrepareStatement != preparedStatement){
lastPrepareStatement.executeBatch();
connection.commit();
lastPrepareStatement.clearBatch();
}
lastPrepareStatement = preparedStatement;

clickhouseSqlHelper.setStatement(preparedStatement, value);
preparedStatement.addBatch();
count++;
//计数每超过batchsize,就将批次中的数据写入
if (count > batchSize){
preparedStatement.executeBatch();
connection.commit();
preparedStatement.clearBatch();
count = 0;
}
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
}
}

​ 由于一共有14种不同的数据类型和14张对应的要写入的表,必须准备14个不同的PreparedStatement,每个PreparedStatement提前设置对应的sql插入语句。

​ 为了加快写入速率,使用按批处理的办法,每批次5000条,使用一个count计数当前批次中的数量,每当批次中数量超过5000,或者切换到一个新的PreparedStatement时,就将批次中的所有数据写入clickhouse,这样可以避免频繁连接数据库,能极大提高速率(在不设置batch时,平均速率只有60条/s)。

​ 在方法中,为了设置PreparedStatement,借助了一个自实现的ClickhouseSqlHelper,该类会读取接收到的数据,根据‘eventType’判断要写入哪张表中,然后根据具体的数据类型设置所有的字段(即statement.setXXX(i+1, value) )核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void setStatement(PreparedStatement statement, String json) throws SQLException {
JSONObject jsonObject = JSON.parseObject(json);
String eventType = jsonObject.getString("eventType");
JSONObject eventBody = jsonObject.getJSONObject("eventBody");

Map<String,String> conDataTypeMap = this.dataTypeMap.getConDataTypeMap(eventType);
List<String> columnNameList = this.columnNameListMap.getColumnNameListMap().get(eventType);
for (int i=0;i<columnNameList.size();i++){
String columnName = columnNameList.get(i);
String columnType = conDataTypeMap.get(columnName);
String columnValue = eventBody.getString(columnName);
switch (columnType){
case "String":
if (columnValue == null || columnValue.equals(""))
statement.setNull(i+1,Types.VARCHAR);
else
statement.setString(i+1, columnValue);
break;
case "Int32":
if (columnValue == null || columnValue.equals(""))
statement.setNull(i+1,Types.INTEGER);
else
statement.setInt(i+1, Integer.parseInt(columnValue));
break;
default:if (columnType.startsWith("Decimal")) {
if (columnValue == null || columnValue.equals(""))
statement.setNull(i+1, Types.DECIMAL);
else
statement.setBigDecimal(i+1,new BigDecimal(columnValue));
}
}
}
}

​ 其他一些使用到的自实现的工具类有:

​ SqlMap,其作用为根据传入的表名返回对应的sql插入语句;

​ DataTypeMap,其作用为根据传入的表名和字段名,返回该字段的数据类型;

​ ColumnNameListMap,其作用为根据表明,按顺序返回一个包含该表所有字段名的列表(与sql插入语句的顺序对应);

​ 具体代码省略。

​ 流式数据处理部分使用的全部依赖和插件如下,为了使用已搭建的flink,关于flink核心库的依赖需要添加<scope>pro</scope>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>sd</groupId>
<artifactId>Streaming-Data-Processing</artifactId>
<version>1.2-SNAPSHOT</version>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>sd.Main</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>

<!-- SLF4J API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>

<!-- Logback实现库 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.5</version>
</dependency>

<!-- scala 2.11.12 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>

<!-- kafka 2.1.0-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- flink 1.13.5-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.5</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.5</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.5</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.5</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.5</version>
</dependency>

<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.59</version>
</dependency>

<!-- clickhouse -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.0</version>
</dependency>

<!-- <dependency>-->
<!-- <groupId>ru.yandex.clickhouse</groupId>-->
<!-- <artifactId>clickhouse-client</artifactId>-->
<!-- <version>0.3.0</version>-->
<!-- </dependency>-->


</dependencies>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

</project>

三 处理结果展示

  1. 在kafka全部生产完后,Flink过程速率为10000-80000条/s不等,如果边生产边消费,实际速率受kafka生产者速率限制

    image-20241215215138436

  2. Clickhouse中数据展示:

    image-20241215215205324

静态数据部分

一 环境配置

​ 操作系统使用windows10,实时数据与静态数据使用的是两台设备,最后写入同一张表中。

1. hadoop 2.7.4
  1. 下载hadoop2.7.4并配置环境变量

  2. 拷贝hadoop2.7.4\bin\hadoop.dll到目录C:\Windows\System32

  3. 配置文件

  4. 在etc\hadoop子目录下,对如下文件做出如下修改并创建相应的tmp、dataNode、nameNode的文件夹

    • core-site.xml

      image-20241215215219268

    • hdfs-site.xml

      image-20241215215232433

    • yarn-site.xml

      image-20241215215241536

    • 复制mapred-site.xml.template,并重命名为mapred-site.xml,修改里面的内容 image-20241215215250422

  5. 格式化namenode:切换至$HADOOP_HOME/bin目录下,使用CMD输入命令:hdfs namenode -format

  6. 启动Hadoop服务:切换至$HADOOP_HOME/sbin目录下,执行start-all.cmd脚本,执行相应命令验证hadoop环境搭建成功

2. Scala 2.11.12
  1. 下载并安装scala 2.11.12,配置环境变量
  2. 在IDEA中,Project Structure->Libraries->Scala SDK处添加对应的路径
3. Spark 2.3.3
  1. 下载并安装spark-2.3.3-bin-hadoop2.7,配置环境变量

二 数据处理

1. 读取数据并进行简单的数据清洗

​ 具体代码如下:

  1. var df = reader.option("dbtable", tblNameDst).load() 
    val columnNames = df.columns.toList.map(name => name.substring(tblNameDst.length + 1)).toArray 
    df = df.toDF(columnNames: _*) 
    // 1.去掉重复数据行 
    var uniDf = df.dropDuplicates() 
    uniDf = uniDf.na.drop() 
    // 2.去掉除uid全部为null的行 
    val colNames = uniDf.columns.toList.toArray 
    val colNamesWithoutUid = colNames.filter(colName => { 
        !colName.equals("uid") 
    }) 
    uniDf = uniDf.na.drop("all", colNamesWithoutUid)  
    // 3.填充null值 
    // """" -> "" 
    uniDf = uniDf.na.replace(colNames, Map("\"\"" -> "")) 
    val columnType: Map[String, String] = uniDf.dtypes.toMap 
    var nonStringTypeCol: List[String] = List() 
    var stringTypeCols: List[String] = List() 
    for(columnName <- uniDf.columns){ 
        // 获得数据类型 
        val tmp = columnType(key = columnName) 
        if(!tmp.equals("StringType")){ 
            nonStringTypeCol = nonStringTypeCol :+ columnName 
        } else { 
            stringTypeCols = stringTypeCols :+ columnName 
        } 
    } 
    var resData = uniDf.na.fill(0, nonStringTypeCol) 
    resData = resData.na.fill("", stringTypeCols) 
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113

    ​ 结果如下:

    ![image-20241215215407311](https://jinqiqing-bucket.oss-cn-nanjing.aliyuncs.com/img/image-20241215215407311.png)

    ##### 2. ETL小作业

    10. ```scala
    if(tblNameDst == "pri_cust_contact_info"){
    // 定义电话号码的正则表达式
    val phoneRegex: Regex = "^1[3-9]\\d{9}$".r

    // 创建一个用户定义函数(UDF)来验证电话号码,并根据其有效性返回原始电话号码或空字符串
    val validatePhoneUDF = udf((phone: String, con_type: String) => {
    if (con_type == "HOMEADD") {
    phone // 如果con_type为HOMEADD,直接返回原始值
    } else if (Set("TEL", "OTH", "MOB").contains(con_type) && phone != null && honeRegex.findFirstIn(phone).isDefined) {

    phone
    } else{
    ""
    }
    })
    // 使用withColumn方法将validatePhoneUDF应用于数据集的contact列,更新电话号码
    df = df.withColumn("contact", validatePhoneUDF(col("contact"), col("con_type")))

    //去除除了主键以外全为空的列
    // 定义主键
    val primaryKey = "uid"
    // 获取除主键(uid)以外的所有列名
    val columns = df.columns.filter(_ != primaryKey)
    // 创建一个用于检查所有列是否为空的条件表达式
    val allColumnsNullCondition = columns.map(col => isnull(df(col))).reduce(_ && _)
    // 使用`filter`方法,仅保留至少有一个非主键列不为空的行
    df.filter(not(allColumnsNullCondition))

    val rdd = df.rdd
    // 将RDD转换为PairRDD,键为uid
    val pairRDD = rdd.map(row => {
    val uid = row.getAs[String]("uid")
    val con_type = row.getAs[String]("con_type")
    val contact = row.getAs[String]("contact")

    val contactPhone = if (Set("TEL", "OTH", "MOB").contains(con_type) && !Set("无", "-", "", null).contains(contact)) contact else ""
    val contactAddress = if (!Set("TEL", "OTH", "MOB", "无", "-", "", null).contains(con_type) && !Set("无", "-", "", null).contains(contact)) contact else ""

    (uid, (row, contactPhone, contactAddress))
    })

    // 定义一个函数用于合并联系方式
    // 修改的mergeContacts函数
    def mergeContacts(contact1: (Row, String, String), contact2: (Row, String, String)): (Row, String, String) = {
    val phoneSeparator = ","
    val addressSeparator = ","
    val invalidContacts = Set("无", "-", "", null)

    val (phone1, address1) = (contact1._2, contact1._3)
    val (phone2, address2) = (contact2._2, contact2._3)

    val newPhone = (phone1.split(phoneSeparator) ++ phone2.split(phoneSeparator)).filterNot(invalidContacts.contains).distinct.mkString(phoneSeparator)
    val newAddress = (address1.split(addressSeparator) ++ address2.split(addressSeparator)).filterNot(invalidContacts.contains).distinct.mkString(addressSeparator)

    // 获取 update_date 和 create_date 的列索引
    val updateDateIndex = 5
    val createDateIndex = 4

    // 使用 Option 类型处理空值
    val updateDate1 = Option(contact1._1.get(updateDateIndex)).map(_.toString).getOrElse("")
    val updateDate2 = Option(contact2._1.get(updateDateIndex)).map(_.toString).getOrElse("")
    val createDate1 = Option(contact1._1.get(createDateIndex)).map(_.toString).getOrElse("")
    val createDate2 = Option(contact2._1.get(createDateIndex)).map(_.toString).getOrElse("")

    // 使用 create_date 作为 update_date 的备选值
    val effectiveUpdateDate1 = if (updateDate1.nonEmpty) updateDate1 else createDate1
    val effectiveUpdateDate2 = if (updateDate2.nonEmpty) updateDate2 else createDate2

    // 根据 update_date 和 create_date 选择较新的一行
    val newerRow = if (effectiveUpdateDate1 > effectiveUpdateDate2) contact1._1 else contact2._1

    // 直接合并两行中的所有列
    val newRowData = newerRow.toSeq
    val newRow = Row.fromSeq(newRowData)

    (newRow, newPhone, newAddress)
    }

    // 使用reduceByKey合并相同uid的数据
    val reducedPairRDD = pairRDD.reduceByKey(mergeContacts)

    // 定义结果的schema
    val schema = df.schema
    .add(StructField("contact_phone", StringType, nullable = true))
    .add(StructField("contact_address", StringType, nullable = true))

    // 将结果转换回DataFrame
    val rowRDD = reducedPairRDD.map { case (uid, (row, contactPhone, contactAddress)) =>
    val newRowData = row.toSeq :+ contactPhone :+ contactAddress
    Row.fromSeq(newRowData)
    }
    val reducedDF = session.createDataFrame(rowRDD, schema)

    // 删除原始的contact列
    val newDF = reducedDF.drop("contact")

    // 显示和打印合并后的DataFrame
    newDF.show(20)
    println(newDF.count())

    // 定义一个正则表达式,用于匹配18位数字的uid
    val uidRegex: Regex = "^\\d{18}$".r

    // 过滤掉不是18位数字的行
    val finalDF = newDF.filter(row=> uidRegex.findFirstIn(row.getAs[String("uid")).isDefined) println(finalDF.count())

小作业结果如下:

image-20241215215420517
3. 写入ClickHouse

​ 写入clickhouse代码如下:

1
2
3
4
5
6
val clickHouseProperties = new Properties() 
clickHouseProperties.put("user", "default")
clickHouseProperties.put("password", "292513Jqq")
clickHouseProperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
resData.write.mode(SaveMode.Append)
.jdbc("jdbc:clickhouse://172.31.56.66:8123/dm", "dm."+tblNameDst, clickHouseProperties)
4. 处理结果展示

一共写入9张九张静态表,即下图中包含数据的9张表

image-20241215215430723

可视化部分

一 可视化工具使用

​ 使用百度的Sugar BI作为可视化工具,具体步骤为:

  1. 申请账号后,在Sugar BI中创建一个空间

  2. 导入数据源,具体如下:

    image-20241215215448272

    为了使Sugar BI能访问到本地8123端口的clickhouse,需要使用内网隧道,从Sugar BI下载内网隧道客户端后,解压至本地,然后修改db.txt文件内容为127.0.0.1:8123,然后使用bash start.sh命令启动内网隧道,回到Sugar BI测试连接。

  3. Sugar BI成功连接本地clickhouse后,可以在数据表预览中查看所有表,应该能看到里面的数据,但仅显示部分:

    image-20241215215508033

二 创建可视化模型

  1. 在Sugar BI中新建一个大屏,选择一个合适的界面,进入编辑,然后选择想要的组件(如柱状图、折线图、饼图等),为组件添加sql模型,这样组件就会展示clickhouse中的数据。

  2. 例如展示活期交易日top20用户,先创建一个sql模型,如下:

    image-20241215215525611
  3. 然后创建一张横向柱状图,绑定刚刚的sql模型,设置绑定字段、x轴与y轴、刷新时间等等,如下:

    image-20241215215538461

三 可视化结果展示

image-20241215215549062

左上的图展示每日客户交易量TOP20,显示了当天实时活期交易额最高的20个客户及其交易量;

下面的图展示了从数据记录之初到实时的每日交易总额变化,从中可以清晰看出交易量的变化趋势、峰值谷值的分布;

右上的饼图使用了静态数据,显示了各种存款类型在银行存款中所占比例以及各种定期存款的大致比例。

可视化视频已作为附件一并打包上传,展示数据的实时变化。

团队分工

姓名 学号 分工
xxx xxxxxxxxx 可视化部分
xxx xxxxxxxxx 实时数据kafka和flink部分
xxx xxxxxxxxx 静态数据ETL处理
xxx xxxxxxxxx 静态数据部分环境搭建
xxx xxxxxxxxx clickhouse搭建