<返回目录     Powered by claude/xia兄

第3课: 文档操作

文档CRUD操作

Elasticsearch中的文档操作是最基础也是最常用的功能。文档以JSON格式存储,每个文档都有唯一的_id标识。

1. 创建文档(Create)

# 方式1: 使用PUT指定ID创建
PUT /products/_doc/1
{
  "name": "iPhone 15 Pro",
  "brand": "Apple",
  "price": 7999,
  "category": "手机",
  "stock": 100,
  "tags": ["5G", "A17芯片", "钛金属"],
  "release_date": "2023-09-22"
}

# 方式2: 使用POST自动生成ID
POST /products/_doc
{
  "name": "MacBook Pro 16",
  "brand": "Apple",
  "price": 19999,
  "category": "笔记本"
}

# 响应:
{
  "_index": "products",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}

2. 读取文档(Read)

# 根据ID获取文档
GET /products/_doc/1

# 响应:
{
  "_index": "products",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "name": "iPhone 15 Pro",
    "brand": "Apple",
    "price": 7999
  }
}

# 只获取_source字段
GET /products/_source/1

# 获取指定字段
GET /products/_doc/1?_source=name,price

# 批量获取文档
GET /products/_mget
{
  "ids": ["1", "2", "3"]
}

3. 更新文档(Update)

# 方式1: 完整替换(PUT)
PUT /products/_doc/1
{
  "name": "iPhone 15 Pro Max",
  "brand": "Apple",
  "price": 9999,
  "category": "手机"
}

# 方式2: 部分更新(POST _update)
POST /products/_update/1
{
  "doc": {
    "price": 7499,
    "stock": 80
  }
}

# 方式3: 使用脚本更新
POST /products/_update/1
{
  "script": {
    "source": "ctx._source.stock -= params.count",
    "params": {
      "count": 10
    }
  }
}

# 方式4: upsert(不存在则创建)
POST /products/_update/10
{
  "doc": {
    "name": "iPad Pro",
    "price": 6799
  },
  "doc_as_upsert": true
}

4. 删除文档(Delete)

# 根据ID删除
DELETE /products/_doc/1

# 响应:
{
  "_index": "products",
  "_id": "1",
  "_version": 2,
  "result": "deleted",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}

# 根据查询条件删除
POST /products/_delete_by_query
{
  "query": {
    "range": {
      "price": {
        "lt": 1000
      }
    }
  }
}

批量操作(Bulk API)

Bulk API允许在单个请求中执行多个索引、更新或删除操作,大幅提升性能。

Bulk API语法

# 批量操作格式
POST /_bulk
{"action": {"metadata"}}
{"document"}
{"action": {"metadata"}}
{"document"}

# 实际示例:混合操作
POST /_bulk
{"index":{"_index":"products","_id":"1"}}
{"name":"iPhone 15","price":5999,"brand":"Apple"}
{"create":{"_index":"products","_id":"2"}}
{"name":"Samsung S24","price":5499,"brand":"Samsung"}
{"update":{"_index":"products","_id":"1"}}
{"doc":{"price":5799}}
{"delete":{"_index":"products","_id":"3"}}

# 响应:
{
  "took": 30,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "products",
        "_id": "1",
        "_version": 1,
        "result": "created",
        "status": 201
      }
    },
    {
      "create": {
        "_index": "products",
        "_id": "2",
        "_version": 1,
        "result": "created",
        "status": 201
      }
    }
  ]
}

批量索引示例

# 批量添加商品数据
POST /products/_bulk
{"index":{"_id":"1"}}
{"name":"iPhone 15 Pro","brand":"Apple","price":7999,"category":"手机"}
{"index":{"_id":"2"}}
{"name":"MacBook Pro","brand":"Apple","price":19999,"category":"笔记本"}
{"index":{"_id":"3"}}
{"name":"iPad Air","brand":"Apple","price":4599,"category":"平板"}
{"index":{"_id":"4"}}
{"name":"AirPods Pro","brand":"Apple","price":1899,"category":"耳机"}
{"index":{"_id":"5"}}
{"name":"Apple Watch","brand":"Apple","price":3199,"category":"手表"}

版本控制

Elasticsearch使用乐观锁机制进行并发控制,通过_version和_seq_no字段实现。

使用版本号控制

# 查看当前版本
GET /products/_doc/1

# 响应包含版本信息
{
  "_version": 3,
  "_seq_no": 5,
  "_primary_term": 1
}

# 基于版本号更新(if_seq_no + if_primary_term)
PUT /products/_doc/1?if_seq_no=5&if_primary_term=1
{
  "name": "iPhone 15 Pro",
  "price": 7499
}

# 如果版本不匹配,返回409冲突错误
{
  "error": {
    "type": "version_conflict_engine_exception",
    "reason": "[1]: version conflict"
  },
  "status": 409
}

路由机制

Elasticsearch使用路由算法决定文档存储在哪个分片上。

默认路由

# 默认路由公式
shard_num = hash(_routing) % num_primary_shards

# _routing默认值为文档的_id

# 自定义路由
PUT /products/_doc/1?routing=user123
{
  "name": "iPhone 15",
  "user_id": "user123"
}

# 查询时也需要指定相同的routing
GET /products/_doc/1?routing=user123

文档刷新策略

Elasticsearch的近实时特性由refresh机制实现,默认每秒刷新一次。

Refresh参数

# refresh=true: 立即刷新,使文档可搜索(影响性能)
PUT /products/_doc/1?refresh=true
{
  "name": "iPhone 15"
}

# refresh=wait_for: 等待下次刷新完成后返回
PUT /products/_doc/2?refresh=wait_for
{
  "name": "MacBook Pro"
}

# refresh=false: 默认值,不等待刷新
PUT /products/_doc/3?refresh=false
{
  "name": "iPad Air"
}

# 手动刷新索引
POST /products/_refresh

实战场景

场景1: 电商库存扣减

# 使用脚本原子性扣减库存
POST /products/_update/1
{
  "script": {
    "source": """
      if (ctx._source.stock >= params.quantity) {
        ctx._source.stock -= params.quantity;
        ctx._source.sales = (ctx._source.sales ?: 0) + params.quantity;
      } else {
        ctx.op = 'none';
      }
    """,
    "params": {
      "quantity": 5
    }
  }
}

场景2: 用户行为日志批量写入

# 批量写入用户行为日志
POST /user_logs/_bulk
{"index":{}}
{"user_id":"u001","action":"view","product_id":"p001","timestamp":"2024-01-15T10:30:00"}
{"index":{}}
{"user_id":"u001","action":"click","product_id":"p001","timestamp":"2024-01-15T10:31:00"}
{"index":{}}
{"user_id":"u002","action":"purchase","product_id":"p002","timestamp":"2024-01-15T10:32:00"}
性能优化建议:

练习题

1. 基础CRUD操作

2. 批量操作练习

3. 脚本更新

4. 版本控制

5. 高级操作

6. 实战场景

7. 性能测试

8. 错误处理

常见问题

Q: PUT和POST创建文档有什么区别?

A: PUT必须指定文档ID,如果ID已存在则会覆盖;POST可以自动生成ID,或者用于_update操作。PUT是幂等的,POST不是。

Q: 为什么bulk操作比单条操作快?

A: Bulk减少了网络往返次数,批量处理减少了索引刷新次数,降低了每个操作的开销。但单批数据不宜过大,建议5-15MB。

Q: 什么时候使用refresh=true?

A: 仅在测试或需要立即可见的场景使用,如实时性要求极高的场景。生产环境应避免使用,因为会严重影响写入性能。

Q: 如何处理并发更新冲突?

A: 使用乐观锁机制,通过if_seq_no和if_primary_term参数。如果更新失败,重新获取最新版本号后重试。