Skip to content

Commit

Permalink
wip: fixing filter
Browse files Browse the repository at this point in the history
  • Loading branch information
D4nte committed Jul 20, 2022
1 parent cf2fdd2 commit 9b071e6
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 25 deletions.
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
"debug": "^4.3.4",
"dns-query": "^0.11.1",
"hi-base32": "^0.5.1",
"it-all": "^1.0.6",
"it-length-prefixed": "^7.0.1",
"it-pipe": "^2.0.3",
"js-sha3": "^0.8.0",
Expand Down
19 changes: 12 additions & 7 deletions src/lib/waku_filter/index.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ describe("Waku Filter", () => {
beforeEach(async function () {
this.timeout(10000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ filter: true });
await nwaku.start({ filter: true, lightpush: true });
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Filter, Protocols.Relay]);
log("waitForRemotePeer");
await waku.waitForRemotePeer([Protocols.Filter, Protocols.LightPush]);
log("waitForRemotePeer done");
});

it("creates a subscription", async function () {
Expand All @@ -43,12 +45,15 @@ describe("Waku Filter", () => {
expect(msg.contentTopic).to.eq(TestContentTopic);
expect(msg.payloadAsUtf8).to.eq(messageText);
};
log("subscribing");
await waku.filter.subscribe(callback, [TestContentTopic]);
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic
);
await waku.relay.send(message);
log("Push message");
await waku.lightPush.push(message);
log("Wait for message");
while (messageCount === 0) {
await delay(250);
}
Expand All @@ -64,10 +69,10 @@ describe("Waku Filter", () => {
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await waku.filter.subscribe(callback, [TestContentTopic]);
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic)
);
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"Filtering still works!",
TestContentTopic
Expand All @@ -87,15 +92,15 @@ describe("Waku Filter", () => {
const unsubscribe = await waku.filter.subscribe(callback, [
TestContentTopic,
]);
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"This should be received",
TestContentTopic
)
);
await delay(100);
await unsubscribe();
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"This should not be received",
TestContentTopic
Expand Down
29 changes: 25 additions & 4 deletions src/lib/waku_filter/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Libp2p } from "libp2p";
Expand Down Expand Up @@ -49,7 +50,12 @@ export class WakuFilter {
constructor(public libp2p: Libp2p) {
this.subscriptions = new Map();
this.decryptionKeys = new Map();
this.libp2p.handle(FilterCodec, this.onRequest.bind(this));
this.libp2p.handle(FilterCodec, this.onRequest.bind(this)).then(
() => {
log("filter protocol registered");
},
(e) => log("Failed to register filter protocol", e)
);
}

/**
Expand Down Expand Up @@ -84,7 +90,15 @@ export class WakuFilter {
const stream = await this.newStream(peer);

try {
await pipe([request.encode()], lp.encode(), stream);
const res = await pipe(
[request.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => await all(source)
);

log("response", res);
} catch (e) {
log(
"Error subscribing to peer ",
Expand All @@ -109,14 +123,21 @@ export class WakuFilter {
private onRequest({ stream }: any): void {
log("Receiving message push");
try {
pipe(stream.source, lp.decode(), async (source) => {
pipe(stream, lp.decode(), async (source) => {
for await (const bytes of source) {
const res = FilterRPC.decode(bytes.slice());
if (res.requestId && res.push?.messages?.length) {
await this.pushMessages(res.requestId, res.push.messages);
}
}
});
}).then(
() => {
log("Receiving pipe closed.");
},
(e) => {
log("Error with receiving pipe", e);
}
);
} catch (e) {
log("Error decoding message", e);
}
Expand Down
10 changes: 3 additions & 7 deletions src/lib/waku_light_push/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Libp2p } from "libp2p";
Expand Down Expand Up @@ -71,17 +72,12 @@ export class WakuLightPush {
? opts.pubSubTopic
: this.pubSubTopic;
const query = PushRPC.createRequest(message, pubSubTopic);
const res: Uint8Array[] = [];
await pipe(
const res = await pipe(
[query.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => {
for await (const chunk of source) {
res.push(chunk.slice());
}
}
async (source) => await all(source)
);
try {
const bytes = concat(res);
Expand Down
10 changes: 3 additions & 7 deletions src/lib/waku_store/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer } from "@libp2p/interface-peer-store";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Libp2p } from "libp2p";
Expand Down Expand Up @@ -205,17 +206,12 @@ export class WakuStore {
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
dbg("Querying store peer", connections[0].remoteAddr.toString());

const res: Uint8Array[] = [];
await pipe(
const res = await pipe(
[historyRpcQuery.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => {
for await (const chunk of source) {
res.push(chunk.slice());
}
}
async (source) => await all(source)
);
const bytes = concat(res);
const reply = historyRpcQuery.decode(bytes);
Expand Down

0 comments on commit 9b071e6

Please sign in to comment.