diff options
author | Christopher Speller <crspeller@gmail.com> | 2018-04-16 05:37:14 -0700 |
---|---|---|
committer | Joram Wilander <jwawilander@gmail.com> | 2018-04-16 08:37:14 -0400 |
commit | 6e2cb00008cbf09e556b00f87603797fcaa47e09 (patch) | |
tree | 3c0eb55ff4226a3f024aad373140d1fb860a6404 /vendor/github.com/olivere/elastic/bulk_processor_test.go | |
parent | bf24f51c4e1cc6286885460672f7f449e8c6f5ef (diff) | |
download | chat-6e2cb00008cbf09e556b00f87603797fcaa47e09.tar.gz chat-6e2cb00008cbf09e556b00f87603797fcaa47e09.tar.bz2 chat-6e2cb00008cbf09e556b00f87603797fcaa47e09.zip |
Depenancy upgrades and movign to dep. (#8630)
Diffstat (limited to 'vendor/github.com/olivere/elastic/bulk_processor_test.go')
-rw-r--r-- | vendor/github.com/olivere/elastic/bulk_processor_test.go | 425 |
1 files changed, 0 insertions, 425 deletions
diff --git a/vendor/github.com/olivere/elastic/bulk_processor_test.go b/vendor/github.com/olivere/elastic/bulk_processor_test.go deleted file mode 100644 index bb97ca217..000000000 --- a/vendor/github.com/olivere/elastic/bulk_processor_test.go +++ /dev/null @@ -1,425 +0,0 @@ -// Copyright 2012-present Oliver Eilhard. All rights reserved. -// Use of this source code is governed by a MIT-license. -// See http://olivere.mit-license.org/license.txt for details. - -package elastic - -import ( - "context" - "fmt" - "math/rand" - "sync/atomic" - "testing" - "time" -) - -func TestBulkProcessorDefaults(t *testing.T) { - client := setupTestClientAndCreateIndex(t) - - p := client.BulkProcessor() - if p == nil { - t.Fatalf("expected BulkProcessorService; got: %v", p) - } - if got, want := p.name, ""; got != want { - t.Errorf("expected %q; got: %q", want, got) - } - if got, want := p.numWorkers, 1; got != want { - t.Errorf("expected %d; got: %d", want, got) - } - if got, want := p.bulkActions, 1000; got != want { - t.Errorf("expected %d; got: %d", want, got) - } - if got, want := p.bulkSize, 5*1024*1024; got != want { - t.Errorf("expected %d; got: %d", want, got) - } - if got, want := p.flushInterval, time.Duration(0); got != want { - t.Errorf("expected %v; got: %v", want, got) - } - if got, want := p.wantStats, false; got != want { - t.Errorf("expected %v; got: %v", want, got) - } - if p.backoff == nil { - t.Fatalf("expected non-nill backoff; got: %v", p.backoff) - } -} - -func TestBulkProcessorCommitOnBulkActions(t *testing.T) { - //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) - client := setupTestClientAndCreateIndex(t) - - testBulkProcessor(t, - 10000, - client.BulkProcessor(). - Name("Actions-1"). - Workers(1). - BulkActions(100). - BulkSize(-1), - ) - - testBulkProcessor(t, - 10000, - client.BulkProcessor(). - Name("Actions-2"). - Workers(2). - BulkActions(100). - BulkSize(-1), - ) -} - -func TestBulkProcessorCommitOnBulkSize(t *testing.T) { - //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) - client := setupTestClientAndCreateIndex(t) - - testBulkProcessor(t, - 10000, - client.BulkProcessor(). - Name("Size-1"). - Workers(1). - BulkActions(-1). - BulkSize(64*1024), - ) - - testBulkProcessor(t, - 10000, - client.BulkProcessor(). - Name("Size-2"). - Workers(2). - BulkActions(-1). - BulkSize(64*1024), - ) -} - -func TestBulkProcessorBasedOnFlushInterval(t *testing.T) { - //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) - client := setupTestClientAndCreateIndex(t) - - var beforeRequests int64 - var befores int64 - var afters int64 - var failures int64 - var afterRequests int64 - - beforeFn := func(executionId int64, requests []BulkableRequest) { - atomic.AddInt64(&beforeRequests, int64(len(requests))) - atomic.AddInt64(&befores, 1) - } - afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) { - atomic.AddInt64(&afters, 1) - if err != nil { - atomic.AddInt64(&failures, 1) - } - atomic.AddInt64(&afterRequests, int64(len(requests))) - } - - svc := client.BulkProcessor(). - Name("FlushInterval-1"). - Workers(2). - BulkActions(-1). - BulkSize(-1). - FlushInterval(1 * time.Second). - Before(beforeFn). - After(afterFn) - - p, err := svc.Do(context.Background()) - if err != nil { - t.Fatal(err) - } - - const numDocs = 1000 // low-enough number that flush should be invoked - - for i := 1; i <= numDocs; i++ { - tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))} - request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet) - p.Add(request) - } - - // Should flush at least once - time.Sleep(2 * time.Second) - - err = p.Close() - if err != nil { - t.Fatal(err) - } - - if p.stats.Flushed == 0 { - t.Errorf("expected at least 1 flush; got: %d", p.stats.Flushed) - } - if got, want := beforeRequests, int64(numDocs); got != want { - t.Errorf("expected %d requests to before callback; got: %d", want, got) - } - if got, want := afterRequests, int64(numDocs); got != want { - t.Errorf("expected %d requests to after callback; got: %d", want, got) - } - if befores == 0 { - t.Error("expected at least 1 call to before callback") - } - if afters == 0 { - t.Error("expected at least 1 call to after callback") - } - if failures != 0 { - t.Errorf("expected 0 calls to failure callback; got: %d", failures) - } - - // Check number of documents that were bulk indexed - _, err = p.c.Flush(testIndexName).Do(context.TODO()) - if err != nil { - t.Fatal(err) - } - count, err := p.c.Count(testIndexName).Do(context.TODO()) - if err != nil { - t.Fatal(err) - } - if count != int64(numDocs) { - t.Fatalf("expected %d documents; got: %d", numDocs, count) - } -} - -func TestBulkProcessorClose(t *testing.T) { - //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) - client := setupTestClientAndCreateIndex(t) - - var beforeRequests int64 - var befores int64 - var afters int64 - var failures int64 - var afterRequests int64 - - beforeFn := func(executionId int64, requests []BulkableRequest) { - atomic.AddInt64(&beforeRequests, int64(len(requests))) - atomic.AddInt64(&befores, 1) - } - afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) { - atomic.AddInt64(&afters, 1) - if err != nil { - atomic.AddInt64(&failures, 1) - } - atomic.AddInt64(&afterRequests, int64(len(requests))) - } - - p, err := client.BulkProcessor(). - Name("FlushInterval-1"). - Workers(2). - BulkActions(-1). - BulkSize(-1). - FlushInterval(30 * time.Second). // 30 seconds to flush - Before(beforeFn).After(afterFn). - Do(context.Background()) - if err != nil { - t.Fatal(err) - } - - const numDocs = 1000 // low-enough number that flush should be invoked - - for i := 1; i <= numDocs; i++ { - tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))} - request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet) - p.Add(request) - } - - // Should not flush because 30s > 1s - time.Sleep(1 * time.Second) - - // Close should flush - err = p.Close() - if err != nil { - t.Fatal(err) - } - - if p.stats.Flushed != 0 { - t.Errorf("expected no flush; got: %d", p.stats.Flushed) - } - if got, want := beforeRequests, int64(numDocs); got != want { - t.Errorf("expected %d requests to before callback; got: %d", want, got) - } - if got, want := afterRequests, int64(numDocs); got != want { - t.Errorf("expected %d requests to after callback; got: %d", want, got) - } - if befores == 0 { - t.Error("expected at least 1 call to before callback") - } - if afters == 0 { - t.Error("expected at least 1 call to after callback") - } - if failures != 0 { - t.Errorf("expected 0 calls to failure callback; got: %d", failures) - } - - // Check number of documents that were bulk indexed - _, err = p.c.Flush(testIndexName).Do(context.TODO()) - if err != nil { - t.Fatal(err) - } - count, err := p.c.Count(testIndexName).Do(context.TODO()) - if err != nil { - t.Fatal(err) - } - if count != int64(numDocs) { - t.Fatalf("expected %d documents; got: %d", numDocs, count) - } -} - -func TestBulkProcessorFlush(t *testing.T) { - //client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) - client := setupTestClientAndCreateIndex(t) - - p, err := client.BulkProcessor(). - Name("ManualFlush"). - Workers(10). - BulkActions(-1). - BulkSize(-1). - FlushInterval(30 * time.Second). // 30 seconds to flush - Stats(true). - Do(context.Background()) - if err != nil { - t.Fatal(err) - } - - const numDocs = 100 - - for i := 1; i <= numDocs; i++ { - tweet := tweet{User: "olivere", Message: fmt.Sprintf("%d. %s", i, randomString(rand.Intn(64)))} - request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet) - p.Add(request) - } - - // Should not flush because 30s > 1s - time.Sleep(1 * time.Second) - - // No flush yet - stats := p.Stats() - if stats.Flushed != 0 { - t.Errorf("expected no flush; got: %d", p.stats.Flushed) - } - - // Manual flush - err = p.Flush() - if err != nil { - t.Fatal(err) - } - - time.Sleep(1 * time.Second) - - // Now flushed - stats = p.Stats() - if got, want := p.stats.Flushed, int64(1); got != want { - t.Errorf("expected %d flush; got: %d", want, got) - } - - // Close should not start another flush - err = p.Close() - if err != nil { - t.Fatal(err) - } - - // Still 1 flush - stats = p.Stats() - if got, want := p.stats.Flushed, int64(1); got != want { - t.Errorf("expected %d flush; got: %d", want, got) - } - - // Check number of documents that were bulk indexed - _, err = p.c.Flush(testIndexName).Do(context.TODO()) - if err != nil { - t.Fatal(err) - } - count, err := p.c.Count(testIndexName).Do(context.TODO()) - if err != nil { - t.Fatal(err) - } - if count != int64(numDocs) { - t.Fatalf("expected %d documents; got: %d", numDocs, count) - } -} - -// -- Helper -- - -func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) { - var beforeRequests int64 - var befores int64 - var afters int64 - var failures int64 - var afterRequests int64 - - beforeFn := func(executionId int64, requests []BulkableRequest) { - atomic.AddInt64(&beforeRequests, int64(len(requests))) - atomic.AddInt64(&befores, 1) - } - afterFn := func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) { - atomic.AddInt64(&afters, 1) - if err != nil { - atomic.AddInt64(&failures, 1) - } - atomic.AddInt64(&afterRequests, int64(len(requests))) - } - - p, err := svc.Before(beforeFn).After(afterFn).Stats(true).Do(context.Background()) - if err != nil { - t.Fatal(err) - } - - for i := 1; i <= numDocs; i++ { - tweet := tweet{User: "olivere", Message: fmt.Sprintf("%07d. %s", i, randomString(1+rand.Intn(63)))} - request := NewBulkIndexRequest().Index(testIndexName).Type("doc").Id(fmt.Sprintf("%d", i)).Doc(tweet) - p.Add(request) - } - - err = p.Close() - if err != nil { - t.Fatal(err) - } - - stats := p.Stats() - - if stats.Flushed != 0 { - t.Errorf("expected no flush; got: %d", stats.Flushed) - } - if stats.Committed <= 0 { - t.Errorf("expected committed > %d; got: %d", 0, stats.Committed) - } - if got, want := stats.Indexed, int64(numDocs); got != want { - t.Errorf("expected indexed = %d; got: %d", want, got) - } - if got, want := stats.Created, int64(0); got != want { - t.Errorf("expected created = %d; got: %d", want, got) - } - if got, want := stats.Updated, int64(0); got != want { - t.Errorf("expected updated = %d; got: %d", want, got) - } - if got, want := stats.Deleted, int64(0); got != want { - t.Errorf("expected deleted = %d; got: %d", want, got) - } - if got, want := stats.Succeeded, int64(numDocs); got != want { - t.Errorf("expected succeeded = %d; got: %d", want, got) - } - if got, want := stats.Failed, int64(0); got != want { - t.Errorf("expected failed = %d; got: %d", want, got) - } - if got, want := beforeRequests, int64(numDocs); got != want { - t.Errorf("expected %d requests to before callback; got: %d", want, got) - } - if got, want := afterRequests, int64(numDocs); got != want { - t.Errorf("expected %d requests to after callback; got: %d", want, got) - } - if befores == 0 { - t.Error("expected at least 1 call to before callback") - } - if afters == 0 { - t.Error("expected at least 1 call to after callback") - } - if failures != 0 { - t.Errorf("expected 0 calls to failure callback; got: %d", failures) - } - - // Check number of documents that were bulk indexed - _, err = p.c.Flush(testIndexName).Do(context.TODO()) - if err != nil { - t.Fatal(err) - } - count, err := p.c.Count(testIndexName).Do(context.TODO()) - if err != nil { - t.Fatal(err) - } - if count != int64(numDocs) { - t.Fatalf("expected %d documents; got: %d", numDocs, count) - } -} |