connection.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723
  1. package binlog
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/tls"
  6. "database/sql"
  7. "database/sql/driver"
  8. "encoding/binary"
  9. "encoding/json"
  10. "fmt"
  11. "io/ioutil"
  12. "math"
  13. "net"
  14. "reflect"
  15. "strings"
  16. "time"
  17. )
  18. // Misc. Constants
  19. const NullByte byte = 0
  20. var EOF = bytes.NewBuffer([]byte{NullByte})
  21. const MaxPacketSize = MaxUint16
  22. // MySQL Packet Data Types
  23. const TypeNullTerminatedString = int(0)
  24. const TypeFixedString = int(1)
  25. const TypeFixedInt = int(2)
  26. const TypeLenEncInt = int(3)
  27. const TypeRestOfPacketString = int(4)
  28. const TypeLenEncString = int(5)
  29. // Integer Maximums
  30. const MaxUint08 = 1<<8 - 1
  31. const MaxUint16 = 1<<16 - 1
  32. const MaxUint24 = 1<<24 - 1
  33. const MaxUint64 = 1<<64 - 1
  34. // Packet Statuses
  35. const StatusOK = 0x00
  36. const StatusEOF = 0xFE
  37. const StatusErr = 0xFF
  38. const StatusAuth = 0x01
  39. type Config struct {
  40. Host string `json:"host"`
  41. Port int `json:"port"`
  42. User string `json:"user"`
  43. Pass string `json:"password"`
  44. Database string `json:"database"`
  45. SSL bool `json:"ssl"`
  46. SSLCA string `json:"ssl-ca"`
  47. SSLCer string `json:"ssl-cer"`
  48. SSLKey string `json:"ssl-key"`
  49. VerifyCert bool `json:"verify-cert"`
  50. ServerId uint64 `json:"server-id"`
  51. BinlogFile string `json:"binlog-file"`
  52. Timeout time.Duration
  53. }
  54. func newBinlogConfig(dsn string) (*Config, error) {
  55. var err error
  56. b, err := ioutil.ReadFile(dsn)
  57. if err != nil {
  58. return nil, err
  59. }
  60. config := Config{}
  61. err = json.Unmarshal(b, &config)
  62. return &config, err
  63. }
  64. type Conn struct {
  65. Config *Config
  66. curConn net.Conn
  67. tcpConn *net.TCPConn
  68. secTCPConn *tls.Conn
  69. Handshake *Handshake
  70. HandshakeResponse *HandshakeResponse
  71. buffer *bufio.ReadWriter
  72. scanner *bufio.Scanner
  73. err error
  74. sequenceId uint64
  75. writeBuf *bytes.Buffer
  76. StatusFlags *StatusFlags
  77. Listener *net.Listener
  78. packetHeader *PacketHeader
  79. scanPos uint64
  80. }
  81. func newBinlogConn(config *Config) Conn {
  82. return Conn{
  83. Config: config,
  84. sequenceId: 1,
  85. }
  86. }
  87. func (c Conn) Prepare(query string) (driver.Stmt, error) {
  88. return nil, nil
  89. }
  90. func (c Conn) Close() error {
  91. return nil
  92. }
  93. func (c Conn) Begin() (driver.Tx, error) {
  94. return nil, nil
  95. }
  96. type Driver struct{}
  97. func (d Driver) Open(dsn string) (driver.Conn, error) {
  98. config, err := newBinlogConfig(dsn)
  99. if nil != err {
  100. return nil, err
  101. }
  102. c := newBinlogConn(config)
  103. var t interface{}
  104. dialer := net.Dialer{Timeout: c.Config.Timeout}
  105. addr := fmt.Sprintf("%s:%d", c.Config.Host, c.Config.Port)
  106. t, err = dialer.Dial("tcp", addr)
  107. if err != nil {
  108. netErr, ok := err.(net.Error)
  109. if ok && !netErr.Temporary() {
  110. fmt.Printf("Error: %s", netErr.Error())
  111. return nil, err
  112. }
  113. } else {
  114. c.tcpConn = t.(*net.TCPConn)
  115. c.setConnection(t.(net.Conn))
  116. }
  117. err = c.decodeHandshakePacket()
  118. if err != nil {
  119. return nil, err
  120. }
  121. c.HandshakeResponse = c.NewHandshakeResponse()
  122. // If we are on SSL send SSL_Request packet now
  123. if c.Config.SSL {
  124. err = c.writeSSLRequestPacket()
  125. if err != nil {
  126. return nil, err
  127. }
  128. tlsConf := NewClientTLSConfig(
  129. c.Config.SSLKey,
  130. c.Config.SSLCer,
  131. []byte(c.Config.SSLCA),
  132. c.Config.VerifyCert,
  133. c.Config.Host,
  134. )
  135. c.secTCPConn = tls.Client(c.tcpConn, tlsConf)
  136. c.setConnection(c.secTCPConn)
  137. }
  138. err = c.writeHandshakeResponse()
  139. if err != nil {
  140. return nil, err
  141. }
  142. // Listen for auth response.
  143. _, err = c.readPacket()
  144. if err != nil {
  145. return nil, err
  146. }
  147. // Auth was successful.
  148. c.sequenceId = 0
  149. // Register as a slave
  150. err = c.registerAsSlave()
  151. if err != nil {
  152. return nil, err
  153. }
  154. c.sequenceId = 0
  155. _, err = c.readPacket()
  156. if err != nil {
  157. return nil, err
  158. }
  159. err = c.startBinlogStream()
  160. if err != nil {
  161. return nil, err
  162. }
  163. err = c.listenForBinlog()
  164. if err != nil {
  165. return nil, err
  166. }
  167. return c, err
  168. }
  169. func (c *Conn) readPacket() (interface{}, error) {
  170. ph, err := c.getPacketHeader()
  171. if err != nil {
  172. return nil, err
  173. }
  174. var res interface{}
  175. switch ph.Status {
  176. case StatusAuth:
  177. res, err := c.decodeAuthMoreDataResponsePacket(ph)
  178. if err != nil {
  179. return nil, err
  180. }
  181. switch res.Data {
  182. case SHA2_FAST_AUTH_SUCCESS:
  183. case SHA2_REQUEST_PUBLIC_KEY:
  184. case SHA2_PERFORM_FULL_AUTHENTICATION:
  185. c.putBytes(append([]byte(c.Config.Pass), NullByte))
  186. if c.Flush() != nil {
  187. return nil, c.Flush()
  188. }
  189. }
  190. case StatusEOF:
  191. fallthrough
  192. case StatusOK:
  193. res, err = c.decodeOKPacket(ph)
  194. if err != nil {
  195. return nil, err
  196. }
  197. case StatusErr:
  198. res, err = c.decodeErrorPacket(ph)
  199. if err != nil {
  200. return nil, err
  201. }
  202. err = fmt.Errorf(
  203. "Error %d: %s",
  204. res.(*ErrorPacket).ErrorCode,
  205. res.(*ErrorPacket).ErrorMessage,
  206. )
  207. return res, err
  208. default:
  209. fmt.Printf("Unknown PacketHeader: %+v\n", ph)
  210. }
  211. err = c.scanner.Err()
  212. if err != nil {
  213. return nil, err
  214. }
  215. return res, nil
  216. }
  217. type PacketHeader struct {
  218. Length uint64
  219. SequenceID uint64
  220. Status uint64
  221. }
  222. func (c *Conn) getPacketHeader() (*PacketHeader, error) {
  223. ph := PacketHeader{}
  224. ph.Length = c.getInt(TypeFixedInt, 3)
  225. ph.SequenceID = c.getInt(TypeFixedInt, 1)
  226. ph.Status = c.getInt(TypeFixedInt, 1)
  227. fmt.Printf("ph.Status = %+v\n", ph.Status)
  228. err := c.scanner.Err()
  229. if err != nil {
  230. return &ph, err
  231. }
  232. c.packetHeader = &ph
  233. c.scanPos = 0
  234. return &ph, nil
  235. }
  236. func init() {
  237. sql.Register("mysql-binlog", &Driver{})
  238. }
  239. func (c *Conn) readBytes(l uint64) *bytes.Buffer {
  240. b := make([]byte, 0)
  241. for i := uint64(0); i < l; i++ {
  242. didScan := c.scanner.Scan()
  243. if !didScan {
  244. err := c.scanner.Err()
  245. if err != nil {
  246. panic(err)
  247. }
  248. } else {
  249. b = append(b, c.scanner.Bytes()...)
  250. }
  251. }
  252. c.scanPos += uint64(len(b))
  253. return bytes.NewBuffer(b)
  254. }
  255. func (c *Conn) getBytesUntilNull() *bytes.Buffer {
  256. l := uint64(1)
  257. s := c.readBytes(l)
  258. b := s.Bytes()
  259. for {
  260. if uint64(s.Len()) != l || s.Bytes()[0] == NullByte {
  261. break
  262. }
  263. s = c.readBytes(uint64(l))
  264. b = append(b, s.Bytes()...)
  265. }
  266. return bytes.NewBuffer(b)
  267. }
  268. func (c *Conn) discardBytes(l uint64) {
  269. c.readBytes(l)
  270. }
  271. func (c *Conn) getInt(t int, l uint64) uint64 {
  272. var v uint64
  273. switch t {
  274. case TypeFixedInt:
  275. v = c.decFixedInt(l)
  276. case TypeLenEncInt:
  277. v = c.decLenEncInt()
  278. default:
  279. v = 0
  280. }
  281. return v
  282. }
  283. func (c *Conn) getString(t int, l uint64) string {
  284. var v string
  285. switch t {
  286. case TypeFixedString:
  287. v = c.decFixedString(l)
  288. case TypeLenEncString:
  289. v = string(c.decLenEncInt())
  290. case TypeNullTerminatedString:
  291. v = c.decNullTerminatedString()
  292. case TypeRestOfPacketString:
  293. v = c.decRestOfPacketString()
  294. default:
  295. v = ""
  296. }
  297. return v
  298. }
  299. func (c *Conn) decRestOfPacketString() string {
  300. b := c.getRemainingBytes()
  301. return b.String()
  302. }
  303. func (c *Conn) getRemainingBytes() *bytes.Buffer {
  304. l := (c.packetHeader.Length - 1) - c.scanPos
  305. b := c.readBytes(l)
  306. return b
  307. }
  308. func (c *Conn) decNullTerminatedString() string {
  309. b := c.getBytesUntilNull()
  310. return strings.TrimRight(b.String(), string(NullByte))
  311. }
  312. func (c *Conn) decFixedString(l uint64) string {
  313. b := c.readBytes(l)
  314. return b.String()
  315. }
  316. func (c *Conn) decLenEncInt() uint64 {
  317. var l uint16
  318. b := c.readBytes(1)
  319. br := bytes.NewReader(b.Bytes())
  320. _ = binary.Read(br, binary.LittleEndian, &l)
  321. if l > 0 {
  322. return c.decFixedInt(uint64(l))
  323. } else {
  324. return 0
  325. }
  326. }
  327. func (c *Conn) decFixedInt(l uint64) uint64 {
  328. var i uint64
  329. b := c.readBytes(l)
  330. if l <= 2 {
  331. var x uint16
  332. pb := c.padBytes(2, b.Bytes())
  333. br := bytes.NewReader(pb)
  334. _ = binary.Read(br, binary.LittleEndian, &x)
  335. i = uint64(x)
  336. } else if l <= 4 {
  337. var x uint32
  338. pb := c.padBytes(4, b.Bytes())
  339. br := bytes.NewReader(pb)
  340. _ = binary.Read(br, binary.LittleEndian, &x)
  341. i = uint64(x)
  342. } else if l <= 8 {
  343. var x uint64
  344. pb := c.padBytes(8, b.Bytes())
  345. br := bytes.NewReader(pb)
  346. _ = binary.Read(br, binary.LittleEndian, &x)
  347. i = x
  348. }
  349. return i
  350. }
  351. func (c *Conn) padBytes(l int, b []byte) []byte {
  352. bl := len(b)
  353. pl := l - bl
  354. for i := 0; i < pl; i++ {
  355. b = append(b, NullByte)
  356. }
  357. return b
  358. }
  359. func (c *Conn) encFixedLenInt(v uint64, l uint64) []byte {
  360. b := make([]byte, 8)
  361. binary.LittleEndian.PutUint64(b, v)
  362. return b[:l]
  363. }
  364. func (c *Conn) encLenEncInt(v uint64) []byte {
  365. prefix := make([]byte, 1)
  366. var b []byte
  367. switch {
  368. case v < MaxUint08:
  369. b = make([]byte, 2)
  370. binary.LittleEndian.PutUint16(b, uint16(v))
  371. b = b[:1]
  372. case v >= MaxUint08 && v < MaxUint16:
  373. prefix[0] = 0xFC
  374. b = make([]byte, 3)
  375. binary.LittleEndian.PutUint16(b, uint16(v))
  376. b = b[:2]
  377. case v >= MaxUint16 && v < MaxUint24:
  378. prefix[0] = 0xFD
  379. b = make([]byte, 4)
  380. binary.LittleEndian.PutUint32(b, uint32(v))
  381. b = b[:3]
  382. case v >= MaxUint24 && v < MaxUint64:
  383. prefix[0] = 0xFE
  384. b = make([]byte, 9)
  385. binary.LittleEndian.PutUint64(b, uint64(v))
  386. }
  387. if len(b) > 1 {
  388. b = append(prefix, b...)
  389. }
  390. return b
  391. }
  392. func (c *Conn) bitmaskToStruct(b []byte, s interface{}) interface{} {
  393. l := len(b)
  394. t := reflect.TypeOf(s)
  395. v := reflect.New(t.Elem()).Elem()
  396. for i := uint(0); i < uint(v.NumField()); i++ {
  397. f := v.Field(int(i))
  398. var v bool
  399. switch {
  400. case l > 4:
  401. x := binary.LittleEndian.Uint64(b)
  402. flag := uint64(1 << i)
  403. v = x&flag > 0
  404. case l > 2:
  405. x := binary.LittleEndian.Uint32(b)
  406. flag := uint32(1 << i)
  407. v = x&flag > 0
  408. case l > 1:
  409. x := binary.LittleEndian.Uint16(b)
  410. flag := uint16(1 << i)
  411. v = x&flag > 0
  412. default:
  413. x := uint(b[0])
  414. flag := uint(1 << i)
  415. v = x&flag > 0
  416. }
  417. f.SetBool(v)
  418. }
  419. return v.Interface()
  420. }
  421. func (c *Conn) structToBitmask(s interface{}) []byte {
  422. t := reflect.TypeOf(s).Elem()
  423. sV := reflect.ValueOf(s).Elem()
  424. fC := uint(t.NumField())
  425. m := uint64(0)
  426. for i := uint(0); i < fC; i++ {
  427. f := sV.Field(int(i))
  428. v := f.Bool()
  429. if v {
  430. m |= 1 << i
  431. }
  432. }
  433. l := uint64(math.Ceil(float64(fC) / 8.0))
  434. b := make([]byte, 8)
  435. binary.LittleEndian.PutUint64(b, m)
  436. switch {
  437. case l > 4: // 64 bits
  438. b = b[:8]
  439. case l > 2: // 32 bits
  440. b = b[:4]
  441. case l > 1: // 16 bits
  442. b = b[:2]
  443. default: // 8 bits
  444. b = b[:1]
  445. }
  446. return b
  447. }
  448. func (c *Conn) putString(t int, v string) uint64 {
  449. b := make([]byte, 0)
  450. switch t {
  451. case TypeFixedString:
  452. b = c.encFixedString(v)
  453. case TypeLenEncString:
  454. b = c.encLenEncString(v)
  455. case TypeNullTerminatedString:
  456. b = c.encNullTerminatedString(v)
  457. case TypeRestOfPacketString:
  458. b = c.encRestOfPacketString(v)
  459. }
  460. l, err := c.writeBuf.Write(b)
  461. if err != nil {
  462. c.err = err
  463. }
  464. return uint64(l)
  465. }
  466. func (c *Conn) encLenEncString(v string) []byte {
  467. l := uint64(len(v))
  468. b := c.encLenEncInt(l)
  469. return append(b, c.encFixedString(v)...)
  470. }
  471. func (c *Conn) encNullTerminatedString(v string) []byte {
  472. return append([]byte(v), NullByte)
  473. }
  474. func (c *Conn) encFixedString(v string) []byte {
  475. return []byte(v)
  476. }
  477. func (c *Conn) encRestOfPacketString(v string) []byte {
  478. s := c.encFixedString(v)
  479. return s
  480. }
  481. func (c *Conn) putInt(t int, v uint64, l uint64) uint64 {
  482. c.setupWriteBuffer()
  483. b := make([]byte, 0)
  484. switch t {
  485. case TypeFixedInt:
  486. b = c.encFixedLenInt(v, l)
  487. case TypeLenEncInt:
  488. b = c.encLenEncInt(v)
  489. }
  490. n, err := c.writeBuf.Write(b)
  491. if err != nil {
  492. c.err = err
  493. }
  494. return uint64(n)
  495. }
  496. func (c *Conn) putNullBytes(n uint64) uint64 {
  497. c.setupWriteBuffer()
  498. b := make([]byte, n)
  499. l, err := c.writeBuf.Write(b)
  500. if err != nil {
  501. c.err = err
  502. }
  503. return uint64(l)
  504. }
  505. func (c *Conn) putBytes(v []byte) uint64 {
  506. c.setupWriteBuffer()
  507. l, err := c.writeBuf.Write(v)
  508. if err != nil {
  509. c.err = err
  510. }
  511. return uint64(l)
  512. }
  513. func (c *Conn) Flush() error {
  514. if c.err != nil {
  515. return c.err
  516. }
  517. c.writeBuf = c.addHeader()
  518. //fmt.Printf("c.writeBuf = %x\n", c.writeBuf.Bytes())
  519. _, _ = c.buffer.Write(c.writeBuf.Bytes())
  520. if c.buffer.Flush() != nil {
  521. return c.buffer.Flush()
  522. }
  523. c.writeBuf = nil
  524. return nil
  525. }
  526. func (c *Conn) addHeader() *bytes.Buffer {
  527. pl := uint64(c.writeBuf.Len())
  528. sId := uint64(c.sequenceId)
  529. c.sequenceId++
  530. plB := c.encFixedLenInt(pl, 3)
  531. sIdB := c.encFixedLenInt(sId, 1)
  532. return bytes.NewBuffer(append(append(plB, sIdB...), c.writeBuf.Bytes()...))
  533. }
  534. func (c *Conn) setupWriteBuffer() {
  535. if c.writeBuf == nil {
  536. c.writeBuf = bytes.NewBuffer(nil)
  537. }
  538. }
  539. type StatusFlags struct {
  540. }
  541. type OKPacket struct {
  542. *PacketHeader
  543. Header uint64
  544. AffectedRows uint64
  545. LastInsertID uint64
  546. StatusFlags uint64
  547. Warnings uint64
  548. Info string
  549. SessionStateInfo string
  550. }
  551. func (c *Conn) decodeOKPacket(ph *PacketHeader) (*OKPacket, error) {
  552. op := OKPacket{}
  553. op.PacketHeader = ph
  554. op.Header = ph.Status
  555. op.AffectedRows = c.getInt(TypeLenEncInt, 0)
  556. op.LastInsertID = c.getInt(TypeLenEncInt, 0)
  557. if c.HandshakeResponse.ClientFlag.Protocol41 {
  558. op.StatusFlags = c.getInt(TypeFixedInt, 2)
  559. op.Warnings = c.getInt(TypeFixedInt, 1)
  560. } else if c.HandshakeResponse.ClientFlag.Transactions {
  561. op.StatusFlags = c.getInt(TypeFixedInt, 2)
  562. }
  563. if c.HandshakeResponse.ClientFlag.SessionTrack {
  564. op.Info = c.getString(TypeRestOfPacketString, 0)
  565. } else {
  566. op.Info = c.getString(TypeRestOfPacketString, 0)
  567. }
  568. return &op, nil
  569. }
  570. type ErrorPacket struct {
  571. *PacketHeader
  572. ErrorCode uint64
  573. ErrorMessage string
  574. SQLStateMarker string
  575. SQLState string
  576. }
  577. func (c *Conn) decodeErrorPacket(ph *PacketHeader) (*ErrorPacket, error) {
  578. ep := ErrorPacket{}
  579. ep.PacketHeader = ph
  580. ep.ErrorCode = c.getInt(TypeFixedInt, 2)
  581. ep.SQLStateMarker = c.getString(TypeFixedString, 1)
  582. ep.SQLState = c.getString(TypeFixedString, 5)
  583. ep.ErrorMessage = c.getString(TypeRestOfPacketString, 0)
  584. err := c.scanner.Err()
  585. if err != nil {
  586. return nil, err
  587. }
  588. return &ep, nil
  589. }
  590. func (c *Conn) setConnection(nc net.Conn) {
  591. c.curConn = nc
  592. c.buffer = bufio.NewReadWriter(
  593. bufio.NewReader(c.curConn),
  594. bufio.NewWriter(c.curConn),
  595. )
  596. c.scanner = bufio.NewScanner(c.buffer.Reader)
  597. c.scanner.Split(bufio.ScanBytes)
  598. }