Jelajahi Sumber

Implemented rest of packet string.

Josh Brickner 7 tahun lalu
induk
melakukan
54f45edcc0
5 mengubah file dengan 121 tambahan dan 84 penghapusan
  1. 2 2
      binlog/authentication.go
  2. 34 15
      binlog/binlog.go
  3. 32 3
      binlog/commands.go
  4. 52 63
      binlog/connection.go
  5. 1 1
      config.json

+ 2 - 2
binlog/authentication.go

@@ -11,11 +11,11 @@ const SHA2_FAST_AUTH_SUCCESS = 0x03
 const SHA2_PERFORM_FULL_AUTHENTICATION = 0x04
 
 type AuthMoreDataPacket struct {
-	PacketHeader
+	*PacketHeader
 	Data uint64
 }
 
-func (c *Conn) decodeAuthMoreDataResponsePacket(ph PacketHeader) (*AuthMoreDataPacket, error) {
+func (c *Conn) decodeAuthMoreDataResponsePacket(ph *PacketHeader) (*AuthMoreDataPacket, error) {
 	md := AuthMoreDataPacket{}
 	md.PacketHeader = ph
 	md.Data = c.getInt(TypeFixedInt, 1)

+ 34 - 15
binlog/binlog.go

@@ -1,31 +1,50 @@
 package binlog
 
-import "fmt"
+import (
+	"fmt"
+)
 
-func (c *Conn) startBinLogStream() error {
-	bldc := &BinLogDumpCommand{
+func (c *Conn) registerAsSlave() error {
+	brsc := &BinlogRegisterSlaveCommand{
+		Status:   COMMAND_REGISTER_SLAVE,
+		ServerId: c.Config.ServerId,
+		Hostname: "",
+		User:     "",
+		Password: "",
+		Port:     0,
+		ReplRank: 1,
+		MasterId: 0,
+	}
+
+	return c.writeBinlogRegisterSlaveCommand(brsc)
+}
+
+func (c *Conn) startBinlogStream() error {
+	bldc := &BinlogDumpCommand{
 		Status:   COMMAND_BIN_LOG_DUMP,
 		Position: 120,
 		Flags:    BINLOG_DUMP_NON_BLOCK,
 		ServerId: c.Config.ServerId,
-		Filename: c.Config.BinLogFile,
+		Filename: c.Config.BinlogFile,
 	}
 
-	return c.writeBinLogDumpCommand(bldc)
+	return c.writeBinlogDumpCommand(bldc)
 }
 
 func (c *Conn) listenForBinlog() error {
-	res, err := c.listen()
-	if err != nil {
-		return err
+	for {
+		p, err := c.readPacket()
+		if err != nil {
+			if err.Error() == "EOF" {
+				continue
+			} else {
+				return err
+			}
+		} else {
+			kp := p.(*OKPacket)
+			fmt.Printf("kp = %+v\n", kp)
+		}
 	}
 
-	fmt.Printf("res = %+v\n", res)
-
-	// err = c.listenForBinlog()
-	// if err != nil {
-	// 	return err
-	// }
-
 	return nil
 }

+ 32 - 3
binlog/commands.go

@@ -1,9 +1,38 @@
 package binlog
 
-const COMMAND_BIN_LOG_DUMP = 0x12
 const BINLOG_DUMP_NON_BLOCK = 0x01
+const COMMAND_BIN_LOG_DUMP = 0x12
+const COMMAND_REGISTER_SLAVE = 0x15
+
+type BinlogRegisterSlaveCommand struct {
+	Status   uint64
+	ServerId uint64
+	Hostname string // Length Encoded
+	User     string // Length Encoded
+	Password string // Length Encoded
+	Port     uint64
+	ReplRank uint64
+	MasterId uint64
+}
+
+func (c *Conn) writeBinlogRegisterSlaveCommand(brsc *BinlogRegisterSlaveCommand) error {
+	c.putInt(TypeFixedInt, brsc.Status, 1)
+	c.putInt(TypeFixedInt, brsc.ServerId, 4)
+	c.putString(TypeLenEncString, brsc.Hostname)
+	c.putString(TypeLenEncString, brsc.User)
+	c.putString(TypeLenEncString, brsc.Password)
+	c.putInt(TypeLenEncInt, brsc.Port, 2)
+	c.putInt(TypeLenEncInt, brsc.ReplRank, 4)
+	c.putInt(TypeLenEncInt, brsc.MasterId, 4)
+
+	if c.Flush() != nil {
+		return c.Flush()
+	}
+
+	return nil
+}
 
-type BinLogDumpCommand struct {
+type BinlogDumpCommand struct {
 	Status   uint64
 	Position uint64
 	Flags    uint64
@@ -11,7 +40,7 @@ type BinLogDumpCommand struct {
 	Filename string
 }
 
-func (c *Conn) writeBinLogDumpCommand(bldc *BinLogDumpCommand) error {
+func (c *Conn) writeBinlogDumpCommand(bldc *BinlogDumpCommand) error {
 	c.putInt(TypeFixedInt, bldc.Status, 1)
 	c.putInt(TypeFixedInt, bldc.Position, 4)
 	c.putInt(TypeFixedInt, bldc.Flags, 2)

+ 52 - 63
binlog/connection.go

@@ -57,7 +57,7 @@ type Config struct {
 	SSLKey     string `json:"ssl-key"`
 	VerifyCert bool   `json:"verify-cert"`
 	ServerId   uint64 `json:"server-id"`
-	BinLogFile string `json:"binlog-file"`
+	BinlogFile string `json:"binlog-file"`
 	Timeout    time.Duration
 }
 
@@ -89,6 +89,8 @@ type Conn struct {
 	writeBuf          *bytes.Buffer
 	StausFlags        *StatusFlags
 	Listener          *net.Listener
+	packetHeader      *PacketHeader
+	scanPos           uint64
 }
 
 func newBinlogConn(config *Config) Conn {
@@ -168,40 +170,34 @@ func (d Driver) Open(dsn string) (driver.Conn, error) {
 	}
 
 	// Listen for auth response.
-	r, err := c.listen()
-	switch r.(type) {
-	case *OKPacket: // Login successful.
-		// Reset sequence to 0 now that connection phase is over
-		// and command phase has started.
-		c.sequenceId = 0
-
-		// Tell server to stream binlog.
-		err = c.startBinLogStream()
-		if err != nil {
-			return nil, err
-		}
+	_, err = c.readPacket()
+	if err != nil {
+		return nil, err
+	}
 
-		err = c.listenForBinlog()
-		if err != nil {
-			return nil, err
-		}
-	case *ErrorPacket: // Bad login.
-		ep := r.(*ErrorPacket)
-		em := fmt.Sprintf("Error %d: %s", ep.ErrorCode, ep.ErrorMessage)
-		err = errors.New(em)
+	// Auth was successful.
+	c.sequenceId = 0
+
+	// Register as a slave
+	err = c.registerAsSlave()
+	if err != nil {
+		return nil, err
+	}
+
+	_, err = c.readPacket()
+	if err != nil {
 		return nil, err
-	case *AuthMoreDataPacket:
-		panic(errors.New("Unexpected AuthMoreDataPacket"))
 	}
 
 	return c, err
 }
 
-func (c *Conn) listen() (interface{}, error) {
+func (c *Conn) readPacket() (interface{}, error) {
 	ph, err := c.getPacketHeader()
 	if err != nil {
 		return nil, err
 	}
+
 	c.sequenceId++
 
 	var res interface{}
@@ -222,7 +218,6 @@ func (c *Conn) listen() (interface{}, error) {
 				return nil, c.Flush()
 			}
 		}
-
 	case StatusEOF:
 		fallthrough
 	case StatusOK:
@@ -236,12 +231,11 @@ func (c *Conn) listen() (interface{}, error) {
 			return nil, err
 		}
 
-		err = errors.New(
-			fmt.Sprintf(
-				"Error %d: %s",
-				res.(*ErrorPacket).ErrorCode,
-				res.(*ErrorPacket).ErrorMessage,
-			))
+		err = fmt.Errorf(
+			"Error %d: %s",
+			res.(*ErrorPacket).ErrorCode,
+			res.(*ErrorPacket).ErrorMessage,
+		)
 
 		return res, err
 	}
@@ -260,18 +254,27 @@ type PacketHeader struct {
 	Status     uint64
 }
 
-func (c *Conn) getPacketHeader() (PacketHeader, error) {
+func (c *Conn) getPacketHeader() (*PacketHeader, error) {
 	ph := PacketHeader{}
 	ph.Length = c.getInt(TypeFixedInt, 3)
+
+	if ph.Length == 0 {
+		err := errors.New("EOF")
+		return nil, err
+	}
+
 	ph.SequenceID = c.getInt(TypeFixedInt, 1)
 	ph.Status = c.getInt(TypeFixedInt, 1)
 
 	err := c.scanner.Err()
 	if err != nil {
-		return ph, err
+		return &ph, err
 	}
 
-	return ph, nil
+	c.packetHeader = &ph
+	c.scanPos = 0
+
+	return &ph, nil
 }
 
 func init() {
@@ -289,33 +292,12 @@ func (c *Conn) readBytes(l uint64) *bytes.Buffer {
 			} else {
 				panic(err) // @TODO Handle this gracefully.
 			}
-
-			return nil
 		}
 
 		b = append(b, c.scanner.Bytes()...)
 	}
 
-	return bytes.NewBuffer(b)
-}
-
-func (c *Conn) getBytesUntilEOF() *bytes.Buffer {
-	l := uint64(1)
-	s := c.readBytes(l)
-	b := s.Bytes()
-
-	for true {
-		if uint64(s.Len()) != l || s.Bytes()[0] == NullByte {
-			break
-		}
-
-		s := c.readBytes(uint64(l))
-		if s == EOF || s == nil {
-			return bytes.NewBuffer(b)
-		}
-
-		b = append(b, s.Bytes()...)
-	}
+	c.scanPos += uint64(len(b))
 
 	return bytes.NewBuffer(b)
 }
@@ -325,7 +307,7 @@ func (c *Conn) getBytesUntilNull() *bytes.Buffer {
 	s := c.readBytes(l)
 	b := s.Bytes()
 
-	for true {
+	for {
 		if uint64(s.Len()) != l || s.Bytes()[0] == NullByte {
 			break
 		}
@@ -376,8 +358,15 @@ func (c *Conn) getString(t int, l uint64) string {
 }
 
 func (c *Conn) decRestOfPacketString() string {
-	b := c.getBytesUntilEOF()
-	return string(b.Bytes())
+	b := c.getRemainingBytes()
+	return b.String()
+}
+
+func (c *Conn) getRemainingBytes() *bytes.Buffer {
+	l := (c.packetHeader.Length - 1) - c.scanPos
+	b := c.readBytes(l)
+
+	return b
 }
 
 func (c *Conn) decNullTerminatedString() string {
@@ -651,7 +640,7 @@ type StatusFlags struct {
 }
 
 type OKPacket struct {
-	PacketHeader
+	*PacketHeader
 	Header           uint64
 	AffectedRows     uint64
 	LastInsertID     uint64
@@ -661,7 +650,7 @@ type OKPacket struct {
 	SessionStateInfo string
 }
 
-func (c *Conn) decodeOKPacket(ph PacketHeader) (*OKPacket, error) {
+func (c *Conn) decodeOKPacket(ph *PacketHeader) (*OKPacket, error) {
 	op := OKPacket{}
 	op.PacketHeader = ph
 	op.Header = ph.Status
@@ -684,14 +673,14 @@ func (c *Conn) decodeOKPacket(ph PacketHeader) (*OKPacket, error) {
 }
 
 type ErrorPacket struct {
-	PacketHeader
+	*PacketHeader
 	ErrorCode      uint64
 	ErrorMessage   string
 	SQLStateMarker string
 	SQLState       string
 }
 
-func (c *Conn) decodeErrorPacket(ph PacketHeader) (*ErrorPacket, error) {
+func (c *Conn) decodeErrorPacket(ph *PacketHeader) (*ErrorPacket, error) {
 	ep := ErrorPacket{}
 	ep.PacketHeader = ph
 	ep.ErrorCode = c.getInt(TypeFixedInt, 2)

+ 1 - 1
config.json

@@ -9,6 +9,6 @@
   "ssl-cer": "",
   "ssl-ca": "/Users/josh/Sites/Certificates.pem",
   "verify-cert": false,
-  "server-id": 1,
+  "server-id": 2,
   "binlog-file": "mysql-bin.000004"
 }