connection.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720
  1. package binlog
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/tls"
  6. "database/sql"
  7. "database/sql/driver"
  8. "encoding/binary"
  9. "encoding/json"
  10. "fmt"
  11. "io/ioutil"
  12. "math"
  13. "net"
  14. "reflect"
  15. "strings"
  16. "time"
  17. )
  18. // Misc. Constants
  19. const NullByte byte = 0
  20. const MaxPacketSize = MaxUint16
  21. // MySQL Packet Data Types
  22. const TypeNullTerminatedString = int(0)
  23. const TypeFixedString = int(1)
  24. const TypeFixedInt = int(2)
  25. const TypeLenEncInt = int(3)
  26. const TypeRestOfPacketString = int(4)
  27. const TypeLenEncString = int(5)
  28. // Integer Maximums
  29. const MaxUint08 = 1<<8 - 1
  30. const MaxUint16 = 1<<16 - 1
  31. const MaxUint24 = 1<<24 - 1
  32. const MaxUint64 = 1<<64 - 1
  33. // Packet Statuses
  34. const StatusOK = 0x00
  35. const StatusEOF = 0xFE
  36. const StatusErr = 0xFF
  37. const StatusAuth = 0x01
  38. type Config struct {
  39. Host string `json:"host"`
  40. Port int `json:"port"`
  41. User string `json:"user"`
  42. Pass string `json:"password"`
  43. Database string `json:"database"`
  44. SSL bool `json:"ssl"`
  45. SSLCA string `json:"ssl-ca"`
  46. SSLCer string `json:"ssl-cer"`
  47. SSLKey string `json:"ssl-key"`
  48. VerifyCert bool `json:"verify-cert"`
  49. ServerId uint64 `json:"server-id"`
  50. BinlogFile string `json:"binlog-file"`
  51. Timeout time.Duration
  52. }
  53. func newBinlogConfig(dsn string) (*Config, error) {
  54. var err error
  55. b, err := ioutil.ReadFile(dsn)
  56. if err != nil {
  57. return nil, err
  58. }
  59. config := Config{}
  60. err = json.Unmarshal(b, &config)
  61. return &config, err
  62. }
  63. type Conn struct {
  64. Config *Config
  65. curConn net.Conn
  66. tcpConn *net.TCPConn
  67. secTCPConn *tls.Conn
  68. Handshake *Handshake
  69. HandshakeResponse *HandshakeResponse
  70. buffer *bufio.ReadWriter
  71. scanner *bufio.Scanner
  72. err error
  73. sequenceId uint64
  74. writeBuf *bytes.Buffer
  75. StatusFlags *StatusFlags
  76. Listener *net.Listener
  77. packetHeader *PacketHeader
  78. scanPos uint64
  79. }
  80. func newBinlogConn(config *Config) Conn {
  81. return Conn{
  82. Config: config,
  83. sequenceId: 1,
  84. }
  85. }
  86. func (c Conn) Prepare(query string) (driver.Stmt, error) {
  87. return nil, nil
  88. }
  89. func (c Conn) Close() error {
  90. return nil
  91. }
  92. func (c Conn) Begin() (driver.Tx, error) {
  93. return nil, nil
  94. }
  95. type Driver struct{}
  96. func (d Driver) Open(dsn string) (driver.Conn, error) {
  97. config, err := newBinlogConfig(dsn)
  98. if nil != err {
  99. return nil, err
  100. }
  101. c := newBinlogConn(config)
  102. var t interface{}
  103. dialer := net.Dialer{Timeout: c.Config.Timeout}
  104. addr := fmt.Sprintf("%s:%d", c.Config.Host, c.Config.Port)
  105. t, err = dialer.Dial("tcp", addr)
  106. if err != nil {
  107. netErr, ok := err.(net.Error)
  108. if ok && !netErr.Temporary() {
  109. fmt.Printf("Error: %s", netErr.Error())
  110. return nil, err
  111. }
  112. } else {
  113. c.tcpConn = t.(*net.TCPConn)
  114. c.setConnection(t.(net.Conn))
  115. }
  116. err = c.decodeHandshakePacket()
  117. if err != nil {
  118. return nil, err
  119. }
  120. c.HandshakeResponse = c.NewHandshakeResponse()
  121. // If we are on SSL send SSL_Request packet now
  122. if c.Config.SSL {
  123. err = c.writeSSLRequestPacket()
  124. if err != nil {
  125. return nil, err
  126. }
  127. tlsConf := NewClientTLSConfig(
  128. c.Config.SSLKey,
  129. c.Config.SSLCer,
  130. []byte(c.Config.SSLCA),
  131. c.Config.VerifyCert,
  132. c.Config.Host,
  133. )
  134. c.secTCPConn = tls.Client(c.tcpConn, tlsConf)
  135. c.setConnection(c.secTCPConn)
  136. }
  137. err = c.writeHandshakeResponse()
  138. if err != nil {
  139. return nil, err
  140. }
  141. // Listen for auth response.
  142. _, err = c.readPacket()
  143. if err != nil {
  144. return nil, err
  145. }
  146. // Auth was successful.
  147. c.sequenceId = 0
  148. // Register as a slave
  149. err = c.registerAsSlave()
  150. if err != nil {
  151. return nil, err
  152. }
  153. c.sequenceId = 0
  154. _, err = c.readPacket()
  155. if err != nil {
  156. return nil, err
  157. }
  158. err = c.startBinlogStream()
  159. if err != nil {
  160. return nil, err
  161. }
  162. err = c.listenForBinlog()
  163. if err != nil {
  164. return nil, err
  165. }
  166. return c, err
  167. }
  168. func (c *Conn) readPacket() (interface{}, error) {
  169. ph, err := c.getPacketHeader()
  170. if err != nil {
  171. return nil, err
  172. }
  173. var res interface{}
  174. switch ph.Status {
  175. case StatusAuth:
  176. res, err := c.decodeAuthMoreDataResponsePacket(ph)
  177. if err != nil {
  178. return nil, err
  179. }
  180. switch res.Data {
  181. case Sha2FastAuthSuccess:
  182. case Sha2RequestPublicKey:
  183. case Sha2PerformFullAuthentication:
  184. c.putBytes(append([]byte(c.Config.Pass), NullByte))
  185. if c.Flush() != nil {
  186. return nil, c.Flush()
  187. }
  188. }
  189. case StatusEOF:
  190. fallthrough
  191. case StatusOK:
  192. res, err = c.decodeOKPacket(ph)
  193. if err != nil {
  194. return nil, err
  195. }
  196. case StatusErr:
  197. res, err = c.decodeErrorPacket(ph)
  198. if err != nil {
  199. return nil, err
  200. }
  201. err = fmt.Errorf(
  202. "error %d: %s",
  203. res.(*ErrorPacket).ErrorCode,
  204. res.(*ErrorPacket).ErrorMessage,
  205. )
  206. return res, err
  207. default:
  208. fmt.Printf("Unknown PacketHeader: %+v\n", ph)
  209. }
  210. err = c.scanner.Err()
  211. if err != nil {
  212. return nil, err
  213. }
  214. return res, nil
  215. }
  216. type PacketHeader struct {
  217. Length uint64
  218. SequenceID uint64
  219. Status uint64
  220. }
  221. func (c *Conn) getPacketHeader() (*PacketHeader, error) {
  222. ph := PacketHeader{}
  223. ph.Length = c.getInt(TypeFixedInt, 3)
  224. ph.SequenceID = c.getInt(TypeFixedInt, 1)
  225. ph.Status = c.getInt(TypeFixedInt, 1)
  226. fmt.Printf("ph.Status = %+v\n", ph.Status)
  227. err := c.scanner.Err()
  228. if err != nil {
  229. return &ph, err
  230. }
  231. c.packetHeader = &ph
  232. c.scanPos = 0
  233. return &ph, nil
  234. }
  235. func init() {
  236. sql.Register("mysql-binlog", &Driver{})
  237. }
  238. func (c *Conn) readBytes(l uint64) *bytes.Buffer {
  239. b := make([]byte, 0)
  240. for i := uint64(0); i < l; i++ {
  241. didScan := c.scanner.Scan()
  242. if !didScan {
  243. err := c.scanner.Err()
  244. if err != nil {
  245. panic(err)
  246. }
  247. } else {
  248. b = append(b, c.scanner.Bytes()...)
  249. }
  250. }
  251. c.scanPos += uint64(len(b))
  252. return bytes.NewBuffer(b)
  253. }
  254. func (c *Conn) getBytesUntilNull() *bytes.Buffer {
  255. l := uint64(1)
  256. s := c.readBytes(l)
  257. b := s.Bytes()
  258. for {
  259. if uint64(s.Len()) != l || s.Bytes()[0] == NullByte {
  260. break
  261. }
  262. s = c.readBytes(l)
  263. b = append(b, s.Bytes()...)
  264. }
  265. return bytes.NewBuffer(b)
  266. }
  267. func (c *Conn) discardBytes(l uint64) {
  268. c.readBytes(l)
  269. }
  270. func (c *Conn) getInt(t int, l uint64) uint64 {
  271. var v uint64
  272. switch t {
  273. case TypeFixedInt:
  274. v = c.decFixedInt(l)
  275. case TypeLenEncInt:
  276. v = c.decLenEncInt()
  277. default:
  278. v = 0
  279. }
  280. return v
  281. }
  282. func (c *Conn) getString(t int, l uint64) string {
  283. var v string
  284. switch t {
  285. case TypeFixedString:
  286. v = c.decFixedString(l)
  287. case TypeLenEncString:
  288. v = string(c.decLenEncInt())
  289. case TypeNullTerminatedString:
  290. v = c.decNullTerminatedString()
  291. case TypeRestOfPacketString:
  292. v = c.decRestOfPacketString()
  293. default:
  294. v = ""
  295. }
  296. return v
  297. }
  298. func (c *Conn) decRestOfPacketString() string {
  299. b := c.getRemainingBytes()
  300. return b.String()
  301. }
  302. func (c *Conn) getRemainingBytes() *bytes.Buffer {
  303. l := (c.packetHeader.Length - 1) - c.scanPos
  304. b := c.readBytes(l)
  305. return b
  306. }
  307. func (c *Conn) decNullTerminatedString() string {
  308. b := c.getBytesUntilNull()
  309. return strings.TrimRight(b.String(), string(NullByte))
  310. }
  311. func (c *Conn) decFixedString(l uint64) string {
  312. b := c.readBytes(l)
  313. return b.String()
  314. }
  315. func (c *Conn) decLenEncInt() uint64 {
  316. var l uint16
  317. b := c.readBytes(1)
  318. br := bytes.NewReader(b.Bytes())
  319. _ = binary.Read(br, binary.LittleEndian, &l)
  320. if l > 0 {
  321. return c.decFixedInt(uint64(l))
  322. } else {
  323. return 0
  324. }
  325. }
  326. func (c *Conn) decFixedInt(l uint64) uint64 {
  327. var i uint64
  328. b := c.readBytes(l)
  329. if l <= 2 {
  330. var x uint16
  331. pb := c.padBytes(2, b.Bytes())
  332. br := bytes.NewReader(pb)
  333. _ = binary.Read(br, binary.LittleEndian, &x)
  334. i = uint64(x)
  335. } else if l <= 4 {
  336. var x uint32
  337. pb := c.padBytes(4, b.Bytes())
  338. br := bytes.NewReader(pb)
  339. _ = binary.Read(br, binary.LittleEndian, &x)
  340. i = uint64(x)
  341. } else if l <= 8 {
  342. var x uint64
  343. pb := c.padBytes(8, b.Bytes())
  344. br := bytes.NewReader(pb)
  345. _ = binary.Read(br, binary.LittleEndian, &x)
  346. i = x
  347. }
  348. return i
  349. }
  350. func (c *Conn) padBytes(l int, b []byte) []byte {
  351. bl := len(b)
  352. pl := l - bl
  353. for i := 0; i < pl; i++ {
  354. b = append(b, NullByte)
  355. }
  356. return b
  357. }
  358. func (c *Conn) encFixedLenInt(v uint64, l uint64) []byte {
  359. b := make([]byte, 8)
  360. binary.LittleEndian.PutUint64(b, v)
  361. return b[:l]
  362. }
  363. func (c *Conn) encLenEncInt(v uint64) []byte {
  364. prefix := make([]byte, 1)
  365. var b []byte
  366. switch {
  367. case v < MaxUint08:
  368. b = make([]byte, 2)
  369. binary.LittleEndian.PutUint16(b, uint16(v))
  370. b = b[:1]
  371. case v >= MaxUint08 && v < MaxUint16:
  372. prefix[0] = 0xFC
  373. b = make([]byte, 3)
  374. binary.LittleEndian.PutUint16(b, uint16(v))
  375. b = b[:2]
  376. case v >= MaxUint16 && v < MaxUint24:
  377. prefix[0] = 0xFD
  378. b = make([]byte, 4)
  379. binary.LittleEndian.PutUint32(b, uint32(v))
  380. b = b[:3]
  381. case v >= MaxUint24 && v < MaxUint64:
  382. prefix[0] = 0xFE
  383. b = make([]byte, 9)
  384. binary.LittleEndian.PutUint64(b, v)
  385. }
  386. if len(b) > 1 {
  387. b = append(prefix, b...)
  388. }
  389. return b
  390. }
  391. func (c *Conn) bitmaskToStruct(b []byte, s interface{}) interface{} {
  392. l := len(b)
  393. t := reflect.TypeOf(s)
  394. v := reflect.New(t.Elem()).Elem()
  395. for i := uint(0); i < uint(v.NumField()); i++ {
  396. f := v.Field(int(i))
  397. var v bool
  398. switch {
  399. case l > 4:
  400. x := binary.LittleEndian.Uint64(b)
  401. flag := uint64(1 << i)
  402. v = x&flag > 0
  403. case l > 2:
  404. x := binary.LittleEndian.Uint32(b)
  405. flag := uint32(1 << i)
  406. v = x&flag > 0
  407. case l > 1:
  408. x := binary.LittleEndian.Uint16(b)
  409. flag := uint16(1 << i)
  410. v = x&flag > 0
  411. default:
  412. x := uint(b[0])
  413. flag := uint(1 << i)
  414. v = x&flag > 0
  415. }
  416. f.SetBool(v)
  417. }
  418. return v.Interface()
  419. }
  420. func (c *Conn) structToBitmask(s interface{}) []byte {
  421. t := reflect.TypeOf(s).Elem()
  422. sV := reflect.ValueOf(s).Elem()
  423. fC := uint(t.NumField())
  424. m := uint64(0)
  425. for i := uint(0); i < fC; i++ {
  426. f := sV.Field(int(i))
  427. v := f.Bool()
  428. if v {
  429. m |= 1 << i
  430. }
  431. }
  432. l := uint64(math.Ceil(float64(fC) / 8.0))
  433. b := make([]byte, 8)
  434. binary.LittleEndian.PutUint64(b, m)
  435. switch {
  436. case l > 4: // 64 bits
  437. b = b[:8]
  438. case l > 2: // 32 bits
  439. b = b[:4]
  440. case l > 1: // 16 bits
  441. b = b[:2]
  442. default: // 8 bits
  443. b = b[:1]
  444. }
  445. return b
  446. }
  447. func (c *Conn) putString(t int, v string) uint64 {
  448. b := make([]byte, 0)
  449. switch t {
  450. case TypeFixedString:
  451. b = c.encFixedString(v)
  452. case TypeLenEncString:
  453. b = c.encLenEncString(v)
  454. case TypeNullTerminatedString:
  455. b = c.encNullTerminatedString(v)
  456. case TypeRestOfPacketString:
  457. b = c.encRestOfPacketString(v)
  458. }
  459. l, err := c.writeBuf.Write(b)
  460. if err != nil {
  461. c.err = err
  462. }
  463. return uint64(l)
  464. }
  465. func (c *Conn) encLenEncString(v string) []byte {
  466. l := uint64(len(v))
  467. b := c.encLenEncInt(l)
  468. return append(b, c.encFixedString(v)...)
  469. }
  470. func (c *Conn) encNullTerminatedString(v string) []byte {
  471. return append([]byte(v), NullByte)
  472. }
  473. func (c *Conn) encFixedString(v string) []byte {
  474. return []byte(v)
  475. }
  476. func (c *Conn) encRestOfPacketString(v string) []byte {
  477. s := c.encFixedString(v)
  478. return s
  479. }
  480. func (c *Conn) putInt(t int, v uint64, l uint64) uint64 {
  481. c.setupWriteBuffer()
  482. b := make([]byte, 0)
  483. switch t {
  484. case TypeFixedInt:
  485. b = c.encFixedLenInt(v, l)
  486. case TypeLenEncInt:
  487. b = c.encLenEncInt(v)
  488. }
  489. n, err := c.writeBuf.Write(b)
  490. if err != nil {
  491. c.err = err
  492. }
  493. return uint64(n)
  494. }
  495. func (c *Conn) putNullBytes(n uint64) uint64 {
  496. c.setupWriteBuffer()
  497. b := make([]byte, n)
  498. l, err := c.writeBuf.Write(b)
  499. if err != nil {
  500. c.err = err
  501. }
  502. return uint64(l)
  503. }
  504. func (c *Conn) putBytes(v []byte) uint64 {
  505. c.setupWriteBuffer()
  506. l, err := c.writeBuf.Write(v)
  507. if err != nil {
  508. c.err = err
  509. }
  510. return uint64(l)
  511. }
  512. func (c *Conn) Flush() error {
  513. if c.err != nil {
  514. return c.err
  515. }
  516. c.writeBuf = c.addHeader()
  517. //fmt.Printf("c.writeBuf = %x\n", c.writeBuf.Bytes())
  518. _, _ = c.buffer.Write(c.writeBuf.Bytes())
  519. if c.buffer.Flush() != nil {
  520. return c.buffer.Flush()
  521. }
  522. c.writeBuf = nil
  523. return nil
  524. }
  525. func (c *Conn) addHeader() *bytes.Buffer {
  526. pl := uint64(c.writeBuf.Len())
  527. sId := c.sequenceId
  528. c.sequenceId++
  529. var plB = c.encFixedLenInt(pl, 3)
  530. var sIdB = c.encFixedLenInt(sId, 1)
  531. return bytes.NewBuffer(append(append(plB, sIdB...), c.writeBuf.Bytes()...))
  532. }
  533. func (c *Conn) setupWriteBuffer() {
  534. if c.writeBuf == nil {
  535. c.writeBuf = bytes.NewBuffer(nil)
  536. }
  537. }
  538. type StatusFlags struct {
  539. }
  540. type OKPacket struct {
  541. *PacketHeader
  542. Header uint64
  543. AffectedRows uint64
  544. LastInsertID uint64
  545. StatusFlags uint64
  546. Warnings uint64
  547. Info string
  548. SessionStateInfo string
  549. }
  550. func (c *Conn) decodeOKPacket(ph *PacketHeader) (*OKPacket, error) {
  551. op := OKPacket{}
  552. op.PacketHeader = ph
  553. op.Header = ph.Status
  554. op.AffectedRows = c.getInt(TypeLenEncInt, 0)
  555. op.LastInsertID = c.getInt(TypeLenEncInt, 0)
  556. if c.HandshakeResponse.ClientFlag.Protocol41 {
  557. op.StatusFlags = c.getInt(TypeFixedInt, 2)
  558. op.Warnings = c.getInt(TypeFixedInt, 1)
  559. } else if c.HandshakeResponse.ClientFlag.Transactions {
  560. op.StatusFlags = c.getInt(TypeFixedInt, 2)
  561. }
  562. if c.HandshakeResponse.ClientFlag.SessionTrack {
  563. op.Info = c.getString(TypeRestOfPacketString, 0)
  564. } else {
  565. op.Info = c.getString(TypeRestOfPacketString, 0)
  566. }
  567. return &op, nil
  568. }
  569. type ErrorPacket struct {
  570. *PacketHeader
  571. ErrorCode uint64
  572. ErrorMessage string
  573. SQLStateMarker string
  574. SQLState string
  575. }
  576. func (c *Conn) decodeErrorPacket(ph *PacketHeader) (*ErrorPacket, error) {
  577. ep := ErrorPacket{}
  578. ep.PacketHeader = ph
  579. ep.ErrorCode = c.getInt(TypeFixedInt, 2)
  580. ep.SQLStateMarker = c.getString(TypeFixedString, 1)
  581. ep.SQLState = c.getString(TypeFixedString, 5)
  582. ep.ErrorMessage = c.getString(TypeRestOfPacketString, 0)
  583. err := c.scanner.Err()
  584. if err != nil {
  585. return nil, err
  586. }
  587. return &ep, nil
  588. }
  589. func (c *Conn) setConnection(nc net.Conn) {
  590. c.curConn = nc
  591. c.buffer = bufio.NewReadWriter(
  592. bufio.NewReader(c.curConn),
  593. bufio.NewWriter(c.curConn),
  594. )
  595. c.scanner = bufio.NewScanner(c.buffer.Reader)
  596. c.scanner.Split(bufio.ScanBytes)
  597. }