elastic是go语言中与ElasticSearch交互使用最多的一个库。

首先要根据ElasticSearch版本选择对应的库:

Elasticsearch version Elastic version Package URL Remarks
7.x 7.0 github.com/olivere/elastic/v7 (source doc) Use Go modules.
6.x 6.0 github.com/olivere/elastic (source doc) Use a dependency manager
5.x 5.0 gopkg.in/olivere/elastic.v5 (source doc) Actively maintained.
2.x 3.0 gopkg.in/olivere/elastic.v3 (source doc) Deprecated. Please update.
1.x 2.0 gopkg.in/olivere/elastic.v2 (source doc) Deprecated. Please update.
0.9-1.3 1.0 gopkg.in/olivere/elastic.v1 (source doc) Deprecated. Please update.


下面以7.0为例:

下载安装

  1. go get gopkg.in/olivere/elastic.v7

初始化

  1. esUrl = "http://127.0.0.1:9200"
  2. //初始化
  3. func init() {
  4. var err error
  5. // sniff: false, 表示关闭集群,默认是开启的
  6. client, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(host))
  7. if err != nil {
  8. panic(err)
  9. }
  10. _,_,err = client.Ping(host).Do(context.Background())
  11. if err != nil {
  12. panic(err)
  13. }
  14. //fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
  15. _,err = client.ElasticsearchVersion(host)
  16. if err != nil {
  17. panic(err)
  18. }
  19. //fmt.Printf("Elasticsearch version %s\n", esversion)
  20. }

配置client时还有以下参数:

  • elastic.SetURL(url) 用来设置ES服务地址,如果是本地,就是127.0.0.1:9200。支持多个地址,用逗号分隔即可。
  • elastic.SetBasicAuth(“user”, “secret”) 这个是基于http base auth 验证机制的账号密码。
  • elastic.SetGzip(true) 启动gzip压缩
  • elastic.SetHealthcheckInterval(10*time.Second) 用来设置监控检查时间间隔
  • elastic.SetMaxRetries(5) 设置请求失败最大重试次数,v7版本以后已被弃用
  • elastic.SetSniff(false) 允许指定弹性是否应该定期检查集群(默认为true)
  • elastic.SetErrorLog(log.New(os.Stderr, “ELASTIC “, log.LstdFlags)) 设置错误日志输出
  • elastic.SetInfoLog(log.New(os.Stdout, “”, log.LstdFlags)) 设置info日志输出

    创建索引

上一步,我们创建了client,接下来我们就要创建对应的索引以及mapping。根据开始介绍的功能,我们来设计我们的mapping结构:

  1. mappingTpl = `{
  2. "mappings":{
  3. "properties":{
  4. "id": { "type": "long" },
  5. "name": { "type": "keyword" },
  6. "sex": { "type": "text" },
  7. "married": { "type": "keyword" },
  8. "age": { "type": "long" },
  9. "interests": { "type": "keyword" },
  10. }
  11. }
  12. }`

索引设计为:index =user。
设计好了index及mapping后,我们开始编写代码进行创建:

  1. func initIndex() {
  2. ctx := context.Background()
  3. exists, err := client.IndexExists(es.index).Do(ctx)
  4. if err != nil {
  5. fmt.Printf("userEs init exist failed err is %s\n", err)
  6. return
  7. }
  8. if !exists {
  9. _, err := client.CreateIndex(es.index).Body(es.mapping).Do(ctx)
  10. if err != nil {
  11. fmt.Printf("index init failed err is %s\n", err)
  12. return
  13. }
  14. }
  15. }

这里我们首先判断es中是否已经存在要创建的索引,不存在,调用CreateIndex进行创建。

添加文档

两种方式,API分别为BodyJson和BodyString(观察来说,BodyString就是用反引号包裹的es原生语法)

  1. func insertDoc(){
  2. // 添加文档方法1
  3. user1 := User{Name:"bob",Sex:"male",Married:false,Age:23}
  4. put1, err := client.Index().Index("user").BodyJson(user1).Id("1").Do(ctx)
  5. if err != nil{
  6. panic(err)
  7. }
  8. fmt.Printf("Indexed user %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type) //Indexed user 1 to index user, type _doc
  9. // 添加文档方法2
  10. user2 := `{"name":"mike","sex":"male","married":true,"age":"22"}`
  11. put2, err := client.Index().Index("user").BodyString(user2).Do(ctx)// 不指定id
  12. if err != nil{
  13. panic(err)
  14. }
  15. fmt.Printf("Indexed user %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type) //Indexed user 1 to index user, type _doc
  16. }

查询文档

  1. func queryDoc(){
  2. // 使用文档id查询
  3. get1, err := client.Get().Index("user").Id("1").Do(ctx)
  4. if err != nil{
  5. panic(err)
  6. }
  7. if get1.Found{
  8. fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
  9. // 数据永久化,Flush to make sure the documents got written.将文档涮入磁盘
  10. _, err = client.Flush().Index("user").Do(ctx)
  11. if err != nil {
  12. panic(err)
  13. }
  14. // 按"term"搜索Search with a term query
  15. termQuery := elastic.NewTermQuery("name", "mike")
  16. searchResult, err := client.Search().
  17. Index("user"). // 搜索的索引"user"
  18. Query(termQuery). // specify the query
  19. Sort("age", true). //按字段"age"排序,升序排列
  20. From(0).Size(10). // 分页,单页显示10条
  21. Pretty(true). // pretty print request and response JSON以json的形式返回信息
  22. Do(ctx) // 执行
  23. if err != nil {
  24. // Handle error
  25. panic(err)
  26. }
  27. fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)// Query took 3 milliseconds
  28. var user User
  29. Each是一个简便函数,此函数忽略了错误输出
  30. for _, item1 := range searchResult.Each(reflect.TypeOf(user)) {
  31. if u, ok := item1.(User); ok {
  32. fmt.Printf("Person by %s,age:%d,married:%t,Sex:%s\n", u.Name, u.Age, u.Married,u.Sex) //Person by bob,age:23,married:false,Sex:male
  33. }
  34. }
  35. // 搜索文档方法2
  36. // 使用hits,获得更详细的输出结果
  37. if searchResult.Hits.TotalHits.Value >0{
  38. fmt.Printf("找到的数据总数是 %d \n", searchResult.Hits.TotalHits.Value)
  39. for _,hits := range searchResult.Hits.Hits{
  40. u :=User{}
  41. err := json.Unmarshal([]byte(hits.Source), &u)
  42. if err != nil{
  43. fmt.Println("反序列化失败",err)
  44. }
  45. fmt.Printf("User by %s,age:%d,married:%t,Sex:%s\n", u.Name, u.Age, u.Married,u.Sex)
  46. }
  47. }else {
  48. fmt.Println("没有搜到用户")
  49. }
  50. }

更新文档

  1. func updateDoc(){
  2. // 根据id更新文档 update
  3. update, err := client.Update().Index("user").Id("1").
  4. Script(elastic.NewScriptInline("ctx._source.age += params.num").Lang("painless").Param("num", 1)).
  5. //Upsert(map[string]interface{}{"created": "2020-06-17"}). // 插入未初始化的字段value
  6. Do(ctx)
  7. if err != nil {
  8. panic(err)
  9. }
  10. fmt.Printf("New version of user %q is now %d\n", update.Id, update.Version)
  11. // 根据查出来的结果更新方法2
  12. termQuery := elastic.NewTermQuery("name", "bob")
  13. update,err := client.UpdateByQuery("user").Query(termQuery).
  14. Script(elastic.NewScriptInline("ctx._source.age += params.num").Lang("painless").Param("num", 1)).
  15. Do(ctx)
  16. if err != nil{
  17. panic(err)
  18. }
  19. fmt.Printf("New version of user %q is now %d\n", update.Id, update.Version)
  20. }

删除文档

  1. func deleteDoc(){
  2. termQuery := elastic.NewTermQuery("name", "mike")
  3. _, err = client.DeleteByQuery().Index("user"). // search in index "user"
  4. Query(termQuery). // specify the query
  5. Do(ctx)
  6. if err != nil {
  7. // Handle error
  8. panic(err)
  9. }
  10. // 按文档id删除
  11. _,err = client.Delete().Index("user").Id("2").Do(ctx)
  12. if err != nil{
  13. panic(err)
  14. }
  15. // 删除索引
  16. _,err = client.DeleteIndex("user").Do(ctx)
  17. if err != nil{
  18. panic(err)
  19. }
  20. }

搜索文档

  1. func searchDoc(){
  2. var res *elastic.SearchResult
  3. var err error
  4. //取所有
  5. res, err = client.Search("user").Type("employee").Do(context.Background())
  6. printEmployee(res, err)
  7. //字段相等
  8. q := elastic.NewQueryStringQuery("name:bob")
  9. res, err = client.Search("user").Type("employee").Query(q).Do(context.Background())
  10. if err != nil {
  11. println(err.Error())
  12. }
  13. printEmployee(res, err)
  14. //条件查询
  15. //年龄大于30岁的
  16. boolQ := elastic.NewBoolQuery()
  17. boolQ.Must(elastic.NewMatchQuery("name", "smith"))
  18. boolQ.Filter(elastic.NewRangeQuery("age").Gt(30))
  19. res, err = client.Search("user").Type("employee").Query(q).Do(context.Background())
  20. printDoc(res, err)
  21. //短语搜索 搜索interests字段中有 rock climbing
  22. matchPhraseQuery := elastic.NewMatchPhraseQuery("interests", "rock climbing")
  23. res, err = client.Search("user").Type("employee").Query(matchPhraseQuery).Do(context.Background())
  24. printDoc(res, err)
  25. //分析 interests
  26. aggs := elastic.NewTermsAggregation().Field("interests")
  27. res, err = client.Search("user").Type("employee").Aggregation("all_interests", aggs).Do(context.Background())
  28. printDoc(res, err)
  29. }
  30. //打印查询到的文档
  31. func printDoc(res *elastic.SearchResult, err error) {
  32. if err != nil {
  33. print(err.Error())
  34. return
  35. }
  36. var typ Employee
  37. for _, item := range res.Each(reflect.TypeOf(typ)) { //从搜索结果中取数据的方法
  38. t := item.(User)
  39. fmt.Printf("%#v\n", t)
  40. }
  41. }
  42. ////简单分页
  43. func list(size,page int) {
  44. if size < 0 || page < 1 {
  45. fmt.Printf("param error")
  46. return
  47. }
  48. res,err := client.Search("user").
  49. Type("employee").
  50. Size(size).
  51. From((page-1)*size).
  52. Do(context.Background())
  53. printEmployee(res, err)
  54. }