基础19 ElasticSearch 上机动手实战演练bulk批量增删改

54人阅读 评论(0) 收藏 举报
分类:

基础19 ElasticSearch 上机动手实战演练bulk批量增删改

概述

1、bulk语法

POST /_bulk
{ "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }} 
{ "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}
{ "test_field":    "test12" }
{ "index":  { "_index": "test_index", "_type": "test_type", "_id": "2" }}
{ "test_field":    "replaced test2" }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

每一个操作要两个json串,语法如下:

{"action": {"metadata"}}
{"data"}

举例,比如你现在要创建一个文档,放bulk里面,看起来会是这样子的:

{"index": {"_index": "test_index", "_type", "test_type", "_id": "1"}}
{"test_field1": "test1", "test_field2": "test2"}

有哪些类型的操作可以执行

  • 1)delete:删除一个文档,只要1个json串就可以了
  • 2)create:PUT /index/type/id/_create,强制创建
  • 3)index:普通的put操作,可以是创建文档,也可以是全量替换文档
  • 4)update:执行的partial update操作

bulk api对json的语法,有严格的要求,每个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行

{
  "error": {
    "root_cause": [
      {
        "type": "json_e_o_f_exception",
        "reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 3]"
      }
    ],
    "type": "json_e_o_f_exception",
    "reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 3]"
  },
  "status": 500
}

{
  "took": 41,
  "errors": true,
  "items": [
    {
      "delete": {
        "found": true,
        "_index": "test_index",
        "_type": "test_type",
        "_id": "10",
        "_version": 3,
        "result": "deleted",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    },
    {
      "create": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "3",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": true,
        "status": 201
      }
    },
    {
      "create": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "2",
        "status": 409,
        "error": {
          "type": "version_conflict_engine_exception",
          "reason": "[test_type][2]: version conflict, document already exists (current version [1])",
          "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
          "shard": "2",
          "index": "test_index"
        }
      }
    },
    {
      "index": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "4",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": true,
        "status": 201
      }
    },
    {
      "index": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "2",
        "_version": 2,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": false,
        "status": 200
      }
    },
    {
      "update": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "1",
        "_version": 3,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    }
  ]
}

bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志

POST /test_index/_bulk
{ "delete": { "_type": "test_type", "_id": "3" }} 
{ "create": { "_type": "test_type", "_id": "12" }}
{ "test_field":    "test12" }
{ "index":  { "_type": "test_type" }}
{ "test_field":    "auto-generate id test" }
{ "index":  { "_type": "test_type", "_id": "2" }}
{ "test_field":    "replaced test2" }
{ "update": { "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
POST /test_index/test_type/_bulk
{ "delete": { "_id": "3" }} 
{ "create": { "_id": "12" }}
{ "test_field":    "test12" }
{ "index":  { }}
{ "test_field":    "auto-generate id test" }
{ "index":  { "_id": "2" }}
{ "test_field":    "replaced test2" }
{ "update": { "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

2、bulk size最佳大小

bulk request会加载到内存里,如果太大的话,性能反而会下降,因此需要反复尝试一个最佳的bulk size。一般从10005000条数据开始,尝试逐渐增加。另外,如果看大小的话,最好是在515MB之间。

3、bulk api的奇特json格式与底层性能优化关系

bulk api奇特的json格式

{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n

为什么不使用:

[{
  "action": {
 
  },
  "data": {

  }
}]
  • 1、bulk中的每个操作都可能要转发到不同的node的shard去执行

  • 2、如果采用比较良好的json数组格式

允许任意的换行,整个可读性非常棒,读起来很爽,es拿到那种标准格式的json串以后,要按照下述流程去进行处理

(1)将json数组解析为JSONArray对象,这个时候,整个数据,就会在内存中出现一份一模一样的拷贝,一份数据是json文本,一份数据是JSONArray对象
(2)解析json数组里的每个json,对每个请求中的document进行路由
(3)为路由到同一个shard上的多个请求,创建一个请求数组
(4)将这个请求数组序列化
(5)将序列化后的请求数组发送到对应的节点上去
  • 3、耗费更多内存,更多的jvm gc开销

我们之前提到过bulk size最佳大小的那个问题,一般建议说在几千条那样,然后大小在10MB左右,所以说,可怕的事情来了。假设说现在100个bulk请求发送到了一个节点上去,然后每个请求是10MB,100个请求,就是1000MB = 1GB,然后每个请求的json都copy一份为jsonarray对象,此时内存中的占用就会翻倍,就会占用2GB的内存,甚至还不止。因为弄成jsonarray之后,还可能会多搞一些其他的数据结构,2GB+的内存占用。

占用更多的内存可能就会积压其他请求的内存使用量,比如说最重要的搜索请求,分析请求,等等,此时就可能会导致其他请求的性能急速下降 另外的话,占用内存更多,就会导致java虚拟机的垃圾回收次数更多,跟频繁,每次要回收的垃圾对象更多,耗费的时间更多,导致es的java虚拟机停止工作线程的时间更多

  • 4、现在的奇特格式
{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n
(1)不用将其转换为json对象,不会出现内存中的相同数据的拷贝,直接按照换行符切割json
(2)对每两个一组的json,读取meta,进行document路由
(3)直接将对应的json发送到node上去
  • 5、最大的优势在于,不需要将json数组解析为一个JSONArray对象,形成一份大数据的拷贝,浪费内存空间,尽可能地保证性能
查看评论

ElasticSearch实战 (二)CRUD以及bulk批量操作 api

生活中把事情做好,做好事情态度和思想认识很重要,生活就变得舒适,也会达到自己的目标。         在对学习es api如何使用之前,我们可以先想一下es使用的数据传输协议和格式是怎样的,为什么会...
  • lilongsheng1125
  • lilongsheng1125
  • 2016-12-18 17:06:57
  • 3169

elasticSearch批量操作bulk

bulk API可以帮助我们同时完成执行多个请求,比如:create,index, update以及delete。当你在处理类似于log等海量数据的时候,你就可以一下处理成百上千的请求,这个操作将会...
  • pfm84
  • pfm84
  • 2015-08-31 11:55:09
  • 4746

elasticsearch之批量提交Bulk

elasticsearch之批量提交Bulk
  • gaojingsong
  • gaojingsong
  • 2017-05-11 21:34:45
  • 1427

ElasticSearch中批量导入数据: _bulk命令

在使用Elasticsearch的时候,bulk批量导入
  • bsh_csn
  • bsh_csn
  • 2016-12-28 09:33:14
  • 1298

Elasticsearch bulk批量增删改

1、bulk语法 POST /_bulk { "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }}  ...
  • qq_15175765
  • qq_15175765
  • 2017-12-25 16:28:40
  • 60

elasticsearch bulk 批量加载索引的实例

import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import ...
  • lw305080
  • lw305080
  • 2017-07-06 20:38:37
  • 287

分布式搜索引擎elasticsearch PHP API index bulk 批量插入操作

  • zhuzhenyu307
  • zhuzhenyu307
  • 2014-07-17 16:48:43
  • 2497

elasticsearch批量index,update,delete——Bulk Helpers

1.  批量建索引 for i in range(0,1000): newDic = {"key":"value"} action = {"_index": IndexName, "_type"...
  • DearDreaming
  • DearDreaming
  • 2016-08-11 11:15:35
  • 3769

Elasticsearch java api(五) Bulk批量索引

这篇博客介绍一下Elasticsearch对多个文档进行索引的简便方法。Bulk api的支持可以实现一次请求执行批量的添加、删除、更新等操作.Bulk操作使用的是UDP协议,UDP无法确保与Elas...
  • napoay
  • napoay
  • 2016-07-14 14:49:27
  • 27512

elasticsearch中批量的upsert

大家知道,在elasticsearch中,使用bulk操作可以批量的处理数据,然而文档中的bulk似乎并不能处理upsert操作,事实上,只需要加上如下参数就可以了 "doc_as_upsert...
  • yyd19921214
  • yyd19921214
  • 2017-05-22 14:42:13
  • 832
    我的微信
      我的微信号号,添加后更多福利
    个人资料
    持之以恒
    等级:
    访问量: 6万+
    积分: 4623
    排名: 7991
    文章存档
    最新评论