企业官方网站制作,做市场推广应该掌握什么技巧,丹江口网站开发,南宁企业如何建网站摘要: jdbc-input-plugin 只能实现数据库的追加#xff0c;对于 elasticsearch 增量写入#xff0c;但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。当然你如果有开发团队可以写程序在删除或者更新的时候同… 摘要: jdbc-input-plugin 只能实现数据库的追加对于 elasticsearch 增量写入但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力可以尝试下面的方法。 解决MySQL与Elasticsearch 数据不对称问题
jdbc-input-plugin 只能实现数据库的追加对于 elasticsearch 增量写入但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。
当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力可以尝试下面的方法。
这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化
mysql desc article;
-----------------------------------------------------------------------------
| Field | Type | Null | Key | Default | Extra |
-----------------------------------------------------------------------------
| id | int(11) | NO | | 0 | |
| title | mediumtext | NO | | NULL | |
| description | mediumtext | YES | | NULL | |
| author | varchar(100) | YES | | NULL | |
| source | varchar(100) | YES | | NULL | |
| content | longtext | YES | | NULL | |
| status | enum(Y,N)| NO | | N | |
| ctime | timestamp | NO | | CURRENT_TIMESTAMP | |
| mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | |
-----------------------------------------------------------------------------
7 rows in set (0.00 sec)| Field | Type | Null | Key | Default | Extra |
-----------------------------------------------------------------------------
| id | int(11) | NO | | 0 | |
| title | mediumtext | NO | | NULL | |
| description | mediumtext | YES | | NULL | |
| author | varchar(100) | YES | | NULL | |
| source | varchar(100) | YES | | NULL | |
| content | longtext | YES | | NULL | |
| status | enum(Y,N)| NO | | N | |
| ctime | timestamp | NO | | CURRENT_TIMESTAMP | |
| mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | |
-----------------------------------------------------------------------------
7 rows in set (0.00 sec)
logstash 增加 mtime 的查询规则
jdbc {jdbc_driver_library /usr/share/java/mysql-connector-java.jarjdbc_driver_class com.mysql.jdbc.Driverjdbc_connection_string jdbc:mysql://localhost:3306/cmsjdbc_user cmsjdbc_password passwordschedule * * * * * #定时cron的表达式,这里是每分钟执行一次statement select * from article where mtime :sql_last_valueuse_column_value truetracking_column mtimetracking_column_type timestamp record_last_run truelast_run_metadata_path /var/tmp/article-mtime.last}/usr/share/java/mysql-connector-java.jarjdbc_driver_class com.mysql.jdbc.Driverjdbc_connection_string jdbc:mysql://localhost:3306/cmsjdbc_user cmsjdbc_password passwordschedule * * * * * #定时cron的表达式,这里是每分钟执行一次statement select * from article where mtime :sql_last_valueuse_column_value truetracking_column mtimetracking_column_type timestamp record_last_run truelast_run_metadata_path /var/tmp/article-mtime.last}
创建回收站表这个事用于解决数据库删除或者禁用 status N 这种情况的。
CREATE TABLE elasticsearch_trash (id int(11) NOT NULL,ctime timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8 TABLE elasticsearch_trash (id int(11) NOT NULL,ctime timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8
为 article 表创建触发器
CREATE DEFINERdba% TRIGGER article_BEFORE_UPDATE BEFORE UPDATE ON article FOR EACH ROW
BEGIN-- 此处的逻辑是解决文章状态变为 N 的时候需要将搜索引擎中对应的数据删除。IF NEW.status N THENinsert into elasticsearch_trash(id) values(OLD.id);END IF;-- 此处逻辑是修改状态到 Y 的时候方式elasticsearch_trash仍然存在该文章ID导致误删除。所以需要删除回收站中得回收记录。IF NEW.status Y THENdelete from elasticsearch_trash where id OLD.id;END IF;
ENDCREATE DEFINERdba% TRIGGER article_BEFORE_DELETE BEFORE DELETE ON article FOR EACH ROW
BEGIN-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。insert into elasticsearch_trash(id) values(OLD.id);
END DEFINERdba% TRIGGER article_BEFORE_UPDATE BEFORE UPDATE ON article FOR EACH ROW
BEGIN-- 此处的逻辑是解决文章状态变为 N 的时候需要将搜索引擎中对应的数据删除。IF NEW.status N THENinsert into elasticsearch_trash(id) values(OLD.id);END IF;-- 此处逻辑是修改状态到 Y 的时候方式elasticsearch_trash仍然存在该文章ID导致误删除。所以需要删除回收站中得回收记录。IF NEW.status Y THENdelete from elasticsearch_trash where id OLD.id;END IF;
ENDCREATE DEFINERdba% TRIGGER article_BEFORE_DELETE BEFORE DELETE ON article FOR EACH ROW
BEGIN-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。insert into elasticsearch_trash(id) values(OLD.id);
END
接下来我们需要写一个简单地 Shell 每分钟运行一次从 elasticsearch_trash 数据表中取出数据然后使用 curl 命令调用 elasticsearch restful 接口删除被收回的数据。
你还可以开发相关的程序这里提供一个 Spring boot 定时任务例子。
实体 package cn.netkiller.api.domain.elasticsearch;import java.util.Date;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;Entity
Table
public class ElasticsearchTrash {Idprivate int id;Column(columnDefinition TIMESTAMP DEFAULT CURRENT_TIMESTAMP)private Date ctime;public int getId() {return id;}public void setId(int id) {this.id id;}public Date getCtime() {return ctime;}public void setCtime(Date ctime) {this.ctime ctime;}}
仓库 package cn.netkiller.api.repository.elasticsearch;import org.springframework.data.repository.CrudRepository;import com.example.api.domain.elasticsearch.ElasticsearchTrash;public interface ElasticsearchTrashRepository extends CrudRepositoryElasticsearchTrash, Integer{}定时任务
package cn.netkiller.api.schedule;import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;Component
public class ScheduledTasks {private static final Logger logger LoggerFactory.getLogger(ScheduledTasks.class);Autowiredprivate TransportClient client;Autowiredprivate ElasticsearchTrashRepository alasticsearchTrashRepository;public ScheduledTasks() {}Scheduled(fixedRate 1000 * 60) // 60秒运行一次调度任务public void cleanTrash() {for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {DeleteResponse response client.prepareDelete(information, article, elasticsearchTrash.getId() ).get();RestStatus status response.status();logger.info(delete {} {}, elasticsearchTrash.getId(), status.toString());if (status RestStatus.OK || status RestStatus.NOT_FOUND) {alasticsearchTrashRepository.delete(elasticsearchTrash);}}}
}Spring boot 启动主程序。
package cn.netkiller.api;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;SpringBootApplication
EnableScheduling
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
本文转载自
https://my.oschina.net/neochen/blog/1518679#comment-list 本人最近开了一个公众号会讲一些常用的技术以及面试题欢迎关注 扫码关注每天获取最前沿的互联网知识~