diff --git a/sonic/connection.go b/sonic/connection.go index e3db3d9..877cb4d 100644 --- a/sonic/connection.go +++ b/sonic/connection.go @@ -13,10 +13,10 @@ import ( ) type connection struct { - reader *bufio.Reader - conn net.Conn + reader *bufio.Reader + conn net.Conn cmdMaxBytes int - closed bool + closed bool } func newConnection(d *driver) (*connection, error) { diff --git a/sonic/ingester.go b/sonic/ingester.go index 7b01061..0d6f4ea 100644 --- a/sonic/ingester.go +++ b/sonic/ingester.go @@ -4,7 +4,6 @@ import ( "fmt" "strconv" "strings" - "sync" "unicode/utf8" ) @@ -100,11 +99,11 @@ func NewIngester(host string, port int, password string) (Ingestable, error) { func (i ingesterChannel) Push(collection, bucket, object, text string) (err error) { // patterns := []struct { - Pattern string - Replacement string + Pattern string + Replacement string }{{"\\", "\\\\"}, {"\n", "\\n"}, - {"\"", "\\\""}} + {"\"", "\\\""}} for _, v := range patterns { text = strings.Replace(text, v.Pattern, v.Replacement, -1) } @@ -136,7 +135,7 @@ func (i ingesterChannel) Push(collection, bucket, object, text string) (err erro // whereas slicing a string simply returns a new string header backed by the same array as the original // (taking constant time). func splitText(longString string, maxLen int) []string { - splits := []string{} + var splits []string var l, r int for l, r = 0, maxLen; r < len(longString); l, r = r, r+maxLen { @@ -154,39 +153,41 @@ func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines in parallelRoutines = 1 } - errs = make([]IngestBulkError, 0) - errMutex := &sync.Mutex{} - // chunk array into N (parallelRoutines) parts divided := divideIngestBulkRecords(records, parallelRoutines) - // dispatch each records array into N goroutines - group := sync.WaitGroup{} - group.Add(len(divided)) + bulkErrorChan := make(chan []IngestBulkError) + defer close(bulkErrorChan) + for _, r := range divided { - go func(recs []IngestBulkRecord) { - conn, _ := newConnection(i.driver) + r := r + go func(driver *driver, collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) { + errs := make([]IngestBulkError, 0) + newIngester, _ := NewIngester(driver.Host, driver.Port, driver.Password) for _, rec := range recs { - if conn == nil { - addBulkError(&errs, rec, ErrClosed, errMutex) - } - err := i.Push(collection, bucket, rec.Object, rec.Text) - if err != nil { - addBulkError(&errs, rec, err, errMutex) + if newIngester == nil { + addBulkError(&errs, rec, ErrClosed) continue } - // sonic should sent OK - _, err = conn.read() + err := newIngester.Push(collection, bucket, rec.Object, rec.Text) if err != nil { - addBulkError(&errs, rec, err, errMutex) + addBulkError(&errs, rec, err) } } - conn.close() - group.Done() - }(r) + + if newIngester != nil { + _ = newIngester.Quit() + } + bulkErrorChan <- errs + }(i.driver, collection, bucket, r, bulkErrorChan) } - group.Wait() + + errs = make([]IngestBulkError, 0) + for range divided { + errs = append(errs, <-bulkErrorChan...) + } + return errs } @@ -209,42 +210,44 @@ func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int parallelRoutines = 1 } - errs = make([]IngestBulkError, 0) - errMutex := &sync.Mutex{} - // chunk array into N (parallelRoutines) parts divided := divideIngestBulkRecords(records, parallelRoutines) - // dispatch each records array into N goroutines - group := sync.WaitGroup{} - group.Add(len(divided)) + bulkErrorChan := make(chan []IngestBulkError) + defer close(bulkErrorChan) + for _, r := range divided { - go func(recs []IngestBulkRecord) { - conn, _ := newConnection(i.driver) + r := r + go func(driver *driver, collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) { + errs := make([]IngestBulkError, 0) + newIngester, _ := NewIngester(driver.Host, driver.Port, driver.Password) for _, rec := range recs { - if conn == nil { - addBulkError(&errs, rec, ErrClosed, errMutex) - } - err := conn.write(fmt.Sprintf( - "%s %s %s %s \"%s\"", - pop, collection, bucket, rec.Object, rec.Text), - ) - if err != nil { - addBulkError(&errs, rec, err, errMutex) + if newIngester == nil { + addBulkError(&errs, rec, ErrClosed) continue } - // sonic should sent OK - _, err = conn.read() + + err := newIngester.Pop(collection, bucket, rec.Object, rec.Text) + if err != nil { - addBulkError(&errs, rec, err, errMutex) + addBulkError(&errs, rec, err) } } - conn.close() - group.Done() - }(r) + + if newIngester != nil { + _ = newIngester.Quit() + } + + bulkErrorChan <- errs + }(i.driver, collection, bucket, r, bulkErrorChan) } - group.Wait() + + errs = make([]IngestBulkError, 0) + for range divided { + errs = append(errs, <-bulkErrorChan...) + } + return errs } @@ -328,8 +331,6 @@ func divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [ return divided } -func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex *sync.Mutex) { - mutex.Lock() - defer mutex.Unlock() +func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error) { *e = append(*e, IngestBulkError{record.Object, err}) } diff --git a/sonic/ingester_test.go b/sonic/ingester_test.go index 271ec6c..f7f7e51 100644 --- a/sonic/ingester_test.go +++ b/sonic/ingester_test.go @@ -10,6 +10,27 @@ import ( var records = make([]IngestBulkRecord, 0) var ingester, err = NewIngester("localhost", 1491, "SecretPassword") +func BenchmarkIngesterChannel_BulkPush2XMaxCPUs(b *testing.B) { + if err != nil { + return + } + + cpus := 2 * runtime.NumCPU() + + for n := 0; n < b.N; n++ { + e := ingester.FlushBucket("test", "test2XMaxCpus") + if e != nil { + b.Log(e) + b.Fail() + } + be := ingester.BulkPush("test", "test2XMaxCpus", cpus, records) + if len(be) > 0 { + b.Log(be, e) + b.Fail() + } + } +} + func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) { if err != nil { return @@ -50,6 +71,25 @@ func BenchmarkIngesterChannel_BulkPush10(b *testing.B) { } } +func BenchmarkIngesterChannel_BulkPop10(b *testing.B) { + if err != nil { + return + } + + for n := 0; n < b.N; n++ { + e := ingester.FlushBucket("test", "popTest10") + if e != nil { + b.Log(e) + b.Fail() + } + be := ingester.BulkPop("test", "popTest10", 10, records) + if len(be) > 0 { + b.Log(be, err) + b.Fail() + } + } +} + func BenchmarkIngesterChannel_Push(b *testing.B) { if err != nil { return