下載吧 - 綠色安全的游戲和軟件下載中心

          軟件下載吧

          當前位置:軟件下載吧 > 數據庫 > DB2 > 詳解OpLog訂閱MongoDB的數據變更

          詳解OpLog訂閱MongoDB的數據變更

          時間:2024-02-04 13:45作者:下載吧人氣:29

          前言

          我們開源了一個訂閱分發mysql的binlog的項目,一直用的非常好,忽然有天開發說能不能支持MongoDB的數據訂閱呢,MongoDB的使用度也挺廣泛的。安排。經過簡單的了解后發現MongoDB也有類似binlog的機制,最終花了兩天時間把功能完成,并統一抽象集成到binlog開源項目中,使用和binlog同一套訂閱分發模型管理MongoDB數據源。整個過程非常順利,比整mysql的binlog要簡單的多了。

          oplog簡介

          先來聊聊MongoDB的主備機制,和mysql的binlog類似,在MongoDB中,有一個系統庫“”Local”,庫里有一個集合“oplog.rs”,這個集合類似于binlog文件,里面記錄了MongoDB的所有操作。從節點通過讀取oplog.rs里的數據做到數據同步。

          解析oplog

          和訂閱mysql的binlog一樣(模擬一個從節點mysql)。我們的訂閱服務要像從節點那樣讀取解析oplog.rs里的數據。解析前先看下oplog.rs的Document的數據結構

          詳解OpLog訂閱MongoDB的數據變更

          上圖是一個插入的數據的日志,可見oplog的doc中共有如下字段,含義分別如下:

          ts:操作的時間戳(非常重要)

          t:term最初在主數據庫上生成操作的。(含義不明)

          h:本次操作的唯一hashID

          v: 版本號

          op:操作類型,有六種類型,我們只需要關注其中的i(插入)、u(更新)、d(刪除)即可

          ns:庫名和集合名稱,中間使用“.”連接

          o:本次操作的document內容

          o2:只有op操作類型時u更新時,才會有這個字段,代表更新的條件語句

          $set:o2獲取后的文檔里的屬性,代表更新的字段

          如上字段,完成一次oplog的解析,只需要ts、op、ns、o、o2、$set即可,其中ts非常重要,可以類比為binlog中的Position。同步mysql的數據時,通過記錄消費binlog的位置,也就是Position,可以有效避免訂閱服務停機后,消費記錄丟失的問題。同步MongoDB時,通過記錄ts的值,來記錄消費的位置,可以到達和訂閱binlog一樣的效果。和mysql訂閱不同的是,MongoDB的同步需要同步服務自己查詢,而且oplog在MongoDB4.0之前的版本有大小限制,超過設置的容量后,老的數據就會被丟失,在4.0之后的版本已經解除了這個限制。

          代碼

          上面已經分析了oplog的結構以及訂閱步驟,下面我們直接構建查詢即可,需要注意,每次獲取到的ts值,需要存儲記錄下來,已便重新訂閱時,從上次斷開的記錄重新開始。下面直接看代碼,重點邏輯都以注釋詳盡

          private BsonTimestamp queryTs;
          @Test
          public void OpLogTest() {
          MongoClient mongoClient = new MongoClient(new MongoClientURI(“mongodb://admin:admin@127.0.0.1:3717”));
          MongoCollectioncollection = mongoClient.getDatabase(“local”)
          .getCollection(“oplog.rs”);

          //如果是首次訂閱,需要使用自然排序查詢,獲取第最后一次操作的操作時間戳。如果是續訂閱直接讀取記錄的值賦值給queryTs即可
          FindIterabletsCursor = collection.find().sort(new BasicDBObject(“$natural”, -1))
          .limit(1);
          Document tsDoc = tsCursor.first();
          queryTs = (BsonTimestamp) tsDoc.get(“ts”);
          while (true) try {
          //構建查詢語句,查詢大于當前查詢時間戳queryTs的記錄
          BasicDBObject query = new BasicDBObject(“ts”, new BasicDBObject(“$gt”, queryTs));
          MongoCursordocCursor = collection.find(query)
          .cursorType(CursorType.TailableAwait) //沒有數據時阻塞休眠
          .noCursorTimeout(true) //防止服務器在不活動時間(10分鐘)后使空閑的游標超時。
          .oplogReplay(true) //結合query條件,獲取增量數據,這個參數比較難懂,見:https://docs.mongodb.com/manual/reference/command/find/index.html
          .maxAwaitTime(1, TimeUnit.SECONDS) //設置此操作在服務器上的最大等待執行時間
          .iterator();
          while (docCursor.hasNext()) {
          Document document = docCursor.next();
          //更新查詢時間戳
          queryTs = (BsonTimestamp) document.get(“ts”);
          //TODO 在這里接收到數據后通過訂閱數據路由分發

          String op = document.getString(“op”);
          String database = document.getString(“ns”);
          Document context = (Document) document.get(“o”);
          Document where = null;
          if (op.equals(“u”)) {
          where = (Document) document.get(“o2”);
          if (context != null) {
          context = (Document) context.get(“$set”);
          }
          }
          System.err.println(“操作時間戳:” + queryTs.getTime());
          System.err.println(“操作類 型:” + op);
          System.err.println(“數據庫.集合:” + database);
          System.err.println(“更新條件:” + JSON.toJSONString(where));
          System.err.println(“文檔內容:” + JSON.toJSONString(context));
          }
          } catch (Exception e) { e.printStackTrace(); }
          }

          標簽MongoDB,技術文檔,數據庫,MongoDB

          相關下載

          查看所有評論+

          網友評論

          網友
          您的評論需要經過審核才能顯示

          熱門閱覽

          最新排行

          公眾號

          主站蜘蛛池模板: 国产精品香蕉一区二区三区| 亚洲一区二区三区高清在线观看| 国产一区二区三区乱码网站| 在线电影一区二区| 久久精品国产亚洲一区二区三区 | 中文国产成人精品久久一区| 亚洲高清毛片一区二区| 亚洲综合无码一区二区| 中文字幕精品一区| 亚洲国产一区二区视频网站| 亚洲Av无码国产一区二区| 日韩在线不卡免费视频一区| 亚洲无线码一区二区三区| 久久精品无码一区二区app | 国偷自产Av一区二区三区吞精| 性色av闺蜜一区二区三区| 亚洲午夜一区二区三区| 日韩一区二区在线播放| 麻豆亚洲av熟女国产一区二| 国产精品免费一区二区三区四区| 精品国产不卡一区二区三区| 国产三级一区二区三区| 精品国产一区二区三区av片| 成人毛片一区二区| 国产伦理一区二区三区| 精品女同一区二区| 一区精品麻豆入口| 一区二区三区无码视频免费福利| 国产一区二区三区在线视頻| 韩国福利一区二区三区高清视频| 精品久久久久中文字幕一区| 成人乱码一区二区三区av| 久久毛片一区二区| 国产成人精品一区在线 | 国模无码一区二区三区| eeuss鲁片一区二区三区| 日本韩国一区二区三区| 精品在线一区二区| 伊人久久大香线蕉av一区| 亚洲国产一区在线| 国产SUV精品一区二区88|