first version of go-sonic, missing lot of stuff
This commit is contained in:
commit
dd927a4ab4
7 changed files with 238 additions and 0 deletions
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
.idea
|
||||||
24
cmd/main/main.go
Normal file
24
cmd/main/main.go
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/expectedsh/go-sonic/sonic"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
connection := &sonic.Connection{
|
||||||
|
Host: "localhost",
|
||||||
|
Port: 1491,
|
||||||
|
Password: "SecretPassword",
|
||||||
|
Channel: sonic.Ingest,
|
||||||
|
}
|
||||||
|
|
||||||
|
e := connection.Connect()
|
||||||
|
if e != nil {
|
||||||
|
panic(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
channel := sonic.IngesterChannel{Connection: connection}
|
||||||
|
e = channel.Push("test", "default", "captain", "lol is truth")
|
||||||
|
fmt.Println(e)
|
||||||
|
}
|
||||||
1
go.mod
Normal file
1
go.mod
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
module github.com/expectedsh/go-sonic
|
||||||
13
sonic/channels.go
Normal file
13
sonic/channels.go
Normal file
|
|
@ -0,0 +1,13 @@
|
||||||
|
package sonic
|
||||||
|
|
||||||
|
type Channel string
|
||||||
|
|
||||||
|
const (
|
||||||
|
Search Channel = "search"
|
||||||
|
Ingest Channel = "ingest"
|
||||||
|
Control Channel = "control"
|
||||||
|
)
|
||||||
|
|
||||||
|
func IsChannelValid(ch Channel) bool {
|
||||||
|
return ch == Search || ch == Ingest || ch == Control
|
||||||
|
}
|
||||||
68
sonic/connector.go
Normal file
68
sonic/connector.go
Normal file
|
|
@ -0,0 +1,68 @@
|
||||||
|
package sonic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Connection struct {
|
||||||
|
Host string
|
||||||
|
Port int
|
||||||
|
Password string
|
||||||
|
Channel Channel
|
||||||
|
|
||||||
|
reader *bufio.Reader
|
||||||
|
conn net.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Connection) Connect() error {
|
||||||
|
if !IsChannelValid(c.Channel) {
|
||||||
|
return errors.New("invalid channel name")
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", c.Host, c.Port))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
c.conn = conn
|
||||||
|
c.reader = bufio.NewReader(c.conn)
|
||||||
|
|
||||||
|
err := c.write(fmt.Sprintf("START %s %s", c.Channel, c.Password))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = c.read()
|
||||||
|
_, err = c.read()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Connection) read() (string, error) {
|
||||||
|
buffer := bytes.Buffer{}
|
||||||
|
for {
|
||||||
|
line, isPrefix, err := c.reader.ReadLine()
|
||||||
|
buffer.Write(line)
|
||||||
|
if err != nil || !isPrefix {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
str := buffer.String()
|
||||||
|
if strings.HasPrefix(str, "ERR ") {
|
||||||
|
return "", errors.New(str[4:])
|
||||||
|
}
|
||||||
|
return str, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Connection) write(str string) error {
|
||||||
|
_, err := c.conn.Write([]byte(str + "\r\n"))
|
||||||
|
return err
|
||||||
|
}
|
||||||
62
sonic/ingester.go
Normal file
62
sonic/ingester.go
Normal file
|
|
@ -0,0 +1,62 @@
|
||||||
|
package sonic
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
|
||||||
|
type Ingestable interface {
|
||||||
|
Push(collection, bucket, object, text string) (err error)
|
||||||
|
Pop(collection, bucket, object, text string) (err error)
|
||||||
|
Count(collection, bucket, object string) (count int, err error)
|
||||||
|
|
||||||
|
FlushCollection(collection string) (err error)
|
||||||
|
FlushBucket(collection, bucket string) (err error)
|
||||||
|
FlushObject(collection, bucket, object string) (err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ingesterCommands string
|
||||||
|
|
||||||
|
const (
|
||||||
|
push ingesterCommands = "PUSH"
|
||||||
|
pop ingesterCommands = "POP"
|
||||||
|
count ingesterCommands = "COUNT"
|
||||||
|
flushb ingesterCommands = "FLUSHB"
|
||||||
|
flushc ingesterCommands = "FLUSHC"
|
||||||
|
flusho ingesterCommands = "FLUSHO"
|
||||||
|
)
|
||||||
|
|
||||||
|
type IngesterChannel struct {
|
||||||
|
*Connection
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i IngesterChannel) Push(collection, bucket, object, text string) (err error) {
|
||||||
|
err = i.write(fmt.Sprintf("%s %s %s %s \"%s\"", push, collection, bucket, object, text))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// sonic should sent OK
|
||||||
|
_, err = i.read()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i IngesterChannel) Pop(collection, bucket, object, text string) (err error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i IngesterChannel) Count(collection, bucket, object string) (count int, err error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i IngesterChannel) FlushCollection(collection string) (err error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i IngesterChannel) FlushBucket(collection, bucket string) (err error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i IngesterChannel) FlushObject(collection, bucket, object string) (err error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
69
sonic/search.go
Normal file
69
sonic/search.go
Normal file
|
|
@ -0,0 +1,69 @@
|
||||||
|
package sonic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Searchable interface {
|
||||||
|
Query(collection, bucket, term string, limit, offset int) (results []string, err error)
|
||||||
|
Suggest(collection, bucket, word string, limit int) (results []string, err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type searchCommands string
|
||||||
|
|
||||||
|
const (
|
||||||
|
query searchCommands = "QUERY"
|
||||||
|
suggest searchCommands = "SUGGEST"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SearchChannel struct {
|
||||||
|
*Connection
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SearchChannel) Query(collection, bucket, term string, limit, offset int) (results []string, err error) {
|
||||||
|
err = s.write(fmt.Sprintf("%s %s %s \"%s\" LIMIT(%d) OFFSET(%d)", query, collection, bucket, term, limit, offset))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// pending, should be PENDING ID_EVENT
|
||||||
|
_, err = s.read()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// event query, should be EVENT QUERY ID_EVENT RESULT1 RESULT2 ...
|
||||||
|
read, err := s.read()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return getSearchResults(read, string(query)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SearchChannel) Suggest(collection, bucket, word string, limit int) (results []string, err error) {
|
||||||
|
err = s.write(fmt.Sprintf("%s %s %s \"%s\" LIMIT(%d)", suggest, collection, bucket, word, limit))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// pending, should be PENDING ID_EVENT
|
||||||
|
_, err = s.read()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// event query, should be EVENT SUGGEST ID_EVENT RESULT1 RESULT2 ...
|
||||||
|
read, err := s.read()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return getSearchResults(read, string(suggest)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getSearchResults(line string, eventType string) []string {
|
||||||
|
if strings.HasPrefix(line, "EVENT "+eventType) {
|
||||||
|
return strings.Split(line, " ")[3:]
|
||||||
|
}
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue