希望长大对我而言,是可以做更多想做的事,而不是被迫做更多不想做的事...... 首页 Elasticsearch父子关系 丁D 学无止境 2019-09-18 09:32 51716已阅读 关联查询 join 摘要本次简单使用一下Elasticsearch父子关系 **Elasticsearch父子关系** [5.x参考](https://www.cnblogs.com/double-yuan/p/9798103.html) [官网join介绍](https://www.elastic.co/guide/en/elasticsearch/reference/6.3/parent-join.html) [es6.x一对多方案参考](https://www.cnblogs.com/liugx/articles/8482356.html) >**注意**:Elasticsearch的关联关系,必须要在**同一个索引**里面,而且父子关系必须在**同一个分片**中。 es6.x版本规定一个索引中只能有一个type,这就导致了,父子关系就必须放在同一个type中。 ```js 例子:本博客,主要两张表: t_article(id,文章标题,摘要,分类) t_article_contents(正文内容,t_article的id)。 需求:要求分类id为1的并且标题或摘要或正文内容,带有“山”的关键字的文章。 SELECT * FROM t_article t LEFT JOIN t_article_contents t1 ON t.id = t1.article_id WHERE t.category_id = 1 AND ( t.title LIKE '%山%' OR t.summary LIKE '%山%' OR t1.contents LIKE '%山%' ) 方案一:在业务层实现, 1.先t_article查询category_id ==1的文章 2.查询t_article_contents中contents有“山”,并article_id==步骤1的值 3.查询t_article查询category_id ==1,并且summary或title有“山”或id为步骤2的值 方案二:关联查询 由于这里为了使用RestHighLevel(es5.6开始支持),使用了es6.3.2版本, 确发现es6.x以上一个index中只能有1个type。这就导致了t_article表和 t_article_contents表必须放在同一个index,同一个type。 方案三:嵌套json 将t_article_contents嵌套在t_article { "title": "你大爷", "summary": "你大爷 哇哈哈 66666", "publish_time": "2018-08-29 15:56", "states": 1, "creator": "丁D", "click_num": 222, "thumb_url": "/upload/shan.jpg", "category_id": 1, "t_article_contents": { "contents": "娃哈哈哈哈哈哈哈哈" } } ``` >注意:由于t_article和t_article_contents只能在同一个type中。由于id就必须唯一.所以mysql中的id不能直接同在es中。 **方案二** ```js 创建idnex,设置mapping,定义父子关系t_article为父,t_article_contents为子 127.0.0.1:9200/blog/ { "mappings": { "t_article": { "properties": { "publish_time":{ "type":"text", "fielddata":true }, "join_field": { "type": "join", "relations": { "t_article": "t_article_contents" } } } } } } 插入两条父数据 127.0.0.1:9200/blog/t_article/1 { "title":"山有木兮", "summary":"《山有木兮》是橙光游戏《人鱼传说之长生烛》的主题曲,执素兮谱曲,顾聆落填词演唱者伦桑,于2016年8月12日正式发行", "publish_time":"2018-08-29 15:56", "category_id":1, "join_field": { "name": "t_article" } } 127.0.0.1:9200/blog/t_article/2 { "title":"你大爷", "summary":"你大爷 哇哈哈 66666", "publish_time":"2018-08-29 15:56", "category_id":1, "join_field": { "name": "t_article" } } //中文分词器使用ik 127.0.0.1:9200/blog/ { "mappings": { "t_article": { "_all": { "analyzer": "ik_max_word", "search_analyzer": "ik_max_word", "term_vector": "no", "store": "false" }, "properties": { "publish_time":{ "type":"text", "fielddata":true }, "join_field": { "type": "join", "relations": { "t_article": "t_article_contents" } }, "title": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word", "boost": 8 } , "summary": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word", "boost": 8 } , "contents": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word", "boost": 8 } } } } } 创建数据时使用了routing=1参数,所以后续getByid,update,delete都需要指定routing参数,否则ES按照默认路由规则去找shard,自然找不到了。 为id==1的文章插入两条子数据 127.0.0.1:9200/blog/t_article/3?routing=1 { "contents":"娃哈哈哈哈哈哈哈哈", "join_field": { "name": "t_article_contents", "parent": 1 } } 127.0.0.1:9200/blog/t_article/4?routing=1 { "contents":" 沿着路灯一个人走回家 和老朋友打电话 你那里天气好吗", "join_field": { "name": "t_article_contents", "parent": 1 } } ``` ** 使用 RestHighLevel** ```js org.elasticsearch elasticsearch 6.3.2 org.elasticsearch.client elasticsearch-rest-high-level-client 6.3.2 /** * 构建查询信息 * @return */ public void buildSearchPage(ModelAndView mv,Long categoryId,String searchKey) throws IOException { RestClientBuilder restClient = RestClient.builder( new HttpHost("localhost", 9200, "http")); RestHighLevelClient client = new RestHighLevelClient(restClient); // 创建并设置SearchSourceBuilder对象 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); BoolQueryBuilder filterQueryBuilder = new BoolQueryBuilder(); TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("category_id",categoryId); filterQueryBuilder.must(termQueryBuilder); filterQueryBuilder.should(QueryBuilders.matchQuery("title",searchKey)); filterQueryBuilder.should(QueryBuilders.matchQuery("summary",searchKey)); HasChildQueryBuilder hasChildQueryBuilder = new HasChildQueryBuilder( "t_article_contents",QueryBuilders.matchQuery("contents",searchKey), ScoreMode.None); filterQueryBuilder.should(hasChildQueryBuilder); boolQueryBuilder.filter(filterQueryBuilder); searchSourceBuilder.query(boolQueryBuilder); searchSourceBuilder.from(0); // 每页多少条数据 searchSourceBuilder.size(1000); // 设置排序规则 searchSourceBuilder.sort("publish_time", SortOrder.DESC); // 设置超时时间为2s searchSourceBuilder.timeout(new TimeValue(2000)); // 创建并设置SearchRequest对象 SearchRequest searchRequest = new SearchRequest(); // 设置request要搜索的索引和类型 searchRequest.indices("blog").types("t_article"); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest); System.out.printf(searchResponse.toString()); client.close(); } 转成DSL { "from": 0, "size": 1000, "timeout": "2000ms", "query": { "bool": { "filter": [ { "bool": { "must": [ { "term": { "category_id": { "value": 1, "boost": 1.0 } } } ], "should": [ { "match": { "title": { "query": "山", "operator": "OR", "prefix_length": 0, "max_expansions": 50, "fuzzy_transpositions": true, "lenient": false, "zero_terms_query": "NONE", "auto_generate_synonyms_phrase_query": true, "boost": 1.0 } } }, { "match": { "summary": { "query": "山", "operator": "OR", "prefix_length": 0, "max_expansions": 50, "fuzzy_transpositions": true, "lenient": false, "zero_terms_query": "NONE", "auto_generate_synonyms_phrase_query": true, "boost": 1.0 } } }, { "has_child": { "query": { "match": { "contents": { "query": "山", "operator": "OR", "prefix_length": 0, "max_expansions": 50, "fuzzy_transpositions": true, "lenient": false, "zero_terms_query": "NONE", "auto_generate_synonyms_phrase_query": true, "boost": 1.0 } } }, "type": "t_article_contents", "score_mode": "none", "min_children": 0, "max_children": 2147483647, "ignore_unmapped": false, "boost": 1.0 } } ], "adjust_pure_negative": true, "boost": 1.0 } } ], "adjust_pure_negative": true, "boost": 1.0 } }, "sort": [ { "publish_time": { "order": "desc" } } ] } 可以使用inner_hits在查询父数据同时带出子数据(或子带出父)。 inner_hits默认只能查询3条数据,需要查询更多要自己设置from和size { "has_child": { "query": { "match": { "contents": { "query": "山", "operator": "OR", "prefix_length": 0, "max_expansions": 50, "fuzzy_transpositions": true, "lenient": false, "zero_terms_query": "NONE", "auto_generate_synonyms_phrase_query": true, "boost": 1.0 } } }, "inner_hits":{}, "type": "t_article_contents", "score_mode": "none", "min_children": 0, "max_children": 2147483647, "ignore_unmapped": false, "boost": 1.0 } } ``` **logstash导入es关联关系** ```js input { jdbc { jdbc_driver_library => "/usr/local/logstash-6.3.2/lib/mysql-connector-java-5.1.29.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/blog" jdbc_user => "root" jdbc_password => "root#$%123" # statement_filepath => "filename.sql" statement => "SELECT * from t_article where test_time> :sql_last_value order by test_time asc" jdbc_paging_enabled => "true" jdbc_page_size => "10" jdbc_default_timezone =>"Asia/Shanghai" type => "t_article" tracking_column_type => "timestamp" use_column_value => true tracking_column => "test_time" record_last_run => true last_run_metadata_path => "/usr/local/logstash-6.3.2/config/last_run_metadata.txt" schedule => "* * * * *" } jdbc { jdbc_driver_library => "/usr/local/logstash-6.3.2/lib/mysql-connector-java-5.1.29.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/blog" jdbc_user => "root" jdbc_password => "root#$%123" # statement_filepath => "filename.sql" statement => "SELECT * from t_article_contents where test_time> :sql_last_value order by test_time asc" jdbc_paging_enabled => "true" jdbc_page_size => "10" jdbc_default_timezone =>"Asia/Shanghai" type => "t_article_contents" tracking_column_type => "timestamp" use_column_value => true tracking_column => "test_time" record_last_run => true last_run_metadata_path => "/usr/local/logstash-6.3.2/config/last_run_content_metadata.txt" schedule => "* * * * *" } } filter { if [type] == "t_article" { ruby{ code => ' event.set("test333333333333",111); field = { "todo_id" => event.get("test_time") } join_field = { "name": "t_article" } event.set("join_field",join_field) ' } json { source => "message" remove_field => ["message"] } json { source => "{\"test\" : \"test1\"}" target =>"aaaaaa" } mutate { add_field => { "join_field11" => "%{aaaaaa}" "join_field22" => "%{test_time}" } } } if [type] == "t_article_contents" { ruby { code =>' join_field = { "name" => "t_article_contents","parent" => "%{article_id}" } event.set("join_field",join_field) ' } json { source => "message" remove_field => ["message"] } } } output { if [type] == "t_article" { elasticsearch { hosts => ["39.108.231.144:9200"] document_type => "t_article" index => "blog" document_id => "%{id}" } } if [type] == "t_article_contents" { elasticsearch { hosts => ["39.108.231.144:9200"] document_type => "t_article" index => "blog" document_id => "%{id}" routing => "%{article_id}" } } } ``` **删除查询的数据** ```js post http://www.582466.top:9200/blog/_delete_by_query { "query": { "match_all": {} } } ``` 方案三:嵌套json ```js 不管是修改t_article还是修改t_article_contents都要全部取出来修改。 从mysql导入es的时候要在两张表的数据,整成嵌套json,可以使用插件logstash-filter-aggregate ``` [下载logstash-filter-aggregate](https://rubygems.org/gems/logstash-filter-aggregate/versions/2.6.4) [参考文章logstash-filter-aggregate](https://segmentfault.com/q/1010000016711393) [官网参考logstash-filter-aggregate](https://github.com/logstash-plugins/logstash-filter-aggregate/blob/master/docs/index.asciidoc) 很赞哦! (0) 上一篇:logstash-input-jdbc 下一篇:使用策略模式干掉大片的 if else 目录 点击排行 Elasticsearch6.3.2之x-pack redis哨兵 2019-07-09 22:05 Redis+Twemproxy+HAProxy+Keepalived 2019-07-12 17:20 GC优化策略和相关实践案例 2019-10-10 10:54 JVM垃圾回收器 2019-10-10 10:23 标签云 Java Spring MVC Mybatis Ansible Elasticsearch Redis Hive Docker Kubernetes RocketMQ Jenkins Nginx 友情链接 郑晓博客 佛布朗斯基 凉风有信 MarkHoo's Blog 冰洛博客 南实博客 Rui | 丁D Java研发工程师 生活可以用「没办法」三个字概括。但别人的没办法是「腿长,没办法」、「长得好看,没办法」、「有才华,没办法」。而你的没办法,是真的没办法。 请作者喝咖啡