diff --git a/cmd/example/main.go b/cmd/example/main.go index c48e897..5c25254 100644 --- a/cmd/example/main.go +++ b/cmd/example/main.go @@ -14,7 +14,7 @@ func main() { // I will ignore all errors for demonstration purposes - _ = ingester.BulkPush("movies", "general", []sonic.IngestBulkRecord{ + _, _ = ingester.BulkPush("movies", "general", 2, []sonic.IngestBulkRecord{ {"id:6ab56b4kk3", "Star wars"}, {"id:5hg67f8dg5", "Spider man"}, {"id:1m2n3b4vf6", "Batman"}, diff --git a/sonic/actions.go b/sonic/actions.go index 906b38b..31e6a9a 100644 --- a/sonic/actions.go +++ b/sonic/actions.go @@ -1,11 +1,15 @@ package sonic +// Action refer to list of actions for TRIGGER command. type Action string const ( + // Consolidate action is not detailed in the sonic protocol. Consolidate Action = "consolidate" ) +// IsActionValid check if the action passed in parameter is valid. +// Mean that TRIGGER command can handle it. func IsActionValid(action Action) bool { return action == Consolidate } diff --git a/sonic/channels.go b/sonic/channels.go index 99867c3..d83a6e5 100644 --- a/sonic/channels.go +++ b/sonic/channels.go @@ -1,13 +1,20 @@ package sonic +// Channel refer to the list of channels available. type Channel string const ( - Search Channel = "search" - Ingest Channel = "ingest" + // Search is used for querying the search index. + Search Channel = "search" + + // Ingest is used for altering the search index (push, pop and flush). + Ingest Channel = "ingest" + + // Control is used for administration purposes. Control Channel = "control" ) +// IsChannelValid check if the parameter is a valid channel. func IsChannelValid(ch Channel) bool { return ch == Search || ch == Ingest || ch == Control } diff --git a/sonic/connection.go b/sonic/connection.go new file mode 100644 index 0000000..e31c53d --- /dev/null +++ b/sonic/connection.go @@ -0,0 +1,84 @@ +package sonic + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "net" + "strings" +) + +type connection struct { + reader *bufio.Reader + conn net.Conn + closed bool +} + +func newConnection(d *driver) (*connection, error) { + c := &connection{} + c.close() + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", d.Host, d.Port)) + if err != nil { + return nil, err + } + + c.closed = false + c.conn = conn + c.reader = bufio.NewReader(c.conn) + + err = c.write(fmt.Sprintf("START %s %s", d.channel, d.Password)) + if err != nil { + return nil, err + } + _, err = c.read() + _, err = c.read() + if err != nil { + return nil, err + } + return c, nil +} + +func (c *connection) read() (string, error) { + if c.closed { + return "", ErrClosed + } + buffer := bytes.Buffer{} + for { + line, isPrefix, err := c.reader.ReadLine() + buffer.Write(line) + if err != nil { + if err == io.EOF { + c.close() + } + return "", err + } + if !isPrefix { + break + } + } + + str := buffer.String() + if strings.HasPrefix(str, "ERR ") { + return "", errors.New(str[4:]) + } + return str, nil +} + +func (c connection) write(str string) error { + if c.closed { + return ErrClosed + } + _, err := c.conn.Write([]byte(str + "\r\n")) + return err +} + +func (c *connection) close() { + if c.conn != nil { + _ = c.conn.Close() + c.conn = nil + } + c.closed = true + c.reader = nil +} diff --git a/sonic/control.go b/sonic/control.go index 1820f51..58c5cb2 100644 --- a/sonic/control.go +++ b/sonic/control.go @@ -5,6 +5,7 @@ import ( "fmt" ) +// ErrActionName is throw when the action is invalid. var ErrActionName = errors.New("invalid action name") // Controllable is used for administration purposes. @@ -16,16 +17,19 @@ type Controllable interface { // Quit refer to the Base interface Quit() (err error) - // Quit refer to the Base interface + // Ping refer to the Base interface Ping() (err error) } -type ControlChannel struct { - *Driver +// controlChannel is used for administration purposes. +type controlChannel struct { + *driver } +// NewControl create a new driver instance with a controlChannel instance. +// Only way to get a Controllable implementation. func NewControl(host string, port int, password string) (Controllable, error) { - driver := &Driver{ + driver := &driver{ Host: host, Port: port, Password: password, @@ -35,12 +39,12 @@ func NewControl(host string, port int, password string) (Controllable, error) { if err != nil { return nil, err } - return ControlChannel{ - Driver: driver, + return controlChannel{ + driver: driver, }, nil } -func (c ControlChannel) Trigger(action Action) (err error) { +func (c controlChannel) Trigger(action Action) (err error) { if IsActionValid(action) { return ErrActionName } diff --git a/sonic/driver.go b/sonic/driver.go index 7de10b7..2326bf2 100644 --- a/sonic/driver.go +++ b/sonic/driver.go @@ -1,13 +1,7 @@ package sonic import ( - "bufio" - "bytes" "errors" - "fmt" - "io" - "net" - "strings" ) var ( @@ -28,47 +22,27 @@ type Base interface { Ping() error } -type Driver struct { +type driver struct { Host string Port int Password string channel Channel - reader *bufio.Reader - conn net.Conn - closed bool + *connection } // Connect open a connection via TCP with the sonic server. -func (c *Driver) Connect() error { +func (c *driver) Connect() error { if !IsChannelValid(c.channel) { return ErrChanName } - c.clean() - conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", c.Host, c.Port)) - if err != nil { - return err - } - - c.closed = false - c.conn = conn - c.reader = bufio.NewReader(c.conn) - - err = c.write(fmt.Sprintf("START %s %s", c.channel, c.Password)) - if err != nil { - return err - } - - _, err = c.read() - _, err = c.read() - if err != nil { - return err - } - return nil + var err error + c.connection, err = newConnection(c) + return err } -func (c *Driver) Quit() error { +func (c *driver) Quit() error { err := c.write("QUIT") if err != nil { return err @@ -76,11 +50,11 @@ func (c *Driver) Quit() error { // should get ENDED _, err = c.read() - c.clean() + c.close() return err } -func (c Driver) Ping() error { +func (c driver) Ping() error { err := c.write("PING") if err != nil { return err @@ -93,46 +67,3 @@ func (c Driver) Ping() error { } return nil } - -func (c *Driver) read() (string, error) { - if c.closed { - return "", ErrClosed - } - buffer := bytes.Buffer{} - for { - line, isPrefix, err := c.reader.ReadLine() - buffer.Write(line) - if err != nil { - if err == io.EOF { - c.clean() - } - return "", err - } - if !isPrefix { - break - } - } - - str := buffer.String() - if strings.HasPrefix(str, "ERR ") { - return "", errors.New(str[4:]) - } - return str, nil -} - -func (c Driver) write(str string) error { - if c.closed { - return ErrClosed - } - _, err := c.conn.Write([]byte(str + "\r\n")) - return err -} - -func (c *Driver) clean() { - if c.conn != nil { - _ = c.conn.Close() - c.conn = nil - } - c.closed = true - c.reader = nil -} diff --git a/sonic/ingester.go b/sonic/ingester.go index 25a1eae..9adc75b 100644 --- a/sonic/ingester.go +++ b/sonic/ingester.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" "strings" + "sync" ) type IngestBulkRecord struct { @@ -25,7 +26,7 @@ type Ingestable interface { // dispatch the records at best. // If parallelRoutines <= 0; parallelRoutines will be equal to 1. // If parallelRoutines > len(records); parallelRoutines will be equal to len(records). - BulkPush(collection, bucket string, records []IngestBulkRecord) []IngestBulkError + BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) ([]IngestBulkError, error) // Pop search data from the index. // Command syntax POP "". @@ -35,7 +36,7 @@ type Ingestable interface { // dispatch the records at best. // If parallelRoutines <= 0; parallelRoutines will be equal to 1. // If parallelRoutines > len(records); parallelRoutines will be equal to len(records). - BulkPop(collection, bucket string, records []IngestBulkRecord) []IngestBulkError + BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) ([]IngestBulkError, error) // Count indexed search data. // bucket and object are optionals, empty string ignore it. @@ -57,7 +58,7 @@ type Ingestable interface { // Quit refer to the Base interface Quit() (err error) - // Quit refer to the Base interface + // Ping refer to the Base interface Ping() (err error) } @@ -72,12 +73,14 @@ const ( flusho ingesterCommands = "FLUSHO" ) -type IngesterChannel struct { - *Driver +type ingesterChannel struct { + *driver } +// NewIngester create a new driver instance with a ingesterChannel instance. +// Only way to get a Ingestable implementation. func NewIngester(host string, port int, password string) (Ingestable, error) { - driver := &Driver{ + driver := &driver{ Host: host, Port: port, Password: password, @@ -87,12 +90,12 @@ func NewIngester(host string, port int, password string) (Ingestable, error) { if err != nil { return nil, err } - return IngesterChannel{ - Driver: driver, + return ingesterChannel{ + driver: driver, }, nil } -func (i IngesterChannel) Push(collection, bucket, object, text string) (err error) { +func (i ingesterChannel) Push(collection, bucket, object, text string) (err error) { err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, object, text)) if err != nil { return err @@ -106,19 +109,55 @@ func (i IngesterChannel) Push(collection, bucket, object, text string) (err erro return nil } -func (i IngesterChannel) BulkPush(collection, bucket string, records []IngestBulkRecord) []IngestBulkError { - errs := make([]IngestBulkError, 0) - - for _, v := range records { - if err := i.Push(collection, bucket, v.Object, v.Text); err != nil { - errs = append(errs, IngestBulkError{v.Object, err}) - } +func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError, err error) { + if parallelRoutines <= 0 { + parallelRoutines = 1 } - return errs + err = nil + errs = make([]IngestBulkError, 0) + errMutex := sync.Mutex{} + + // chunk array into N (parallelRoutines) parts + divided := i.divideIngestBulkRecords(records, parallelRoutines) + + // dispatch each records array into N goroutines + group := sync.WaitGroup{} + group.Add(len(divided)) + for _, r := range divided { + go func(recs []IngestBulkRecord) { + var conn *connection + + errMutex.Lock() + conn, err = newConnection(i.driver) + errMutex.Unlock() + + for _, rec := range recs { + err := conn.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, rec.Object, rec.Text)) + if err != nil { + errMutex.Lock() + errs = append(errs, IngestBulkError{rec.Object, err}) + errMutex.Unlock() + continue + } + + // sonic should sent OK + _, err = conn.read() + if err != nil { + errMutex.Lock() + errs = append(errs, IngestBulkError{rec.Object, err}) + errMutex.Unlock() + } + } + conn.close() + group.Done() + }(r) + } + group.Wait() + return errs, err } -func (i IngesterChannel) Pop(collection, bucket, object, text string) (err error) { +func (i ingesterChannel) Pop(collection, bucket, object, text string) (err error) { err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", pop, collection, bucket, object, text)) if err != nil { return err @@ -132,19 +171,55 @@ func (i IngesterChannel) Pop(collection, bucket, object, text string) (err error return nil } -func (i IngesterChannel) BulkPop(collection, bucket string, records []IngestBulkRecord) []IngestBulkError { - errs := make([]IngestBulkError, 0) - - for _, v := range records { - if err := i.Push(collection, bucket, v.Object, v.Text); err != nil { - errs = append(errs, IngestBulkError{v.Object, err}) - } +func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError, err error) { + if parallelRoutines <= 0 { + parallelRoutines = 1 } - return errs + err = nil + errs = make([]IngestBulkError, 0) + errMutex := sync.Mutex{} + + // chunk array into N (parallelRoutines) parts + divided := i.divideIngestBulkRecords(records, parallelRoutines) + + // dispatch each records array into N goroutines + group := sync.WaitGroup{} + group.Add(len(divided)) + for _, r := range divided { + go func(recs []IngestBulkRecord) { + var conn *connection + + errMutex.Lock() + conn, err = newConnection(i.driver) + errMutex.Unlock() + + for _, rec := range recs { + err := conn.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, rec.Object, rec.Text)) + if err != nil { + errMutex.Lock() + errs = append(errs, IngestBulkError{rec.Object, err}) + errMutex.Unlock() + continue + } + + // sonic should sent OK + _, err = conn.read() + if err != nil { + errMutex.Lock() + errs = append(errs, IngestBulkError{rec.Object, err}) + errMutex.Unlock() + } + } + conn.close() + group.Done() + }(r) + } + group.Wait() + return errs, err } -func (i IngesterChannel) Count(collection, bucket, object string) (cnt int, err error) { +func (i ingesterChannel) Count(collection, bucket, object string) (cnt int, err error) { err = i.write(fmt.Sprintf("%s %s %s", count, collection, buildCountQuery(bucket, object))) if err != nil { return 0, err @@ -169,7 +244,7 @@ func buildCountQuery(bucket, object string) string { return builder.String() } -func (i IngesterChannel) FlushCollection(collection string) (err error) { +func (i ingesterChannel) FlushCollection(collection string) (err error) { err = i.write(fmt.Sprintf("%s %s", flushc, collection)) if err != nil { return err @@ -183,7 +258,7 @@ func (i IngesterChannel) FlushCollection(collection string) (err error) { return nil } -func (i IngesterChannel) FlushBucket(collection, bucket string) (err error) { +func (i ingesterChannel) FlushBucket(collection, bucket string) (err error) { err = i.write(fmt.Sprintf("%s %s %s", flushb, collection, bucket)) if err != nil { return err @@ -197,7 +272,7 @@ func (i IngesterChannel) FlushBucket(collection, bucket string) (err error) { return nil } -func (i IngesterChannel) FlushObject(collection, bucket, object string) (err error) { +func (i ingesterChannel) FlushObject(collection, bucket, object string) (err error) { err = i.write(fmt.Sprintf("%s %s %s %s", flusho, collection, bucket, object)) if err != nil { return err @@ -211,7 +286,7 @@ func (i IngesterChannel) FlushObject(collection, bucket, object string) (err err return nil } -func (i IngesterChannel) divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [][]IngestBulkRecord { +func (i ingesterChannel) divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [][]IngestBulkRecord { var divided [][]IngestBulkRecord chunkSize := (len(records) + parallelRoutines - 1) / parallelRoutines for i := 0; i < len(records); i += chunkSize { diff --git a/sonic/ingester_test.go b/sonic/ingester_test.go new file mode 100644 index 0000000..5082479 --- /dev/null +++ b/sonic/ingester_test.go @@ -0,0 +1,90 @@ +package sonic + +import ( + "math/rand" + "runtime" + "testing" + "time" +) + +var records = make([]IngestBulkRecord, 0) +var ingester, err = NewIngester("localhost", 1491, "SecretPassword") + +func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) { + if err != nil { + return + } + + cpus := runtime.NumCPU() + + for n := 0; n < b.N; n++ { + e := ingester.FlushBucket("test", "testMaxCpus") + if e != nil { + b.Log(e) + b.Fail() + } + be, e := ingester.BulkPush("test", "testMaxCpus", cpus, records) + if len(be) > 0 || e != nil { + b.Log(be, e) + b.Fail() + } + } +} + +func BenchmarkIngesterChannel_BulkPush10(b *testing.B) { + if err != nil { + return + } + + for n := 0; n < b.N; n++ { + e := ingester.FlushBucket("test", "test10") + if e != nil { + b.Log(e) + b.Fail() + } + be, e := ingester.BulkPush("test", "test10", 10, records) + if len(be) > 0 || err != nil { + b.Log(be, err) + b.Fail() + } + } +} + +func BenchmarkIngesterChannel_Push(b *testing.B) { + if err != nil { + return + } + + for n := 0; n < b.N; n++ { + e := ingester.FlushBucket("test", "testBulk") + if e != nil { + b.Log(e) + b.Fail() + } + for _, v := range records { + e := ingester.Push("test", "testBulk", v.Object, v.Text) + if e != nil { + b.Log(e) + b.Fail() + } + } + } +} + +const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_" + +var seededRand = rand.New(rand.NewSource(time.Now().UnixNano())) + +func randStr(length int, charset string) string { + b := make([]byte, length) + for i := range b { + b[i] = charset[seededRand.Intn(len(charset))] + } + return string(b) +} + +func init() { + for n := 0; n < 3000; n++ { + records = append(records, IngestBulkRecord{randStr(10, charset), randStr(10, charset)}) + } +} diff --git a/sonic/search.go b/sonic/search.go index 4ad84de..89ca02f 100644 --- a/sonic/search.go +++ b/sonic/search.go @@ -20,7 +20,7 @@ type Searchable interface { // Quit refer to the Base interface Quit() (err error) - // Quit refer to the Base interface + // Ping refer to the Base interface Ping() (err error) } @@ -31,12 +31,14 @@ const ( suggest searchCommands = "SUGGEST" ) -type SearchChannel struct { - *Driver +type searchChannel struct { + *driver } +// NewIngester create a new driver instance with a searchChannel instance. +// Only way to get a Searchable implementation. func NewSearch(host string, port int, password string) (Searchable, error) { - driver := &Driver{ + driver := &driver{ Host: host, Port: port, Password: password, @@ -46,12 +48,12 @@ func NewSearch(host string, port int, password string) (Searchable, error) { if err != nil { return nil, err } - return SearchChannel{ - Driver: driver, + return searchChannel{ + driver: driver, }, nil } -func (s SearchChannel) Query(collection, bucket, term string, limit, offset int) (results []string, err error) { +func (s searchChannel) Query(collection, bucket, term string, limit, offset int) (results []string, err error) { err = s.write(fmt.Sprintf("%s %s %s \"%s\" LIMIT(%d) OFFSET(%d)", query, collection, bucket, term, limit, offset)) if err != nil { return nil, err @@ -71,7 +73,7 @@ func (s SearchChannel) Query(collection, bucket, term string, limit, offset int) return getSearchResults(read, string(query)), nil } -func (s SearchChannel) Suggest(collection, bucket, word string, limit int) (results []string, err error) { +func (s searchChannel) Suggest(collection, bucket, word string, limit int) (results []string, err error) { err = s.write(fmt.Sprintf("%s %s %s \"%s\" LIMIT(%d)", suggest, collection, bucket, word, limit)) if err != nil { return nil, err