Przeglądaj źródła

Fixed blocking issue.

Josh Brickner 7 lat temu
rodzic
commit
16611fdf11
3 zmienionych plików z 7 dodań i 21 usunięć
  1. 1 7
      binlog/binlog.go
  2. 1 1
      binlog/commands.go
  3. 5 13
      binlog/connection.go

+ 1 - 7
binlog/binlog.go

@@ -35,16 +35,10 @@ func (c *Conn) listenForBinlog() error {
 	for {
 		p, err := c.readPacket()
 		if err != nil {
-			if err.Error() == "EOF" {
-				continue
-			} else {
-				return err
-			}
+			return err
 		} else {
 			kp := p.(*OKPacket)
 			fmt.Printf("kp = %+v\n", kp)
 		}
 	}
-
-	return nil
 }

+ 1 - 1
binlog/commands.go

@@ -1,6 +1,6 @@
 package binlog
 
-const BINLOG_DUMP_NON_BLOCK = 0x01
+const BINLOG_DUMP_NON_BLOCK = 0x00
 const COMMAND_BIN_LOG_DUMP = 0x12
 const COMMAND_REGISTER_SLAVE = 0x15
 

+ 5 - 13
binlog/connection.go

@@ -8,7 +8,6 @@ import (
 	"database/sql/driver"
 	"encoding/binary"
 	"encoding/json"
-	"errors"
 	"fmt"
 	"io/ioutil"
 	"math"
@@ -196,7 +195,7 @@ func (d Driver) Open(dsn string) (driver.Conn, error) {
 		return nil, err
 	}
 
-	_, err = c.readPacket()
+	err = c.listenForBinlog()
 	if err != nil {
 		return nil, err
 	}
@@ -268,11 +267,6 @@ func (c *Conn) getPacketHeader() (*PacketHeader, error) {
 	ph := PacketHeader{}
 	ph.Length = c.getInt(TypeFixedInt, 3)
 
-	if ph.Length == 0 {
-		err := errors.New("EOF")
-		return nil, err
-	}
-
 	ph.SequenceID = c.getInt(TypeFixedInt, 1)
 	ph.Status = c.getInt(TypeFixedInt, 1)
 
@@ -297,14 +291,12 @@ func (c *Conn) readBytes(l uint64) *bytes.Buffer {
 		didScan := c.scanner.Scan()
 		if !didScan {
 			err := c.scanner.Err()
-			if err == nil { // scanner reached EOF
-				return EOF
-			} else {
-				panic(err) // @TODO Handle this gracefully.
+			if err != nil {
+				panic(err)
 			}
+		} else {
+			b = append(b, c.scanner.Bytes()...)
 		}
-
-		b = append(b, c.scanner.Bytes()...)
 	}
 
 	c.scanPos += uint64(len(b))