diff --git a/cmd/example/main.go b/cmd/example/main.go index bb6f271..c48e897 100644 --- a/cmd/example/main.go +++ b/cmd/example/main.go @@ -14,10 +14,12 @@ func main() { // I will ignore all errors for demonstration purposes - _ = ingester.Push("movies", "general", "id:6ab56b4kk3", "Star wars") - _ = ingester.Push("movies", "general", "id:5hg67f8dg5", "Spider man") - _ = ingester.Push("movies", "general", "id:1m2n3b4vf6", "Batman") - _ = ingester.Push("movies", "general", "id:68d96h5h9d0", "This is another movie") + _ = ingester.BulkPush("movies", "general", []sonic.IngestBulkRecord{ + {"id:6ab56b4kk3", "Star wars"}, + {"id:5hg67f8dg5", "Spider man"}, + {"id:1m2n3b4vf6", "Batman"}, + {"id:68d96h5h9d0", "This is another movie"}, + }) search, err := sonic.NewSearch("localhost", 1491, "SecretPassword") if err != nil { diff --git a/sonic/ingester.go b/sonic/ingester.go index ee0f58e..25a1eae 100644 --- a/sonic/ingester.go +++ b/sonic/ingester.go @@ -4,7 +4,6 @@ import ( "fmt" "strconv" "strings" - "sync" ) type IngestBulkRecord struct { @@ -26,7 +25,7 @@ type Ingestable interface { // dispatch the records at best. // If parallelRoutines <= 0; parallelRoutines will be equal to 1. // If parallelRoutines > len(records); parallelRoutines will be equal to len(records). - BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError + BulkPush(collection, bucket string, records []IngestBulkRecord) []IngestBulkError // Pop search data from the index. // Command syntax POP "". @@ -36,7 +35,7 @@ type Ingestable interface { // dispatch the records at best. // If parallelRoutines <= 0; parallelRoutines will be equal to 1. // If parallelRoutines > len(records); parallelRoutines will be equal to len(records). - BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError + BulkPop(collection, bucket string, records []IngestBulkRecord) []IngestBulkError // Count indexed search data. // bucket and object are optionals, empty string ignore it. @@ -107,32 +106,15 @@ func (i IngesterChannel) Push(collection, bucket, object, text string) (err erro return nil } -func (i IngesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError { - if parallelRoutines <= 0 { - parallelRoutines = 1 - } - +func (i IngesterChannel) BulkPush(collection, bucket string, records []IngestBulkRecord) []IngestBulkError { errs := make([]IngestBulkError, 0) - mutex := sync.Mutex{} - // chunk array into N (parallelRoutines) parts - divided := i.divideIngestBulkRecords(records, parallelRoutines) - - // dispatch each records array into N goroutines - group := sync.WaitGroup{} - group.Add(len(divided)) - for _, r := range divided { - go func(recs []IngestBulkRecord) { - for _, rec := range recs { - if err := i.Push(collection, bucket, rec.Object, rec.Text); err != nil { - mutex.Lock() - errs = append(errs, IngestBulkError{rec.Object, err}) - mutex.Unlock() - } - } - group.Done() - }(r) + for _, v := range records { + if err := i.Push(collection, bucket, v.Object, v.Text); err != nil { + errs = append(errs, IngestBulkError{v.Object, err}) + } } + return errs } @@ -150,32 +132,15 @@ func (i IngesterChannel) Pop(collection, bucket, object, text string) (err error return nil } -func (i IngesterChannel) BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError { - if parallelRoutines <= 0 { - parallelRoutines = 1 - } - +func (i IngesterChannel) BulkPop(collection, bucket string, records []IngestBulkRecord) []IngestBulkError { errs := make([]IngestBulkError, 0) - mutex := sync.Mutex{} - // chunk array into N (parallelRoutines) parts - divided := i.divideIngestBulkRecords(records, parallelRoutines) - - // dispatch each records array into N goroutines - group := sync.WaitGroup{} - group.Add(len(divided)) - for _, r := range divided { - go func(recs []IngestBulkRecord) { - for _, rec := range recs { - if err := i.Pop(collection, bucket, rec.Object, rec.Text); err != nil { - mutex.Lock() - errs = append(errs, IngestBulkError{rec.Object, err}) - mutex.Unlock() - } - } - group.Done() - }(r) + for _, v := range records { + if err := i.Push(collection, bucket, v.Object, v.Text); err != nil { + errs = append(errs, IngestBulkError{v.Object, err}) + } } + return errs } diff --git a/sonic/ingester_test.go b/sonic/ingester_test.go deleted file mode 100644 index 2876b87..0000000 --- a/sonic/ingester_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package sonic - -import ( - "math/rand" - "runtime" - "testing" - "time" -) - -var records = make([]IngestBulkRecord, 0) -var ingester, err = NewIngester("localhost", 1491, "SecretPassword") - -func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) { - if err != nil { - return - } - - cpus := runtime.NumCPU() - - for n := 0; n < b.N; n++ { - _ = ingester.FlushBucket("test", "testMaxCpus") - ingester.BulkPush("test", "testMaxCpus", cpus, records) - } -} - -func BenchmarkIngesterChannel_Push(b *testing.B) { - if err != nil { - return - } - - for n := 0; n < b.N; n++ { - _ = ingester.FlushBucket("test", "testMaxCpus") - for _, v := range records { - _ = ingester.Push("test", "testMaxCpus", v.Object, v.Text) - } - } -} - -func BenchmarkIngesterChannel_BulkPush10(b *testing.B) { - if err != nil { - return - } - - for n := 0; n < b.N; n++ { - _ = ingester.FlushBucket("test", "testMaxCpus") - ingester.BulkPush("test", "testMaxCpus", 10, records) - } -} - -const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_" - -var seededRand = rand.New(rand.NewSource(time.Now().UnixNano())) - -func randStr(length int, charset string) string { - b := make([]byte, length) - for i := range b { - b[i] = charset[seededRand.Intn(len(charset))] - } - return string(b) -} - -func init() { - for n := 0; n < 1000; n++ { - records = append(records, IngestBulkRecord{randStr(10, charset), randStr(10, charset)}) - } -}