Просмотр исходного кода

Fixed connection to command phase transition.

Josh Brickner 7 лет назад
Родитель
Сommit
ac239e128f
3 измененных файлов с 86 добавлено и 4 удалено
  1. 4 4
      binlog/binlog.go
  2. 79 0
      binlog/connection.go
  3. 3 0
      main.go

+ 4 - 4
binlog/binlog.go

@@ -1,5 +1,7 @@
 package binlog
 
+import "fmt"
+
 func (c *Conn) startBinLogStream() error {
 	bldc := &BinLogDumpCommand{
 		Status:   COMMAND_BIN_LOG_DUMP,
@@ -12,14 +14,13 @@ func (c *Conn) startBinLogStream() error {
 	return c.writeBinLogDumpCommand(bldc)
 }
 
-/*
 func (c *Conn) listenForBinlog() error {
-	_, err := c.listen()
+	res, err := c.listen()
 	if err != nil {
 		return err
 	}
 
-	//fmt.Printf("res = %+v\n", res)
+	fmt.Printf("res = %+v\n", res)
 	fmt.Println("hello")
 	err = c.listenForBinlog()
 	fmt.Println("test")
@@ -29,4 +30,3 @@ func (c *Conn) listenForBinlog() error {
 
 	return nil
 }
-*/

+ 79 - 0
binlog/connection.go

@@ -8,6 +8,7 @@ import (
 	"database/sql/driver"
 	"encoding/binary"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"io/ioutil"
 	"math"
@@ -166,9 +167,87 @@ func (d Driver) Open(dsn string) (driver.Conn, error) {
 		return nil, err
 	}
 
+	// Listen for auth response.
+	_, err = c.listen()
+	if err != nil {
+		// Auth failed.
+		return nil, err
+	}
+
+	// Auth completed successfully, move to command phase.
+	c.sequenceId = 0
+
+	// Start binlog stream
+	err = c.startBinLogStream()
+	if err != nil {
+		return nil, err
+	}
+
+	err = c.listenForBinlog()
+	if err != nil {
+		return nil, err
+	}
+
 	return c, err
 }
 
+func (c *Conn) listen() (interface{}, error) {
+	ph, err := c.getPacketHeader()
+	if err != nil {
+		return nil, err
+	}
+	c.sequenceId++
+
+	var res interface{}
+
+	switch ph.Status {
+	case StatusAuth:
+		res, err := c.decodeAuthMoreDataResponsePacket(ph)
+		if err != nil {
+			return nil, err
+		}
+
+		switch res.Data {
+		case SHA2_FAST_AUTH_SUCCESS:
+		case SHA2_REQUEST_PUBLIC_KEY:
+		case SHA2_PERFORM_FULL_AUTHENTICATION:
+			c.putBytes(append([]byte(c.Config.Pass), NullByte))
+			if c.Flush() != nil {
+				return nil, c.Flush()
+			}
+		}
+
+	case StatusEOF:
+		fallthrough
+	case StatusOK:
+		res, err = c.decodeOKPacket(ph)
+		if err != nil {
+			return nil, err
+		}
+	case StatusErr:
+		res, err = c.decodeErrorPacket(ph)
+		if err != nil {
+			return nil, err
+		}
+
+		err = errors.New(
+			fmt.Sprintf(
+				"Error %d: %s",
+				res.(*ErrorPacket).ErrorCode,
+				res.(*ErrorPacket).ErrorMessage,
+			))
+
+		return res, err
+	}
+
+	err = c.scanner.Err()
+	if err != nil {
+		return nil, err
+	}
+
+	return res, nil
+}
+
 type PacketHeader struct {
 	Length     uint64
 	SequenceID uint64

+ 3 - 0
main.go

@@ -3,6 +3,7 @@ package main
 import (
 	"database/sql"
 	"fmt"
+	"os"
 
 	_ "github.com/macinjosh/mysql-binlog-filter/binlog"
 )
@@ -11,10 +12,12 @@ func main() {
 	conn, err := sql.Open("mysql-binlog", "config.json")
 	if err != nil {
 		fmt.Printf("Open Error: %+v\n", err)
+		os.Exit(1)
 	}
 
 	err = conn.Ping()
 	if err != nil {
 		fmt.Printf("%+v\n", err)
+		os.Exit(1)
 	}
 }