Avoid copy lock value and split push

This commit is contained in:
Steve Coffman 2019-08-11 18:13:37 -04:00
parent 13c8ecfd5a
commit ccf2b7b321
3 changed files with 60 additions and 15 deletions

2
go.mod
View file

@ -1 +1,3 @@
module github.com/expectedsh/go-sonic module github.com/expectedsh/go-sonic
go 1.12

View file

@ -7,12 +7,15 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"strconv"
"strings" "strings"
"unicode"
) )
type connection struct { type connection struct {
reader *bufio.Reader reader *bufio.Reader
conn net.Conn conn net.Conn
cmdMaxBytes int
closed bool closed bool
} }
@ -63,6 +66,20 @@ func (c *connection) read() (string, error) {
if strings.HasPrefix(str, "ERR ") { if strings.HasPrefix(str, "ERR ") {
return "", errors.New(str[4:]) return "", errors.New(str[4:])
} }
if strings.HasPrefix(str, "STARTED ") {
ss := strings.FieldsFunc(str, func(r rune) bool {
if unicode.IsSpace(r) || r == '(' || r == ')' {
return true
}
return false
})
bufferSize, err := strconv.Atoi(ss[len(ss)-1])
if err != nil {
return "", errors.New(fmt.Sprintf("Unable to parse STARTED response: %s", str))
}
c.cmdMaxBytes = bufferSize
}
return str, nil return str, nil
} }

View file

@ -5,6 +5,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"unicode/utf8"
) )
// IngestBulkRecord is the struct to be used as a list in bulk operation. // IngestBulkRecord is the struct to be used as a list in bulk operation.
@ -97,26 +98,54 @@ func NewIngester(host string, port int, password string) (Ingestable, error) {
} }
func (i ingesterChannel) Push(collection, bucket, object, text string) (err error) { func (i ingesterChannel) Push(collection, bucket, object, text string) (err error) {
err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, object, text))
if err != nil { chunks := splitText(text, i.cmdMaxBytes/2)
return err // split chunks with partial success will yield single error
for _, chunk := range chunks {
err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, object, chunk))
if err != nil {
return err
}
// sonic should sent OK
_, err = i.read()
if err != nil {
return err
}
} }
// sonic should sent OK
_, err = i.read()
if err != nil {
return err
}
return nil return nil
} }
// Ensure splitting on a valid leading byte
// Slicing the string directly is more efficient than converting to []byte and back because
// since a string is immutable and a []byte isn't,
// the data must be copied to new memory upon conversion,
// taking O(n) time (both ways!),
// whereas slicing a string simply returns a new string header backed by the same array as the original
// (taking constant time).
func splitText(longString string, maxLen int) []string {
splits := []string{}
var l, r int
for l, r = 0, maxLen; r < len(longString); l, r = r, r+maxLen {
for !utf8.RuneStart(longString[r]) {
r--
}
splits = append(splits, longString[l:r])
}
splits = append(splits, longString[l:])
return splits
}
func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError) { func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError) {
if parallelRoutines <= 0 { if parallelRoutines <= 0 {
parallelRoutines = 1 parallelRoutines = 1
} }
errs = make([]IngestBulkError, 0) errs = make([]IngestBulkError, 0)
errMutex := sync.Mutex{} errMutex := &sync.Mutex{}
// chunk array into N (parallelRoutines) parts // chunk array into N (parallelRoutines) parts
divided := divideIngestBulkRecords(records, parallelRoutines) divided := divideIngestBulkRecords(records, parallelRoutines)
@ -132,10 +161,7 @@ func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines in
if conn == nil { if conn == nil {
addBulkError(&errs, rec, ErrClosed, errMutex) addBulkError(&errs, rec, ErrClosed, errMutex)
} }
err := conn.write(fmt.Sprintf( err := i.Push(collection, bucket, rec.Object, rec.Text)
"%s %s %s %s \"%s\"",
push, collection, bucket, rec.Object, rec.Text),
)
if err != nil { if err != nil {
addBulkError(&errs, rec, err, errMutex) addBulkError(&errs, rec, err, errMutex)
continue continue
@ -174,7 +200,7 @@ func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int
} }
errs = make([]IngestBulkError, 0) errs = make([]IngestBulkError, 0)
errMutex := sync.Mutex{} errMutex := &sync.Mutex{}
// chunk array into N (parallelRoutines) parts // chunk array into N (parallelRoutines) parts
divided := divideIngestBulkRecords(records, parallelRoutines) divided := divideIngestBulkRecords(records, parallelRoutines)
@ -292,7 +318,7 @@ func divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [
return divided return divided
} }
func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex sync.Mutex) { func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex *sync.Mutex) {
mutex.Lock() mutex.Lock()
defer mutex.Unlock() defer mutex.Unlock()
*e = append(*e, IngestBulkError{record.Object, err}) *e = append(*e, IngestBulkError{record.Object, err})