因为Spring-Data-Elasticsearch的这个包版本之间的封装差距蛮大的,最新版本的里面API已经弃用了好多方法,搜索引擎上现在查到的大部分资料是比较老的版本 ,已经不推荐使用。
- 依赖引入
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
|
- 在项目启动项加入注解扫repository包(Dao层)
1
| @EnableElasticsearchRepositories("com.*.*.*.repository")
|
- 在application.yml里添加ES相关配置和Debug的日志打印
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| spring: data: elasticsearch: repositories: enabled: true elasticsearch: rest:
uris: <http://localhost:9200>
logging: level: org.springframework.security: - debug - info org.springframework.web: error org: springframework: data: elasticsearch: core: DEBUG apache: http: DEBUG elasticsearch: client: DEBUG WIRE: DEBUG
|
- 在entity文件夹下创建实体类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Data @Entity @Document(indexName = "report_section_flow_5min_f", replicas = 0, refreshInterval = "1m") @ApiModel(value = "5分钟断面数据") public class SectionFlow5Min { @Id private Long id; @ApiModelProperty("区间标识名") @Field(name = "identifyname") private String identifyName;
@ApiModelProperty("开始时间") @Field(name = "starttime", format = DateFormat.date_optional_time, type = FieldType.Date) @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ", timezone = "GMT+8") private Date startTime;
@ApiModelProperty("结束时间") @Field(name = "endtime", format = DateFormat.date_optional_time, type = FieldType.Date) @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ", timezone = "GMT+8") private Date endTime; }
|
这里要注意下时间的序列化 ,否则查询会报错。
- 在repository下创建interface继承ElasticsearchRepository
1 2 3 4 5 6 7 8
| public interface SectionFlow5MinDao extends ElasticsearchRepository<SectionFlow5Min, Long> {
Page<SectionFlow5Min> findByStartTimeGreaterThanEqualAndEndTimeLessThanEqualAndSectionNameIn (Date startTime, Date endTime, Collection<String> sectionNames, Pageable pageable);
List<SectionFlow5Min> findByStartTimeGreaterThanEqualAndEndTimeLessThanEqualAndSectionName (Date startTime, Date endTime, String sectionName); }
|
此处findByStartTimeGreaterThanEqualAndEndTimeLessThanEqualAndSectionNameIn
是从ES里寻找开始时间大于等于StartTime 并且小于等于EndTime 并且查找多个SectionName 。
- 聚合查询
当然遇见复杂的查询或者聚合查询ElasticsearchRepository可能就满足不了了这时候需要用
NativeSearchQueryBuilder 去写查询语句或者用@Query注解,但是在@Query里写JSON我感觉实在太反人类了,还是NativeSearchQueryBuilder 好用,下面放代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import org.springframework.data.domain.Pageable;
private Map<Page<StationFlow5Min>, Map<String, Map<String, ParsedSum>>> find5MinDataByES(String startTime, String endTime, List<String> categoryNames, Pageable pageable) { NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder(); QueryBuilder qb1 = QueryBuilders.rangeQuery("starttime").from(startTime); QueryBuilder qb2 = QueryBuilders.rangeQuery("endtime").to(endTime); QueryBuilder qb3 = QueryBuilders.queryStringQuery(ESUtils.subStrListToStr(categoryNames)).field("stationname");
BoolQueryBuilder must = QueryBuilders.boolQuery().must(qb1).must(qb2).must(qb3);
if (pageable.getPageNumber() == 0) { SumAggregationBuilder sum1 = AggregationBuilders.sum("entryFlowSum").field("entryflow"); SumAggregationBuilder sum2 = AggregationBuilders.sum("exitFlowSum").field("exitflow"); TermsAggregationBuilder stationNameTerm = AggregationBuilders.terms("stationName").field("stationname.keyword"); stationNameTerm.subAggregation(sum1); stationNameTerm.subAggregation(sum2); queryBuilder.addAggregation(stationNameTerm); } queryBuilder.withQuery(must) .withPageable(pageable);
SearchHits<StationFlow5Min> search = elasticsearchOperations.search(queryBuilder.build(), StationFlow5Min.class); Map<String, Map<String, ParsedSum>> aggBucketsInAgg = null; if (search.getAggregations() != null) { aggBucketsInAgg = ESUtils.getAggBucketsInAgg(search.getAggregations()); } Map<Page<StationFlow5Min>, Map<String, Map<String, ParsedSum>>> result = new HashMap<>(); result.put(ESUtils.getPages(search, pageable), aggBucketsInAgg); return result;
}
|
pageable.getPageNumber() == 0 是业务场景可以无视, AggregationBuilders 是聚合查询封装好的方法 ,此处sum(entryflow),sum(exitflow)聚合查询出的字段命名为entryFlowSum、exitFlowSum并且把q1,q2,q3的数据查询也带出来,还有一点要注意,在使用聚合查询一定要标记查询字段为.keyword
提高索引效率AggregationBuilders.terms("stationName").field("stationname.keyword");
如果不加.keyword 在ES会查询报错。这里Map里返回的ParsedSum 是ES封装好的Sum后的值。(看了es的源码 ,我总感觉spring data-es过度封装了,在使用的时候比较难受)
- ESUtil里的封装方法说明
关于ESUtil里的方法,为了取数据更方便一些而封装了一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| import cn.hutool.json.JSONUtil; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.elasticsearch.core.SearchHit; import org.springframework.data.elasticsearch.core.SearchHitSupport; import org.springframework.data.elasticsearch.core.SearchHits; import org.springframework.data.elasticsearch.core.SearchPage;
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors;
public class ESUtils {
public static <T> SearchPage<T> geHitstPages(SearchHits<T> searchHits, Pageable pageable) { return SearchHitSupport.searchPageFor(searchHits, pageable); }
public static <T> List<T> getHitsContents(SearchPage<T> searchHits) { return searchHits.stream().map(SearchHit::getContent).collect(Collectors.toList()); }
public static <T> Page<T> getPages(SearchHits<T> searchHits, Pageable pageable) { return geHitstPages(searchHits, pageable).map(SearchHit::getContent); }
public static <T> Map<String, T> getAggregation(Aggregations aggregations) { return aggregations.asList().stream().collect(Collectors.toMap(Aggregation::getName, i -> (T) i));
}
public static String timeFormatBlankReT(String time) { return time.replace(" ", "T"); }
public static String subStrListToStr(List<String> ss) { String s = JSONUtil.toJsonStr(ss); s = s.substring(1, s.length() - 1); s = s.replace(",", ""); return s; }
public static <T> Map<String, Map<String, T>> getAggBucketsInAgg(Aggregations aggregations) {
Map<String, Map<String, T>> results = new HashMap<>(); for (Aggregation aggregation : aggregations) { List<? extends Terms.Bucket> buckets = ((Terms) aggregation).getBuckets(); Map<String, T> bucketMap = new HashMap<>(); String groupByName = null; for (Terms.Bucket bucket : buckets) { groupByName = bucket.getKeyAsString(); for (Aggregation bucketAggregation : bucket.getAggregations()) { bucketMap.put(bucketAggregation.getName(), (T) bucketAggregation); } } results.put(groupByName, bucketMap); } return results; }
}
|
- 踩坑说明
最后如何定位我的查询语句是否有问题,在你写的NativeSearchQueryBuilder 变量打上断点 ,本质上他是在处理最终的JSON结构,如果你使用的是ElasticsearchRepository 可以去源码core里寻找组装语句的地方打断点,复制出来去工具里调试。
- 在使用中参考了如下文档:
https://www.jianshu.com/p/7d1f72ca49ba
https://github.com/spring-projects/spring-data-elasticsearch/blob/main/src/main/asciidoc/reference/elasticsearch-repository-queries.adoc
https://segmentfault.com/a/1190000016296983