diff --git a/cmd/example/main.go b/cmd/example/main.go index 3271181..5cf19f7 100644 --- a/cmd/example/main.go +++ b/cmd/example/main.go @@ -14,7 +14,7 @@ func main() { // I will ignore all errors for demonstration purposes - _, _ = ingester.BulkPush("movies", "general", 3, []sonic.IngestBulkRecord{ + _ = ingester.BulkPush("movies", "general", 3, []sonic.IngestBulkRecord{ {"id:6ab56b4kk3", "Star wars"}, {"id:5hg67f8dg5", "Spider man"}, {"id:1m2n3b4vf6", "Batman"}, diff --git a/readme.md b/readme.md index 56ae955..4964c88 100644 --- a/readme.md +++ b/readme.md @@ -21,6 +21,7 @@ import ( ) func main() { + ingester, err := sonic.NewIngester("localhost", 1491, "SecretPassword") if err != nil { panic(err) @@ -28,7 +29,7 @@ func main() { // I will ignore all errors for demonstration purposes - _, _ = ingester.BulkPush("movies", "general", 2, []sonic.IngestBulkRecord{ + _ = ingester.BulkPush("movies", "general", 3, []sonic.IngestBulkRecord{ {"id:6ab56b4kk3", "Star wars"}, {"id:5hg67f8dg5", "Spider man"}, {"id:1m2n3b4vf6", "Batman"}, diff --git a/sonic/ingester.go b/sonic/ingester.go index 9adc75b..85db485 100644 --- a/sonic/ingester.go +++ b/sonic/ingester.go @@ -26,7 +26,7 @@ type Ingestable interface { // dispatch the records at best. // If parallelRoutines <= 0; parallelRoutines will be equal to 1. // If parallelRoutines > len(records); parallelRoutines will be equal to len(records). - BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) ([]IngestBulkError, error) + BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError // Pop search data from the index. // Command syntax POP "". @@ -36,7 +36,7 @@ type Ingestable interface { // dispatch the records at best. // If parallelRoutines <= 0; parallelRoutines will be equal to 1. // If parallelRoutines > len(records); parallelRoutines will be equal to len(records). - BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) ([]IngestBulkError, error) + BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError // Count indexed search data. // bucket and object are optionals, empty string ignore it. @@ -61,7 +61,6 @@ type Ingestable interface { // Ping refer to the Base interface Ping() (err error) } - type ingesterCommands string const ( @@ -109,44 +108,40 @@ func (i ingesterChannel) Push(collection, bucket, object, text string) (err erro return nil } -func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError, err error) { +func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError) { if parallelRoutines <= 0 { parallelRoutines = 1 } - err = nil errs = make([]IngestBulkError, 0) errMutex := sync.Mutex{} // chunk array into N (parallelRoutines) parts - divided := i.divideIngestBulkRecords(records, parallelRoutines) + divided := divideIngestBulkRecords(records, parallelRoutines) // dispatch each records array into N goroutines group := sync.WaitGroup{} group.Add(len(divided)) for _, r := range divided { go func(recs []IngestBulkRecord) { - var conn *connection - - errMutex.Lock() - conn, err = newConnection(i.driver) - errMutex.Unlock() + conn, _ := newConnection(i.driver) for _, rec := range recs { - err := conn.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, rec.Object, rec.Text)) + if conn == nil { + addBulkError(&errs, rec, ErrClosed, errMutex) + } + err := conn.write(fmt.Sprintf( + "%s %s %s %s \"%s\"", + push, collection, bucket, rec.Object, rec.Text), + ) if err != nil { - errMutex.Lock() - errs = append(errs, IngestBulkError{rec.Object, err}) - errMutex.Unlock() + addBulkError(&errs, rec, err, errMutex) continue } - // sonic should sent OK _, err = conn.read() if err != nil { - errMutex.Lock() - errs = append(errs, IngestBulkError{rec.Object, err}) - errMutex.Unlock() + addBulkError(&errs, rec, err, errMutex) } } conn.close() @@ -154,7 +149,7 @@ func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines in }(r) } group.Wait() - return errs, err + return errs } func (i ingesterChannel) Pop(collection, bucket, object, text string) (err error) { @@ -171,44 +166,40 @@ func (i ingesterChannel) Pop(collection, bucket, object, text string) (err error return nil } -func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError, err error) { +func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError) { if parallelRoutines <= 0 { parallelRoutines = 1 } - err = nil errs = make([]IngestBulkError, 0) errMutex := sync.Mutex{} // chunk array into N (parallelRoutines) parts - divided := i.divideIngestBulkRecords(records, parallelRoutines) + divided := divideIngestBulkRecords(records, parallelRoutines) // dispatch each records array into N goroutines group := sync.WaitGroup{} group.Add(len(divided)) for _, r := range divided { go func(recs []IngestBulkRecord) { - var conn *connection - - errMutex.Lock() - conn, err = newConnection(i.driver) - errMutex.Unlock() + conn, _ := newConnection(i.driver) for _, rec := range recs { - err := conn.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, rec.Object, rec.Text)) + if conn == nil { + addBulkError(&errs, rec, ErrClosed, errMutex) + } + err := conn.write(fmt.Sprintf( + "%s %s %s %s \"%s\"", + pop, collection, bucket, rec.Object, rec.Text), + ) if err != nil { - errMutex.Lock() - errs = append(errs, IngestBulkError{rec.Object, err}) - errMutex.Unlock() + addBulkError(&errs, rec, err, errMutex) continue } - // sonic should sent OK _, err = conn.read() if err != nil { - errMutex.Lock() - errs = append(errs, IngestBulkError{rec.Object, err}) - errMutex.Unlock() + addBulkError(&errs, rec, err, errMutex) } } conn.close() @@ -216,7 +207,7 @@ func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int }(r) } group.Wait() - return errs, err + return errs } func (i ingesterChannel) Count(collection, bucket, object string) (cnt int, err error) { @@ -286,7 +277,7 @@ func (i ingesterChannel) FlushObject(collection, bucket, object string) (err err return nil } -func (i ingesterChannel) divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [][]IngestBulkRecord { +func divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [][]IngestBulkRecord { var divided [][]IngestBulkRecord chunkSize := (len(records) + parallelRoutines - 1) / parallelRoutines for i := 0; i < len(records); i += chunkSize { @@ -298,3 +289,9 @@ func (i ingesterChannel) divideIngestBulkRecords(records []IngestBulkRecord, par } return divided } + +func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex sync.Mutex) { + mutex.Lock() + defer mutex.Unlock() + *e = append(*e, IngestBulkError{record.Object, err}) +} diff --git a/sonic/ingester_test.go b/sonic/ingester_test.go index 5082479..271ec6c 100644 --- a/sonic/ingester_test.go +++ b/sonic/ingester_test.go @@ -23,8 +23,8 @@ func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) { b.Log(e) b.Fail() } - be, e := ingester.BulkPush("test", "testMaxCpus", cpus, records) - if len(be) > 0 || e != nil { + be := ingester.BulkPush("test", "testMaxCpus", cpus, records) + if len(be) > 0 { b.Log(be, e) b.Fail() } @@ -42,8 +42,8 @@ func BenchmarkIngesterChannel_BulkPush10(b *testing.B) { b.Log(e) b.Fail() } - be, e := ingester.BulkPush("test", "test10", 10, records) - if len(be) > 0 || err != nil { + be := ingester.BulkPush("test", "test10", 10, records) + if len(be) > 0 { b.Log(be, err) b.Fail() }