From ccf2b7b321b75e5d1ffbef058c4ae34db83413bf Mon Sep 17 00:00:00 2001 From: Steve Coffman Date: Sun, 11 Aug 2019 18:13:37 -0400 Subject: [PATCH 1/2] Avoid copy lock value and split push --- go.mod | 2 ++ sonic/connection.go | 17 ++++++++++++++ sonic/ingester.go | 56 +++++++++++++++++++++++++++++++++------------ 3 files changed, 60 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 96ee3a6..de7ee70 100644 --- a/go.mod +++ b/go.mod @@ -1 +1,3 @@ module github.com/expectedsh/go-sonic + +go 1.12 diff --git a/sonic/connection.go b/sonic/connection.go index e31c53d..e3db3d9 100644 --- a/sonic/connection.go +++ b/sonic/connection.go @@ -7,12 +7,15 @@ import ( "fmt" "io" "net" + "strconv" "strings" + "unicode" ) type connection struct { reader *bufio.Reader conn net.Conn + cmdMaxBytes int closed bool } @@ -63,6 +66,20 @@ func (c *connection) read() (string, error) { if strings.HasPrefix(str, "ERR ") { return "", errors.New(str[4:]) } + if strings.HasPrefix(str, "STARTED ") { + + ss := strings.FieldsFunc(str, func(r rune) bool { + if unicode.IsSpace(r) || r == '(' || r == ')' { + return true + } + return false + }) + bufferSize, err := strconv.Atoi(ss[len(ss)-1]) + if err != nil { + return "", errors.New(fmt.Sprintf("Unable to parse STARTED response: %s", str)) + } + c.cmdMaxBytes = bufferSize + } return str, nil } diff --git a/sonic/ingester.go b/sonic/ingester.go index 27b322b..acf24f9 100644 --- a/sonic/ingester.go +++ b/sonic/ingester.go @@ -5,6 +5,7 @@ import ( "strconv" "strings" "sync" + "unicode/utf8" ) // IngestBulkRecord is the struct to be used as a list in bulk operation. @@ -97,26 +98,54 @@ func NewIngester(host string, port int, password string) (Ingestable, error) { } func (i ingesterChannel) Push(collection, bucket, object, text string) (err error) { - err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, object, text)) - if err != nil { - return err + + chunks := splitText(text, i.cmdMaxBytes/2) + // split chunks with partial success will yield single error + for _, chunk := range chunks { + err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, object, chunk)) + + if err != nil { + return err + } + + // sonic should sent OK + _, err = i.read() + if err != nil { + return err + } } - // sonic should sent OK - _, err = i.read() - if err != nil { - return err - } return nil } +// Ensure splitting on a valid leading byte +// Slicing the string directly is more efficient than converting to []byte and back because +// since a string is immutable and a []byte isn't, +// the data must be copied to new memory upon conversion, +// taking O(n) time (both ways!), +// whereas slicing a string simply returns a new string header backed by the same array as the original +// (taking constant time). +func splitText(longString string, maxLen int) []string { + splits := []string{} + + var l, r int + for l, r = 0, maxLen; r < len(longString); l, r = r, r+maxLen { + for !utf8.RuneStart(longString[r]) { + r-- + } + splits = append(splits, longString[l:r]) + } + splits = append(splits, longString[l:]) + return splits +} + func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError) { if parallelRoutines <= 0 { parallelRoutines = 1 } errs = make([]IngestBulkError, 0) - errMutex := sync.Mutex{} + errMutex := &sync.Mutex{} // chunk array into N (parallelRoutines) parts divided := divideIngestBulkRecords(records, parallelRoutines) @@ -132,10 +161,7 @@ func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines in if conn == nil { addBulkError(&errs, rec, ErrClosed, errMutex) } - err := conn.write(fmt.Sprintf( - "%s %s %s %s \"%s\"", - push, collection, bucket, rec.Object, rec.Text), - ) + err := i.Push(collection, bucket, rec.Object, rec.Text) if err != nil { addBulkError(&errs, rec, err, errMutex) continue @@ -174,7 +200,7 @@ func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int } errs = make([]IngestBulkError, 0) - errMutex := sync.Mutex{} + errMutex := &sync.Mutex{} // chunk array into N (parallelRoutines) parts divided := divideIngestBulkRecords(records, parallelRoutines) @@ -292,7 +318,7 @@ func divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [ return divided } -func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex sync.Mutex) { +func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex *sync.Mutex) { mutex.Lock() defer mutex.Unlock() *e = append(*e, IngestBulkError{record.Object, err}) From 083e2b588e0e62d60baf536d844be58e502e6277 Mon Sep 17 00:00:00 2001 From: Steve Coffman Date: Sun, 11 Aug 2019 19:11:33 -0400 Subject: [PATCH 2/2] https://github.com/valeriansaliou/node-sonic-channel/blob/f88985149c4176490092e53f8b4f6f188db08900/lib/channel/generic.js#L59 --- sonic/ingester.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sonic/ingester.go b/sonic/ingester.go index acf24f9..7b01061 100644 --- a/sonic/ingester.go +++ b/sonic/ingester.go @@ -98,6 +98,16 @@ func NewIngester(host string, port int, password string) (Ingestable, error) { } func (i ingesterChannel) Push(collection, bucket, object, text string) (err error) { + // + patterns := []struct { + Pattern string + Replacement string + }{{"\\", "\\\\"}, + {"\n", "\\n"}, + {"\"", "\\\""}} + for _, v := range patterns { + text = strings.Replace(text, v.Pattern, v.Replacement, -1) + } chunks := splitText(text, i.cmdMaxBytes/2) // split chunks with partial success will yield single error