Merge pull request #5 from StevenACoffman/master

fix:  buffer overflow and avoid copying lock value
This commit is contained in:
Alexis Viscogliosi 2019-09-18 17:59:22 +02:00 committed by GitHub
commit c735f46ecb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 14 deletions

2
go.mod
View file

@ -1 +1,3 @@
module github.com/expectedsh/go-sonic
go 1.12

View file

@ -7,12 +7,15 @@ import (
"fmt"
"io"
"net"
"strconv"
"strings"
"unicode"
)
type connection struct {
reader *bufio.Reader
conn net.Conn
cmdMaxBytes int
closed bool
}
@ -63,6 +66,20 @@ func (c *connection) read() (string, error) {
if strings.HasPrefix(str, "ERR ") {
return "", errors.New(str[4:])
}
if strings.HasPrefix(str, "STARTED ") {
ss := strings.FieldsFunc(str, func(r rune) bool {
if unicode.IsSpace(r) || r == '(' || r == ')' {
return true
}
return false
})
bufferSize, err := strconv.Atoi(ss[len(ss)-1])
if err != nil {
return "", errors.New(fmt.Sprintf("Unable to parse STARTED response: %s", str))
}
c.cmdMaxBytes = bufferSize
}
return str, nil
}

View file

@ -5,6 +5,7 @@ import (
"strconv"
"strings"
"sync"
"unicode/utf8"
)
// IngestBulkRecord is the struct to be used as a list in bulk operation.
@ -97,26 +98,64 @@ func NewIngester(host string, port int, password string) (Ingestable, error) {
}
func (i ingesterChannel) Push(collection, bucket, object, text string) (err error) {
err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, object, text))
if err != nil {
return err
//
patterns := []struct {
Pattern string
Replacement string
}{{"\\", "\\\\"},
{"\n", "\\n"},
{"\"", "\\\""}}
for _, v := range patterns {
text = strings.Replace(text, v.Pattern, v.Replacement, -1)
}
// sonic should sent OK
_, err = i.read()
if err != nil {
return err
chunks := splitText(text, i.cmdMaxBytes/2)
// split chunks with partial success will yield single error
for _, chunk := range chunks {
err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, object, chunk))
if err != nil {
return err
}
// sonic should sent OK
_, err = i.read()
if err != nil {
return err
}
}
return nil
}
// Ensure splitting on a valid leading byte
// Slicing the string directly is more efficient than converting to []byte and back because
// since a string is immutable and a []byte isn't,
// the data must be copied to new memory upon conversion,
// taking O(n) time (both ways!),
// whereas slicing a string simply returns a new string header backed by the same array as the original
// (taking constant time).
func splitText(longString string, maxLen int) []string {
splits := []string{}
var l, r int
for l, r = 0, maxLen; r < len(longString); l, r = r, r+maxLen {
for !utf8.RuneStart(longString[r]) {
r--
}
splits = append(splits, longString[l:r])
}
splits = append(splits, longString[l:])
return splits
}
func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError) {
if parallelRoutines <= 0 {
parallelRoutines = 1
}
errs = make([]IngestBulkError, 0)
errMutex := sync.Mutex{}
errMutex := &sync.Mutex{}
// chunk array into N (parallelRoutines) parts
divided := divideIngestBulkRecords(records, parallelRoutines)
@ -132,10 +171,7 @@ func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines in
if conn == nil {
addBulkError(&errs, rec, ErrClosed, errMutex)
}
err := conn.write(fmt.Sprintf(
"%s %s %s %s \"%s\"",
push, collection, bucket, rec.Object, rec.Text),
)
err := i.Push(collection, bucket, rec.Object, rec.Text)
if err != nil {
addBulkError(&errs, rec, err, errMutex)
continue
@ -174,7 +210,7 @@ func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int
}
errs = make([]IngestBulkError, 0)
errMutex := sync.Mutex{}
errMutex := &sync.Mutex{}
// chunk array into N (parallelRoutines) parts
divided := divideIngestBulkRecords(records, parallelRoutines)
@ -292,7 +328,7 @@ func divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [
return divided
}
func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex sync.Mutex) {
func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex *sync.Mutex) {
mutex.Lock()
defer mutex.Unlock()
*e = append(*e, IngestBulkError{record.Object, err})