connection.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. package binlog
  2. import (
  3. "bufio"
  4. "bytes"
  5. "database/sql"
  6. "database/sql/driver"
  7. "encoding/binary"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io/ioutil"
  12. "math"
  13. "net"
  14. "reflect"
  15. "strings"
  16. "time"
  17. )
  18. // MySQL Packet Data Types
  19. const TypeNullTerminatedString = int(0)
  20. const TypeFixedString = int(1)
  21. const TypeFixedInt = int(2)
  22. const TypeLenEncInt = int(3)
  23. const TypeRestOfPacketString = int(4)
  24. // Integer Maximums
  25. const MaxUint8 = 1<<8 - 1
  26. const MaxUint16 = 1<<16 - 1
  27. const MaxUint24 = 1<<24 - 1
  28. const MaxUint64 = 1<<64 - 1
  29. // Misc. Constants
  30. const NullByte byte = 0
  31. const MaxPacketSize = MaxUint16
  32. type Config struct {
  33. Host string `json:"host"`
  34. Port int `json:"port"`
  35. User string `json:"user"`
  36. Pass string `json:"password"`
  37. Database string `json:"database"`
  38. SSL bool `json:"ssl"`
  39. VerifyCert bool `json:"verify_cert"`
  40. Timeout time.Duration
  41. }
  42. func splitByBytesFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
  43. if atEOF {
  44. return 0, nil, errors.New("scanner found EOF")
  45. }
  46. return 1, data[:1], nil
  47. }
  48. func newBinlogConfig(dsn string) (*Config, error) {
  49. var err error
  50. b, err := ioutil.ReadFile(dsn)
  51. if err != nil {
  52. return nil, err
  53. }
  54. config := Config{}
  55. err = json.Unmarshal(b, &config)
  56. return &config, err
  57. }
  58. type Conn struct {
  59. Config *Config
  60. tcpConn *net.TCPConn
  61. Handshake *Handshake
  62. buffer *bufio.ReadWriter
  63. scanner *bufio.Scanner
  64. err error
  65. sequenceId uint64
  66. writeBuf *bytes.Buffer
  67. }
  68. func newBinlogConn(config *Config) Conn {
  69. return Conn{
  70. Config: config,
  71. sequenceId: 1,
  72. }
  73. }
  74. func (c Conn) Prepare(query string) (driver.Stmt, error) {
  75. return nil, nil
  76. }
  77. func (c Conn) Close() error {
  78. return nil
  79. }
  80. func (c Conn) Begin() (driver.Tx, error) {
  81. return nil, nil
  82. }
  83. type Driver struct{}
  84. func (d Driver) Open(dsn string) (driver.Conn, error) {
  85. config, err := newBinlogConfig(dsn)
  86. if nil != err {
  87. return nil, err
  88. }
  89. c := newBinlogConn(config)
  90. dialer := net.Dialer{Timeout: c.Config.Timeout}
  91. addr := fmt.Sprintf("%s:%d", c.Config.Host, c.Config.Port)
  92. t, err := dialer.Dial("tcp", addr)
  93. c.tcpConn = t.(*net.TCPConn)
  94. if err != nil {
  95. netErr, ok := err.(net.Error)
  96. if ok && netErr.Temporary() {
  97. fmt.Printf("Error: %s", netErr.Error())
  98. return nil, err
  99. }
  100. }
  101. err = c.decodeHandshakePacket()
  102. if err != nil {
  103. return nil, err
  104. }
  105. err = c.writeHandshakeResponse()
  106. if err != nil {
  107. return nil, err
  108. }
  109. fmt.Printf("%+v\n", c.Handshake)
  110. return c, err
  111. }
  112. func init() {
  113. sql.Register("mysql-binlog", &Driver{})
  114. }
  115. func (c *Conn) readBytes(l uint64) *bytes.Buffer {
  116. if c.buffer == nil {
  117. c.buffer = bufio.NewReadWriter(
  118. bufio.NewReader(c.tcpConn),
  119. bufio.NewWriter(c.tcpConn),
  120. )
  121. c.scanner = bufio.NewScanner(c.buffer.Reader)
  122. c.scanner.Split(splitByBytesFunc)
  123. }
  124. b := make([]byte, 0)
  125. for i := uint64(0); i < l; i++ {
  126. c.scanner.Scan()
  127. b = append(b, c.scanner.Bytes()...)
  128. }
  129. return bytes.NewBuffer(b)
  130. }
  131. func (c *Conn) getBytesUntilNull() *bytes.Buffer {
  132. l := uint64(1)
  133. s := c.readBytes(l)
  134. b := s.Bytes()
  135. for true {
  136. if uint64(s.Len()) != l || s.Bytes()[0] == NullByte {
  137. break
  138. }
  139. s = c.readBytes(uint64(l))
  140. b = append(b, s.Bytes()...)
  141. }
  142. return bytes.NewBuffer(b)
  143. }
  144. func (c *Conn) discardBytes(l int) {
  145. for i := 0; i < l; i++ {
  146. c.scanner.Scan()
  147. }
  148. }
  149. func (c *Conn) getInt(t int, l uint64) uint64 {
  150. var v uint64
  151. switch t {
  152. case TypeFixedInt:
  153. v = c.decFixedInt(l)
  154. default:
  155. v = 0
  156. }
  157. return v
  158. }
  159. func (c *Conn) getString(t int, l uint64) string {
  160. var v string
  161. switch t {
  162. case TypeFixedString:
  163. v = c.decFixedString(l)
  164. case TypeNullTerminatedString:
  165. v = c.decNullTerminatedString()
  166. default:
  167. v = ""
  168. }
  169. return v
  170. }
  171. func (c *Conn) decNullTerminatedString() string {
  172. b := c.getBytesUntilNull()
  173. return strings.TrimRight(b.String(), string(NullByte))
  174. }
  175. func (c *Conn) decFixedString(l uint64) string {
  176. b := c.readBytes(l)
  177. return b.String()
  178. }
  179. func (c *Conn) decFixedInt(l uint64) uint64 {
  180. var i uint64
  181. b := c.readBytes(l)
  182. i, _ = binary.ReadUvarint(b)
  183. return i
  184. }
  185. func (c *Conn) encFixedLenInt(v uint64, l uint64) []byte {
  186. b := make([]byte, 8)
  187. binary.LittleEndian.PutUint64(b, v)
  188. return b[:l]
  189. }
  190. func (c *Conn) encLenEncInt(v uint64) []byte {
  191. prefix := make([]byte, 1)
  192. var b []byte
  193. switch {
  194. case v < MaxUint8:
  195. b = make([]byte, 2)
  196. binary.LittleEndian.PutUint16(b, uint16(v))
  197. b = b[:1]
  198. case v >= MaxUint8 && v < MaxUint16:
  199. prefix[0] = 0xFC
  200. b = make([]byte, 3)
  201. binary.LittleEndian.PutUint16(b, uint16(v))
  202. b = b[:2]
  203. case v >= MaxUint16 && v < MaxUint24:
  204. prefix[0] = 0xFD
  205. b = make([]byte, 4)
  206. binary.LittleEndian.PutUint32(b, uint32(v))
  207. b = b[:3]
  208. case v >= MaxUint24 && v < MaxUint64:
  209. prefix[0] = 0xFE
  210. b = make([]byte, 9)
  211. binary.LittleEndian.PutUint64(b, uint64(v))
  212. }
  213. if len(b) > 1 {
  214. b = append(prefix, b...)
  215. }
  216. return b
  217. }
  218. func (c *Conn) bitmaskToStruct(b []byte, s interface{}) interface{} {
  219. l := len(b)
  220. t := reflect.TypeOf(s)
  221. v := reflect.New(t.Elem()).Elem()
  222. for i := uint(0); i < uint(v.NumField()); i++ {
  223. f := v.Field(int(i))
  224. var v bool
  225. switch {
  226. case l > 4:
  227. x := binary.LittleEndian.Uint64(b)
  228. flag := uint64(1 << i)
  229. v = x&flag > 0
  230. case l > 2:
  231. x := binary.LittleEndian.Uint32(b)
  232. flag := uint32(1 << i)
  233. v = x&flag > 0
  234. case l > 1:
  235. x := binary.LittleEndian.Uint16(b)
  236. flag := uint16(1 << i)
  237. v = x&flag > 0
  238. default:
  239. x := uint(b[0])
  240. flag := uint(1 << i)
  241. v = x&flag > 0
  242. }
  243. f.SetBool(v)
  244. }
  245. return v.Interface()
  246. }
  247. func (c *Conn) structToBitmask(s interface{}) []byte {
  248. t := reflect.TypeOf(s).Elem()
  249. sV := reflect.ValueOf(s).Elem()
  250. fC := uint(t.NumField())
  251. m := uint64(0)
  252. for i := uint(0); i < fC; i++ {
  253. f := sV.Field(int(i))
  254. v := f.Bool()
  255. if v {
  256. m |= 1 << i
  257. }
  258. }
  259. l := uint64(math.Ceil(float64(fC) / 8.0))
  260. b := make([]byte, 8)
  261. binary.LittleEndian.PutUint64(b, m)
  262. switch {
  263. case l > 4: // 64 bits
  264. b = b[:8]
  265. case l > 2: // 32 bits
  266. b = b[:4]
  267. case l > 1: // 16 bits
  268. b = b[:2]
  269. default: // 8 bits
  270. b = b[:1]
  271. }
  272. return b
  273. }
  274. func (c *Conn) putString(t int, v string) uint64 {
  275. b := make([]byte, 0)
  276. switch t {
  277. case TypeFixedString:
  278. b = c.encFixedString(v)
  279. case TypeNullTerminatedString:
  280. b = c.encNullTerminatedString(v)
  281. case TypeRestOfPacketString:
  282. b = c.encRestOfPacketString(v)
  283. }
  284. l, err := c.writeBuf.Write(b)
  285. if err != nil {
  286. c.err = err
  287. }
  288. return uint64(l)
  289. }
  290. func (c *Conn) encNullTerminatedString(v string) []byte {
  291. return append([]byte(v), NullByte)
  292. }
  293. func (c *Conn) encFixedString(v string) []byte {
  294. return []byte(v)
  295. }
  296. func (c *Conn) encRestOfPacketString(v string) []byte {
  297. s := c.encFixedString(v)
  298. return s
  299. }
  300. func (c *Conn) putInt(t int, v uint64, l uint64) uint64 {
  301. c.setupWriteBuffer()
  302. b := make([]byte, 0)
  303. switch t {
  304. case TypeFixedInt:
  305. b = c.encFixedLenInt(v, l)
  306. case TypeLenEncInt:
  307. b = c.encLenEncInt(v)
  308. }
  309. n, err := c.writeBuf.Write(b)
  310. if err != nil {
  311. c.err = err
  312. }
  313. return uint64(n)
  314. }
  315. func (c *Conn) putNullBytes(n uint64) uint64 {
  316. c.setupWriteBuffer()
  317. b := make([]byte, n)
  318. l, err := c.writeBuf.Write(b)
  319. if err != nil {
  320. c.err = err
  321. }
  322. return uint64(l)
  323. }
  324. func (c *Conn) putBytes(v []byte) uint64 {
  325. c.setupWriteBuffer()
  326. l, err := c.writeBuf.Write(v)
  327. if err != nil {
  328. c.err = err
  329. }
  330. return uint64(l)
  331. }
  332. func (c *Conn) Flush() error {
  333. if c.err != nil {
  334. return c.err
  335. }
  336. c.writeBuf = c.addHeader()
  337. _, _ = c.buffer.Write(c.writeBuf.Bytes())
  338. if c.buffer.Flush() != nil {
  339. return c.buffer.Flush()
  340. }
  341. return nil
  342. }
  343. func (c *Conn) addHeader() *bytes.Buffer {
  344. pl := uint64(c.writeBuf.Len())
  345. sId := uint64(c.sequenceId)
  346. c.sequenceId++
  347. plB := c.encFixedLenInt(pl, 3)
  348. sIdB := c.encFixedLenInt(sId, 1)
  349. return bytes.NewBuffer(append(append(plB, sIdB...), c.writeBuf.Bytes()...))
  350. }
  351. func (c *Conn) setupWriteBuffer() {
  352. if c.writeBuf == nil {
  353. c.writeBuf = bytes.NewBuffer(nil)
  354. }
  355. }