
在分布式系统横行的今天,“数据查询”早已不是简单的数据库SELECT操作——当数据量突破TB级、并发查询达到万级QPS,传统单体搜索引擎(如Lucene)的性能瓶颈会瞬间暴露,要么查得慢,要么查不到,甚至直接宕机。而分布式搜索引擎,正是为解决“海量数据高效检索”而生的核心组件,它像一个“分布式数据检索大脑”,将海量数据分片存储、并行计算,既能支撑PB级数据存储,又能实现毫秒级查询响应,是电商、日志分析、风控等场景的必备技术。
很多开发者对分布式搜索引擎的理解停留在“会用Elasticsearch(ES)调API”,但对其底层“为什么能分片”“副本如何保证高可用”“查询请求如何路由”“倒排索引怎么构建”等核心逻辑一知半解,导致遇到性能瓶颈、数据不一致、集群宕机等问题时无从下手。
本文将彻底打破“只知其然,不知其所以然”的困境,从底层逻辑出发,结合Java实战讲透分布式搜索引擎的核心原理、架构设计、实战落地及性能优化,兼顾深度与可读性,让新手能夯实基础,让资深开发者能解决实际问题。
我们先抛开复杂的专业术语,用“生活场景”理解分布式搜索引擎的核心价值:
假设你有一个“网上书店”,里面有1000万本书(对应系统中的1000万条图书数据,每条数据包含书名、作者、出版社、价格、简介等信息)。现在用户需要查询“Java编程”相关的图书,你该如何快速找到所有匹配的书?
如果直接用MySQL存储所有图书数据,查询SQL如下:
SELECT * FROM book WHERE title LIKE '%Java编程%' OR author LIKE '%Java编程%' OR intro LIKE '%Java编程%';
这个SQL的问题很明显:
Lucene是一个“全文检索工具包”,核心能力是“构建倒排索引”(后面会详细讲),能实现快速全文检索、相关性排序。但它有一个致命缺陷:只能单机运行。
也就是说,你只能把1000万条图书数据都存在一台服务器上,用Lucene构建索引、处理查询。这会带来两个问题:
分布式搜索引擎的核心思路是“分而治之+集群协作”:
简单来说:分布式搜索引擎 = 多个单体搜索引擎(如Lucene)+ 分布式集群管理 + 数据分片 + 副本机制,它解决了单体搜索引擎的“存储瓶颈、性能瓶颈、高可用风险”,同时保留了全文检索的核心能力(快速匹配、相关性排序)。
核心价值总结(干货):
分布式搜索引擎的所有功能(分片、副本、查询、索引),都基于以下5个核心底层逻辑,搞懂这些,再看任何分布式搜索引擎(ES、Solr、PolarDB-X Search)的用法,都会豁然开朗。
倒排索引是分布式搜索引擎实现“快速全文检索”的核心,也是和传统数据库“正排索引”的本质区别。
很多开发者听过倒排索引,但说不清楚它的原理,这里用“通俗语言+实例”,彻底讲透。
先看传统数据库的“正排索引”(比如MySQL的主键索引):
再看倒排索引:
倒排索引不是简单的“关键词→文档ID列表”,而是由3个核心组件组成,缺一不可,直接决定了检索效率和相关性排序的准确性:
倒排索引的构建是“离线构建+在线更新”的过程,核心流程分为5步,用流程图展示:

假设我们有3条原始文档(图书数据),基于这3条文档,完整演示倒排索引的构建过程:
原始文档: 文档1(ID=1):title:《Java编程思想》,author:Bruce Eckel,intro:Java编程思想是Java领域的经典书籍,适合有一定基础的Java开发者。 文档2(ID=2):title:《Java核心技术》,author:Cay S. Horstmann,intro:Java核心技术涵盖Java基础语法和高级特性,适合Java初学者。 文档3(ID=3):title:《Python编程入门》,author:张三,intro:Python编程入门简单易懂,适合零基础学习者,与Java编程有本质区别。
如果每次新增/修改/删除文档,都重新构建整个倒排索引,效率会极低(比如10亿条数据,全量重建可能需要几小时)。因此,分布式搜索引擎采用“增量更新”机制,核心是“分段索引(Segment)”:
这种机制的优势:无需全量重建索引,新增/修改/删除文档的效率极高,同时不影响查询性能(查询时只需遍历所有分段,合并结果)。
分布式搜索引擎要支撑PB级数据存储,核心是“分片(Shard)”——将海量数据拆分成多个小的“数据分片”,每个分片存储一部分数据,分布在不同的节点(服务器)上,实现“分而治之”。
很多开发者会把“分片”和“副本”搞混,这里先明确:分片负责“数据拆分”(解决存储瓶颈),副本负责“数据备份”(解决高可用瓶颈) ,两者分工明确,缺一不可。
用架构图展示分片的分布逻辑,清晰明了:

很多开发者在搭建分布式搜索引擎集群时,不知道“主分片数量设多少合适”,这里给出明确的实战建议(基于ES实战经验,有理有据):
分布式搜索引擎中,无论是“写入文档”还是“查询文档”,都需要先找到“文档对应的分片”,这个过程叫“分片路由”,核心是“确定文档属于哪个主分片”。
路由流程(流程图):

假设集群中主分片数量为3(分片ID:0、1、2),文档ID为“java_book_123”,路由过程如下:
默认情况下,路由字段是“文档ID”,但在某些场景下(比如“按作者分组查询”),可以自定义路由字段(比如“author”),这样同一作者的所有文档,都会存储在同一个分片上,提升查询效率。
实例(ES的自定义路由请求,Java代码后续实战会讲):
PUT /book/_doc/java_book_123?routing=BruceEckel
{
"title": "Java编程思想",
"author": "Bruce Eckel",
"price": 89.0
}
说明:用“author”作为路由字段,同一作者(BruceEckel)的所有文档,都会存储在同一个分片上,查询该作者的文档时,只需查询一个分片,提升效率。
分布式搜索引擎的“高可用”,核心靠副本机制——每个主分片都有一个或多个副本分片,当主分片所在的节点宕机时,副本分片会自动升级为主分片,继续提供服务,避免服务中断。
副本的分布规则,直接影响集群的高可用和性能,核心有3条规则(必须遵守):
当主分片所在的节点宕机时,集群会自动触发“故障转移”流程,确保服务不中断,流程如下(流程图+详细说明):

假设集群节点1(主分片0、副本2)、节点2(主分片1、副本0)、节点3(主分片2、副本1),主分片0所在的节点1宕机,故障转移流程如下:
副本数量不是越多越好,要结合集群节点数量和业务需求,给出明确的实战建议:
分布式搜索引擎的集群,不是“多个节点简单叠加”,而是有明确的节点分工,核心分为3类节点(主节点、数据节点、协调节点),每个节点各司其职,确保集群正常运行。
用架构图展示集群节点的分工:

主节点是集群的“管理者”,一旦主节点宕机,需要选举新的主节点,确保集群正常运行。主节点选举的核心是“分布式一致性协议”(比如ES用的是“Bully算法”,简化版的Raft协议)。

结合前面的分片、副本、节点分工,完整梳理集群的运行流程(以“客户端查询图书”为例):
假设集群架构:3个主节点(node-master-1、node-master-2、node-master-3)、5个数据节点(node-data-1至node-data-5)、3个协调节点(node-coord-1至node-coord-3),主分片数量3,副本数量1。
运行流程:
分布式搜索引擎的“数据一致性”,核心是“确保主分片和副本分片的数据同步一致”,以及“多个分片之间的数据操作一致性”。但由于分布式系统的特性(网络延迟、节点宕机),无法实现“强一致性”(比如MySQL的事务ACID),只能实现“最终一致性”。
最终一致性:客户端写入数据后,主分片立即写入成功,但副本分片可能需要一定时间(毫秒级)才能同步到数据,在同步完成前,查询副本分片可能会得到旧数据,但最终(同步完成后),主分片和副本分片的数据会保持一致。
比如:客户端写入文档A到主分片0,主分片0写入成功后,立即返回“写入成功”给客户端,但副本分片0还未同步到文档A,此时客户端查询副本分片0,会查不到文档A,但10毫秒后,副本分片0同步完成,再查询就能查到文档A,这就是“最终一致性”。
主分片和副本分片的数据同步,采用“异步同步+确认机制”,流程如下:

在实际业务中,经常会遇到“数据库和分布式搜索引擎的数据一致性”问题(比如“新增图书”,既要写入MySQL,也要写入ES,确保两者数据一致),此时需要实现“分布式事务”,核心方案是“本地消息表+最终一致性”(最常用、最可靠的方案)。
方案流程(流程图):

假设业务场景:新增图书时,先写入MySQL,再写入ES,确保两者数据一致,采用“本地消息表+RabbitMQ”实现分布式事务,步骤如下:
-- 图书表
CREATETABLE`book` (
`id`varchar(64) NOTNULLCOMMENT'图书ID',
`title`varchar(255) NOTNULLCOMMENT'书名',
`author`varchar(100) NOTNULLCOMMENT'作者',
`price`decimal(10,2) NOTNULLCOMMENT'价格',
`intro`textCOMMENT'简介',
`create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',
`update_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'修改时间',
PRIMARY KEY (`id`),
KEY`idx_author` (`author`) COMMENT'作者索引'
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='图书表';
-- 本地消息表(用于分布式事务)
CREATETABLE`local_message` (
`id`bigintNOTNULL AUTO_INCREMENT COMMENT'消息ID',
`business_type`varchar(50) NOTNULLCOMMENT'业务类型(如BOOK_ADD:新增图书)',
`business_id`varchar(64) NOTNULLCOMMENT'业务ID(图书ID)',
`message`textNOTNULLCOMMENT'消息内容(JSON格式)',
`status`tinyintNOTNULLDEFAULT0COMMENT'消息状态:0-未发送,1-已发送,2-已消费,3-消费失败',
`retry_count`intNOTNULLDEFAULT0COMMENT'重试次数',
`create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',
`update_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'修改时间',
PRIMARY KEY (`id`),
KEY`idx_business_id` (`business_id`),
KEY`idx_status_retry_count` (`status`,`retry_count`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='本地消息表';
具体Java代码,后续实战部分会完整给出(包含依赖、实体类、mapper、service、controller),确保可直接编译运行。
分布式搜索引擎的核心优势之一,是“相关性排序”——查询时,不仅能找到匹配的文档,还能根据“文档和查询的匹配度”排序,让最匹配的文档排在前面(比如用户查询“Java编程思想”,优先显示书名完全匹配的文档,再显示简介包含的文档)。
相关性排序的核心,是“TF-IDF算法”(Term Frequency-Inverse Document Frequency,词频-逆文档频率),通俗来说,就是“关键词在文档中出现的频率越高,同时在所有文档中出现的频率越低,该文档的相关性得分越高”。
除了TF-IDF算法,分布式搜索引擎还会结合以下因素,调整相关性得分,让排序更合理:
在ES中,可以通过“function_score”查询,自定义相关性排序规则(比如结合TF-IDF和文档新鲜度),实例如下(Java代码后续实战会讲):
GET /book/_search
{
"query": {
"function_score": {
"query": {
"match": {
"title": "Java编程"
}
},
"functions": [
{
"tfidf": {
"field": "title",
"boost": 2.0
}
},
{
"gauss": {
"create_time": {
"scale": "30d",
"decay": 0.5
}
}
}
],
"boost_mode": "multiply"
}
},
"sort": [
{
"_score": {
"order": "desc"
}
}
]
}
说明:
目前市面上主流的分布式搜索引擎有3种:Elasticsearch(ES)、Solr、PolarDB-X Search,很多开发者在选型时不知道该选哪种,这里从底层架构、性能、易用性、适用场景等维度,做详细对比,帮你快速选型(无废话,纯干货)。
对比维度 | Elasticsearch(ES) | Solr | PolarDB-X Search |
|---|---|---|---|
底层基础 | 基于Lucene,自研分布式架构 | 基于Lucene,分布式架构依赖第三方(如ZooKeeper) | 基于Lucene,阿里自研分布式架构(结合PolarDB-X) |
分布式能力 | 原生支持,无需第三方组件,集群管理、分片、副本、故障转移全自研 | 分布式依赖ZooKeeper,配置复杂,集群管理成本高 | 原生集成PolarDB-X分布式数据库,数据同步零成本 |
全文检索性能 | 近实时检索(毫秒级),高并发场景表现优异,动态扩容便捷 | 静态数据检索性能优异,实时数据写入性能弱于ES | 近实时检索,与分布式数据库联动性能拉满,适合电商、金融 |
易用性 | 开箱即用,API简洁,生态完善,文档丰富,社区活跃 | 配置繁琐,学习成本高,社区活跃度低 | 与阿里云生态深度集成,适合阿里技术栈用户,上手快 |
生态支持 | 支持日志分析、ELK栈、APM、向量检索(最新版支持AI向量) | 生态单一,主要用于传统全文检索场景,无向量检索能力 | 支持结构化+非结构化数据混合检索,兼容MySQL语法,学习成本极低 |
高可用 | 原生支持,故障转移自动完成,无需人工干预 | 依赖ZooKeeper,故障转移慢,易出现脑裂 | 原生高可用,结合PolarDB-X三副本机制,数据零丢失 |
适用场景 | 日志分析、全文检索、电商搜索、风控、AI向量检索、时序数据 | 传统企业站内搜索、静态文档检索、低并发场景 | 电商商品搜索、金融数据检索、分布式数据库+搜索引擎一体化场景 |
最新稳定版 | 8.14.0 | 9.5.0 | 2.0.0 |
com.jam.demo
├── config(ES配置、事务配置、Swagger配置)
├── controller(接口层)
├── entity(实体类)
├── mapper(MyBatisPlus mapper)
├── service(业务层)
│ └── impl(实现类)
├── util(工具类)
└── DemoApplication.java(启动类)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
<relativePath/>
</parent>
<groupId>com.jam</groupId>
<artifactId>distributed-search-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>distributed-search-demo</name>
<description>Distributed Search Engine Demo</description>
<properties>
<java.version>17</java.version>
<elasticsearch.version>8.14.0</elasticsearch.version>
<lombok.version>1.18.30</lombok.version>
<fastjson2.version>2.0.32</fastjson2.version>
<guava.version>32.1.3-jre</guava.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<swagger.version>2.2.15</swagger.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-java-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
spring:
datasource:
driver-class-name:com.mysql.cj.jdbc.Driver
url:jdbc:mysql://localhost:3306/distributed_search?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username:root
password:root
elasticsearch:
uris:http://localhost:9200
username:elastic
password:elastic
mybatis-plus:
mapper-locations:classpath:mapper/*.xml
type-aliases-package:com.jam.demo.entity
configuration:
map-underscore-to-camel-case:true
log-impl:org.apache.ibatis.logging.stdout.StdOutImpl
springdoc:
api-docs:
enabled:true
path:/v3/api-docs
swagger-ui:
enabled:true
path:/swagger-ui.html
server:
port:8080
CREATE DATABASEIFNOTEXISTS distributed_search DEFAULTCHARACTERSET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE distributed_search;
CREATETABLE`book` (
`id`varchar(64) NOTNULLCOMMENT'图书ID',
`title`varchar(255) NOTNULLCOMMENT'书名',
`author`varchar(100) NOTNULLCOMMENT'作者',
`price`decimal(10,2) NOTNULLCOMMENT'价格',
`intro`textCOMMENT'简介',
`create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',
`update_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'修改时间',
PRIMARY KEY (`id`),
KEY`idx_author` (`author`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='图书表';
CREATETABLE`local_message` (
`id`bigintNOTNULL AUTO_INCREMENT COMMENT'消息ID',
`business_type`varchar(50) NOTNULLCOMMENT'业务类型',
`business_id`varchar(64) NOTNULLCOMMENT'业务ID',
`message`textNOTNULLCOMMENT'消息内容',
`status`tinyintNOTNULLDEFAULT0COMMENT'0未发送1已发送2已消费3失败',
`retry_count`intNOTNULLDEFAULT0COMMENT'重试次数',
`create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMP,
`update_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY`idx_business` (`business_type`,`business_id`),
KEY`idx_status` (`status`,`retry_count`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COMMENT='本地消息表';
package com.jam.demo.config;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* ES8.x配置类
* @author ken
*/
@Configuration
publicclass EsConfig {
@Value("${spring.elasticsearch.uris}")
private String esUris;
@Value("${spring.elasticsearch.username}")
private String username;
@Value("${spring.elasticsearch.password}")
private String password;
@Bean
public ElasticsearchClient elasticsearchClient() {
String[] uriArr = esUris.split(",");
HttpHost[] hosts = new HttpHost[uriArr.length];
for (int i = 0; i < uriArr.length; i++) {
String uri = uriArr[i].replace("http://", "").replace("https://", "");
String[] ipPort = uri.split(":");
hosts[i] = new HttpHost(ipPort[0], Integer.parseInt(ipPort[1]), "http");
}
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestClient restClient = RestClient.builder(hosts).setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)).build();
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
returnnew ElasticsearchClient(transport);
}
}
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 图书实体
* @author ken
*/
@Data
@TableName("book")
@Schema(name = "Book", description = "图书信息")
publicclass Book {
@TableId(type = IdType.ASSIGN_UUID)
@Schema(description = "图书ID")
private String id;
@Schema(description = "书名")
private String title;
@Schema(description = "作者")
private String author;
@Schema(description = "价格")
private BigDecimal price;
@Schema(description = "简介")
private String intro;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "修改时间")
private LocalDateTime updateTime;
}
package com.jam.demo.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.demo.entity.Book;
import org.apache.ibatis.annotations.Mapper;
/**
* BookMapper
* @author ken
*/
@Mapper
public interface BookMapper extends BaseMapper<Book> {
}
package com.jam.demo.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.jam.demo.entity.Book;
import org.springframework.transaction.TransactionStatus;
/**
* BookService
* @author ken
*/
public interface BookService extends IService<Book> {
/**
* 新增图书(MySQL+ES分布式事务)
* @param book 图书信息
* @return 结果
*/
boolean saveBook(Book book);
/**
* 全文检索图书
* @param keyword 关键词
* @return 结果
*/
Object searchBook(String keyword);
/**
* 编程式事务提交/回滚
* @param status 事务状态
* @param flag 成功标识
*/
void completeTransaction(TransactionStatus status, boolean flag);
}
package com.jam.demo.service.impl;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.jam.demo.entity.Book;
import com.jam.demo.mapper.BookMapper;
import com.jam.demo.service.BookService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.util.List;
/**
* BookServiceImpl
* @author ken
*/
@Service
@Slf4j
@RequiredArgsConstructor
publicclass BookServiceImpl extends ServiceImpl<BookMapper, Book> implements BookService {
privatefinal ElasticsearchClient elasticsearchClient;
privatefinal DataSourceTransactionManager transactionManager;
privatestaticfinal String ES_INDEX = "book_index";
@Override
public boolean saveBook(Book book) {
if (ObjectUtils.isEmpty(book) || !StringUtils.hasText(book.getTitle()) || !StringUtils.hasText(book.getAuthor())) {
log.error("图书参数异常");
returnfalse;
}
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
TransactionStatus status = transactionManager.getTransaction(definition);
try {
boolean saveDb = this.save(book);
if (!saveDb) {
transactionManager.rollback(status);
returnfalse;
}
IndexRequest<Book> request = IndexRequest.of(i -> i.index(ES_INDEX).id(book.getId()).document(book));
elasticsearchClient.index(request);
transactionManager.commit(status);
log.info("图书新增成功,ID:{}", book.getId());
returntrue;
} catch (Exception e) {
log.error("图书新增失败", e);
transactionManager.rollback(status);
returnfalse;
}
}
@Override
public Object searchBook(String keyword) {
if (!StringUtils.hasText(keyword)) {
return Lists.newArrayList();
}
try {
SearchRequest request = SearchRequest.of(s -> s.index(ES_INDEX).query(q -> q.multiMatch(m -> m.fields("title", "intro", "author").query(keyword))));
SearchResponse<Book> response = elasticsearchClient.search(request, Book.class);
List<Hit<Book>> hits = response.hits().hits();
if (CollectionUtils.isEmpty(hits)) {
return Lists.newArrayList();
}
List<Book> result = Lists.newArrayList();
for (Hit<Book> hit : hits) {
result.add(hit.source());
}
log.info("检索关键词:{}, 结果数:{}", keyword, result.size());
return result;
} catch (IOException e) {
log.error("ES检索失败", e);
return Lists.newArrayList();
}
}
@Override
public void completeTransaction(TransactionStatus status, boolean flag) {
if (flag) {
if (!status.isCompleted()) {
transactionManager.commit(status);
}
} else {
if (!status.isCompleted()) {
transactionManager.rollback(status);
}
}
}
}
package com.jam.demo.controller;
import com.jam.demo.entity.Book;
import com.jam.demo.service.BookService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
/**
* BookController
* @author ken
*/
@RestController
@RequestMapping("/book")
@RequiredArgsConstructor
@Tag(name = "图书检索接口", description = "分布式搜索引擎图书管理")
publicclass BookController {
privatefinal BookService bookService;
@PostMapping("/save")
@Operation(summary = "新增图书", description = "新增图书并同步到ES")
public boolean saveBook(@RequestBody Book book) {
return bookService.saveBook(book);
}
@GetMapping("/search")
@Operation(summary = "全文检索图书", description = "基于ES全文检索")
public Object searchBook(@RequestParam String keyword) {
return bookService.searchBook(keyword);
}
}
package com.jam.demo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 启动类
* @author ken
*/
@SpringBootApplication
@MapperScan("com.jam.demo.mapper")
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
本文从底层逻辑出发,用通俗语言讲透分布式搜索引擎的倒排索引、分片、副本、集群架构、分布式事务、相关性排序核心原理,对比主流产品选型,覆盖生产环境优化与问题解决方案。分布式搜索引擎的核心是分而治之+最终一致性,掌握底层逻辑后,无论使用哪种搜索引擎,都能快速落地、高效调优、解决实际问题。