fix global err names

add BulkPush and BulkPop
This commit is contained in:
alexisvisco 2019-03-27 11:09:09 +01:00
parent 33d5ed706f
commit 80b7d7367f
4 changed files with 168 additions and 9 deletions

View file

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
) )
var InvalidActionName = errors.New("invalid action name") var ErrActionName = errors.New("invalid action name")
// Controllable is used for administration purposes. // Controllable is used for administration purposes.
type Controllable interface { type Controllable interface {
@ -42,7 +42,7 @@ func NewControl(host string, port int, password string) (Controllable, error) {
func (c ControlChannel) Trigger(action Action) (err error) { func (c ControlChannel) Trigger(action Action) (err error) {
if IsActionValid(action) { if IsActionValid(action) {
return InvalidActionName return ErrActionName
} }
err = c.write(fmt.Sprintf("TRIGGER %s", action)) err = c.write(fmt.Sprintf("TRIGGER %s", action))
if err != nil { if err != nil {

View file

@ -11,8 +11,8 @@ import (
) )
var ( var (
ClosedError = errors.New("sonic connection is closed") ErrClosed = errors.New("sonic connection is closed")
InvalidChanName = errors.New("invalid channel name") ErrChanName = errors.New("invalid channel name")
) )
// Base contains commons commands to all channels. // Base contains commons commands to all channels.
@ -42,7 +42,7 @@ type Driver struct {
// Connect open a connection via TCP with the sonic server. // Connect open a connection via TCP with the sonic server.
func (c *Driver) Connect() error { func (c *Driver) Connect() error {
if !IsChannelValid(c.channel) { if !IsChannelValid(c.channel) {
return InvalidChanName return ErrChanName
} }
c.clean() c.clean()
@ -96,7 +96,7 @@ func (c Driver) Ping() error {
func (c *Driver) read() (string, error) { func (c *Driver) read() (string, error) {
if c.closed { if c.closed {
return "", ClosedError return "", ErrClosed
} }
buffer := bytes.Buffer{} buffer := bytes.Buffer{}
for { for {
@ -122,7 +122,7 @@ func (c *Driver) read() (string, error) {
func (c Driver) write(str string) error { func (c Driver) write(str string) error {
if c.closed { if c.closed {
return ClosedError return ErrClosed
} }
_, err := c.conn.Write([]byte(str + "\r\n")) _, err := c.conn.Write([]byte(str + "\r\n"))
return err return err

View file

@ -4,18 +4,40 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"sync"
) )
type IngestBulkRecord struct {
Object, Text string
}
type IngestBulkError struct {
Object string
Error error
}
// Ingestable is used for altering the search index (push, pop and flush). // Ingestable is used for altering the search index (push, pop and flush).
type Ingestable interface { type Ingestable interface {
// Push search data in the index. // Push search data in the index.
// Command syntax PUSH <collection> <bucket> <object> "<text>" // Command syntax PUSH <collection> <bucket> <object> "<text>"
Push(collection, bucket, object, text string) (err error) Push(collection, bucket, object, text string) (err error)
// BulkPush will execute N (parallelRoutines) goroutines at the same time to
// dispatch the records at best.
// If parallelRoutines <= 0; parallelRoutines will be equal to 1.
// If parallelRoutines > len(records); parallelRoutines will be equal to len(records).
BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError
// Pop search data from the index. // Pop search data from the index.
// Command syntax POP <collection> <bucket> <object> "<text>". // Command syntax POP <collection> <bucket> <object> "<text>".
Pop(collection, bucket, object, text string) (err error) Pop(collection, bucket, object, text string) (err error)
// BulkPop will execute N (parallelRoutines) goroutines at the same time to
// dispatch the records at best.
// If parallelRoutines <= 0; parallelRoutines will be equal to 1.
// If parallelRoutines > len(records); parallelRoutines will be equal to len(records).
BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError
// Count indexed search data. // Count indexed search data.
// bucket and object are optionals, empty string ignore it. // bucket and object are optionals, empty string ignore it.
// Command syntax COUNT <collection> [<bucket> [<object>]?]?. // Command syntax COUNT <collection> [<bucket> [<object>]?]?.
@ -85,6 +107,35 @@ func (i IngesterChannel) Push(collection, bucket, object, text string) (err erro
return nil return nil
} }
func (i IngesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError {
if parallelRoutines <= 0 {
parallelRoutines = 1
}
errs := make([]IngestBulkError, 0)
mutex := sync.Mutex{}
// chunk array into N (parallelRoutines) parts
divided := i.divideIngestBulkRecords(records, parallelRoutines)
// dispatch each records array into N goroutines
group := sync.WaitGroup{}
group.Add(len(divided))
for _, r := range divided {
go func(recs []IngestBulkRecord) {
for _, rec := range recs {
if err := i.Push(collection, bucket, rec.Object, rec.Text); err != nil {
mutex.Lock()
errs = append(errs, IngestBulkError{rec.Object, err})
mutex.Unlock()
}
}
group.Done()
}(r)
}
return errs
}
func (i IngesterChannel) Pop(collection, bucket, object, text string) (err error) { func (i IngesterChannel) Pop(collection, bucket, object, text string) (err error) {
err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", pop, collection, bucket, object, text)) err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", pop, collection, bucket, object, text))
if err != nil { if err != nil {
@ -99,6 +150,35 @@ func (i IngesterChannel) Pop(collection, bucket, object, text string) (err error
return nil return nil
} }
func (i IngesterChannel) BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError {
if parallelRoutines <= 0 {
parallelRoutines = 1
}
errs := make([]IngestBulkError, 0)
mutex := sync.Mutex{}
// chunk array into N (parallelRoutines) parts
divided := i.divideIngestBulkRecords(records, parallelRoutines)
// dispatch each records array into N goroutines
group := sync.WaitGroup{}
group.Add(len(divided))
for _, r := range divided {
go func(recs []IngestBulkRecord) {
for _, rec := range recs {
if err := i.Pop(collection, bucket, rec.Object, rec.Text); err != nil {
mutex.Lock()
errs = append(errs, IngestBulkError{rec.Object, err})
mutex.Unlock()
}
}
group.Done()
}(r)
}
return errs
}
func (i IngesterChannel) Count(collection, bucket, object string) (cnt int, err error) { func (i IngesterChannel) Count(collection, bucket, object string) (cnt int, err error) {
err = i.write(fmt.Sprintf("%s %s %s", count, collection, buildCountQuery(bucket, object))) err = i.write(fmt.Sprintf("%s %s %s", count, collection, buildCountQuery(bucket, object)))
if err != nil { if err != nil {
@ -165,3 +245,16 @@ func (i IngesterChannel) FlushObject(collection, bucket, object string) (err err
} }
return nil return nil
} }
func (i IngesterChannel) divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [][]IngestBulkRecord {
var divided [][]IngestBulkRecord
chunkSize := (len(records) + parallelRoutines - 1) / parallelRoutines
for i := 0; i < len(records); i += chunkSize {
end := i + chunkSize
if end > len(records) {
end = len(records)
}
divided = append(divided, records[i:end])
}
return divided
}

66
sonic/ingester_test.go Normal file
View file

@ -0,0 +1,66 @@
package sonic
import (
"math/rand"
"runtime"
"testing"
"time"
)
var records = make([]IngestBulkRecord, 0)
var ingester, err = NewIngester("localhost", 1491, "SecretPassword")
func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) {
if err != nil {
return
}
cpus := runtime.NumCPU()
for n := 0; n < b.N; n++ {
_ = ingester.FlushBucket("test", "testMaxCpus")
ingester.BulkPush("test", "testMaxCpus", cpus, records)
}
}
func BenchmarkIngesterChannel_Push(b *testing.B) {
if err != nil {
return
}
for n := 0; n < b.N; n++ {
_ = ingester.FlushBucket("test", "testMaxCpus")
for _, v := range records {
_ = ingester.Push("test", "testMaxCpus", v.Object, v.Text)
}
}
}
func BenchmarkIngesterChannel_BulkPush10(b *testing.B) {
if err != nil {
return
}
for n := 0; n < b.N; n++ {
_ = ingester.FlushBucket("test", "testMaxCpus")
ingester.BulkPush("test", "testMaxCpus", 10, records)
}
}
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_"
var seededRand = rand.New(rand.NewSource(time.Now().UnixNano()))
func randStr(length int, charset string) string {
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)
}
func init() {
for n := 0; n < 1000; n++ {
records = append(records, IngestBulkRecord{randStr(10, charset), randStr(10, charset)})
}
}