344 lines
9 KiB
Go
344 lines
9 KiB
Go
package sonic
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"unicode/utf8"
|
|
)
|
|
|
|
// IngestBulkRecord is the struct to be used as a list in bulk operation.
|
|
type IngestBulkRecord struct {
|
|
Object, Text string
|
|
}
|
|
|
|
// IngestBulkError represent an error for a given object in a bulk operation.
|
|
type IngestBulkError struct {
|
|
Object string
|
|
Error error
|
|
}
|
|
|
|
// Ingestable is used for altering the search index (push, pop and flush).
|
|
type Ingestable interface {
|
|
// Push search data in the index.
|
|
// Command syntax PUSH <collection> <bucket> <object> "<text>" [LANG(<locale>)]?
|
|
Push(collection, bucket, object, text string, lang Lang) (err error)
|
|
|
|
// BulkPush will execute N (parallelRoutines) goroutines at the same time to
|
|
// dispatch the records at best.
|
|
// If parallelRoutines <= 0; parallelRoutines will be equal to 1.
|
|
// If parallelRoutines > len(records); parallelRoutines will be equal to len(records).
|
|
BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord, lang Lang) []IngestBulkError
|
|
|
|
// Pop search data from the index.
|
|
// Command syntax POP <collection> <bucket> <object> "<text>".
|
|
Pop(collection, bucket, object, text string) (err error)
|
|
|
|
// BulkPop will execute N (parallelRoutines) goroutines at the same time to
|
|
// dispatch the records at best.
|
|
// If parallelRoutines <= 0; parallelRoutines will be equal to 1.
|
|
// If parallelRoutines > len(records); parallelRoutines will be equal to len(records).
|
|
BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError
|
|
|
|
// Count indexed search data.
|
|
// bucket and object are optionals, empty string ignore it.
|
|
// Command syntax COUNT <collection> [<bucket> [<object>]?]?.
|
|
Count(collection, bucket, object string) (count int, err error)
|
|
|
|
// FlushCollection Flush all indexed data from a collection.
|
|
// Command syntax FLUSHC <collection>.
|
|
FlushCollection(collection string) (err error)
|
|
|
|
// Flush all indexed data from a bucket in a collection.
|
|
// Command syntax FLUSHB <collection> <bucket>.
|
|
FlushBucket(collection, bucket string) (err error)
|
|
|
|
// Flush all indexed data from an object in a bucket in collection.
|
|
// Command syntax FLUSHO <collection> <bucket> <object>.
|
|
FlushObject(collection, bucket, object string) (err error)
|
|
|
|
// Quit refer to the Base interface
|
|
Quit() (err error)
|
|
|
|
// Ping refer to the Base interface
|
|
Ping() (err error)
|
|
}
|
|
type ingesterCommands string
|
|
|
|
const (
|
|
push ingesterCommands = "PUSH"
|
|
pop ingesterCommands = "POP"
|
|
count ingesterCommands = "COUNT"
|
|
flushb ingesterCommands = "FLUSHB"
|
|
flushc ingesterCommands = "FLUSHC"
|
|
flusho ingesterCommands = "FLUSHO"
|
|
)
|
|
|
|
type ingesterChannel struct {
|
|
*driver
|
|
}
|
|
|
|
// NewIngester create a new driver instance with a ingesterChannel instance.
|
|
// Only way to get a Ingestable implementation.
|
|
func NewIngester(host string, port int, password string) (Ingestable, error) {
|
|
driver := &driver{
|
|
Host: host,
|
|
Port: port,
|
|
Password: password,
|
|
channel: Ingest,
|
|
}
|
|
err := driver.Connect()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return ingesterChannel{
|
|
driver: driver,
|
|
}, nil
|
|
}
|
|
|
|
func (i ingesterChannel) Push(collection, bucket, object, text string, lang Lang) (err error) {
|
|
//
|
|
patterns := []struct {
|
|
Pattern string
|
|
Replacement string
|
|
}{{"\\", "\\\\"},
|
|
{"\n", "\\n"},
|
|
{"\"", "\\\""}}
|
|
for _, v := range patterns {
|
|
text = strings.Replace(text, v.Pattern, v.Replacement, -1)
|
|
}
|
|
|
|
chunks := splitText(text, i.cmdMaxBytes/2)
|
|
// split chunks with partial success will yield single error
|
|
for _, chunk := range chunks {
|
|
ff := fmt.Sprintf("%s %s %s %s \"%s\""+langFormat(lang), push, collection, bucket, object, chunk, lang)
|
|
err = i.write(ff)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// sonic should sent OK
|
|
_, err = i.read()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func langFormat(lang Lang) string {
|
|
if lang != "" {
|
|
return " LANG(%s)"
|
|
}
|
|
return "%s"
|
|
}
|
|
|
|
// Ensure splitting on a valid leading byte
|
|
// Slicing the string directly is more efficient than converting to []byte and back because
|
|
// since a string is immutable and a []byte isn't,
|
|
// the data must be copied to new memory upon conversion,
|
|
// taking O(n) time (both ways!),
|
|
// whereas slicing a string simply returns a new string header backed by the same array as the original
|
|
// (taking constant time).
|
|
func splitText(longString string, maxLen int) []string {
|
|
var splits []string
|
|
|
|
var l, r int
|
|
for l, r = 0, maxLen; r < len(longString); l, r = r, r+maxLen {
|
|
for !utf8.RuneStart(longString[r]) {
|
|
r--
|
|
}
|
|
splits = append(splits, longString[l:r])
|
|
}
|
|
splits = append(splits, longString[l:])
|
|
return splits
|
|
}
|
|
|
|
func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord, lang Lang) (errs []IngestBulkError) {
|
|
if parallelRoutines <= 0 {
|
|
parallelRoutines = 1
|
|
}
|
|
|
|
// chunk array into N (parallelRoutines) parts
|
|
divided := divideIngestBulkRecords(records, parallelRoutines)
|
|
|
|
bulkErrorChan := make(chan []IngestBulkError)
|
|
defer close(bulkErrorChan)
|
|
|
|
for _, r := range divided {
|
|
r := r
|
|
go func(driver *driver, collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) {
|
|
errs := make([]IngestBulkError, 0)
|
|
newIngester, _ := NewIngester(driver.Host, driver.Port, driver.Password)
|
|
|
|
for _, rec := range recs {
|
|
if newIngester == nil {
|
|
addBulkError(&errs, rec, ErrClosed)
|
|
continue
|
|
}
|
|
err := newIngester.Push(collection, bucket, rec.Object, rec.Text, lang)
|
|
if err != nil {
|
|
addBulkError(&errs, rec, err)
|
|
}
|
|
}
|
|
|
|
if newIngester != nil {
|
|
_ = newIngester.Quit()
|
|
}
|
|
bulkErrorChan <- errs
|
|
}(i.driver, collection, bucket, r, bulkErrorChan)
|
|
}
|
|
|
|
errs = make([]IngestBulkError, 0)
|
|
for range divided {
|
|
errs = append(errs, <-bulkErrorChan...)
|
|
}
|
|
|
|
return errs
|
|
}
|
|
|
|
func (i ingesterChannel) Pop(collection, bucket, object, text string) (err error) {
|
|
err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", pop, collection, bucket, object, text))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// sonic should sent OK
|
|
_, err = i.read()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError) {
|
|
if parallelRoutines <= 0 {
|
|
parallelRoutines = 1
|
|
}
|
|
|
|
// chunk array into N (parallelRoutines) parts
|
|
divided := divideIngestBulkRecords(records, parallelRoutines)
|
|
|
|
bulkErrorChan := make(chan []IngestBulkError)
|
|
defer close(bulkErrorChan)
|
|
|
|
for _, r := range divided {
|
|
r := r
|
|
go func(driver *driver, collection, bucket string, recs []IngestBulkRecord, bulkErrorChan chan<- []IngestBulkError) {
|
|
errs := make([]IngestBulkError, 0)
|
|
newIngester, _ := NewIngester(driver.Host, driver.Port, driver.Password)
|
|
|
|
for _, rec := range recs {
|
|
if newIngester == nil {
|
|
addBulkError(&errs, rec, ErrClosed)
|
|
continue
|
|
}
|
|
|
|
err := newIngester.Pop(collection, bucket, rec.Object, rec.Text)
|
|
|
|
if err != nil {
|
|
addBulkError(&errs, rec, err)
|
|
}
|
|
}
|
|
|
|
if newIngester != nil {
|
|
_ = newIngester.Quit()
|
|
}
|
|
|
|
bulkErrorChan <- errs
|
|
}(i.driver, collection, bucket, r, bulkErrorChan)
|
|
}
|
|
|
|
errs = make([]IngestBulkError, 0)
|
|
for range divided {
|
|
errs = append(errs, <-bulkErrorChan...)
|
|
}
|
|
|
|
return errs
|
|
}
|
|
|
|
func (i ingesterChannel) Count(collection, bucket, object string) (cnt int, err error) {
|
|
err = i.write(fmt.Sprintf("%s %s %s", count, collection, buildCountQuery(bucket, object)))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// RESULT NUMBER
|
|
r, err := i.read()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return strconv.Atoi(r[7:])
|
|
}
|
|
|
|
func buildCountQuery(bucket, object string) string {
|
|
builder := strings.Builder{}
|
|
if bucket != "" {
|
|
builder.WriteString(bucket)
|
|
if object != "" {
|
|
builder.WriteString(" " + object)
|
|
}
|
|
}
|
|
return builder.String()
|
|
}
|
|
|
|
func (i ingesterChannel) FlushCollection(collection string) (err error) {
|
|
err = i.write(fmt.Sprintf("%s %s", flushc, collection))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// sonic should sent OK
|
|
_, err = i.read()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (i ingesterChannel) FlushBucket(collection, bucket string) (err error) {
|
|
err = i.write(fmt.Sprintf("%s %s %s", flushb, collection, bucket))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// sonic should sent OK
|
|
_, err = i.read()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (i ingesterChannel) FlushObject(collection, bucket, object string) (err error) {
|
|
err = i.write(fmt.Sprintf("%s %s %s %s", flusho, collection, bucket, object))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// sonic should sent OK
|
|
_, err = i.read()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [][]IngestBulkRecord {
|
|
var divided [][]IngestBulkRecord
|
|
chunkSize := (len(records) + parallelRoutines - 1) / parallelRoutines
|
|
for i := 0; i < len(records); i += chunkSize {
|
|
end := i + chunkSize
|
|
if end > len(records) {
|
|
end = len(records)
|
|
}
|
|
divided = append(divided, records[i:end])
|
|
}
|
|
return divided
|
|
}
|
|
|
|
func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error) {
|
|
*e = append(*e, IngestBulkError{record.Object, err})
|
|
}
|