MySQL 與 Elasticsearch 數(shù)據(jù)不對稱問題解決辦法
MySQL 與 Elasticsearch 數(shù)據(jù)不對稱問題解決辦法
jdbc-input-plugin 只能實現(xiàn)數(shù)據(jù)庫的追加,對于 elasticsearch 增量寫入,但經(jīng)常jdbc源一端的數(shù)據(jù)庫可能會做數(shù)據(jù)庫刪除或者更新操作。這樣一來數(shù)據(jù)庫與搜索引擎的數(shù)據(jù)庫就出現(xiàn)了不對稱的情況。
當(dāng)然你如果有開發(fā)團(tuán)隊可以寫程序在刪除或者更新的時候同步對搜索引擎操作。如果你沒有這個能力,可以嘗試下面的方法。
這里有一個數(shù)據(jù)表 article , mtime 字段定義了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的時間都會變化
mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id | int(11) | NO | | 0 | |
| title | mediumtext | NO | | NULL | |
| description | mediumtext | YES | | NULL | |
| author | varchar(100) | YES | | NULL | |
| source | varchar(100) | YES | | NULL | |
| content | longtext | YES | | NULL | |
| status | enum('Y','N')| NO | | 'N' | |
| ctime | timestamp | NO | | CURRENT_TIMESTAMP | |
| mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)
logstash 增加 mtime 的查詢規(guī)則
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *" #定時cron的表達(dá)式,這里是每分鐘執(zhí)行一次
statement => "select * from article where mtime > :sql_last_value"
use_column_value => true
tracking_column => "mtime"
tracking_column_type => "timestamp"
record_last_run => true
last_run_metadata_path => "/var/tmp/article-mtime.last"
}
創(chuàng)建回收站表,這個事用于解決數(shù)據(jù)庫刪除,或者禁用 status = 'N' 這種情況的。
CREATE TABLE `elasticsearch_trash` ( `id` int(11) NOT NULL, `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
為 article 表創(chuàng)建觸發(fā)器
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW BEGIN -- 此處的邏輯是解決文章狀態(tài)變?yōu)?N 的時候,需要將搜索引擎中對應(yīng)的數(shù)據(jù)刪除。 IF NEW.status = 'N' THEN insert into elasticsearch_trash(id) values(OLD.id); END IF; -- 此處邏輯是修改狀態(tài)到 Y 的時候,方式elasticsearch_trash仍然存在該文章ID,導(dǎo)致誤刪除。所以需要刪除回收站中得回收記錄。 IF NEW.status = 'Y' THEN delete from elasticsearch_trash where id = OLD.id; END IF; END CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN -- 此處邏輯是文章被刪除同事將改文章放入搜索引擎回收站。 insert into elasticsearch_trash(id) values(OLD.id); END
接下來我們需要寫一個簡單地 Shell 每分鐘運(yùn)行一次,從 elasticsearch_trash 數(shù)據(jù)表中取出數(shù)據(jù),然后使用 curl 命令調(diào)用 elasticsearch restful 接口,刪除被收回的數(shù)據(jù)。
你還可以開發(fā)相關(guān)的程序,這里提供一個 Spring boot 定時任務(wù)例子。
實體
package cn.netkiller.api.domain.elasticsearch;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
@Entity
@Table
public class ElasticsearchTrash {
@Id
private int id;
@Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
private Date ctime;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public Date getCtime() {
return ctime;
}
public void setCtime(Date ctime) {
this.ctime = ctime;
}
}
倉庫
package cn.netkiller.api.repository.elasticsearch;
import org.springframework.data.repository.CrudRepository;
import com.example.api.domain.elasticsearch.ElasticsearchTrash;
public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{
}
定時任務(wù)
package cn.netkiller.api.schedule;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;
@Component
public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
@Autowired
private TransportClient client;
@Autowired
private ElasticsearchTrashRepository alasticsearchTrashRepository;
public ScheduledTasks() {
}
@Scheduled(fixedRate = 1000 * 60) // 60秒運(yùn)行一次調(diào)度任務(wù)
public void cleanTrash() {
for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
RestStatus status = response.status();
logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
alasticsearchTrashRepository.delete(elasticsearchTrash);
}
}
}
}
Spring boot 啟動主程序。
package cn.netkiller.api;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
以上就是MySQL 與 Elasticsearch 數(shù)據(jù)不對稱問題解決辦法的講解,如有疑問請留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非maisonbaluchon.cn所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。
關(guān)注官方微信