fix errors
fix readme
This commit is contained in:
parent
241f6b889b
commit
3061d33062
4 changed files with 42 additions and 44 deletions
|
|
@ -14,7 +14,7 @@ func main() {
|
||||||
|
|
||||||
// I will ignore all errors for demonstration purposes
|
// I will ignore all errors for demonstration purposes
|
||||||
|
|
||||||
_, _ = ingester.BulkPush("movies", "general", 3, []sonic.IngestBulkRecord{
|
_ = ingester.BulkPush("movies", "general", 3, []sonic.IngestBulkRecord{
|
||||||
{"id:6ab56b4kk3", "Star wars"},
|
{"id:6ab56b4kk3", "Star wars"},
|
||||||
{"id:5hg67f8dg5", "Spider man"},
|
{"id:5hg67f8dg5", "Spider man"},
|
||||||
{"id:1m2n3b4vf6", "Batman"},
|
{"id:1m2n3b4vf6", "Batman"},
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
ingester, err := sonic.NewIngester("localhost", 1491, "SecretPassword")
|
ingester, err := sonic.NewIngester("localhost", 1491, "SecretPassword")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|
@ -28,7 +29,7 @@ func main() {
|
||||||
|
|
||||||
// I will ignore all errors for demonstration purposes
|
// I will ignore all errors for demonstration purposes
|
||||||
|
|
||||||
_, _ = ingester.BulkPush("movies", "general", 2, []sonic.IngestBulkRecord{
|
_ = ingester.BulkPush("movies", "general", 3, []sonic.IngestBulkRecord{
|
||||||
{"id:6ab56b4kk3", "Star wars"},
|
{"id:6ab56b4kk3", "Star wars"},
|
||||||
{"id:5hg67f8dg5", "Spider man"},
|
{"id:5hg67f8dg5", "Spider man"},
|
||||||
{"id:1m2n3b4vf6", "Batman"},
|
{"id:1m2n3b4vf6", "Batman"},
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ type Ingestable interface {
|
||||||
// dispatch the records at best.
|
// dispatch the records at best.
|
||||||
// If parallelRoutines <= 0; parallelRoutines will be equal to 1.
|
// If parallelRoutines <= 0; parallelRoutines will be equal to 1.
|
||||||
// If parallelRoutines > len(records); parallelRoutines will be equal to len(records).
|
// If parallelRoutines > len(records); parallelRoutines will be equal to len(records).
|
||||||
BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) ([]IngestBulkError, error)
|
BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError
|
||||||
|
|
||||||
// Pop search data from the index.
|
// Pop search data from the index.
|
||||||
// Command syntax POP <collection> <bucket> <object> "<text>".
|
// Command syntax POP <collection> <bucket> <object> "<text>".
|
||||||
|
|
@ -36,7 +36,7 @@ type Ingestable interface {
|
||||||
// dispatch the records at best.
|
// dispatch the records at best.
|
||||||
// If parallelRoutines <= 0; parallelRoutines will be equal to 1.
|
// If parallelRoutines <= 0; parallelRoutines will be equal to 1.
|
||||||
// If parallelRoutines > len(records); parallelRoutines will be equal to len(records).
|
// If parallelRoutines > len(records); parallelRoutines will be equal to len(records).
|
||||||
BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) ([]IngestBulkError, error)
|
BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) []IngestBulkError
|
||||||
|
|
||||||
// Count indexed search data.
|
// Count indexed search data.
|
||||||
// bucket and object are optionals, empty string ignore it.
|
// bucket and object are optionals, empty string ignore it.
|
||||||
|
|
@ -61,7 +61,6 @@ type Ingestable interface {
|
||||||
// Ping refer to the Base interface
|
// Ping refer to the Base interface
|
||||||
Ping() (err error)
|
Ping() (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ingesterCommands string
|
type ingesterCommands string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -109,44 +108,40 @@ func (i ingesterChannel) Push(collection, bucket, object, text string) (err erro
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError, err error) {
|
func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError) {
|
||||||
if parallelRoutines <= 0 {
|
if parallelRoutines <= 0 {
|
||||||
parallelRoutines = 1
|
parallelRoutines = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
err = nil
|
|
||||||
errs = make([]IngestBulkError, 0)
|
errs = make([]IngestBulkError, 0)
|
||||||
errMutex := sync.Mutex{}
|
errMutex := sync.Mutex{}
|
||||||
|
|
||||||
// chunk array into N (parallelRoutines) parts
|
// chunk array into N (parallelRoutines) parts
|
||||||
divided := i.divideIngestBulkRecords(records, parallelRoutines)
|
divided := divideIngestBulkRecords(records, parallelRoutines)
|
||||||
|
|
||||||
// dispatch each records array into N goroutines
|
// dispatch each records array into N goroutines
|
||||||
group := sync.WaitGroup{}
|
group := sync.WaitGroup{}
|
||||||
group.Add(len(divided))
|
group.Add(len(divided))
|
||||||
for _, r := range divided {
|
for _, r := range divided {
|
||||||
go func(recs []IngestBulkRecord) {
|
go func(recs []IngestBulkRecord) {
|
||||||
var conn *connection
|
conn, _ := newConnection(i.driver)
|
||||||
|
|
||||||
errMutex.Lock()
|
|
||||||
conn, err = newConnection(i.driver)
|
|
||||||
errMutex.Unlock()
|
|
||||||
|
|
||||||
for _, rec := range recs {
|
for _, rec := range recs {
|
||||||
err := conn.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, rec.Object, rec.Text))
|
if conn == nil {
|
||||||
|
addBulkError(&errs, rec, ErrClosed, errMutex)
|
||||||
|
}
|
||||||
|
err := conn.write(fmt.Sprintf(
|
||||||
|
"%s %s %s %s \"%s\"",
|
||||||
|
push, collection, bucket, rec.Object, rec.Text),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMutex.Lock()
|
addBulkError(&errs, rec, err, errMutex)
|
||||||
errs = append(errs, IngestBulkError{rec.Object, err})
|
|
||||||
errMutex.Unlock()
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// sonic should sent OK
|
// sonic should sent OK
|
||||||
_, err = conn.read()
|
_, err = conn.read()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMutex.Lock()
|
addBulkError(&errs, rec, err, errMutex)
|
||||||
errs = append(errs, IngestBulkError{rec.Object, err})
|
|
||||||
errMutex.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
@ -154,7 +149,7 @@ func (i ingesterChannel) BulkPush(collection, bucket string, parallelRoutines in
|
||||||
}(r)
|
}(r)
|
||||||
}
|
}
|
||||||
group.Wait()
|
group.Wait()
|
||||||
return errs, err
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ingesterChannel) Pop(collection, bucket, object, text string) (err error) {
|
func (i ingesterChannel) Pop(collection, bucket, object, text string) (err error) {
|
||||||
|
|
@ -171,44 +166,40 @@ func (i ingesterChannel) Pop(collection, bucket, object, text string) (err error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError, err error) {
|
func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int, records []IngestBulkRecord) (errs []IngestBulkError) {
|
||||||
if parallelRoutines <= 0 {
|
if parallelRoutines <= 0 {
|
||||||
parallelRoutines = 1
|
parallelRoutines = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
err = nil
|
|
||||||
errs = make([]IngestBulkError, 0)
|
errs = make([]IngestBulkError, 0)
|
||||||
errMutex := sync.Mutex{}
|
errMutex := sync.Mutex{}
|
||||||
|
|
||||||
// chunk array into N (parallelRoutines) parts
|
// chunk array into N (parallelRoutines) parts
|
||||||
divided := i.divideIngestBulkRecords(records, parallelRoutines)
|
divided := divideIngestBulkRecords(records, parallelRoutines)
|
||||||
|
|
||||||
// dispatch each records array into N goroutines
|
// dispatch each records array into N goroutines
|
||||||
group := sync.WaitGroup{}
|
group := sync.WaitGroup{}
|
||||||
group.Add(len(divided))
|
group.Add(len(divided))
|
||||||
for _, r := range divided {
|
for _, r := range divided {
|
||||||
go func(recs []IngestBulkRecord) {
|
go func(recs []IngestBulkRecord) {
|
||||||
var conn *connection
|
conn, _ := newConnection(i.driver)
|
||||||
|
|
||||||
errMutex.Lock()
|
|
||||||
conn, err = newConnection(i.driver)
|
|
||||||
errMutex.Unlock()
|
|
||||||
|
|
||||||
for _, rec := range recs {
|
for _, rec := range recs {
|
||||||
err := conn.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, rec.Object, rec.Text))
|
if conn == nil {
|
||||||
|
addBulkError(&errs, rec, ErrClosed, errMutex)
|
||||||
|
}
|
||||||
|
err := conn.write(fmt.Sprintf(
|
||||||
|
"%s %s %s %s \"%s\"",
|
||||||
|
pop, collection, bucket, rec.Object, rec.Text),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMutex.Lock()
|
addBulkError(&errs, rec, err, errMutex)
|
||||||
errs = append(errs, IngestBulkError{rec.Object, err})
|
|
||||||
errMutex.Unlock()
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// sonic should sent OK
|
// sonic should sent OK
|
||||||
_, err = conn.read()
|
_, err = conn.read()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMutex.Lock()
|
addBulkError(&errs, rec, err, errMutex)
|
||||||
errs = append(errs, IngestBulkError{rec.Object, err})
|
|
||||||
errMutex.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
@ -216,7 +207,7 @@ func (i ingesterChannel) BulkPop(collection, bucket string, parallelRoutines int
|
||||||
}(r)
|
}(r)
|
||||||
}
|
}
|
||||||
group.Wait()
|
group.Wait()
|
||||||
return errs, err
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ingesterChannel) Count(collection, bucket, object string) (cnt int, err error) {
|
func (i ingesterChannel) Count(collection, bucket, object string) (cnt int, err error) {
|
||||||
|
|
@ -286,7 +277,7 @@ func (i ingesterChannel) FlushObject(collection, bucket, object string) (err err
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ingesterChannel) divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [][]IngestBulkRecord {
|
func divideIngestBulkRecords(records []IngestBulkRecord, parallelRoutines int) [][]IngestBulkRecord {
|
||||||
var divided [][]IngestBulkRecord
|
var divided [][]IngestBulkRecord
|
||||||
chunkSize := (len(records) + parallelRoutines - 1) / parallelRoutines
|
chunkSize := (len(records) + parallelRoutines - 1) / parallelRoutines
|
||||||
for i := 0; i < len(records); i += chunkSize {
|
for i := 0; i < len(records); i += chunkSize {
|
||||||
|
|
@ -298,3 +289,9 @@ func (i ingesterChannel) divideIngestBulkRecords(records []IngestBulkRecord, par
|
||||||
}
|
}
|
||||||
return divided
|
return divided
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addBulkError(e *[]IngestBulkError, record IngestBulkRecord, err error, mutex sync.Mutex) {
|
||||||
|
mutex.Lock()
|
||||||
|
defer mutex.Unlock()
|
||||||
|
*e = append(*e, IngestBulkError{record.Object, err})
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,8 @@ func BenchmarkIngesterChannel_BulkPushMaxCPUs(b *testing.B) {
|
||||||
b.Log(e)
|
b.Log(e)
|
||||||
b.Fail()
|
b.Fail()
|
||||||
}
|
}
|
||||||
be, e := ingester.BulkPush("test", "testMaxCpus", cpus, records)
|
be := ingester.BulkPush("test", "testMaxCpus", cpus, records)
|
||||||
if len(be) > 0 || e != nil {
|
if len(be) > 0 {
|
||||||
b.Log(be, e)
|
b.Log(be, e)
|
||||||
b.Fail()
|
b.Fail()
|
||||||
}
|
}
|
||||||
|
|
@ -42,8 +42,8 @@ func BenchmarkIngesterChannel_BulkPush10(b *testing.B) {
|
||||||
b.Log(e)
|
b.Log(e)
|
||||||
b.Fail()
|
b.Fail()
|
||||||
}
|
}
|
||||||
be, e := ingester.BulkPush("test", "test10", 10, records)
|
be := ingester.BulkPush("test", "test10", 10, records)
|
||||||
if len(be) > 0 || err != nil {
|
if len(be) > 0 {
|
||||||
b.Log(be, err)
|
b.Log(be, err)
|
||||||
b.Fail()
|
b.Fail()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue