Elasticsearch中的文档操作是最基础也是最常用的功能。文档以JSON格式存储,每个文档都有唯一的_id标识。
# 方式1: 使用PUT指定ID创建
PUT /products/_doc/1
{
"name": "iPhone 15 Pro",
"brand": "Apple",
"price": 7999,
"category": "手机",
"stock": 100,
"tags": ["5G", "A17芯片", "钛金属"],
"release_date": "2023-09-22"
}
# 方式2: 使用POST自动生成ID
POST /products/_doc
{
"name": "MacBook Pro 16",
"brand": "Apple",
"price": 19999,
"category": "笔记本"
}
# 响应:
{
"_index": "products",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}
# 根据ID获取文档
GET /products/_doc/1
# 响应:
{
"_index": "products",
"_id": "1",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"name": "iPhone 15 Pro",
"brand": "Apple",
"price": 7999
}
}
# 只获取_source字段
GET /products/_source/1
# 获取指定字段
GET /products/_doc/1?_source=name,price
# 批量获取文档
GET /products/_mget
{
"ids": ["1", "2", "3"]
}
# 方式1: 完整替换(PUT)
PUT /products/_doc/1
{
"name": "iPhone 15 Pro Max",
"brand": "Apple",
"price": 9999,
"category": "手机"
}
# 方式2: 部分更新(POST _update)
POST /products/_update/1
{
"doc": {
"price": 7499,
"stock": 80
}
}
# 方式3: 使用脚本更新
POST /products/_update/1
{
"script": {
"source": "ctx._source.stock -= params.count",
"params": {
"count": 10
}
}
}
# 方式4: upsert(不存在则创建)
POST /products/_update/10
{
"doc": {
"name": "iPad Pro",
"price": 6799
},
"doc_as_upsert": true
}
# 根据ID删除
DELETE /products/_doc/1
# 响应:
{
"_index": "products",
"_id": "1",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}
# 根据查询条件删除
POST /products/_delete_by_query
{
"query": {
"range": {
"price": {
"lt": 1000
}
}
}
}
Bulk API允许在单个请求中执行多个索引、更新或删除操作,大幅提升性能。
# 批量操作格式
POST /_bulk
{"action": {"metadata"}}
{"document"}
{"action": {"metadata"}}
{"document"}
# 实际示例:混合操作
POST /_bulk
{"index":{"_index":"products","_id":"1"}}
{"name":"iPhone 15","price":5999,"brand":"Apple"}
{"create":{"_index":"products","_id":"2"}}
{"name":"Samsung S24","price":5499,"brand":"Samsung"}
{"update":{"_index":"products","_id":"1"}}
{"doc":{"price":5799}}
{"delete":{"_index":"products","_id":"3"}}
# 响应:
{
"took": 30,
"errors": false,
"items": [
{
"index": {
"_index": "products",
"_id": "1",
"_version": 1,
"result": "created",
"status": 201
}
},
{
"create": {
"_index": "products",
"_id": "2",
"_version": 1,
"result": "created",
"status": 201
}
}
]
}
# 批量添加商品数据
POST /products/_bulk
{"index":{"_id":"1"}}
{"name":"iPhone 15 Pro","brand":"Apple","price":7999,"category":"手机"}
{"index":{"_id":"2"}}
{"name":"MacBook Pro","brand":"Apple","price":19999,"category":"笔记本"}
{"index":{"_id":"3"}}
{"name":"iPad Air","brand":"Apple","price":4599,"category":"平板"}
{"index":{"_id":"4"}}
{"name":"AirPods Pro","brand":"Apple","price":1899,"category":"耳机"}
{"index":{"_id":"5"}}
{"name":"Apple Watch","brand":"Apple","price":3199,"category":"手表"}
Elasticsearch使用乐观锁机制进行并发控制,通过_version和_seq_no字段实现。
# 查看当前版本
GET /products/_doc/1
# 响应包含版本信息
{
"_version": 3,
"_seq_no": 5,
"_primary_term": 1
}
# 基于版本号更新(if_seq_no + if_primary_term)
PUT /products/_doc/1?if_seq_no=5&if_primary_term=1
{
"name": "iPhone 15 Pro",
"price": 7499
}
# 如果版本不匹配,返回409冲突错误
{
"error": {
"type": "version_conflict_engine_exception",
"reason": "[1]: version conflict"
},
"status": 409
}
Elasticsearch使用路由算法决定文档存储在哪个分片上。
# 默认路由公式
shard_num = hash(_routing) % num_primary_shards
# _routing默认值为文档的_id
# 自定义路由
PUT /products/_doc/1?routing=user123
{
"name": "iPhone 15",
"user_id": "user123"
}
# 查询时也需要指定相同的routing
GET /products/_doc/1?routing=user123
Elasticsearch的近实时特性由refresh机制实现,默认每秒刷新一次。
# refresh=true: 立即刷新,使文档可搜索(影响性能)
PUT /products/_doc/1?refresh=true
{
"name": "iPhone 15"
}
# refresh=wait_for: 等待下次刷新完成后返回
PUT /products/_doc/2?refresh=wait_for
{
"name": "MacBook Pro"
}
# refresh=false: 默认值,不等待刷新
PUT /products/_doc/3?refresh=false
{
"name": "iPad Air"
}
# 手动刷新索引
POST /products/_refresh
# 使用脚本原子性扣减库存
POST /products/_update/1
{
"script": {
"source": """
if (ctx._source.stock >= params.quantity) {
ctx._source.stock -= params.quantity;
ctx._source.sales = (ctx._source.sales ?: 0) + params.quantity;
} else {
ctx.op = 'none';
}
""",
"params": {
"quantity": 5
}
}
}
# 批量写入用户行为日志
POST /user_logs/_bulk
{"index":{}}
{"user_id":"u001","action":"view","product_id":"p001","timestamp":"2024-01-15T10:30:00"}
{"index":{}}
{"user_id":"u001","action":"click","product_id":"p001","timestamp":"2024-01-15T10:31:00"}
{"index":{}}
{"user_id":"u002","action":"purchase","product_id":"p002","timestamp":"2024-01-15T10:32:00"}
1. 基础CRUD操作
2. 批量操作练习
3. 脚本更新
4. 版本控制
5. 高级操作
6. 实战场景
7. 性能测试
8. 错误处理
Q: PUT和POST创建文档有什么区别?
A: PUT必须指定文档ID,如果ID已存在则会覆盖;POST可以自动生成ID,或者用于_update操作。PUT是幂等的,POST不是。
Q: 为什么bulk操作比单条操作快?
A: Bulk减少了网络往返次数,批量处理减少了索引刷新次数,降低了每个操作的开销。但单批数据不宜过大,建议5-15MB。
Q: 什么时候使用refresh=true?
A: 仅在测试或需要立即可见的场景使用,如实时性要求极高的场景。生产环境应避免使用,因为会严重影响写入性能。
Q: 如何处理并发更新冲突?
A: 使用乐观锁机制,通过if_seq_no和if_primary_term参数。如果更新失败,重新获取最新版本号后重试。