diff --git a/sonic/control.go b/sonic/control.go index 0d4155f..1820f51 100644 --- a/sonic/control.go +++ b/sonic/control.go @@ -5,7 +5,7 @@ import ( "fmt" ) -var InvalidActionName = errors.New("invalid action name") +var ErrActionName = errors.New("invalid action name") // Controllable is used for administration purposes. type Controllable interface { @@ -42,7 +42,7 @@ func NewControl(host string, port int, password string) (Controllable, error) { func (c ControlChannel) Trigger(action Action) (err error) { if IsActionValid(action) { - return InvalidActionName + return ErrActionName } err = c.write(fmt.Sprintf("TRIGGER %s", action)) if err != nil { diff --git a/sonic/driver.go b/sonic/driver.go index a0ae5cb..7de10b7 100644 --- a/sonic/driver.go +++ b/sonic/driver.go @@ -11,8 +11,8 @@ import ( ) var ( - ClosedError = errors.New("sonic connection is closed") - InvalidChanName = errors.New("invalid channel name") + ErrClosed = errors.New("sonic connection is closed") + ErrChanName = errors.New("invalid channel name") ) // Base contains commons commands to all channels. @@ -42,15 +42,15 @@ type Driver struct { // Connect open a connection via TCP with the sonic server. func (c *Driver) Connect() error { if !IsChannelValid(c.channel) { - return InvalidChanName + return ErrChanName } c.clean() conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", c.Host, c.Port)) if err != nil { return err - } - + } + c.closed = false c.conn = conn c.reader = bufio.NewReader(c.conn) @@ -96,7 +96,7 @@ func (c Driver) Ping() error { func (c *Driver) read() (string, error) { if c.closed { - return "", ClosedError + return "", ErrClosed } buffer := bytes.Buffer{} for { @@ -122,7 +122,7 @@ func (c *Driver) read() (string, error) { func (c Driver) write(str string) error { if c.closed { - return ClosedError + return ErrClosed } _, err := c.conn.Write([]byte(str + "\r\n")) return err diff --git a/sonic/ingester.go b/sonic/ingester.go index 2492d98..ee0f58e 100644 --- a/sonic/ingester.go +++ b/sonic/ingester.go @@ -4,18 +4,40 @@ import ( "fmt" "strconv" "strings" + "sync" ) +type IngestBulkRecord struct { + Object, Text string +} + +type IngestBulkError struct { + Object string + Error error +} + // Ingestable is used for altering the search index (push, pop and flush). type Ingestable interface { // Push search data in the index. // Command syntax PUSH "" Push(collection, bucket, object, text string) (err error) + // BulkPush will execute N (parallelRoutines) goroutines at the same time to + // dispatch the records at best. + // If parallelRoutines <= 0; parallelRoutines will be equal to 1. + // If parallelRoutines > len(records); parallelRoutines will be equal to len(records). + BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError + // Pop search data from the index. // Command syntax POP "". Pop(collection, bucket, object, text string) (err error) + // BulkPop will execute N (parallelRoutines) goroutines at the same time to + // dispatch the records at best. + // If parallelRoutines <= 0; parallelRoutines will be equal to 1. + // If parallelRoutines > len(records); parallelRoutines will be equal to len(records). + BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError + // Count indexed search data. // bucket and object are optionals, empty string ignore it. // Command syntax COUNT [ []?]?. @@ -85,6 +107,35 @@ func (i IngesterChannel) Push(collection, bucket, object, text string) (err erro return nil } +func (i IngesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError { + if parallelRoutines <= 0 { + parallelRoutines = 1 + } + + errs := make([]IngestBulkError, 0) + mutex := sync.Mutex{} + + // chunk array into N (parallelRoutines) parts + divided := i.divideIngestBulkRecords(records, parallelRoutines) + + // dispatch each records array into N goroutines + group := sync.WaitGroup{} + group.Add(len(divided)) + for _, r := range divided { + go func(recs []IngestBulkRecord) { + for _, rec := range recs { + if err := i.Push(collection, bucket, rec.Object, rec.Text); err != nil { + mutex.Lock() + errs = append(errs, IngestBulkError{rec.Object, err}) + mutex.Unlock() + } + } + group.Done() + }(r) + } + return errs +} + func (i IngesterChannel) Pop(collection, bucket, object, text string) (err error) { err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", pop, collection, bucket, object, text)) if err != nil { @@ -99,6 +150,35 @@ func (i IngesterChannel) Pop(collection, bucket, object, text string) (err error return nil } +func (i IngesterChannel) BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError { + if parallelRoutines <= 0 { + parallelRoutines = 1 + } + + errs := make([]IngestBulkError, 0) + mutex := sync.Mutex{} + + // chunk array into N (parallelRoutines) parts + divided := i.divideIngestBulkRecords(records, parallelRoutines) + + // dispatch each records array into N goroutines + group := sync.WaitGroup{} + group.Add(len(divided)) + for _, r := range divided { + go func(recs []IngestBulkRecord) { + for _, rec := range recs { + if err := i.Pop(collection, bucket, rec.Object, rec.Text); err != nil { + mutex.Lock() + errs = append(errs, IngestBulkError{rec.Object, err}) + mutex.Unlock() + } + } + group.Done() + }(r) + } + return errs +} + func (i IngesterChannel) Count(collection, bucket, object string) (cnt int, err error) { err = i.write(fmt.Sprintf("%s %s %s", count, collection, buildCountQuery(bucket, object))) if err != nil { @@ -165,3 +245,16 @@ func (i IngesterChannel) FlushObject(collection, bucket, object string) (err err } return nil } + +func (i IngesterChannel) divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [][]IngestBulkRecord { + var divided [][]IngestBulkRecord + chunkSize := (len(records) + parallelRoutines - 1) / parallelRoutines + for i := 0; i < len(records); i += chunkSize { + end := i + chunkSize + if end > len(records) { + end = len(records) + } + divided = append(divided, records[i:end]) + } + return divided +} diff --git a/sonic/ingester_test.go b/sonic/ingester_test.go new file mode 100644 index 0000000..2876b87 --- /dev/null +++ b/sonic/ingester_test.go @@ -0,0 +1,66 @@ +package sonic + +import ( + "math/rand" + "runtime" + "testing" + "time" +) + +var records = make([]IngestBulkRecord, 0) +var ingester, err = NewIngester("localhost", 1491, "SecretPassword") + +func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) { + if err != nil { + return + } + + cpus := runtime.NumCPU() + + for n := 0; n < b.N; n++ { + _ = ingester.FlushBucket("test", "testMaxCpus") + ingester.BulkPush("test", "testMaxCpus", cpus, records) + } +} + +func BenchmarkIngesterChannel_Push(b *testing.B) { + if err != nil { + return + } + + for n := 0; n < b.N; n++ { + _ = ingester.FlushBucket("test", "testMaxCpus") + for _, v := range records { + _ = ingester.Push("test", "testMaxCpus", v.Object, v.Text) + } + } +} + +func BenchmarkIngesterChannel_BulkPush10(b *testing.B) { + if err != nil { + return + } + + for n := 0; n < b.N; n++ { + _ = ingester.FlushBucket("test", "testMaxCpus") + ingester.BulkPush("test", "testMaxCpus", 10, records) + } +} + +const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_" + +var seededRand = rand.New(rand.NewSource(time.Now().UnixNano())) + +func randStr(length int, charset string) string { + b := make([]byte, length) + for i := range b { + b[i] = charset[seededRand.Intn(len(charset))] + } + return string(b) +} + +func init() { + for n := 0; n < 1000; n++ { + records = append(records, IngestBulkRecord{randStr(10, charset), randStr(10, charset)}) + } +}