connection.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package binlog
  2. import (
  3. "bytes"
  4. "database/sql"
  5. "database/sql/driver"
  6. "encoding/binary"
  7. "encoding/json"
  8. "fmt"
  9. "net"
  10. "time"
  11. )
  12. // MySQL Packet Data Types
  13. const TypeNullTerminatedString = int(0)
  14. const TypeFixedString = int(1)
  15. const TypeFixedInt = int(2)
  16. const TypeLenEncodedInt = int(3)
  17. // Integer Maximums
  18. const MaxUint8 = 1<<8 - 1
  19. const MaxUint16 = 1<<16 - 1
  20. const MaxUint24 = 1<<24 - 1
  21. const MaxUint64 = 1<<64 - 1
  22. // Misc. Constants
  23. const NullByte byte = 0
  24. const MaxPacketSize = MaxUint16
  25. type Config struct {
  26. Host string `json:"host"`
  27. Port int `json:"port"`
  28. User string `json:"user"`
  29. Pass string `json:"password"`
  30. Database string `json:"database"`
  31. SSL bool `json:"ssl"`
  32. VerifyCert bool `json:"verify_cert"`
  33. Timeout time.Duration
  34. }
  35. func newBinlogConfig(dsn string) (*Config, error) {
  36. var err error
  37. config := Config{}
  38. err = json.Unmarshal([]byte(dsn), &config)
  39. return &config, err
  40. }
  41. type Conn struct {
  42. Config *Config
  43. tcpConn *net.TCPConn
  44. Handshake *Handshake
  45. }
  46. func newBinlogConn(config *Config) Conn {
  47. return Conn{
  48. Config: config,
  49. }
  50. }
  51. func (c Conn) Prepare(query string) (driver.Stmt, error) {
  52. return nil, nil
  53. }
  54. func (c Conn) Close() error {
  55. return nil
  56. }
  57. func (c Conn) Begin() (driver.Tx, error) {
  58. return nil, nil
  59. }
  60. type Driver struct{}
  61. func (d Driver) Open(dsn string) (driver.Conn, error) {
  62. config, err := newBinlogConfig(dsn)
  63. if nil != err {
  64. return nil, err
  65. }
  66. blConn := newBinlogConn(config)
  67. dialer := net.Dialer{Timeout: blConn.Config.Timeout}
  68. addr := fmt.Sprintf("%s:%d", blConn.Config.Host, blConn.Config.Port)
  69. c, err := dialer.Dial("tcp", addr)
  70. blConn.tcpConn = c.(*net.TCPConn)
  71. if err != nil {
  72. netErr, ok := err.(net.Error)
  73. if ok && netErr.Temporary() {
  74. fmt.Printf("Error: %s", netErr.Error())
  75. return nil, err
  76. }
  77. }
  78. err = blConn.decodeHandshakePacket()
  79. b := blConn.encodeHandshakeResponse()
  80. fmt.Printf("%08b\n%d\n%s", b, b, b)
  81. _, err = blConn.tcpConn.Write(b)
  82. return blConn, err
  83. }
  84. func init() {
  85. sql.Register("mysql-binlog", &Driver{})
  86. }
  87. func (c *Conn) getBytes(l uint64) ([]byte, error) {
  88. b := make([]byte, l)
  89. _, err := c.tcpConn.Read(b)
  90. return b, err
  91. }
  92. func (c *Conn) consumeBytes(l uint64) error {
  93. b := make([]byte, l)
  94. _, err := c.tcpConn.Read(b)
  95. return err
  96. }
  97. func (c *Conn) getInt(t int, l uint64) (uint64, error) {
  98. var v uint64
  99. var err error = nil
  100. switch t {
  101. case TypeFixedInt:
  102. v, err = c.popFixedInt(l)
  103. default:
  104. v = 0
  105. }
  106. if err != nil {
  107. return 0, err
  108. }
  109. return v, nil
  110. }
  111. func (c *Conn) getString(t int, l uint64) (string, error) {
  112. var v string
  113. var err error = nil
  114. switch t {
  115. case TypeFixedString:
  116. v, err = c.popFixedString(l)
  117. case TypeNullTerminatedString:
  118. v, err = c.popNullTerminatedString()
  119. default:
  120. v = ""
  121. }
  122. if err != nil {
  123. return "", err
  124. }
  125. return v, nil
  126. }
  127. func (c *Conn) readBytes(l uint64) (*bytes.Buffer, error) {
  128. b := make([]byte, l)
  129. _, err := c.tcpConn.Read(b)
  130. if err != nil {
  131. return nil, err
  132. }
  133. return bytes.NewBuffer(b), nil
  134. }
  135. func (c *Conn) readToNull() (*bytes.Buffer, error) {
  136. var s []byte
  137. for {
  138. bA := make([]byte, 1)
  139. _, err := c.tcpConn.Read(bA)
  140. if err != nil {
  141. return nil, err
  142. }
  143. b := bA[0]
  144. if b == NullByte {
  145. break
  146. } else {
  147. s = append(s, b)
  148. }
  149. }
  150. return bytes.NewBuffer(s), nil
  151. }
  152. func (c *Conn) popNullTerminatedString() (string, error) {
  153. b, err := c.readToNull()
  154. if err != nil {
  155. return "", err
  156. }
  157. return string(b.Bytes()), nil
  158. }
  159. func (c *Conn) popFixedString(l uint64) (string, error) {
  160. b, err := c.readBytes(l)
  161. if err != nil {
  162. return "", err
  163. }
  164. return string(b.Bytes()), nil
  165. }
  166. func (c *Conn) popFixedInt(l uint64) (uint64, error) {
  167. b, err := c.readBytes(l)
  168. if err != nil {
  169. return 0, err
  170. }
  171. var i uint64
  172. i, err = binary.ReadUvarint(b)
  173. return i, err
  174. }
  175. func (c *Conn) encFixedLenInt(l uint64, v uint64) []byte {
  176. b := make([]byte, 4)
  177. binary.LittleEndian.PutUint64(b, v)
  178. return b[:(l - 1)]
  179. }
  180. func (c *Conn) encLenEncInt(v uint64) []byte {
  181. prefix := make([]byte, 1)
  182. var b []byte
  183. switch {
  184. case v < MaxUint8:
  185. b = make([]byte, 2)
  186. binary.LittleEndian.PutUint16(b, uint16(v))
  187. b = b[:1]
  188. case v >= MaxUint8 && v < MaxUint16:
  189. prefix[0] = 0xFC
  190. b = make([]byte, 3)
  191. binary.LittleEndian.PutUint16(b, uint16(v))
  192. b = b[:2]
  193. case v >= MaxUint16 && v < MaxUint24:
  194. prefix[0] = 0xFD
  195. b = make([]byte, 4)
  196. binary.LittleEndian.PutUint32(b, uint32(v))
  197. b = b[:3]
  198. case v >= MaxUint24 && v < MaxUint64:
  199. prefix[0] = 0xFE
  200. b = make([]byte, 9)
  201. binary.LittleEndian.PutUint64(b, uint64(v))
  202. }
  203. b = append(prefix, b...)
  204. return b
  205. }