9 - fixing bulk push/pop
This commit is contained in:
parent
43d274259e
commit
8073daa173
3 changed files with 97 additions and 56 deletions
|
|
@ -13,10 +13,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type connection struct {
|
type connection struct {
|
||||||
reader *bufio.Reader
|
reader *bufio.Reader
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
cmdMaxBytes int
|
cmdMaxBytes int
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConnection(d *driver) (*connection, error) {
|
func newConnection(d *driver) (*connection, error) {
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -100,11 +99,11 @@ func NewIngester(host string, port int, password string) (Ingestable, error) {
|
||||||
func (i ingesterChannel) Push(collection, bucket, object, text string) (err error) {
|
func (i ingesterChannel) Push(collection, bucket, object, text string) (err error) {
|
||||||
//
|
//
|
||||||
patterns := []struct {
|
patterns := []struct {
|
||||||
Pattern string
|
Pattern string
|
||||||
Replacement string
|
Replacement string
|
||||||
}{{"\\", "\\\\"},
|
}{{"\\", "\\\\"},
|
||||||
{"\n", "\\n"},
|
{"\n", "\\n"},
|
||||||
{"\"", "\\\""}}
|
{"\"", "\\\""}}
|
||||||
for _, v := range patterns {
|
for _, v := range patterns {
|
||||||
text = strings.Replace(text, v.Pattern, v.Replacement, -1)
|
text = strings.Replace(text, v.Pattern, v.Replacement, -1)
|
||||||
}
|
}
|
||||||
|
|
@ -136,7 +135,7 @@ func (i ingesterChannel) Push(collection, bucket, object, text string) (err erro
|
||||||
// whereas slicing a string simply returns a new string header backed by the same array as the original
|
// whereas slicing a string simply returns a new string header backed by the same array as the original
|
||||||
// (taking constant time).
|
// (taking constant time).
|
||||||
func splitText(longString string, maxLen int) []string {
|
func splitText(longString string, maxLen int) []string {
|
||||||
splits := []string{}
|
var splits []string
|
||||||
|
|
||||||
var l, r int
|
var l, r int
|
||||||
for l, r = 0, maxLen; r < len(longString); l, r = r, r+maxLen {
|
for l, r = 0, maxLen; r < len(longString); l, r = r, r+maxLen {
|
||||||
|
|
@ -154,39 +153,41 @@ func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines in
|
||||||
parallelRoutines = 1
|
parallelRoutines = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
errs = make([]IngestBulkError, 0)
|
|
||||||
errMutex := &sync.Mutex{}
|
|
||||||
|
|
||||||
// chunk array into N (parallelRoutines) parts
|
// chunk array into N (parallelRoutines) parts
|
||||||
divided := divideIngestBulkRecords(records, parallelRoutines)
|
divided := divideIngestBulkRecords(records, parallelRoutines)
|
||||||
|
|
||||||
// dispatch each records array into N goroutines
|
bulkErrorChan := make(chan []IngestBulkError)
|
||||||
group := sync.WaitGroup{}
|
defer close(bulkErrorChan)
|
||||||
group.Add(len(divided))
|
|
||||||
for _, r := range divided {
|
for _, r := range divided {
|
||||||
go func(recs []IngestBulkRecord) {
|
r := r
|
||||||
conn, _ := newConnection(i.driver)
|
go func(driver *driver, collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) {
|
||||||
|
errs := make([]IngestBulkError, 0)
|
||||||
|
newIngester, _ := NewIngester(driver.Host, driver.Port, driver.Password)
|
||||||
|
|
||||||
for _, rec := range recs {
|
for _, rec := range recs {
|
||||||
if conn == nil {
|
if newIngester == nil {
|
||||||
addBulkError(&errs, rec, ErrClosed, errMutex)
|
addBulkError(&errs, rec, ErrClosed)
|
||||||
}
|
|
||||||
err := i.Push(collection, bucket, rec.Object, rec.Text)
|
|
||||||
if err != nil {
|
|
||||||
addBulkError(&errs, rec, err, errMutex)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// sonic should sent OK
|
err := newIngester.Push(collection, bucket, rec.Object, rec.Text)
|
||||||
_, err = conn.read()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
addBulkError(&errs, rec, err, errMutex)
|
addBulkError(&errs, rec, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn.close()
|
|
||||||
group.Done()
|
if newIngester != nil {
|
||||||
}(r)
|
_ = newIngester.Quit()
|
||||||
|
}
|
||||||
|
bulkErrorChan <- errs
|
||||||
|
}(i.driver, collection, bucket, r, bulkErrorChan)
|
||||||
}
|
}
|
||||||
group.Wait()
|
|
||||||
|
errs = make([]IngestBulkError, 0)
|
||||||
|
for range divided {
|
||||||
|
errs = append(errs, <-bulkErrorChan...)
|
||||||
|
}
|
||||||
|
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -209,42 +210,44 @@ func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int
|
||||||
parallelRoutines = 1
|
parallelRoutines = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
errs = make([]IngestBulkError, 0)
|
|
||||||
errMutex := &sync.Mutex{}
|
|
||||||
|
|
||||||
// chunk array into N (parallelRoutines) parts
|
// chunk array into N (parallelRoutines) parts
|
||||||
divided := divideIngestBulkRecords(records, parallelRoutines)
|
divided := divideIngestBulkRecords(records, parallelRoutines)
|
||||||
|
|
||||||
// dispatch each records array into N goroutines
|
bulkErrorChan := make(chan []IngestBulkError)
|
||||||
group := sync.WaitGroup{}
|
defer close(bulkErrorChan)
|
||||||
group.Add(len(divided))
|
|
||||||
for _, r := range divided {
|
for _, r := range divided {
|
||||||
go func(recs []IngestBulkRecord) {
|
r := r
|
||||||
conn, _ := newConnection(i.driver)
|
go func(driver *driver, collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) {
|
||||||
|
errs := make([]IngestBulkError, 0)
|
||||||
|
newIngester, _ := NewIngester(driver.Host, driver.Port, driver.Password)
|
||||||
|
|
||||||
for _, rec := range recs {
|
for _, rec := range recs {
|
||||||
if conn == nil {
|
if newIngester == nil {
|
||||||
addBulkError(&errs, rec, ErrClosed, errMutex)
|
addBulkError(&errs, rec, ErrClosed)
|
||||||
}
|
|
||||||
err := conn.write(fmt.Sprintf(
|
|
||||||
"%s %s %s %s \"%s\"",
|
|
||||||
pop, collection, bucket, rec.Object, rec.Text),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
addBulkError(&errs, rec, err, errMutex)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// sonic should sent OK
|
|
||||||
_, err = conn.read()
|
err := newIngester.Pop(collection, bucket, rec.Object, rec.Text)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
addBulkError(&errs, rec, err, errMutex)
|
addBulkError(&errs, rec, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn.close()
|
|
||||||
group.Done()
|
if newIngester != nil {
|
||||||
}(r)
|
_ = newIngester.Quit()
|
||||||
|
}
|
||||||
|
|
||||||
|
bulkErrorChan <- errs
|
||||||
|
}(i.driver, collection, bucket, r, bulkErrorChan)
|
||||||
}
|
}
|
||||||
group.Wait()
|
|
||||||
|
errs = make([]IngestBulkError, 0)
|
||||||
|
for range divided {
|
||||||
|
errs = append(errs, <-bulkErrorChan...)
|
||||||
|
}
|
||||||
|
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -328,8 +331,6 @@ func divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [
|
||||||
return divided
|
return divided
|
||||||
}
|
}
|
||||||
|
|
||||||
func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex *sync.Mutex) {
|
func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error) {
|
||||||
mutex.Lock()
|
|
||||||
defer mutex.Unlock()
|
|
||||||
*e = append(*e, IngestBulkError{record.Object, err})
|
*e = append(*e, IngestBulkError{record.Object, err})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,27 @@ import (
|
||||||
var records = make([]IngestBulkRecord, 0)
|
var records = make([]IngestBulkRecord, 0)
|
||||||
var ingester, err = NewIngester("localhost", 1491, "SecretPassword")
|
var ingester, err = NewIngester("localhost", 1491, "SecretPassword")
|
||||||
|
|
||||||
|
func BenchmarkIngesterChannel_BulkPush2XMaxCPUs(b *testing.B) {
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cpus := 2 * runtime.NumCPU()
|
||||||
|
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
e := ingester.FlushBucket("test", "test2XMaxCpus")
|
||||||
|
if e != nil {
|
||||||
|
b.Log(e)
|
||||||
|
b.Fail()
|
||||||
|
}
|
||||||
|
be := ingester.BulkPush("test", "test2XMaxCpus", cpus, records)
|
||||||
|
if len(be) > 0 {
|
||||||
|
b.Log(be, e)
|
||||||
|
b.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) {
|
func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
@ -50,6 +71,25 @@ func BenchmarkIngesterChannel_BulkPush10(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkIngesterChannel_BulkPop10(b *testing.B) {
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
e := ingester.FlushBucket("test", "popTest10")
|
||||||
|
if e != nil {
|
||||||
|
b.Log(e)
|
||||||
|
b.Fail()
|
||||||
|
}
|
||||||
|
be := ingester.BulkPop("test", "popTest10", 10, records)
|
||||||
|
if len(be) > 0 {
|
||||||
|
b.Log(be, err)
|
||||||
|
b.Fail()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkIngesterChannel_Push(b *testing.B) {
|
func BenchmarkIngesterChannel_Push(b *testing.B) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue