From 7dd0faf24bf23084d68da637a0fd5cdf98f17964 Mon Sep 17 00:00:00 2001 From: alexisvisco Date: Mon, 25 Mar 2019 19:42:23 +0100 Subject: [PATCH] driver is functional --- cmd/main/main.go | 5 +++++ sonic/connector.go | 54 ++++++++++++++++++++++++++++++++++++++++++++-- sonic/ingester.go | 36 ++++++++++++++++++++++++++++--- 3 files changed, 90 insertions(+), 5 deletions(-) diff --git a/cmd/main/main.go b/cmd/main/main.go index 8da8015..fbd9d30 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "github.com/expectedsh/go-sonic/sonic" + "time" ) func main() { @@ -20,6 +21,10 @@ func main() { channel := sonic.IngesterChannel{Connection: connection} c, e := channel.Count("test", "default", "captain") + fmt.Println("waiting") + time.Sleep(time.Second * 10) + e = channel.Ping() + //c, e = channel.Count("test", "default", "captain") fmt.Println(c, e) } diff --git a/sonic/connector.go b/sonic/connector.go index 914c27e..b6fdedc 100644 --- a/sonic/connector.go +++ b/sonic/connector.go @@ -5,10 +5,13 @@ import ( "bytes" "errors" "fmt" + "io" "net" "strings" ) +var ClosedError = errors.New("sonic connection is closed") + type Connection struct { Host string Port int @@ -17,6 +20,7 @@ type Connection struct { reader *bufio.Reader conn net.Conn + closed bool } func (c *Connection) Connect() error { @@ -45,12 +49,21 @@ func (c *Connection) Connect() error { } } -func (c Connection) read() (string, error) { +func (c *Connection) read() (string, error) { + if c.closed { + return "", ClosedError + } buffer := bytes.Buffer{} for { line, isPrefix, err := c.reader.ReadLine() buffer.Write(line) - if err != nil || !isPrefix { + if err != nil { + if err == io.EOF { + c.clean() + } + return "", err + } + if !isPrefix { break } } @@ -63,6 +76,43 @@ func (c Connection) read() (string, error) { } func (c Connection) write(str string) error { + if c.closed { + return ClosedError + } _, err := c.conn.Write([]byte(str + "\r\n")) return err } + +func (c *Connection) Quit() error { + err := c.write("QUIT") + if err != nil { + return err + } + // should get ENDED + _, err = c.read() + c.clean() + return err +} + +func (c *Connection) Ping() error { + err := c.write("PING") + if err != nil { + fmt.Println("err write") + return err + } + + // should get PONG + _, err = c.read() + if err != nil { + fmt.Println("err read") + return err + } + return nil +} + +func (c *Connection) clean() { + c.closed = true + _ = c.conn.Close() + c.conn = nil + c.reader = nil +} diff --git a/sonic/ingester.go b/sonic/ingester.go index be9d5cc..5808b15 100644 --- a/sonic/ingester.go +++ b/sonic/ingester.go @@ -85,13 +85,43 @@ func buildCountQuery(bucket, object string) string { } func (i IngesterChannel) FlushCollection(collection string) (err error) { - panic("implement me") + err = i.write(fmt.Sprintf("%s %s", flushc, collection)) + if err != nil { + return err + } + + // sonic should sent OK + _, err = i.read() + if err != nil { + return err + } + return nil } func (i IngesterChannel) FlushBucket(collection, bucket string) (err error) { - panic("implement me") + err = i.write(fmt.Sprintf("%s %s %s", flushb, collection, bucket)) + if err != nil { + return err + } + + // sonic should sent OK + _, err = i.read() + if err != nil { + return err + } + return nil } func (i IngesterChannel) FlushObject(collection, bucket, object string) (err error) { - panic("implement me") + err = i.write(fmt.Sprintf("%s %s %s %s", flusho, collection, bucket, object)) + if err != nil { + return err + } + + // sonic should sent OK + _, err = i.read() + if err != nil { + return err + } + return nil }