MongoDB: Auf Änderungen in Collection reagieren mit ChangeStreams

Mit Version 3.6 bietet MongoDB die Möglichkeit, auf Änderungen auf Collections zu reagieren. Dabei erzeugt man eine Art Subscription (inkl. Filterung). Damit kann man über sog. ChangeStreams auf Änderungen in der MongoDB reagieren.

Voraussetzung

  • JDK 1.8
  • MongoDB 3.6

Mit dem folgenden Filter z.B. erzeugt man eine Pipeline, welche greift, wenn ein neues Dokument mit dem Attribut test=123 eingefügt wird.

List<Bson> pipeline = Collections.singletonList(Aggregates.match(Filters.and(
   Document.parse("{'fullDocument.test': '123'}"),
   Filters.in("operationType", asList("insert")))));

Für das folgende Beispiel würden mit dem Filter 2 Events erzeugt.

collection.insertOne(new Document("test", "abc"));
collection.insertOne(new Document("test", "123")); // watch
collection.insertOne(new Document("test", "456"));
collection.insertOne(new Document("test", "123")); // watch

Sofern man auf die Collection einen Watch erzeugt, kann man über die Änderungen iterieren.

MongoClient mongoClient = new MongoClient( new MongoClientURI("mongodb://localhost:27017"));
MongoDatabase db = mongoClient.getDatabase("test");
MongoCollection<Document> collection = db.getCollection("testcollection");

MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch(pipeline).iterator();

// access iterator to get change events