Canal的使用

  1. 进入mysql,输入show variables like 'bin_log'查看是否开启binlog

  2. 如果没有,则在/etc/my.cnf文件中添加如下内容

     [mysqld]
     server-id= 1
     log-bin= mysql-bin
     binlog_format= row
  3. 重启mysql,sudo service mysql restart

  4. 输入show variables like 'bin_log'查看是否开启binlog

  5. 创建一个文件夹存放canal,mkdir /opt/module/canal

  6. 创建一个Maven项目,并且添加Scala依赖,pom文件如下

      <dependencies>
          <!--canal 客户端, 从 canal 服务器读取数据-->
          <dependency>
              <groupId>com.alibaba.otter</groupId>
              <artifactId>canal.client</artifactId>
              <version>1.1.2</version>
          </dependency>
          <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
          <!-- kafka 客户端 -->
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
              <version>0.11.0.0</version>
          </dependency>
      </dependencies>
  7. 创建两个单例类CanalHandlerCanalClient,代码如下

      import java.util
      import com.alibaba.otter.canal.protocol.CanalEntry
      import com.alibaba.otter.canal.protocol.CanalEntry.{EventType, RowData}
    
      object CanalHandler {
        /**
         * 处理从 canal 取来的数据
         *
         * @param tableName   表名
         * @param eventType   事件类型
         * @param rowDataList 数据类别
         */
        def handle(tableName: String, eventType: EventType, rowDataList: util.List[RowData]) = {
          import scala.collection.JavaConversions._
          if ("order_info" == tableName && eventType == EventType.INSERT && rowDataList.size() > 0) {
            // 1. rowData 表示一行数据, 通过他得到每一列. 首先遍历每一行数据
            for (rowData <- rowDataList) {
              // 2. 得到每行中, 所有列组成的列表
              val columnList: util.List[CanalEntry.Column] = rowData.getAfterColumnsList
              for (column <- columnList) {
                // 3. 得到列名和列值
                println(column.getName + ":" + column.getValue)
              }
            }
          }
        }
      }
      import java.net.InetSocketAddress
      import com.alibaba.otter.canal.client.{CanalConnector, CanalConnectors}
      import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, RowChange}
      import com.alibaba.otter.canal.protocol.{CanalEntry, Message}
      import com.google.protobuf.ByteString
    
      object CanalClient {
        def main(args: Array[String]): Unit = {
          // 1. 创建能连接到 Canal 的连接器对象
          val connector: CanalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop101", 11111), "example", "", "")
          // 2. 连接到 Canal
          connector.connect()
          // 3. 监控指定的表的数据的变化
          connector.subscribe("gmall.order_info")
          while (true) {
            // 4. 获取消息  (一个消息对应 多条sql 语句的执行)
            val msg: Message = connector.get(100) // 一次最多获取 100 条 sql
            // 5. 个消息对应多行数据发生了变化, 一个 entry 表示一条 sql 语句的执行
            val entries: java.util.List[CanalEntry.Entry] = msg.getEntries
            import scala.collection.JavaConversions._
            if (entries.size() > 0) {
              // 6. 遍历每行数据
              for (entry <- entries) {
                // 7. EntryType.ROWDATA 只对这样的 EntryType 做处理
                if (entry.getEntryType == EntryType.ROWDATA) {
                  // 8. 获取到这行数据, 但是这种数据不是字符串, 所以要解析
                  val value: ByteString = entry.getStoreValue
                  val rowChange: RowChange = RowChange.parseFrom(value)
                  // 9.定义专门处理的工具类: 参数 1 表名, 参数 2 事件类型(插入, 删除等), 参数 3: 具体的数据
                  CanalHandler.handle(entry.getHeader.getTableName, rowChange.getEventType, rowChange.getRowDatasList)
                }
              }
            } else {
              println("没有抓取到数据...., 2s 之后重新抓取")
              Thread.sleep(2000)
            }
          }
        }
      }
  8. gmall.order_info中插入数据,可以看到控制台打印出日志

      id:1
      consignee:UHBKVw
      consignee_tel:13492323624
      total_amount:530.0
      order_status:1
      user_id:26
      payment_way:1
      delivery_address:jEsdlXeEICMeiFCVWXUI
      order_comment:JhSSLZQfcVvquYLHeyff
      out_trade_no:2866414735
      trade_body:
      create_time:2021-05-09 19:30:09
      operate_time:
      expire_time:
      tracking_no:
      parent_order_id:
      img_url:
      id:2
      consignee:QJWJqK
      consignee_tel:13585089108
      total_amount:249.0
      order_status:2
      user_id:73
      payment_way:1
      delivery_address:QBkurDQvEIFZgKgyaIov
      order_comment:nanqMPOyaJsMFQpdtJor
      out_trade_no:8132263655
      trade_body:
      create_time:2021-05-09 05:47:49
      operate_time:2021-05-09 06:16:19
      expire_time:
      tracking_no:
      parent_order_id:
      img_url:
      没有抓取到数据...., 2s 之后重新抓取
      没有抓取到数据...., 2s 之后重新抓取
      没有抓取到数据...., 2s 之后重新抓取
  9. 配置好Kafka之后即可将数据生产到Kafka中

全部评论

相关推荐

评论
1
收藏
分享
牛客网
牛客企业服务