Skip to content

Elasticsearch 搜索引擎实战

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,常用于全文检索、日志分析等场景。

一、数据库表设计

以商品搜索为例,设计商品表和 ES 索引映射。

1.1 MySQL 商品表

sql
CREATE TABLE `product` (
  `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '商品ID',
  `name` VARCHAR(200) NOT NULL COMMENT '商品名称',
  `description` VARCHAR(2000) DEFAULT NULL COMMENT '商品描述',
  `price` DECIMAL(10,2) NOT NULL COMMENT '商品价格',
  `category_id` BIGINT NOT NULL COMMENT '分类ID',
  `category_name` VARCHAR(50) NOT NULL COMMENT '分类名称',
  `brand` VARCHAR(50) DEFAULT NULL COMMENT '品牌',
  `stock` INT NOT NULL DEFAULT 0 COMMENT '库存',
  `status` TINYINT NOT NULL DEFAULT 1 COMMENT '状态:0-下架,1-上架',
  `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_category_id` (`category_id`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品表';

1.2 ES 索引映射

json
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "analysis": {
      "analyzer": {
        "ik_smart_analyzer": {
          "type": "custom",
          "tokenizer": "ik_smart"
        },
        "ik_max_word_analyzer": {
          "type": "custom",
          "tokenizer": "ik_max_word"
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id": { "type": "long" },
      "name": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart",
        "fields": {
          "keyword": { "type": "keyword" }
        }
      },
      "description": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "price": { "type": "double" },
      "categoryId": { "type": "long" },
      "categoryName": {
        "type": "text",
        "fields": { "keyword": { "type": "keyword" } }
      },
      "brand": {
        "type": "text",
        "fields": { "keyword": { "type": "keyword" } }
      },
      "stock": { "type": "integer" },
      "status": { "type": "integer" },
      "createTime": { "type": "date" },
      "updateTime": { "type": "date" }
    }
  }
}

1.3 IK 分词器说明

分词器用途
ik_max_word细粒度分词,搜索时匹配更多词条
ik_smart粗粒度分词,提高搜索精度

二、Spring Data Elasticsearch

2.1 Maven 依赖

xml
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

2.2 配置文件

yaml
spring:
  elasticsearch:
    uris: http://localhost:9200
    connection-timeout: 5s
    socket-timeout: 30s

2.3 实体类

java
package com.example.es.entity;

import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.math.BigDecimal;
import java.time.LocalDateTime;

@Data  // Lombok 自动生成 getter/setter/toString 等方法
@Document(indexName = "product")  // 声明 ES 索引名称为 product
public class Product {

  @Id  // 主键,对应 ES 文档的 _id
  private Long id;

  // 文本字段,指定分词器:索引时用 ik_max_word,搜索时用 ik_smart
  @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
  private String name;

  @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
  private String description;

  @Field(type = FieldType.Double)  // 数值类型存储
  private BigDecimal price;

  @Field(type = FieldType.Long)
  private Long categoryId;

  @Field(type = FieldType.Text, fielddata = true)  // fielddata 用于排序聚合
  private String categoryName;

  @Field(type = FieldType.Text)
  private String brand;

  @Field(type = FieldType.Integer)
  private Integer stock;

  @Field(type = FieldType.Integer)
  private Integer status;

  // 日期类型,指定日期格式
  @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second_millis)
  private LocalDateTime createTime;

  @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second_millis)
  private LocalDateTime updateTime;
}

2.4 Repository 接口

java
package com.example.es.repository;

import com.example.es.entity.Product;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
import java.util.List;

@Repository
public interface ProductRepository extends ElasticsearchRepository<Product, Long> {

  // 根据名称模糊查询,方法命名规则自动生成查询
  List<Product> findByNameContaining(String name);

  // 根据分类ID查询
  List<Product> findByCategoryId(Long categoryId);

  // 根据状态查询
  List<Product> findByStatus(Integer status);

  // 根据品牌查询
  List<Product> findByBrand(String brand);
}

三、常见查询操作

3.1 全文检索

根据关键词在商品名称和描述中搜索。

java
package com.example.es.service;

import com.example.es.entity.Product;
import com.example.es.repository.ProductRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.stereotype.Service;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.MultiMatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

@Service
@RequiredArgsConstructor  // Lombok 自动注入 final 依赖
public class ProductSearchService {

  private final ProductRepository productRepository;
  private final ElasticsearchTemplate elasticsearchTemplate;

  /**
   * 全文检索
   * 在 name 和 description 字段中搜索关键词
   */
  public List<Product> searchByKeyword(String keyword) {
    // 构建查询:多字段匹配,name 权重 2 倍
    Query query = NativeQuery.builder()
      .withQuery(q -> q.bool(b -> b
        .should(s -> s.multiMatch(mm -> mm
          .query(keyword)
          .fields("name^2", "description")  // name 权重更高,^2 表示权重为 2
          .fuzziness("AUTO")                // 模糊匹配,容忍错别字
        ))
      ))
      .build();

    // 执行搜索,返回结果
    SearchHits<Product> searchHits = elasticsearchTemplate.search(query, Product.class);
    // 转换为 Product 列表
    return searchHits.stream()
      .map(SearchHit::getContent)
      .collect(Collectors.toList());
  }
}

3.2 分类筛选

按分类 ID 筛选商品。

java
/**
 * 按分类查询
 */
public List<Product> findByCategory(Long categoryId) {
  return productRepository.findByCategoryId(categoryId);
}

/**
 * 按分类和状态筛选
 * 使用 BoolQuery 构建复合查询
 */
public List<Product> findByCategoryAndStatus(Long categoryId, Integer status) {
  Query query = NativeQuery.builder()
    .withQuery(q -> q.bool(b -> b
      .must(m -> m.term(t -> t.field("categoryId").value(categoryId)))  // 精确匹配分类ID
      .must(m -> m.term(t -> t.field("status").value(status)))          // 精确匹配状态
    ))
    .build();

  SearchHits<Product> searchHits = elasticsearchTemplate.search(query, Product.class);
  return searchHits.stream()
    .map(SearchHit::getContent)
    .collect(Collectors.toList());
}

3.3 价格范围查询

筛选指定价格区间的商品。

java
/**
 * 价格范围查询
 * 使用 range 查询指定价格区间
 */
public List<Product> findByPriceRange(Double minPrice, Double maxPrice) {
  Query query = NativeQuery.builder()
    .withQuery(q -> q.bool(b -> b
      .must(m -> m.range(r -> r.field("price")
        .gte(co.elastic.clients.json.JsonData.of(minPrice))  // 大于等于
        .lte(co.elastic.clients.json.JsonData.of(maxPrice))  // 小于等于
      ))
    ))
    .withSort(s -> s.field(f -> f.field("price").order(co.elastic.clients.elasticsearch._types.SortOrder.Asc)))  // 按价格升序排序
    .build();

  SearchHits<Product> searchHits = elasticsearchTemplate.search(query, Product.class);
  return searchHits.stream()
    .map(SearchHit::getContent)
    .collect(Collectors.toList());
}

3.4 组合查询

同时进行关键词搜索、分类筛选、价格范围过滤。

java
/**
 * 组合查询:关键词 + 分类 + 价格范围
 * 支持多个可选条件,灵活组合
 */
public List<Product> search(ProductSearchDTO searchDTO) {
  // 构建布尔查询,组合多个条件
  BoolQuery.Builder boolBuilder = new BoolQuery.Builder();

  // 关键词搜索(可选)
  if (searchDTO.getKeyword() != null && !searchDTO.getKeyword().isEmpty()) {
    boolBuilder.must(m -> m.multiMatch(mm -> mm
      .query(searchDTO.getKeyword())
      .fields("name^3", "description")  // name 权重 3 倍,提高相关性
      .fuzziness("AUTO")
    ));
  }

  // 分类筛选(可选)
  if (searchDTO.getCategoryId() != null) {
    boolBuilder.must(m -> m.term(t -> t.field("categoryId").value(searchDTO.getCategoryId())));
  }

  // 品牌筛选(可选),使用 keyword 子字段进行精确匹配
  if (searchDTO.getBrand() != null && !searchDTO.getBrand().isEmpty()) {
    boolBuilder.must(m -> m.term(t -> t.field("brand.keyword").value(searchDTO.getBrand())));
  }

  // 价格范围(可选)
  if (searchDTO.getMinPrice() != null || searchDTO.getMaxPrice() != null) {
    boolBuilder.must(m -> m.range(r -> {
      var rangeQuery = r.field("price");
      if (searchDTO.getMinPrice() != null) {
        rangeQuery.gte(co.elastic.clients.json.JsonData.of(searchDTO.getMinPrice()));
      }
      if (searchDTO.getMaxPrice() != null) {
        rangeQuery.lte(co.elastic.clients.json.JsonData.of(searchDTO.getMaxPrice()));
      }
      return rangeQuery;
    }));
  }

  // 状态筛选(默认上架 1)
  Integer status = searchDTO.getStatus() != null ? searchDTO.getStatus() : 1;
  boolBuilder.must(m -> m.term(t -> t.field("status").value(status)));

  // 构建最终查询,添加分页参数
  Query query = NativeQuery.builder()
    .withQuery(q -> q.bool(boolBuilder.build()))
    .withPageable(org.springframework.data.domain.PageRequest.of(
      searchDTO.getPage() != null ? searchDTO.getPage() : 0,
      searchDTO.getSize() != null ? searchDTO.getSize() : 10
    ))
    .build();

  SearchHits<Product> searchHits = elasticsearchTemplate.search(query, Product.class);
  return searchHits.stream()
    .map(SearchHit::getContent)
    .collect(Collectors.toList());
}

3.5 搜索请求 DTO

java
package com.example.es.dto;

import lombok.Data;
import java.math.BigDecimal;

@Data
public class ProductSearchDTO {
  private String keyword;        // 搜索关键词
  private Long categoryId;       // 分类ID
  private String brand;          // 品牌
  private BigDecimal minPrice;   // 最低价格
  private BigDecimal maxPrice;   // 最高价格
  private Integer status;        // 商品状态
  private Integer page;          // 页码
  private Integer size;          // 每页数量
}

四、数据同步

使用 Canal 将 MySQL 数据实时同步到 ES。

4.1 Canal 工作原理

MySQL → Binlog → Canal Server → Canal Client → Elasticsearch

4.2 Canal Server 配置

yaml
# canal-deployer/conf/example/instance.properties
canal.instance.gtidon=false
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=0
canal.instance.master.timestamp=
canal.instance.slaveId=1
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=product\\.product
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_manager
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal

4.3 Canal Client 实现

Canal Client 负责监听 MySQL Binlog,将数据变更同步到 ES。

java
package com.example.es.sync;

import com.alibaba.otter.canal.clientCanalAdapter;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.example.es.entity.Product;
import com.example.es.repository.ProductRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;

@Slf4j
@Component
@RequiredArgsConstructor
public class CanalSyncService {

  private final ProductRepository productRepository;

  /**
   * 处理新增数据
   * Canal 解析 INSERT 事件后调用
   */
  public void insert(CanalEntry.RowData rowData) {
    // 获取新增后的列数据
    List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
    // 解析为 Product 实体
    Product product = parseProduct(columns);
    // 保存到 ES
    productRepository.save(product);
    log.info("[Canal] 新增商品: {}", product.getId());
  }

  /**
   * 处理更新数据
   * Canal 解析 UPDATE 事件后调用
   */
  public void update(CanalEntry.RowData rowData) {
    List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
    Product product = parseProduct(columns);
    productRepository.save(product);
    log.info("[Canal] 更新商品: {}", product.getId());
  }

  /**
   * 处理删除数据
   * Canal 解析 DELETE 事件后调用
   */
  public void delete(Long id) {
    productRepository.deleteById(id);
    log.info("[Canal] 删除商品: {}", id);
  }

  /**
   * 解析 Canal 变更数据为 Product 实体
   * 将列式数据转换为对象
   */
  private Product parseProduct(List<CanalEntry.Column> columns) {
    Product product = new Product();
    for (CanalEntry.Column column : columns) {
      // 根据列名映射到实体属性
      switch (column.getName()) {
        case "id":
          product.setId(Long.parseLong(column.getValue()));
          break;
        case "name":
          product.setName(column.getValue());
          break;
        case "description":
          product.setDescription(column.getValue());
          break;
        case "price":
          product.setPrice(new java.math.BigDecimal(column.getValue()));
          break;
        case "category_id":
          product.setCategoryId(Long.parseLong(column.getValue()));
          break;
        case "category_name":
          product.setCategoryName(column.getValue());
          break;
        case "brand":
          product.setBrand(column.getValue());
          break;
        case "stock":
          product.setStock(Integer.parseInt(column.getValue()));
          break;
        case "status":
          product.setStatus(Integer.parseInt(column.getValue()));
          break;
        case "create_time":
          product.setCreateTime(java.time.LocalDateTime.parse(column.getValue()));
          break;
        case "update_time":
          product.setUpdateTime(java.time.LocalDateTime.parse(column.getValue()));
          break;
      }
    }
    return product;
  }
}

4.4 Spring Boot 启动类配置 Canal

Canal Client 连接到 Canal Server,持续监听数据变更。

java
package com.example.es;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.es.sync.CanalSyncService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;

@Slf4j
@Component
@RequiredArgsConstructor
public class CanalClient {

  private final CanalSyncService canalSyncService;
  private CanalConnector connector;

  /**
   * 项目启动时连接 Canal Server
   */
  @PostConstruct
  public void connect() {
    // 创建单节点连接,连接到 Canal Server
    connector = CanalConnectors.newSingleConnector(
      new InetSocketAddress("127.0.0.1", 11111),  // Canal Server 地址
      "example",                                   // destination 名称
      "canal",                                     // 用户名
      "canal"                                      // 密码
    );
    connector.connect();
    connector.subscribe("product\\.product");  // 订阅 product 库的 product 表
    connector.rollback();
    log.info("[Canal] 连接到 Canal Server 成功");
  }

  /**
   * 持续监听数据变更
   * 循环获取消息并处理
   */
  public void listen() {
    while (true) {
      // 获取消息,批量大小 100,超时时间 100ms
      Message message = connector.getWithoutAck(100);
      long batchId = message.getId();
      int size = message.getEntries().size();

      // 无消息时休眠后继续
      if (batchId == 0 || size == 0) {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        continue;
      }

      // 遍历处理每条变更记录
      for (CanalEntry.Entry entry : message.getEntries()) {
        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
          // 解析 Binlog 数据
          CanalEntry.RowChange rowChange;
          try {
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
          } catch (Exception e) {
            log.error("[Canal] 解析数据失败", e);
            continue;
          }

          // 获取事件类型:INSERT/UPDATE/DELETE
          CanalEntry.EventType eventType = rowChange.getEventType();
          // 处理每行数据
          for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            switch (eventType) {
              case INSERT:
                canalSyncService.insert(rowData);
                break;
              case UPDATE:
                canalSyncService.update(rowData);
                break;
              case DELETE:
                // 删除时从变更前数据获取 ID
                Long id = Long.parseLong(rowData.getBeforeColumns(0).getValue());
                canalSyncService.delete(id);
                break;
            }
          }
        }
      }

      // 确认消息处理成功
      connector.ack(batchId);
    }
  }

  /**
   * 项目关闭时断开连接
   */
  @PreDestroy
  public void disconnect() {
    if (connector != null) {
      connector.disconnect();
      log.info("[Canal] 断开连接");
    }
  }
}

4.5 Maven 依赖

xml
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.7</version>
</dependency>

五、注意事项

5.1 ES 索引设计

  • 根据业务选择合适的分词器
  • 数值类型用 longintegerdouble,避免用 text
  • 精确匹配字段添加 .keyword 子字段
  • 根据数据量设置 number_of_shards

5.2 查询优化

  • 关键词搜索加权,提高 name 字段权重
  • 使用 fuzziness 实现模糊匹配
  • 分页查询限制最大页数,避免深度分页
  • 使用 filter 缓存筛选条件

5.3 Canal 配置

  • MySQL 需开启 Binlog
  • 创建 Canal 专用数据库账号并授权
  • Canal 监听正则使用 库名.表名 格式
  • 生产环境建议使用 Canal Cluster