为什么想做这个东西

一直好奇像亚马逊这类网站的搜索是如何做到推荐的,最近刚好看到一篇文章:Redis 与搜索热词推荐,然而只写了思路。所以,就是想自己实现一个。
先上个效果图,再聊:
热词推荐的简单实现 - 图1
P.S. 按四年前,要写这样的前端效果,对于我这个后台开发,还是挺困难的。而现在,简单的学了下Vue.js,再加上同事的小小指点,就搞定了。😂

热词推荐的本质

假如你预先就知道了用户输入:s、sz、shen、深这些字时,就是想搜“深圳”,那是不是说,我们只要提前将这些字放到一个Map结构中,将用户的输入想像出一个key,value就是“深圳”。
说到底,热词推荐的本质就是一个大大的Map。难点就在于如何更新这个Map,以至于让用户觉得“智能”,或觉得我们在给他们做“推荐”。
这个Map,常常被人称为“索引”。其实使用“索引” 这个名词也更准确一些。Map中的Key是不能重复的。但是我们数据结构是要求可重复的,为什么呢?因为,在系统中,s、sh、shen、深等等这些都是key,而它们对应的value,可能相同,又可能不同。举个例子:
hotword:0>zrevrange s 0 10 1) 鼠蛟 2) 鼠场乡 3) 鳝鱼 4) 鳝溪校区 5) 鳝溪农场 6) 鳝溪 7) 骚子营社区 8) 骚子营 9) 驷马镇 10) 驷马桥街道 11) 驷马桥
hotword:0>zrevrange sh 0 10 1) 鼠蛟 2) 鼠场乡 3) 鳝鱼 4) 鳝溪校区 5) 鳝溪农场 6) 鳝溪 7) 首院胡同 8) 首阳镇 9) 首阳山镇 10) 首阳山 11) 首钢试验厂
仔细看到其中的不同了吗?同时,这里还有一个问题,那就是当用户输入s时,出现了10个value,我们如何给这些value如何排序呢?
为了与排序模型解耦,我们为每个value都给出一个分数score。score越大,越排前面。最终索引结构就变成了这样子:
image.png
P.S. 这些score之所以都为0,是因为数据问题。
总的来说,关于热词推荐,我们需要解决以下问题:

  • 如何存储索引的数据?
  • 如何构建索引?也就是一开始时,我们怎么知道用户输入“s” 就是要搜“深圳”呢?
  • 如何根据用户的反馈行为来更新索引?当用户输入 “s” 出现了“1 沙河”和“2 深圳”,用户选择了“深圳”,那么当其他用户输入“s”时,我们是不是应该将“深圳”这个词放到前面呢?

    基于Solr实现的弊端

    美团在几年前也写了一篇文章来介绍自己的热词推荐:搜索引擎关键字智能提示的一种实现。然而这种实现,个人觉得有个设计非常不好。因为Solr在整个系统中,即做了“存储索引”的角色,又做了“构建索引”的角色。违反了职责单一原则。因为当我们想改变构建索引的算法时,同时会影响到“存储索引”的逻辑。
    以下是他们的实现逻辑截图:
    image.png

    另一种基于Redis的实现

    我目前只写了一个简单实现,而且还没有实现“根据用户反馈来更新索引”的功能。这个功能可实现得很简单,也可以实现得很复杂。本文不讨论。
    同时,生产环境会更复杂一些。比如要实现高可用。我个人能力有限,还没有能实现。但是思路是有的:所有出现单点的地方都要做成分布式的,比如Redis就做成Redis Cluster。
    以下是架构图:
    image.png
    图中,InitWorker负责将我准备好的全国地名大全的数据,构建成索引,然后写到Redis中。用户则可以通过基于Openresty写的APP去查询Redis中的数据。
    使用本系统的方法:
    P.S. 本系统使用Ansible做自动化部署,所以,请提前安装好Ansible。
  1. git clone https://github.com/zacker330/hot-word-recommend.git
  2. 准备两个Ubuntu 16的机器,如果你懂Vagrant的话,直接使用我的Vagrantfile就好了
  3. 进入到项目中,执行ansible-playbook ./ansible/playbook.yml -i ./ansible/inventory -u vagrant -k 来自动化部署所有组件。如果使用Vagrant来搭建的环境,密码是 vagrant,以下同,将不在重述。
  4. 打包我们的InitWorker项目:mvn assembly:assembly
  5. 部署InitWorker: ansible-playbook ./ansible/deploy-worker.yml -i ./ansible/inventory -u vagrant -k
  6. 打开链接测试:http://192.168.10.11/index.lsp 。IP换成你自己部署的机器的IP。

具体代码,自己看了。为方便阅读,我觉得有必要注释一下项目结构:

  1. ├── README.md
  2. ├── Vagrantfile
  3. ├── ansible
  4. ├── deploy-front-app.yml // 单独部署 前端app
  5. ├── deploy-local.yml //本地开发使用
  6. ├── deploy-worker.yml // 执行worker,写索引到redis中
  7. ├── inventory
  8. ├── playbook.yml // 安装所有必要的组件
  9. ├── roles
  10. ├── common
  11. ├── front-app // 安装前端APP
  12. ├── jdk8 // 安装Jdk8
  13. ├── openresty // 安装Openresty
  14. └── redis // 安装redis的脚本
  15. └── vars
  16. └── base-env.yml // 配置变量存放文件
  17. ├── autocomplete-worker
  18. ├── pom.xml
  19. ├── src
  20. ├── main
  21. ├── java
  22. └── codes
  23. └── showme
  24. └── autocomplete
  25. ├── InitWorker.java
  26. └── common
  27. └── resources
  28. └── env.properties
  29. └── test
  30. └── target
  31. ├── doc // 文档需要用到的一些文件
  32. └── files
  33. └── places.txt.zip //全国地名数据

小结

热词推荐的“智能”所在处就在于索引的构建算法。简单一点的做法就是每当用户点击某搜索结果时,我们就给这个索引条目加权1。感兴趣的同学可以实现来玩玩。
以上内容均为个人看法,如果有不对的地方,还请斧正,谢谢了。
项目地址为:https://github.com/zacker330/hot-word-recommend

ES 实现实时从Mysql数据库中读取热词,停用词
IK分词器虽然自带词库
image.png

但是在实际开发应用中对于词库的灵活度的要求是远远不够的,IK分词器虽然配置文件中能添加扩展词库,但是需要重启ES
这章就当写一篇扩展了

其实IK本身是支持热更新词库的,但是需要我感觉不是很好
词库热更新方案:
1:IK 原生的热更新方案,部署一个WEB服务器,提供一个Http接口,通过Modified和tag两个Http响应头,来完成词库的热更新
2:通过修改IK源码支持Mysql定时更新数据
注意:推荐使用第二种方案,也是比较常用的方式,虽然第一种是官方提供的,但是官方也不建议使用

方案一:IK原生方案
1:外挂词库,就是在IK配置文件中添加扩展词库文件多个之间使用分号分割
优点:编辑指定词库文件,部署比较方便
缺点:每次编辑更新后都需要重启ES
2:远程词库,就是在IK配置文件中配置一个Http请求,可以是.dic文件,也可以是接口,同样多个之间使用分号分割
优点:指定静态文件,或者接口设置词库实现热更新词库,不用重启ES,是IK原生自带的
缺点:需要通过Modified和tag两个Http响应头,来提供词库的热更新,有时候会不生效
具体使用就不说了,在这里具体说第二种方案
方案二:通过定时读取Mysql完成词库的热更新
首先要下载IK分词器的源码
网址:https://github.com/medcl/elasticsearch-analysis-ik
下载的时候一定要选对版本,保持和ES的版本一致,否则会启动的时候报错,版本不一致
接着把源码导入IDEA中,并在POM.xml中添加Mysql的依赖,根据自己的Mysql版本需要添加
我的Mysql是5.6.1所以添加5的驱动包

  1. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  2. <dependency>
  3. <groupId>mysql</groupId>
  4. <artifactId>mysql-connector-java</artifactId>
  5. <version>5.1.49</version>
  6. </dependency>

然后再config目录下创建一个新的.properties配置文件
image.png

在里面配置Mysql的一些配置,以及我们需要的配置

  1. jdbc.url=jdbc:mysql://192.168.43.154:3306/es?characterEncoding=UTF-8&serverTimezone=GMT&nullCatalogMeansCurrent=true
  2. jdbc.user=root
  3. jdbc.password=root
  4. # 更新词库
  5. jdbc.reload.sql=select word from hot_words
  6. # 更新停用词词库
  7. jdbc.reload.stopword.sql=select stopword as word from hot_stopwords
  8. # 重新拉取时间间隔
  9. jdbc.reload.interval=5000

创建一个新的线程,用于调用Dictionary得reLoadMainDict()方法重新加载词库
image.png

  1. package org.wltea.analyzer.dic;
  2. import org.wltea.analyzer.help.ESPluginLoggerFactory;
  3. public class HotDicReloadThread implements Runnable{
  4. private static final org.apache.logging.log4j.Logger logger = ESPluginLoggerFactory.getLogger(Dictionary.class.getName());
  5. @Override
  6. public void run() {
  7. while (true){
  8. logger.info("-------重新加载mysql词典--------");
  9. Dictionary.getSingleton().reLoadMainDict();
  10. }
  11. }
  12. }

修改org.wltea.analyzer.dic文件夹下的Dictionary
在Dictionary类中加载mysql驱动类

  1. private static Properties prop = new Properties();
  2. static {
  3. try {
  4. Class.forName("com.mysql.jdbc.Driver");
  5. } catch (ClassNotFoundException e) {
  6. logger.error("error", e);
  7. }
  8. }

接着,创建重Mysql中加载词典的方法

  1. /**
  2. * 从mysql中加载热更新词典
  3. */
  4. private void loadMySqlExtDict(){
  5. Connection connection = null;
  6. Statement statement = null;
  7. ResultSet resultSet = null;
  8. try {
  9. Path file = PathUtils.get(getDictRoot(),"jdbc-reload.properties");
  10. prop.load(new FileInputStream(file.toFile()));
  11. logger.info("-------jdbc-reload.properties-------");
  12. for (Object key : prop.keySet()) {
  13. logger.info("key:{}", prop.getProperty(String.valueOf(key)));
  14. }
  15. logger.info("------- 查询词典, sql:{}-------", prop.getProperty("jdbc.reload.sql"));
  16. // 建立mysql连接
  17. connection = DriverManager.getConnection(
  18. prop.getProperty("jdbc.url"),
  19. prop.getProperty("jdbc.user"),
  20. prop.getProperty("jdbc.password")
  21. );
  22. // 执行查询
  23. statement = connection.createStatement();
  24. resultSet = statement.executeQuery(prop.getProperty("jdbc.reload.sql"));
  25. // 循环输出查询啊结果,添加到Main.dict中去
  26. while (resultSet.next()) {
  27. String theWord = resultSet.getString("word");
  28. logger.info("------热更新词典:{}------", theWord);
  29. // 加到mainDict里面
  30. _MainDict.fillSegment(theWord.trim().toCharArray());
  31. }
  32. } catch (Exception e) {
  33. logger.error("error:{}", e);
  34. } finally {
  35. try {
  36. if (resultSet != null) {
  37. resultSet.close();
  38. }
  39. if (statement != null) {
  40. statement.close();
  41. }
  42. if (connection != null) {
  43. connection.close();
  44. }
  45. } catch (SQLException e){
  46. logger.error("error", e);
  47. }
  48. }
  49. }

接着,创建加载停用词词典方法

  1. /**
  2. * 从mysql中加载停用词
  3. */
  4. private void loadMySqlStopwordDict(){
  5. Connection conn = null;
  6. Statement stmt = null;
  7. ResultSet rs = null;
  8. try {
  9. Path file = PathUtils.get(getDictRoot(), "jdbc-reload.properties");
  10. prop.load(new FileInputStream(file.toFile()));
  11. logger.info("-------jdbc-reload.properties-------");
  12. for(Object key : prop.keySet()) {
  13. logger.info("-------key:{}", prop.getProperty(String.valueOf(key)));
  14. }
  15. logger.info("-------查询停用词, sql:{}",prop.getProperty("jdbc.reload.stopword.sql"));
  16. conn = DriverManager.getConnection(
  17. prop.getProperty("jdbc.url"),
  18. prop.getProperty("jdbc.user"),
  19. prop.getProperty("jdbc.password"));
  20. stmt = conn.createStatement();
  21. rs = stmt.executeQuery(prop.getProperty("jdbc.reload.stopword.sql"));
  22. while(rs.next()) {
  23. String theWord = rs.getString("word");
  24. logger.info("------- 加载停用词 : {}", theWord);
  25. _StopWords.fillSegment(theWord.trim().toCharArray());
  26. }
  27. Thread.sleep(Integer.valueOf(String.valueOf(prop.get("jdbc.reload.interval"))));
  28. } catch (Exception e) {
  29. logger.error("error", e);
  30. } finally {
  31. try {
  32. if(rs != null) {
  33. rs.close();
  34. }
  35. if(stmt != null) {
  36. stmt.close();
  37. }
  38. if(conn != null) {
  39. conn.close();
  40. }
  41. } catch (SQLException e){
  42. logger.error("error:{}", e);
  43. }
  44. }
  45. }

接下来,分别在loadMainDict()方法和loadStopWordDict()方法结尾处调用

  1. /**
  2. * 加载主词典及扩展词典
  3. */
  4. private void loadMainDict() {
  5. // 建立一个主词典实例
  6. _MainDict = new DictSegment((char) 0);
  7. // 读取主词典文件
  8. Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_MAIN);
  9. loadDictFile(_MainDict, file, false, "Main Dict");
  10. // 加载扩展词典
  11. this.loadExtDict();
  12. // 加载远程自定义词库
  13. this.loadRemoteExtDict();
  14. // 加载Mysql外挂词库
  15. this.loadMySqlExtDict();
  16. }
  1. /**
  2. * 加载用户扩展的停止词词典
  3. */
  4. private void loadStopWordDict() {
  5. // 建立主词典实例
  6. _StopWords = new DictSegment((char) 0);
  7. // 读取主词典文件
  8. Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_STOP);
  9. loadDictFile(_StopWords, file, false, "Main Stopwords");
  10. // 加载扩展停止词典
  11. List<String> extStopWordDictFiles = getExtStopWordDictionarys();
  12. if (extStopWordDictFiles != null) {
  13. for (String extStopWordDictName : extStopWordDictFiles) {
  14. logger.info("[Dict Loading] " + extStopWordDictName);
  15. // 读取扩展词典文件
  16. file = PathUtils.get(extStopWordDictName);
  17. loadDictFile(_StopWords, file, false, "Extra Stopwords");
  18. }
  19. }
  20. // 加载远程停用词典
  21. List<String> remoteExtStopWordDictFiles = getRemoteExtStopWordDictionarys();
  22. for (String location : remoteExtStopWordDictFiles) {
  23. logger.info("[Dict Loading] " + location);
  24. List<String> lists = getRemoteWords(location);
  25. // 如果找不到扩展的字典,则忽略
  26. if (lists == null) {
  27. logger.error("[Dict Loading] " + location + " load failed");
  28. continue;
  29. }
  30. for (String theWord : lists) {
  31. if (theWord != null && !"".equals(theWord.trim())) {
  32. // 加载远程词典数据到主内存中
  33. logger.info(theWord);
  34. _StopWords.fillSegment(theWord.trim().toLowerCase().toCharArray());
  35. }
  36. }
  37. }
  38. // 加载Mysql停用词词库
  39. this.loadMySqlStopwordDict();
  40. }

最后在initial()方法中启动更新线程

  1. /**
  2. * 词典初始化 由于IK Analyzer的词典采用Dictionary类的静态方法进行词典初始化
  3. * 只有当Dictionary类被实际调用时,才会开始载入词典, 这将延长首次分词操作的时间 该方法提供了一个在应用加载阶段就初始化字典的手段
  4. *
  5. * @return Dictionary
  6. */
  7. public static synchronized void initial(Configuration cfg) {
  8. if (singleton == null) {
  9. synchronized (Dictionary.class) {
  10. if (singleton == null) {
  11. singleton = new Dictionary(cfg);
  12. singleton.loadMainDict();
  13. singleton.loadSurnameDict();
  14. singleton.loadQuantifierDict();
  15. singleton.loadSuffixDict();
  16. singleton.loadPrepDict();
  17. singleton.loadStopWordDict();
  18. // 执行更新mysql词库的线程
  19. new Thread(new HotDicReloadThread()).start();
  20. if(cfg.isEnableRemoteDict()){
  21. // 建立监控线程
  22. for (String location : singleton.getRemoteExtDictionarys()) {
  23. // 10 秒是初始延迟可以修改的 60是间隔时间 单位秒
  24. pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
  25. }
  26. for (String location : singleton.getRemoteExtStopWordDictionarys()) {
  27. pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
  28. }
  29. }
  30. }
  31. }
  32. }
  33. }

然后,修改src/main/assemblies/plugin.xml文件中,加入Mysql

  1. <dependencySet>
  2. <outputDirectory>/</outputDirectory>
  3. <useProjectArtifact>true</useProjectArtifact>
  4. <useTransitiveFiltering>true</useTransitiveFiltering>
  5. <includes>
  6. <include>mysql:mysql-connector-java</include>
  7. </includes>
  8. </dependencySet>

源码到此修改完成,在自己的数据库中创建两张新的表
建表SQL

  1. CREATE TABLE hot_words (
  2. id bigint(20) NOT NULL AUTO_INCREMENT,
  3. word varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '词语',
  4. PRIMARY KEY (id)
  5. ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
  6. CREATE TABLE hot_stopwords (
  7. id bigint(20) NOT NULL AUTO_INCREMENT,
  8. stopword varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '停用词',
  9. PRIMARY KEY (id)
  10. ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

接下来对源码进行打包:
打包之前检查自己的POM.xml中的elasticsearch.version的版本,记得和自己的ES的版本对应,否则到时候会报错
image.png
检查完毕后,点击IDEA右侧的package进行项目打包,如果版本不对,修改版本并点击IDEA右侧的刷新同步,进行版本的更换,然后打包
image.png

打包完成后在左侧项目中会出现target目录,会看到一个zip,我的是因为解压了,所以有文件夹
image.png

点击右键在文件夹中展示,然后使用解压工具解压
image.png
解压完成后,双击进入image.png
先把原来ES下的plugins下的IK文件夹中的东西删除,可以先备份,然后把自己打包解压后里面的东西全部拷贝到ES下的plugins下的IK文件夹中
image.png
接下来进入bin目录下启动就可以了
当然按照惯例,我的启动时不会那么简单的,很高兴,我的报错了,所有的坑都踩了一遍,之前的版本不对就踩了两次
第一次是源码下载的版本不对
第二次的ES依赖版本不对
好了说报错:报错只贴主要内容
第三次报错:

  1. Caused by: java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "setContextClassLoader")

这个是JRE的类的创建设值权限不对
在jre/lib/security文件夹中有一个java.policy文件,在其grant{}中加入授权即可

  1. permission java.lang.RuntimePermission "createClassLoader";
  2. permission java.lang.RuntimePermission "getClassLoader";
  3. permission java.lang.RuntimePermission "accessDeclaredMembers";
  4. permission java.lang.RuntimePermission "setContextClassLoader";

第四次报错:

  1. Caused by: java.security.AccessControlException: access denied ("java.net.SocketPermission" "192.168.43.154:3306" "connect,resolve")

这个是通信链接等权限不对
也是,在jre/lib/security文件夹中有一个java.policy文件,在其grant{}中加入授权即可

  1. permission java.net.SocketPermission "192.168.43.154:3306","accept";
  2. permission java.net.SocketPermission "192.168.43.154:3306","listen";
  3. permission java.net.SocketPermission "192.168.43.154:3306","resolve";
  4. permission java.net.SocketPermission "192.168.43.154:3306","connect";

到此之后启动无异常
最后就是测试了,启动我的head插件和kibana,这两个没有或者不会的可以看我之前写的,也可以百度
执行分词
image.png

但是我想要 天青色
在Mysql中添加记录

  1. insert into hot_words(word) value("天青色");

重新执行
image.png

也比如我想要这就是一个词 天青色等烟雨
在Mysql中添加记录

  1. insert into hot_words(word) value("天青色等烟雨");

再次执行
image.png
到此实现了ES定时从mysql中读取热词,停用词这个一般用的比较少,有兴趣自己测测,在使用的时候,通过业务系统往数据库热词表和停用词表添加记录就可以了
https://www.cnblogs.com/qwer78/p/15565539.html

dynamic-synonym插件