浏览代码

Initial Import

Implemented driver basics and decoding handshake
Josh Brickner 7 年之前
当前提交
d0d043da59
共有 4 个文件被更改,包括 432 次插入0 次删除
  1. 3 0
      .gitignore
  2. 207 0
      binlog/connection.go
  3. 203 0
      binlog/handshake.go
  4. 19 0
      main.go

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+tags
+.idea
+.DS_Store

+ 207 - 0
binlog/connection.go

@@ -0,0 +1,207 @@
+package binlog
+
+import (
+	"bytes"
+	"database/sql"
+	"database/sql/driver"
+	"encoding/binary"
+	"encoding/json"
+	"fmt"
+	"net"
+	"time"
+)
+
+const NullByte byte = '\x00'
+
+// MySQL Packet Data Types
+const TypeNullTerminatedString = int(0)
+const TypeFixedString = int(1)
+const TypeFixedInt = int(2)
+
+type Config struct {
+	Host    string `json:"host"`
+	Port    int    `json:"port"`
+	User    string `json:"user"`
+	Pass    string `json:"password"`
+	Timeout time.Duration
+}
+
+func newBinlogConfig(dsn string) (*Config, error) {
+	var err error
+
+	config := Config{}
+	err = json.Unmarshal([]byte(dsn), &config)
+
+	return &config, err
+}
+
+type Conn struct {
+	Config  *Config
+	tcpConn *net.TCPConn
+	Handshake *HandshakePacket
+}
+
+func newBinlogConn(config *Config) Conn {
+	return Conn{
+		Config: config,
+	}
+}
+
+func (c Conn) Prepare(query string) (driver.Stmt, error) {
+	return nil, nil
+}
+
+func (c Conn) Close() error {
+	return nil
+}
+
+func (c Conn) Begin() (driver.Tx, error) {
+	return nil, nil
+}
+
+type Driver struct {}
+
+func (d Driver) Open(dsn string) (driver.Conn, error) {
+	config, err := newBinlogConfig(dsn)
+	if nil != err {
+		return nil, err
+	}
+
+	blConn := newBinlogConn(config)
+
+	dialer := net.Dialer{Timeout: blConn.Config.Timeout,}
+	addr := fmt.Sprintf("%s:%d", blConn.Config.Host, blConn.Config.Port)
+	c, err := dialer.Dial("tcp", addr)
+	blConn.tcpConn = c.(*net.TCPConn)
+
+	if err != nil {
+		netErr, ok := err.(net.Error)
+		if ok && netErr.Temporary() {
+			fmt.Printf("Error: %s", netErr.Error())
+			return nil, err
+		}
+	}
+
+	hsp, err := blConn.handshakePacket()
+	blConn.Handshake = hsp
+
+	fmt.Printf("%+v", blConn.Handshake)
+
+	return blConn, err
+}
+
+func init() {
+	sql.Register("mysql-binlog", &Driver{})
+}
+
+func (c *Conn) getBytes(l uint64) ([]byte, error) {
+	b := make([]byte, l)
+	_, err := c.tcpConn.Read(b)
+
+	return b, err
+}
+
+func (c *Conn) consumeBytes(l uint64) error {
+	b := make([]byte, l)
+	_, err := c.tcpConn.Read(b)
+
+	return err
+}
+
+func (c *Conn) getInt(t int, l uint64) (uint64, error) {
+	var v uint64
+	var err error = nil
+
+	switch t {
+	case TypeFixedInt:
+		v, err = c.popFixedInt(l)
+	default:
+		v = 0
+	}
+
+	if err != nil {
+		return 0, err
+	}
+
+	return v, nil
+}
+
+func (c *Conn) getString(t int, l uint64) (string, error) {
+	var v string
+	var err error = nil
+
+	switch t {
+	case TypeFixedString:
+		v, err = c.popFixedString(l)
+	case TypeNullTerminatedString:
+		v, err = c.popNullTerminatedString()
+	default:
+		v = ""
+	}
+
+	if err != nil {
+		return "", err
+	}
+
+	return v, nil
+}
+
+func (c *Conn) readBytes(l uint64) (*bytes.Buffer, error) {
+	b := make([]byte, l)
+	_, err := c.tcpConn.Read(b)
+	if err != nil {
+		return nil, err
+	}
+
+	return bytes.NewBuffer(b), nil
+}
+
+func (c *Conn) readToNull() (*bytes.Buffer, error) {
+	var s []byte
+	for {
+		bA := make([]byte, 1)
+		_, err := c.tcpConn.Read(bA)
+		if err != nil {
+			return nil, err
+		}
+
+		b := bA[0]
+		if b == NullByte {
+			break
+		} else {
+			s = append(s, b)
+		}
+	}
+
+	return bytes.NewBuffer(s), nil
+}
+
+func (c *Conn) popNullTerminatedString() (string, error) {
+	b, err := c.readToNull()
+	if err != nil {
+		return "", err
+	}
+
+	return string(b.Bytes()), nil
+}
+
+func (c *Conn) popFixedString(l uint64) (string, error) {
+	b, err := c.readBytes(l)
+	if err != nil {
+		return "", err
+	}
+
+	return string(b.Bytes()), nil
+}
+
+func (c *Conn) popFixedInt(l uint64) (uint64, error) {
+	b, err := c.readBytes(l)
+	if err != nil {
+		return 0, err
+	}
+
+	var i uint64
+	i, err = binary.ReadUvarint(b)
+
+	return i, err
+}

+ 203 - 0
binlog/handshake.go

@@ -0,0 +1,203 @@
+package binlog
+
+type HandshakePacket struct {
+	PacketLength         uint64
+	SequenceId           uint64
+	ProtocolVersion      uint64
+	ServerVersion        string
+	ThreadId             uint64
+	AuthPluginDataPart1  []byte
+	CapabilityFlags1     []byte
+	Charset              uint64
+	Status               []byte
+	CapabilityFlags2     []byte
+	AuthPluginDataLength uint64
+	AuthPluginDataPart2  []byte
+	AuthPluginName       string
+	CapabilityFlags      *CapabilityFlags
+	StatusFlags          *StatusFlags
+}
+
+type CapabilityFlags struct {
+	LongPassword               bool
+	FoundRows                  bool
+	LongFlag                   bool
+	ConnectWithDb              bool
+	NoSchema                   bool
+	Compress                   bool
+	Odbc                       bool
+	LocalFiles                 bool
+	IgnoreSpace                bool
+	Protocol41                 bool
+	Interactive                bool
+	Ssl                        bool
+	IgnoreSigpipe              bool
+	Transactions               bool
+	Reserved                   bool
+	Reserved2                  bool
+	MultiStatements            bool
+	MultiResults               bool
+	PsMultiResults             bool
+	ConnectAttrs               bool
+	PluginAuthLenEncClientData bool
+	CanHandleExpiredPasswords  bool
+	SessionTrack               bool
+	DeprecateEOF               bool
+	SslVerifyServerCert        bool
+	OptionalResultSetMetadata  bool
+	RememberOptions            bool
+}
+
+type StatusFlags struct {
+	StatusInTrans            bool
+	StatusAutocommit         bool
+	MoreResultsExists        bool
+	QueryNoGoodIndexUsed     bool
+	QueryNoIndexUsed         bool
+	StatusCursorExists       bool
+	StatusLastRowSent        bool
+	StatusDbDropped          bool
+	StatusNoBackslashEscapes bool
+	StatusMetadataChanged    bool
+	QueryWasSlow             bool
+	PsOutParams              bool
+	StatusInTransReadonly    bool
+	SessionStateChanged      bool
+}
+
+func (hs *HandshakePacket) decodeCapabilityFlags() {
+	hs.CapabilityFlags = &CapabilityFlags{
+		LongPassword:               (hs.CapabilityFlags1[0] & 1) > 0,
+		FoundRows:                  (hs.CapabilityFlags1[0] & 2) > 0,
+		LongFlag:                   (hs.CapabilityFlags1[0] & 4) > 0,
+		ConnectWithDb:              (hs.CapabilityFlags1[0] & 8) > 0,
+		NoSchema:                   (hs.CapabilityFlags1[0] & 16) > 0,
+		Compress:                   (hs.CapabilityFlags1[0] & 32) > 0,
+		Odbc:                       (hs.CapabilityFlags1[0] & 64) > 0,
+		LocalFiles:                 (hs.CapabilityFlags1[0] & 128) > 0,
+		IgnoreSpace:                (hs.CapabilityFlags1[1] & 1) > 0,
+		Protocol41:                 (hs.CapabilityFlags1[1] & 2) > 0,
+		Interactive:                (hs.CapabilityFlags1[1] & 4) > 0,
+		Ssl:                        (hs.CapabilityFlags1[1] & 8) > 0,
+		IgnoreSigpipe:              (hs.CapabilityFlags1[1] & 16) > 0,
+		Transactions:               (hs.CapabilityFlags1[1] & 32) > 0,
+		Reserved:                   (hs.CapabilityFlags1[1] & 64) > 0,
+		Reserved2:                  (hs.CapabilityFlags1[1] & 128) > 0,
+		MultiStatements:            (hs.CapabilityFlags2[0] & 1) > 0,
+		MultiResults:               (hs.CapabilityFlags2[0] & 2) > 0,
+		PsMultiResults:             (hs.CapabilityFlags2[0] & 4) > 0,
+		ConnectAttrs:               (hs.CapabilityFlags2[0] & 8) > 0,
+		PluginAuthLenEncClientData: (hs.CapabilityFlags2[0] & 16) > 0,
+		CanHandleExpiredPasswords:  (hs.CapabilityFlags2[0] & 32) > 0,
+		SessionTrack:               (hs.CapabilityFlags2[0] & 64) > 0,
+		DeprecateEOF:               (hs.CapabilityFlags2[1] & 128) > 0,
+		SslVerifyServerCert:        (hs.CapabilityFlags2[1] & 1) > 0,
+		OptionalResultSetMetadata:  (hs.CapabilityFlags2[1] & 2) > 0,
+		RememberOptions:            (hs.CapabilityFlags2[1] & 4) > 0,
+	}
+}
+
+func (hs *HandshakePacket) decodeStatusFlags() {
+	hs.StatusFlags = &StatusFlags{
+		StatusInTrans:            (hs.Status[0] & 1) > 0,
+		StatusAutocommit:         (hs.Status[0] & 2) > 0,
+		MoreResultsExists:        (hs.Status[0] & 4) > 0,
+		QueryNoGoodIndexUsed:     (hs.Status[0] & 8) > 0,
+		QueryNoIndexUsed:         (hs.Status[0] & 16) > 0,
+		StatusCursorExists:       (hs.Status[0] & 32) > 0,
+		StatusLastRowSent:        (hs.Status[0] & 64) > 0,
+		StatusDbDropped:          (hs.Status[0] & 128) > 0,
+		StatusNoBackslashEscapes: (hs.Status[1] & 1) > 0,
+		StatusMetadataChanged:    (hs.Status[1] & 2) > 0,
+		QueryWasSlow:             (hs.Status[1] & 4) > 0,
+		PsOutParams:              (hs.Status[1] & 8) > 0,
+		StatusInTransReadonly:    (hs.Status[1] & 16) > 0,
+		SessionStateChanged:      (hs.Status[1] & 32) > 0,
+	}
+}
+
+func (c *Conn) handshakePacket() (*HandshakePacket, error) {
+	packet := HandshakePacket{}
+	var err error
+
+	packet.PacketLength, err = c.getInt(TypeFixedInt, 3)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.SequenceId, err = c.getInt(TypeFixedInt, 1)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.ProtocolVersion, err = c.getInt(TypeFixedInt, 1)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.ServerVersion, err = c.getString(TypeNullTerminatedString, 0)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.ThreadId, err = c.getInt(TypeFixedInt, 4)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.AuthPluginDataPart1, err = c.getBytes(8)
+	if err != nil {
+		return nil, err
+	}
+
+	err = c.consumeBytes(1)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.CapabilityFlags1, err = c.getBytes(2)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.Charset, err = c.getInt(TypeFixedInt, 1)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.Status, err = c.getBytes(2)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.decodeStatusFlags()
+
+	packet.CapabilityFlags2, err = c.getBytes(2)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.decodeCapabilityFlags()
+
+	packet.AuthPluginDataLength, err = c.getInt(TypeFixedInt, 1)
+	if err != nil {
+		return nil, err
+	}
+
+	err = c.consumeBytes(10)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.AuthPluginDataPart2, err = c.getBytes(packet.AuthPluginDataLength - 8)
+	if err != nil {
+		return nil, err
+	}
+
+	packet.AuthPluginName, err = c.getString(TypeNullTerminatedString, 0)
+	if err != nil {
+		return nil, err
+	}
+
+	return &packet, nil
+}

+ 19 - 0
main.go

@@ -0,0 +1,19 @@
+package main
+
+import (
+	"database/sql"
+	"fmt"
+	_ "github.com/macinjosh/mysql-binlog-filter/binlog"
+)
+
+func main() {
+	conn, err := sql.Open("mysql-binlog", "{\"host\": \"127.0.0.1\", \"port\": 3306, \"user\": \"root\", \"password\": \"root\"}")
+	if err != nil {
+		fmt.Printf("Open Error: %+v\n", err)
+	}
+
+	err = conn.Ping()
+	if err != nil {
+		fmt.Printf("Ping Error: %+v\n", err)
+	}
+}