Josh Brickner 7 gadi atpakaļ
vecāks
revīzija
1cd76bd575
4 mainītis faili ar 72 papildinājumiem un 29 dzēšanām
  1. 32 0
      binlog/binlog.go
  2. 37 25
      binlog/connection.go
  3. 0 1
      conf/replicate.cnf
  4. 3 3
      docker-compose.yaml

+ 32 - 0
binlog/binlog.go

@@ -0,0 +1,32 @@
+package binlog
+
+import "fmt"
+
+func (c *Conn) startBinLogStream() error {
+	bldc := &BinLogDumpCommand{
+		Status:   COMMAND_BIN_LOG_DUMP,
+		Position: 120,
+		Flags:    BINLOG_DUMP_NON_BLOCK,
+		ServerId: c.Config.ServerId,
+		Filename: c.Config.BinLogFile,
+	}
+
+	return c.writeBinLogDumpCommand(bldc)
+}
+
+func (c *Conn) listenForBinlog() error {
+	_, err := c.listen()
+	if err != nil {
+		return err
+	}
+
+	//fmt.Printf("res = %+v\n", res)
+	fmt.Println("hello")
+	err = c.listenForBinlog()
+	fmt.Println("test")
+	if err != nil {
+		return err
+	}
+
+	return nil
+}

+ 37 - 25
binlog/connection.go

@@ -20,6 +20,9 @@ import (
 
 // Misc. Constants
 const NullByte byte = 0
+
+var EOF = bytes.NewBuffer([]byte{NullByte})
+
 const MaxPacketSize = MaxUint16
 
 // MySQL Packet Data Types
@@ -85,6 +88,7 @@ type Conn struct {
 	sequenceId        uint64
 	writeBuf          *bytes.Buffer
 	StausFlags        *StatusFlags
+	Listener          *net.Listener
 }
 
 func newBinlogConn(config *Config) Conn {
@@ -163,26 +167,28 @@ func (d Driver) Open(dsn string) (driver.Conn, error) {
 		return nil, err
 	}
 
-	r, err := c.listen()
-	switch r.(type) {
-	case *OKPacket: // Login successful.
-		c.sequenceId = 0
-		bldc := &BinLogDumpCommand{
-			Status:   COMMAND_BIN_LOG_DUMP,
-			Position: 120,
-			Flags:    BINLOG_DUMP_NON_BLOCK,
-			ServerId: c.Config.ServerId,
-			Filename: c.Config.BinLogFile,
-		}
-		c.writeBinLogDumpCommand(bldc)
-		_, err := c.listen()
-		if err != nil {
-			return nil, err
-		}
-	case *ErrorPacket: // Bad login.
-	case *AuthMoreDataPacket: // Something's fucked
-		fmt.Println("AuthMoreData")
-	}
+	//	c.startListener()
+
+	// switch r.(type) {
+	// case *OKPacket: // Login successful.
+	// 	// Reset sequence to 0 now that connection phase is over
+	// 	// and command phase has started.
+	// 	c.sequenceId = 0
+
+	// 	// Tell server to stream binlog.
+	// 	err = c.startBinLogStream()
+	// 	if err != nil {
+	// 		return nil, err
+	// 	}
+
+	// 	err = c.listenForBinlog()
+	// 	if err != nil {
+	// 		return nil, err
+	// 	}
+	// case *ErrorPacket: // Bad login.
+	// case *AuthMoreDataPacket: // Something's fucked
+	// 	fmt.Println("AuthMoreData")
+	// }
 
 	return c, err
 }
@@ -273,6 +279,13 @@ func (c *Conn) readBytes(l uint64) *bytes.Buffer {
 	for i := uint64(0); i < l; i++ {
 		didScan := c.scanner.Scan()
 		if !didScan {
+			err := c.scanner.Err()
+			if err == nil { // scanner reached EOF
+				return EOF
+			} else {
+				panic(err) // @TODO Handle this gracefully.
+			}
+
 			return nil
 		}
 
@@ -293,7 +306,7 @@ func (c *Conn) getBytesUntilEOF() *bytes.Buffer {
 		}
 
 		s := c.readBytes(uint64(l))
-		if s == nil {
+		if s == EOF || s == nil {
 			return bytes.NewBuffer(b)
 		}
 
@@ -320,10 +333,8 @@ func (c *Conn) getBytesUntilNull() *bytes.Buffer {
 	return bytes.NewBuffer(b)
 }
 
-func (c *Conn) discardBytes(l int) {
-	for i := 0; i < l; i++ {
-		c.scanner.Scan()
-	}
+func (c *Conn) discardBytes(l uint64) {
+	c.readBytes(l)
 }
 
 func (c *Conn) getInt(t int, l uint64) uint64 {
@@ -390,6 +401,7 @@ func (c *Conn) decLenEncInt() uint64 {
 func (c *Conn) decFixedInt(l uint64) uint64 {
 	var i uint64
 	b := c.readBytes(l)
+
 	if l <= 2 {
 		var x uint16
 		pb := c.padBytes(2, b.Bytes())

+ 0 - 1
conf/my.cnf → conf/replicate.cnf

@@ -1,5 +1,4 @@
 [mysqld]
 server-id = 1
 log_bin = /tmp/mysql-bin.log
-binlog_do_db = information_schema
 binlog_checksum = NONE

+ 3 - 3
docker-compose.yaml

@@ -9,7 +9,7 @@ services:
       - "3318:3306"
     restart: always
     volumes:
-        -  "./conf/my.cnf:/etc/mysql/my.cnf"
+        -  "./conf/replicate.cnf:/etc/mysql/replicate.cnf"
   mysql57:
     image: mysql:5.7
     environment:
@@ -19,7 +19,7 @@ services:
       - "3317:3306"
     restart: always
     volumes:
-        -  "./conf/my.cnf:/etc/mysql/my.cnf"
+        -  "./conf/replicate.cnf:/etc/mysql/replicate.cnf"
   mysql56:
     image: mysql:5.6
     environment:
@@ -29,5 +29,5 @@ services:
       - "3316:3306"
     restart: always
     volumes:
-        -  "./conf/my.cnf:/etc/mysql/my.cnf"
+        -  "./conf/replicate.cnf:/etc/mysql/replicate.cnf"