Elasticsearch 搜索引擎实战
Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,常用于全文检索、日志分析等场景。
一、数据库表设计
以商品搜索为例,设计商品表和 ES 索引映射。
1.1 MySQL 商品表
sql
CREATE TABLE `product` (
`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '商品ID',
`name` VARCHAR(200) NOT NULL COMMENT '商品名称',
`description` VARCHAR(2000) DEFAULT NULL COMMENT '商品描述',
`price` DECIMAL(10,2) NOT NULL COMMENT '商品价格',
`category_id` BIGINT NOT NULL COMMENT '分类ID',
`category_name` VARCHAR(50) NOT NULL COMMENT '分类名称',
`brand` VARCHAR(50) DEFAULT NULL COMMENT '品牌',
`stock` INT NOT NULL DEFAULT 0 COMMENT '库存',
`status` TINYINT NOT NULL DEFAULT 1 COMMENT '状态:0-下架,1-上架',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx_category_id` (`category_id`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品表';1.2 ES 索引映射
json
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": {
"analyzer": {
"ik_smart_analyzer": {
"type": "custom",
"tokenizer": "ik_smart"
},
"ik_max_word_analyzer": {
"type": "custom",
"tokenizer": "ik_max_word"
}
}
}
},
"mappings": {
"properties": {
"id": { "type": "long" },
"name": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"fields": {
"keyword": { "type": "keyword" }
}
},
"description": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"price": { "type": "double" },
"categoryId": { "type": "long" },
"categoryName": {
"type": "text",
"fields": { "keyword": { "type": "keyword" } }
},
"brand": {
"type": "text",
"fields": { "keyword": { "type": "keyword" } }
},
"stock": { "type": "integer" },
"status": { "type": "integer" },
"createTime": { "type": "date" },
"updateTime": { "type": "date" }
}
}
}1.3 IK 分词器说明
| 分词器 | 用途 |
|---|---|
ik_max_word | 细粒度分词,搜索时匹配更多词条 |
ik_smart | 粗粒度分词,提高搜索精度 |
二、Spring Data Elasticsearch
2.1 Maven 依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>2.2 配置文件
yaml
spring:
elasticsearch:
uris: http://localhost:9200
connection-timeout: 5s
socket-timeout: 30s2.3 实体类
java
package com.example.es.entity;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data // Lombok 自动生成 getter/setter/toString 等方法
@Document(indexName = "product") // 声明 ES 索引名称为 product
public class Product {
@Id // 主键,对应 ES 文档的 _id
private Long id;
// 文本字段,指定分词器:索引时用 ik_max_word,搜索时用 ik_smart
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String name;
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String description;
@Field(type = FieldType.Double) // 数值类型存储
private BigDecimal price;
@Field(type = FieldType.Long)
private Long categoryId;
@Field(type = FieldType.Text, fielddata = true) // fielddata 用于排序聚合
private String categoryName;
@Field(type = FieldType.Text)
private String brand;
@Field(type = FieldType.Integer)
private Integer stock;
@Field(type = FieldType.Integer)
private Integer status;
// 日期类型,指定日期格式
@Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second_millis)
private LocalDateTime createTime;
@Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second_millis)
private LocalDateTime updateTime;
}2.4 Repository 接口
java
package com.example.es.repository;
import com.example.es.entity.Product;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface ProductRepository extends ElasticsearchRepository<Product, Long> {
// 根据名称模糊查询,方法命名规则自动生成查询
List<Product> findByNameContaining(String name);
// 根据分类ID查询
List<Product> findByCategoryId(Long categoryId);
// 根据状态查询
List<Product> findByStatus(Integer status);
// 根据品牌查询
List<Product> findByBrand(String brand);
}三、常见查询操作
3.1 全文检索
根据关键词在商品名称和描述中搜索。
java
package com.example.es.service;
import com.example.es.entity.Product;
import com.example.es.repository.ProductRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.stereotype.Service;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.MultiMatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor // Lombok 自动注入 final 依赖
public class ProductSearchService {
private final ProductRepository productRepository;
private final ElasticsearchTemplate elasticsearchTemplate;
/**
* 全文检索
* 在 name 和 description 字段中搜索关键词
*/
public List<Product> searchByKeyword(String keyword) {
// 构建查询:多字段匹配,name 权重 2 倍
Query query = NativeQuery.builder()
.withQuery(q -> q.bool(b -> b
.should(s -> s.multiMatch(mm -> mm
.query(keyword)
.fields("name^2", "description") // name 权重更高,^2 表示权重为 2
.fuzziness("AUTO") // 模糊匹配,容忍错别字
))
))
.build();
// 执行搜索,返回结果
SearchHits<Product> searchHits = elasticsearchTemplate.search(query, Product.class);
// 转换为 Product 列表
return searchHits.stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}
}3.2 分类筛选
按分类 ID 筛选商品。
java
/**
* 按分类查询
*/
public List<Product> findByCategory(Long categoryId) {
return productRepository.findByCategoryId(categoryId);
}
/**
* 按分类和状态筛选
* 使用 BoolQuery 构建复合查询
*/
public List<Product> findByCategoryAndStatus(Long categoryId, Integer status) {
Query query = NativeQuery.builder()
.withQuery(q -> q.bool(b -> b
.must(m -> m.term(t -> t.field("categoryId").value(categoryId))) // 精确匹配分类ID
.must(m -> m.term(t -> t.field("status").value(status))) // 精确匹配状态
))
.build();
SearchHits<Product> searchHits = elasticsearchTemplate.search(query, Product.class);
return searchHits.stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}3.3 价格范围查询
筛选指定价格区间的商品。
java
/**
* 价格范围查询
* 使用 range 查询指定价格区间
*/
public List<Product> findByPriceRange(Double minPrice, Double maxPrice) {
Query query = NativeQuery.builder()
.withQuery(q -> q.bool(b -> b
.must(m -> m.range(r -> r.field("price")
.gte(co.elastic.clients.json.JsonData.of(minPrice)) // 大于等于
.lte(co.elastic.clients.json.JsonData.of(maxPrice)) // 小于等于
))
))
.withSort(s -> s.field(f -> f.field("price").order(co.elastic.clients.elasticsearch._types.SortOrder.Asc))) // 按价格升序排序
.build();
SearchHits<Product> searchHits = elasticsearchTemplate.search(query, Product.class);
return searchHits.stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}3.4 组合查询
同时进行关键词搜索、分类筛选、价格范围过滤。
java
/**
* 组合查询:关键词 + 分类 + 价格范围
* 支持多个可选条件,灵活组合
*/
public List<Product> search(ProductSearchDTO searchDTO) {
// 构建布尔查询,组合多个条件
BoolQuery.Builder boolBuilder = new BoolQuery.Builder();
// 关键词搜索(可选)
if (searchDTO.getKeyword() != null && !searchDTO.getKeyword().isEmpty()) {
boolBuilder.must(m -> m.multiMatch(mm -> mm
.query(searchDTO.getKeyword())
.fields("name^3", "description") // name 权重 3 倍,提高相关性
.fuzziness("AUTO")
));
}
// 分类筛选(可选)
if (searchDTO.getCategoryId() != null) {
boolBuilder.must(m -> m.term(t -> t.field("categoryId").value(searchDTO.getCategoryId())));
}
// 品牌筛选(可选),使用 keyword 子字段进行精确匹配
if (searchDTO.getBrand() != null && !searchDTO.getBrand().isEmpty()) {
boolBuilder.must(m -> m.term(t -> t.field("brand.keyword").value(searchDTO.getBrand())));
}
// 价格范围(可选)
if (searchDTO.getMinPrice() != null || searchDTO.getMaxPrice() != null) {
boolBuilder.must(m -> m.range(r -> {
var rangeQuery = r.field("price");
if (searchDTO.getMinPrice() != null) {
rangeQuery.gte(co.elastic.clients.json.JsonData.of(searchDTO.getMinPrice()));
}
if (searchDTO.getMaxPrice() != null) {
rangeQuery.lte(co.elastic.clients.json.JsonData.of(searchDTO.getMaxPrice()));
}
return rangeQuery;
}));
}
// 状态筛选(默认上架 1)
Integer status = searchDTO.getStatus() != null ? searchDTO.getStatus() : 1;
boolBuilder.must(m -> m.term(t -> t.field("status").value(status)));
// 构建最终查询,添加分页参数
Query query = NativeQuery.builder()
.withQuery(q -> q.bool(boolBuilder.build()))
.withPageable(org.springframework.data.domain.PageRequest.of(
searchDTO.getPage() != null ? searchDTO.getPage() : 0,
searchDTO.getSize() != null ? searchDTO.getSize() : 10
))
.build();
SearchHits<Product> searchHits = elasticsearchTemplate.search(query, Product.class);
return searchHits.stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}3.5 搜索请求 DTO
java
package com.example.es.dto;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class ProductSearchDTO {
private String keyword; // 搜索关键词
private Long categoryId; // 分类ID
private String brand; // 品牌
private BigDecimal minPrice; // 最低价格
private BigDecimal maxPrice; // 最高价格
private Integer status; // 商品状态
private Integer page; // 页码
private Integer size; // 每页数量
}四、数据同步
使用 Canal 将 MySQL 数据实时同步到 ES。
4.1 Canal 工作原理
MySQL → Binlog → Canal Server → Canal Client → Elasticsearch4.2 Canal Server 配置
yaml
# canal-deployer/conf/example/instance.properties
canal.instance.gtidon=false
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=0
canal.instance.master.timestamp=
canal.instance.slaveId=1
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=product\\.product
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_manager
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal4.3 Canal Client 实现
Canal Client 负责监听 MySQL Binlog,将数据变更同步到 ES。
java
package com.example.es.sync;
import com.alibaba.otter.canal.clientCanalAdapter;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.example.es.entity.Product;
import com.example.es.repository.ProductRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class CanalSyncService {
private final ProductRepository productRepository;
/**
* 处理新增数据
* Canal 解析 INSERT 事件后调用
*/
public void insert(CanalEntry.RowData rowData) {
// 获取新增后的列数据
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
// 解析为 Product 实体
Product product = parseProduct(columns);
// 保存到 ES
productRepository.save(product);
log.info("[Canal] 新增商品: {}", product.getId());
}
/**
* 处理更新数据
* Canal 解析 UPDATE 事件后调用
*/
public void update(CanalEntry.RowData rowData) {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
Product product = parseProduct(columns);
productRepository.save(product);
log.info("[Canal] 更新商品: {}", product.getId());
}
/**
* 处理删除数据
* Canal 解析 DELETE 事件后调用
*/
public void delete(Long id) {
productRepository.deleteById(id);
log.info("[Canal] 删除商品: {}", id);
}
/**
* 解析 Canal 变更数据为 Product 实体
* 将列式数据转换为对象
*/
private Product parseProduct(List<CanalEntry.Column> columns) {
Product product = new Product();
for (CanalEntry.Column column : columns) {
// 根据列名映射到实体属性
switch (column.getName()) {
case "id":
product.setId(Long.parseLong(column.getValue()));
break;
case "name":
product.setName(column.getValue());
break;
case "description":
product.setDescription(column.getValue());
break;
case "price":
product.setPrice(new java.math.BigDecimal(column.getValue()));
break;
case "category_id":
product.setCategoryId(Long.parseLong(column.getValue()));
break;
case "category_name":
product.setCategoryName(column.getValue());
break;
case "brand":
product.setBrand(column.getValue());
break;
case "stock":
product.setStock(Integer.parseInt(column.getValue()));
break;
case "status":
product.setStatus(Integer.parseInt(column.getValue()));
break;
case "create_time":
product.setCreateTime(java.time.LocalDateTime.parse(column.getValue()));
break;
case "update_time":
product.setUpdateTime(java.time.LocalDateTime.parse(column.getValue()));
break;
}
}
return product;
}
}4.4 Spring Boot 启动类配置 Canal
Canal Client 连接到 Canal Server,持续监听数据变更。
java
package com.example.es;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.es.sync.CanalSyncService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
@Slf4j
@Component
@RequiredArgsConstructor
public class CanalClient {
private final CanalSyncService canalSyncService;
private CanalConnector connector;
/**
* 项目启动时连接 Canal Server
*/
@PostConstruct
public void connect() {
// 创建单节点连接,连接到 Canal Server
connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), // Canal Server 地址
"example", // destination 名称
"canal", // 用户名
"canal" // 密码
);
connector.connect();
connector.subscribe("product\\.product"); // 订阅 product 库的 product 表
connector.rollback();
log.info("[Canal] 连接到 Canal Server 成功");
}
/**
* 持续监听数据变更
* 循环获取消息并处理
*/
public void listen() {
while (true) {
// 获取消息,批量大小 100,超时时间 100ms
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
// 无消息时休眠后继续
if (batchId == 0 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
// 遍历处理每条变更记录
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
// 解析 Binlog 数据
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
log.error("[Canal] 解析数据失败", e);
continue;
}
// 获取事件类型:INSERT/UPDATE/DELETE
CanalEntry.EventType eventType = rowChange.getEventType();
// 处理每行数据
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
switch (eventType) {
case INSERT:
canalSyncService.insert(rowData);
break;
case UPDATE:
canalSyncService.update(rowData);
break;
case DELETE:
// 删除时从变更前数据获取 ID
Long id = Long.parseLong(rowData.getBeforeColumns(0).getValue());
canalSyncService.delete(id);
break;
}
}
}
}
// 确认消息处理成功
connector.ack(batchId);
}
}
/**
* 项目关闭时断开连接
*/
@PreDestroy
public void disconnect() {
if (connector != null) {
connector.disconnect();
log.info("[Canal] 断开连接");
}
}
}4.5 Maven 依赖
xml
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>五、注意事项
5.1 ES 索引设计
- 根据业务选择合适的分词器
- 数值类型用
long、integer、double,避免用text - 精确匹配字段添加
.keyword子字段 - 根据数据量设置
number_of_shards
5.2 查询优化
- 关键词搜索加权,提高 name 字段权重
- 使用
fuzziness实现模糊匹配 - 分页查询限制最大页数,避免深度分页
- 使用
filter缓存筛选条件
5.3 Canal 配置
- MySQL 需开启 Binlog
- 创建 Canal 专用数据库账号并授权
- Canal 监听正则使用
库名.表名格式 - 生产环境建议使用 Canal Cluster