Josh Brickner há 7 anos atrás
pai
commit
c5bc9d4f71
3 ficheiros alterados com 26 adições e 7 exclusões
  1. 20 5
      binlog/binlog.go
  2. 3 2
      binlog/commands.go
  3. 3 0
      binlog/connection.go

+ 20 - 5
binlog/binlog.go

@@ -1,8 +1,6 @@
 package binlog
 
-import (
-	"fmt"
-)
+import "fmt"
 
 func (c *Conn) registerAsSlave() error {
 	brsc := &BinlogRegisterSlaveCommand{
@@ -34,11 +32,28 @@ func (c *Conn) startBinlogStream() error {
 func (c *Conn) listenForBinlog() error {
 	for {
 		p, err := c.readPacket()
-		if err != nil {
+		fmt.Printf("p = %+v\n", p)
+		fmt.Printf("err = %+v\n", err)
+		if err != nil || p == nil {
 			return err
 		} else {
 			kp := p.(*OKPacket)
-			fmt.Printf("kp = %+v\n", kp)
+			c.getEventHeader(kp)
 		}
 	}
 }
+
+func (c *Conn) getEventHeader(p *OKPacket) {
+	/*
+		4              timestamp
+		1              event type
+		4              server-id
+		4              event-size
+		   if binlog-version > 1:
+		4              log pos
+		2              flags
+	*/
+
+	_ = c.getInt(TypeFixedInt, 4)
+
+}

+ 3 - 2
binlog/commands.go

@@ -1,8 +1,9 @@
 package binlog
 
-const BINLOG_DUMP_NON_BLOCK = 0x00
-const COMMAND_BIN_LOG_DUMP = 0x12
+const BINLOG_DUMP_NON_BLOCK = 0x00 // Set to 0 because we do want the binlog to block.
+
 const COMMAND_REGISTER_SLAVE = 0x15
+const COMMAND_BIN_LOG_DUMP = 0x12
 
 type BinlogRegisterSlaveCommand struct {
 	Status   uint64

+ 3 - 0
binlog/connection.go

@@ -247,6 +247,8 @@ func (c *Conn) readPacket() (interface{}, error) {
 		)
 
 		return res, err
+	default:
+		fmt.Printf("Unknown PacketHeader: %+v\n", ph)
 	}
 
 	err = c.scanner.Err()
@@ -269,6 +271,7 @@ func (c *Conn) getPacketHeader() (*PacketHeader, error) {
 
 	ph.SequenceID = c.getInt(TypeFixedInt, 1)
 	ph.Status = c.getInt(TypeFixedInt, 1)
+	fmt.Printf("ph.Status = %+v\n", ph.Status)
 
 	err := c.scanner.Err()
 	if err != nil {