希望长大对我而言,是可以做更多想做的事,而不是被迫做更多不想做的事...... 首页 logstash-input-jdbc 丁D 学无止境 2019-09-12 14:34 49699已阅读 logstash-input-jdbc 摘要本文讲解如何将mysql的数据导入es。 ##logstash-input-jdbc ```js 我们在实际工作中经常会遇到这样情况:我们数据存放在mysql中,但是数据越来越多, 我们在搜索的使用使用“like”这样效率太低,所以我们需要使用es来做全文搜索。 这样我们就需要将数据同步到es中,方法有很多这里我们使用logstash-input-jdbc这个插件。 ``` ```js 题外话:有空学习下canal,通过binlog实现。 https://mp.weixin.qq.com/s/YKqKW0n5JTPgTd9kv9RDhQ github:https://github.com/alibaba/canal https://www.jianshu.com/p/87944efe1005 ``` [官网参考](https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#plugins-inputs-jdbc-jdbc_default_timezone) https://segmentfault.com/a/1190000011784259 **安装logstash-input-jdbc** ```js 首先应该安装logstash,见elk安装篇 进入cd logstash/bin ./logstash-plugin install logstash-input-jdbc ``` ```js 安装完成之后,我们在config目录中创建配置文件:blog_log.conf input { file { path => "/root/blog/blog_error.log" type => "blog-error-log" start_position => "beginning" codec => multiline { pattern => "^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}" negate => true auto_flush_interval => 3 what => previous } } file { path => "/root/blog/blog_info.log" type => "blog-info-log" start_position => "beginning" codec => multiline { pattern => "^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}" negate => true auto_flush_interval => 3 what => previous } } jdbc { jdbc_driver_library => "/usr/local/logstash-5.2.0/lib/mysql-connector-java-5.1.29.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/blog" jdbc_user => "root" jdbc_password => "root#$%123" # statement_filepath => "filename.sql" statement => "SELECT * from t_article where test_time> :sql_last_value" jdbc_paging_enabled => "true" jdbc_page_size => "50000" type => "t_article" use_column_value => true tracking_column => "test_time" record_last_run => true last_run_metadata_path => "/usr/local/logstash-5.2.0/config/last_run_metadata.txt" schedule => "* * * * *" } } filter { if [type] != "t_article" { mutate { split => ["host", "8"] add_field => { "add_field_test" => "test3" } } grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" } } if [type] == "t_article" { json { source => "message" remove_field => ["message"] } } } output { if [type] == "t_article" { elasticsearch { hosts => ["39.108.231.144:9200"] index => "table_blog" document_id => "%{id}" } } if [type] != "t_article" { elasticsearch { hosts => ["39.108.231.144:9200"] index => "blog-%{+YYYY.MM.dd}" } } } ``` >**注意:** 这里的test_time是数据库的时间戳,数据更新,时间戳就变化。 这样可以在mysql数据变化变化后也,同步到es中去,如果使用的是id不会同步变化的数据 >设置mysql的id到es的id,不让es自动生成,不然每次都是新增,当你修改了数据,在es中也会新增,不会覆盖 **document_id => "%{id}"** >schedule => "* * * * *" 每分钟 >不支持数据删除,可以通过逻辑删除,+定时物理删除(自己实现) **遇到的问题** ```js [root@iZwz9278r1bks3b80puk6fZ config]# cat last_run_metadata.txt --- 2019-09-12 14:30:54.000000000 +08:00 控制台输出 [2019-09-12T15:04:00,250][INFO ][logstash.inputs.jdbc ] (0.020000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> 0) AS `t1` LIMIT 1 [2019-09-12T15:04:00,270][INFO ][logstash.inputs.jdbc ] (0.003000s) SELECT * FROM (SELECT * from t_article where test_time> 0) AS `t1` LIMIT 50000 OFFSET 0 [2019-09-12T15:05:00,092][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT version() AS `v` LIMIT 1 [2019-09-12T15:05:00,100][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:30:54') AS `t1` LIMIT 1 [2019-09-12T15:05:00,108][INFO ][logstash.inputs.jdbc ] (0.004000s) SELECT * FROM (SELECT * from t_article where test_time> '2019-09-12 14:30:54') AS `t1` LIMIT 50000 OFFSET 0 [2019-09-12T15:06:00,191][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:56:00') AS `t1` LIMIT 1 [2019-09-12T15:07:00,267][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:56:00') AS `t1` LIMIT 1 从上面日志分析 问题1: SELECT * FROM (SELECT * from t_article where test_time> 0) AS `t1` LIMIT 50000 OFFSET 0 test_time是时间戳从0默认是numeric 开始明显就不合适,我们设置类型 tracking_column_type string, one of ["numeric", "timestamp"] // tracking_column_type => "timestamp" use_column_value => true tracking_column => "test_time" 这样就会从 [2019-09-12T15:31:02,525][INFO ][logstash.inputs.jdbc ] (0.006000s) SELECT * FROM (SELECT * from t_article where test_time> '1970-01-01 00:00:00') AS `t1` LIMIT 10 OFFSET 60 问题2: 假设数据顺序是 test_time 2019-09-12 13:56:00 2019-09-12 14:56:00 2019-09-12 14:30:54 last_run_metadata.txt 文件记录的是最后一条(不是最大的) --- 2019-09-12 14:30:54.000000000 +08:00 [2019-09-12T15:05:00,100][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (SELECT * from t_article where test_time> '2019-09-12 14:30:54') AS `t1` LIMIT 1 出现情况:重复覆盖导入:假设数据库顺序如下 1 2 3 4 6 2 这样last_run_metadata.txt 记入的是2,每次到定时的时间点(1分钟)都会进行同步下面的数据 SELECT * from t_article where test_time> '2' 这样会查出 3 4 6 2 可以设置排序 order by test_time asc(让最后一个数据是最大的) 问题3 kibanna上面显示的时间戳少了8个小时 时区问题,但是last_run_metadata.txt加了8个小时跟数据库一样,,, --- 2019-09-12 14:30:54.000000000 +08:00 { "took": 79, "timed_out": false, "_shards": { "total": 5, "successful": 5, "failed": 0 }, "hits": { "total": 1, "max_score": 1, "hits": [ { "_index": "table_blog", "_type": "t_article", "_id": "125", "_score": 1, "_source": { "summary": "本文讲解的Elasticsearch的中文分词器IKAnalyzer。", "created_time": "2019-09-12 10:44:10", "thumb_url": "/upload/BCB71BBC-FC70-4b9e-A55E-0559BCA99006.png", "creator": "丁D", "allow_comment": 0, "recommend": 0, "click_num": 1, "title": "Elasticsearch(七)", "type": "t_article", "updator_id": 1, "upator": "丁D", "states": 1, "update_time": "2019-09-12 10:44:10", "test_time": "2019-09-12T06:30:54.000Z", "@timestamp": "2019-09-12T07:31:03.060Z", "category_id": 2, "publish_time": "2019-09-12 10:19", "creator_id": 1, "@version": "1", "support_num": 2, "id": 125 } } ] } } 设置时区地点 jdbc_default_timezone =>"Asia/Shanghai" 也没有什么用。。。待处理 ``` **最终配置文件** ```js input { file { path => "/root/blog/blog_error.log" type => "blog-error-log" start_position => "beginning" codec => multiline { pattern => "^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}" negate => true auto_flush_interval => 3 what => previous } } file { path => "/root/blog/blog_info.log" type => "blog-info-log" start_position => "beginning" codec => multiline { pattern => "^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}" negate => true auto_flush_interval => 3 what => previous } } jdbc { jdbc_driver_library => "/usr/local/logstash-5.2.0/lib/mysql-connector-java-5.1.29.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/blog" jdbc_user => "root" jdbc_password => "root#$%123" # statement_filepath => "filename.sql" statement => "SELECT * from t_article where test_time> :sql_last_value order by test_time asc" jdbc_paging_enabled => "true" jdbc_page_size => "10" jdbc_default_timezone =>"Asia/Shanghai" type => "t_article" tracking_column_type => "timestamp" use_column_value => true tracking_column => "test_time" record_last_run => true last_run_metadata_path => "/usr/local/logstash-5.2.0/config/last_run_metadata.txt" schedule => "* * * * *" } } filter { if [type] != "t_article" { mutate { split => ["host", "8"] add_field => { "add_field_test" => "test3" } } grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" } } if [type] == "t_article" { json { source => "message" remove_field => ["message"] } } } output { if [type] == "t_article" { elasticsearch { hosts => ["39.108.231.144:9200"] index => "table_blog" document_id => "%{id}" } } if [type] != "t_article" { elasticsearch { hosts => ["39.108.231.144:9200"] index => "blog-%{+YYYY.MM.dd}" } } } ``` 很赞哦! (1) 上一篇:Elasticsearch之IKAnalyzer 下一篇:Elasticsearch父子关系 目录 点击排行 Elasticsearch6.3.2之x-pack redis哨兵 2019-07-09 22:05 Redis+Twemproxy+HAProxy+Keepalived 2019-07-12 17:20 GC优化策略和相关实践案例 2019-10-10 10:54 JVM垃圾回收器 2019-10-10 10:23 标签云 Java Spring MVC Mybatis Ansible Elasticsearch Redis Hive Docker Kubernetes RocketMQ Jenkins Nginx 友情链接 郑晓博客 佛布朗斯基 凉风有信 MarkHoo's Blog 冰洛博客 南实博客 Rui | 丁D Java研发工程师 生活可以用「没办法」三个字概括。但别人的没办法是「腿长,没办法」、「长得好看,没办法」、「有才华,没办法」。而你的没办法,是真的没办法。 请作者喝咖啡