# 1 数据库迁移需求

业务 mysql 数据库中的数据,会同步到我们的 hadoop 的 hive 数据仓库中。

  • 为了避免直接连接、操作业务数据
  • 同步一份数据在集群中方便进行数据分析操作
1
2
3
4
5
6
hive> show databases;
OK
default
profile
toutiao
Time taken: 0.017 seconds, Fetched: 3 row(s)

创建 hive 业务数据库 onedream

1
create database if not exists onedream comment "user,news information of onedream mysql" location '/user/hive/warehouse/onedream.db/';

# 2 sqoop 导入

用户:基本信息,关注,收藏,搜索,订阅(设置选择喜好频道)
文章:分类,文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/bin/bash

array=(user_profile user_basic news_user_channel news_channel user_follow user_blacklist user_search news_collection news_article_basic news_article_content news_read news_article_statistic user_material)


for table_name in ${array[@]};
do
sqoop import \
--connect jdbc:mysql://192.168.19.137/onedream \
--username root \
--password password \
--table $table_name \
--m 5 \
--hive-home /root/bigdata/hive \
--hive-import \
--create-hive-table \
--hive-drop-import-delims \
--warehouse-dir /user/hive/warehouse/onedream.db \
--hive-table onedream.$table_name
done

Mysql 导入对应 hive 类型:

1
2
3
4
5
6
7
8
9
MySQL(bigint) --> Hive(bigint) 
MySQL(tinyint) --> Hive(tinyint)
MySQL(int) --> Hive(int)
MySQL(double) --> Hive(double)
MySQL(bit) --> Hive(boolean)
MySQL(varchar) --> Hive(string)
MySQL(decimal) --> Hive(double)
MySQL(date/timestamp) --> Hive(string)

# 3 TFIDF 模型的训练

步骤:
1、读取 N 篇文章数据
2、文章数据进行分词处理
3、TFIDF 模型训练保存,spark 使用 count 与 idf 进行计算
4、利用模型计算 N 篇文章数据的 TFIDF 值

# 分词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 分词
def segmentation(partition):
import os
import re

import jieba
import jieba.analyse
import jieba.posseg as pseg
import codecs

abspath = "/root/words"

# 结巴加载用户词典
userDict_path = os.path.join(abspath, "ITKeywords.txt")
jieba.load_userdict(userDict_path)

# 停用词文本
stopwords_path = os.path.join(abspath, "stopwords.txt")

def get_stopwords_list():
"""返回stopwords列表"""
stopwords_list = [i.strip()
for i in codecs.open(stopwords_path).readlines()]
return stopwords_list

# 所有的停用词列表
stopwords_list = get_stopwords_list()

# 分词
def cut_sentence(sentence):
"""对切割之后的词语进行过滤,去除停用词,保留名词,英文和自定义词库中的词,长度大于2的词"""
# print(sentence,"*"*100)
# eg:[pair('今天', 't'), pair('有', 'd'), pair('雾', 'n'), pair('霾', 'g')]
seg_list = pseg.lcut(sentence)
seg_list = [i for i in seg_list if i.flag not in stopwords_list]
filtered_words_list = []
for seg in seg_list:
# print(seg)
if len(seg.word) <= 1:
continue
elif seg.flag == "eng":
if len(seg.word) <= 2:
continue
else:
filtered_words_list.append(seg.word)
elif seg.flag.startswith("n"):
filtered_words_list.append(seg.word)
elif seg.flag in ["x", "eng"]: # 是自定一个词语或者是英文单词
filtered_words_list.append(seg.word)
return filtered_words_list

for row in partition:
sentence = re.sub("<.*?>", "", row.sentence) # 替换掉标签数据
words = cut_sentence(sentence)
yield row.article_id, row.channel_id, words

训练模型,得到每个文章词的频率 Counts 结果

# 词语与词频统计

1
2
3
4
5
6
7
# 词语与词频统计
from pyspark.ml.feature import CountVectorizer
# 总词汇的大小,文本中必须出现的次数
cv = CountVectorizer(inputCol="words", outputCol="countFeatures", vocabSize=200*10000, minDF=1.0)
# 训练词频统计模型
cv_model = cv.fit(words_df)
cv_model.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/CV.model")