This commit is contained in:
alexisvisco 2019-03-27 11:32:37 +01:00
parent 80b7d7367f
commit ed961675e5
3 changed files with 20 additions and 119 deletions

View file

@ -14,10 +14,12 @@ func main() {
// I will ignore all errors for demonstration purposes
_ = ingester.Push("movies", "general", "id:6ab56b4kk3", "Star wars")
_ = ingester.Push("movies", "general", "id:5hg67f8dg5", "Spider man")
_ = ingester.Push("movies", "general", "id:1m2n3b4vf6", "Batman")
_ = ingester.Push("movies", "general", "id:68d96h5h9d0", "This is another movie")
_ = ingester.BulkPush("movies", "general", []sonic.IngestBulkRecord{
{"id:6ab56b4kk3", "Star wars"},
{"id:5hg67f8dg5", "Spider man"},
{"id:1m2n3b4vf6", "Batman"},
{"id:68d96h5h9d0", "This is another movie"},
})
search, err := sonic.NewSearch("localhost", 1491, "SecretPassword")
if err != nil {

View file

@ -4,7 +4,6 @@ import (
"fmt"
"strconv"
"strings"
"sync"
)
type IngestBulkRecord struct {
@ -26,7 +25,7 @@ type Ingestable interface {
// dispatch the records at best.
// If parallelRoutines <= 0; parallelRoutines will be equal to 1.
// If parallelRoutines > len(records); parallelRoutines will be equal to len(records).
BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError
BulkPush(collection, bucket string, records []IngestBulkRecord) []IngestBulkError
// Pop search data from the index.
// Command syntax POP <collection> <bucket> <object> "<text>".
@ -36,7 +35,7 @@ type Ingestable interface {
// dispatch the records at best.
// If parallelRoutines <= 0; parallelRoutines will be equal to 1.
// If parallelRoutines > len(records); parallelRoutines will be equal to len(records).
BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError
BulkPop(collection, bucket string, records []IngestBulkRecord) []IngestBulkError
// Count indexed search data.
// bucket and object are optionals, empty string ignore it.
@ -107,32 +106,15 @@ func (i IngesterChannel) Push(collection, bucket, object, text string) (err erro
return nil
}
func (i IngesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError {
if parallelRoutines <= 0 {
parallelRoutines = 1
}
func (i IngesterChannel) BulkPush(collection, bucket string, records []IngestBulkRecord) []IngestBulkError {
errs := make([]IngestBulkError, 0)
mutex := sync.Mutex{}
// chunk array into N (parallelRoutines) parts
divided := i.divideIngestBulkRecords(records, parallelRoutines)
// dispatch each records array into N goroutines
group := sync.WaitGroup{}
group.Add(len(divided))
for _, r := range divided {
go func(recs []IngestBulkRecord) {
for _, rec := range recs {
if err := i.Push(collection, bucket, rec.Object, rec.Text); err != nil {
mutex.Lock()
errs = append(errs, IngestBulkError{rec.Object, err})
mutex.Unlock()
}
}
group.Done()
}(r)
for _, v := range records {
if err := i.Push(collection, bucket, v.Object, v.Text); err != nil {
errs = append(errs, IngestBulkError{v.Object, err})
}
}
return errs
}
@ -150,32 +132,15 @@ func (i IngesterChannel) Pop(collection, bucket, object, text string) (err error
return nil
}
func (i IngesterChannel) BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError {
if parallelRoutines <= 0 {
parallelRoutines = 1
}
func (i IngesterChannel) BulkPop(collection, bucket string, records []IngestBulkRecord) []IngestBulkError {
errs := make([]IngestBulkError, 0)
mutex := sync.Mutex{}
// chunk array into N (parallelRoutines) parts
divided := i.divideIngestBulkRecords(records, parallelRoutines)
// dispatch each records array into N goroutines
group := sync.WaitGroup{}
group.Add(len(divided))
for _, r := range divided {
go func(recs []IngestBulkRecord) {
for _, rec := range recs {
if err := i.Pop(collection, bucket, rec.Object, rec.Text); err != nil {
mutex.Lock()
errs = append(errs, IngestBulkError{rec.Object, err})
mutex.Unlock()
}
}
group.Done()
}(r)
for _, v := range records {
if err := i.Push(collection, bucket, v.Object, v.Text); err != nil {
errs = append(errs, IngestBulkError{v.Object, err})
}
}
return errs
}

View file

@ -1,66 +0,0 @@
package sonic
import (
"math/rand"
"runtime"
"testing"
"time"
)
var records = make([]IngestBulkRecord, 0)
var ingester, err = NewIngester("localhost", 1491, "SecretPassword")
func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) {
if err != nil {
return
}
cpus := runtime.NumCPU()
for n := 0; n < b.N; n++ {
_ = ingester.FlushBucket("test", "testMaxCpus")
ingester.BulkPush("test", "testMaxCpus", cpus, records)
}
}
func BenchmarkIngesterChannel_Push(b *testing.B) {
if err != nil {
return
}
for n := 0; n < b.N; n++ {
_ = ingester.FlushBucket("test", "testMaxCpus")
for _, v := range records {
_ = ingester.Push("test", "testMaxCpus", v.Object, v.Text)
}
}
}
func BenchmarkIngesterChannel_BulkPush10(b *testing.B) {
if err != nil {
return
}
for n := 0; n < b.N; n++ {
_ = ingester.FlushBucket("test", "testMaxCpus")
ingester.BulkPush("test", "testMaxCpus", 10, records)
}
}
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_"
var seededRand = rand.New(rand.NewSource(time.Now().UnixNano()))
func randStr(length int, charset string) string {
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)
}
func init() {
for n := 0; n < 1000; n++ {
records = append(records, IngestBulkRecord{randStr(10, charset), randStr(10, charset)})
}
}