Spring-Data-Elasticsearch开发记录
Colorful_Ghost Lv4

因为Spring-Data-Elasticsearch的这个包版本之间的封装差距蛮大的,最新版本的里面API已经弃用了好多方法,搜索引擎上现在查到的大部分资料是比较老的版本 ,已经不推荐使用。

  1. 依赖引入
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
  1. 在项目启动项加入注解扫repository包(Dao层)
1
@EnableElasticsearchRepositories("com.*.*.*.repository")
  1. 在application.yml里添加ES相关配置和Debug的日志打印
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
data:
elasticsearch:
repositories:
enabled: true
elasticsearch:
rest:
# 要连接的ES客户端Rest Uri 多个逗号分隔
uris: <http://localhost:9200>

logging:
level:
org.springframework.security:
- debug
- info
org.springframework.web: error
org:
springframework:
data:
elasticsearch:
core: DEBUG
apache:
http: DEBUG
elasticsearch:
client: DEBUG
WIRE: DEBUG
  1. 在entity文件夹下创建实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Data
@Entity
@Document(indexName = "report_section_flow_5min_f", replicas = 0, refreshInterval = "1m")
@ApiModel(value = "5分钟断面数据")
public class SectionFlow5Min {
@Id
private Long id;
@ApiModelProperty("区间标识名")
@Field(name = "identifyname")
private String identifyName;

@ApiModelProperty("开始时间")
@Field(name = "starttime", format = DateFormat.date_optional_time, type = FieldType.Date)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ", timezone = "GMT+8")
private Date startTime;

@ApiModelProperty("结束时间")
@Field(name = "endtime", format = DateFormat.date_optional_time, type = FieldType.Date)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ", timezone = "GMT+8")
private Date endTime;
}

这里要注意下时间的序列化 ,否则查询会报错。

  1. 在repository下创建interface继承ElasticsearchRepository
1
2
3
4
5
6
7
8
public interface SectionFlow5MinDao extends ElasticsearchRepository<SectionFlow5Min, Long> {

Page<SectionFlow5Min> findByStartTimeGreaterThanEqualAndEndTimeLessThanEqualAndSectionNameIn
(Date startTime, Date endTime, Collection<String> sectionNames, Pageable pageable);

List<SectionFlow5Min> findByStartTimeGreaterThanEqualAndEndTimeLessThanEqualAndSectionName
(Date startTime, Date endTime, String sectionName);
}

此处findByStartTimeGreaterThanEqualAndEndTimeLessThanEqualAndSectionNameIn

是从ES里寻找开始时间大于等于StartTime 并且小于等于EndTime 并且查找多个SectionName

  1. 聚合查询

当然遇见复杂的查询或者聚合查询ElasticsearchRepository可能就满足不了了这时候需要用

NativeSearchQueryBuilder 去写查询语句或者用@Query注解,但是在@Query里写JSON我感觉实在太反人类了,还是NativeSearchQueryBuilder 好用,下面放代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.springframework.data.domain.Pageable;
//ServiceImpl层实现
private Map<Page<StationFlow5Min>, Map<String, Map<String, ParsedSum>>> find5MinDataByES(String startTime, String endTime,
List<String> categoryNames,
Pageable pageable) {
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
QueryBuilder qb1 = QueryBuilders.rangeQuery("starttime").from(startTime);
QueryBuilder qb2 = QueryBuilders.rangeQuery("endtime").to(endTime);
QueryBuilder qb3 = QueryBuilders.queryStringQuery(ESUtils.subStrListToStr(categoryNames)).field("stationname");

BoolQueryBuilder must = QueryBuilders.boolQuery().must(qb1).must(qb2).must(qb3);

//ES从0索引
if (pageable.getPageNumber() == 0) {
SumAggregationBuilder sum1 = AggregationBuilders.sum("entryFlowSum").field("entryflow");
SumAggregationBuilder sum2 = AggregationBuilders.sum("exitFlowSum").field("exitflow");
TermsAggregationBuilder stationNameTerm = AggregationBuilders.terms("stationName").field("stationname.keyword");
stationNameTerm.subAggregation(sum1);
stationNameTerm.subAggregation(sum2);
queryBuilder.addAggregation(stationNameTerm);
}
queryBuilder.withQuery(must)
.withPageable(pageable);

SearchHits<StationFlow5Min> search = elasticsearchOperations.search(queryBuilder.build(), StationFlow5Min.class);
Map<String, Map<String, ParsedSum>> aggBucketsInAgg = null;
if (search.getAggregations() != null) {
aggBucketsInAgg = ESUtils.getAggBucketsInAgg(search.getAggregations());
}
Map<Page<StationFlow5Min>, Map<String, Map<String, ParsedSum>>> result = new HashMap<>();
result.put(ESUtils.getPages(search, pageable), aggBucketsInAgg);
return result;

}

pageable.getPageNumber() == 0 是业务场景可以无视, AggregationBuilders 是聚合查询封装好的方法 ,此处sum(entryflow),sum(exitflow)聚合查询出的字段命名为entryFlowSum、exitFlowSum并且把q1,q2,q3的数据查询也带出来,还有一点要注意,在使用聚合查询一定要标记查询字段为.keyword

提高索引效率AggregationBuilders.terms("stationName").field("stationname.keyword");

如果不加.keyword 在ES会查询报错。这里Map里返回的ParsedSum 是ES封装好的Sum后的值。(看了es的源码 ,我总感觉spring data-es过度封装了,在使用的时候比较难受)

  1. ESUtil里的封装方法说明

关于ESUtil里的方法,为了取数据更方便一些而封装了一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import cn.hutool.json.JSONUtil;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHitSupport;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchPage;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ESUtils {

/**
* 返回ES所封装的page
*
* @param searchHits
* @param pageable
* @param <T>
* @return
*/
public static <T> SearchPage<T> geHitstPages(SearchHits<T> searchHits, Pageable pageable) {
return SearchHitSupport.searchPageFor(searchHits, pageable);
}

public static <T> List<T> getHitsContents(SearchPage<T> searchHits) {
return searchHits.stream().map(SearchHit::getContent).collect(Collectors.toList());
}

/**
* 把返回的数据处理成JPA里的 page
*
* @param searchHits
* @param pageable
* @param <T>
* @return
*/
public static <T> Page<T> getPages(SearchHits<T> searchHits, Pageable pageable) {
return geHitstPages(searchHits, pageable).map(SearchHit::getContent);
}

public static <T> Map<String, T> getAggregation(Aggregations aggregations) {
return aggregations.asList().stream().collect(Collectors.toMap(Aggregation::getName, i -> (T) i));

}

/**
* 把普通时间格式的转换成世界标准格式 (es本质存的就是JSON字符串)
*
* @param time
* @return
*/
public static String timeFormatBlankReT(String time) {
return time.replace(" ", "T");
}

/**
* QueryBuilders.queryStringQuery 查询多个字段需要对查询字段处理下
*
* @param ss
* @return
*/
public static String subStrListToStr(List<String> ss) {
String s = JSONUtil.toJsonStr(ss);
s = s.substring(1, s.length() - 1);
s = s.replace(",", "");
return s;
}

/**
* 拿取返回的聚合查询的数据 并且返回为Map结构
*
* @param aggregations
* @param <T>
* @return
*/

public static <T> Map<String, Map<String, T>> getAggBucketsInAgg(Aggregations aggregations) {

Map<String, Map<String, T>> results = new HashMap<>();
for (Aggregation aggregation : aggregations) {
List<? extends Terms.Bucket> buckets = ((Terms) aggregation).getBuckets();
Map<String, T> bucketMap = new HashMap<>();
String groupByName = null;
for (Terms.Bucket bucket : buckets) {
groupByName = bucket.getKeyAsString();
for (Aggregation bucketAggregation : bucket.getAggregations()) {
bucketMap.put(bucketAggregation.getName(), (T) bucketAggregation);
}
}
results.put(groupByName, bucketMap);
}
return results;
}

}
  1. 踩坑说明

最后如何定位我的查询语句是否有问题,在你写的NativeSearchQueryBuilder 变量打上断点 ,本质上他是在处理最终的JSON结构,如果你使用的是ElasticsearchRepository 可以去源码core里寻找组装语句的地方打断点,复制出来去工具里调试。

  1. 在使用中参考了如下文档:

https://www.jianshu.com/p/7d1f72ca49ba

https://github.com/spring-projects/spring-data-elasticsearch/blob/main/src/main/asciidoc/reference/elasticsearch-repository-queries.adoc

https://segmentfault.com/a/1190000016296983

  • 本文标题:Spring-Data-Elasticsearch开发记录
  • 本文作者:Colorful_Ghost
  • 创建时间:2021-08-22 19:47:06
  • 本文链接:https://blog.iacg.moe/2021/08/22/Spring-Data-Elasticsearch开发记录/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
 评论