Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

feat: Unix domain sockets #208

Merged
merged 10 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// p2p multi-address code
export const CODE_P2P = 421
export const CODE_CIRCUIT = 290
export const CODE_UNIX = 400

// Time to wait for a connection to close gracefully before destroying it manually
export const CLOSE_TIMEOUT = 2000
Expand Down
15 changes: 10 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import { toMultiaddrConnection } from './socket-to-conn.js'
import { createListener } from './listener.js'
import { multiaddrToNetConfig } from './utils.js'
import { AbortError } from '@libp2p/interfaces/errors'
import { CODE_CIRCUIT, CODE_P2P } from './constants.js'
import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js'
import { CreateListenerOptions, DialOptions, symbol, Transport } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Socket } from 'net'
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Connection } from '@libp2p/interface-connection'

Expand Down Expand Up @@ -75,19 +75,20 @@ export class TCP implements Transport {

return await new Promise<Socket>((resolve, reject) => {
const start = Date.now()
const cOpts = multiaddrToNetConfig(ma)
const cOpts = multiaddrToNetConfig(ma) as (IpcSocketConnectOpts & TcpSocketConnectOpts)
const cOptsStr = cOpts.path ?? `${cOpts.host ?? ''}:${cOpts.port}`

log('dialing %j', cOpts)
const rawSocket = net.connect(cOpts)

const onError = (err: Error) => {
err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}`
err.message = `connection error ${cOptsStr}: ${err.message}`

done(err)
}

const onTimeout = () => {
log('connection timeout %s:%s', cOpts.host, cOpts.port)
log('connection timeout %s', cOptsStr)

const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
// Note: this will result in onError() being called
Expand Down Expand Up @@ -155,6 +156,10 @@ export class TCP implements Transport {
return false
}

if (ma.protoCodes().includes(CODE_UNIX)) {
return true
}

return mafmt.TCP.matches(ma.decapsulateCode(CODE_P2P))
})
}
Expand Down
24 changes: 12 additions & 12 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,19 @@ export function createListener (context: Context) {
}

if (typeof address === 'string') {
throw new Error('Incorrect server address type')
}

try {
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
addrs = [listeningAddr]
} else {
try {
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}
} catch (err) {
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
}
} catch (err) {
log.error('could not turn %s:%s into multiaddr', address.address, address.port, err)
}

return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)
Expand Down
22 changes: 12 additions & 10 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { logger } from '@libp2p/logger'
import toIterable from 'stream-to-it'
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js'
import { multiaddrToNetConfig } from './utils.js'
import errCode from 'err-code'
import type { Socket } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -52,13 +53,14 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
remoteAddr = toMultiaddr(socket.remoteAddress, socket.remotePort)
}

const { host, port } = remoteAddr.toOptions()
const lOpts = multiaddrToNetConfig(remoteAddr)
const lOptsStr = lOpts.path ?? `${lOpts.host ?? ''}:${lOpts.port ?? ''}`
const { sink, source } = toIterable.duplex(socket)

// by default there is no timeout
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s:%s socket read timeout', host, port)
log('%s socket read timeout', lOptsStr)

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
Expand All @@ -72,7 +74,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
})

socket.once('close', () => {
log('%s:%s socket closed', host, port)
log('%s socket read timeout', lOptsStr)

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
Expand Down Expand Up @@ -119,36 +121,36 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti

async close () {
if (socket.destroyed) {
log('%s:%s socket was already destroyed when trying to close', host, port)
log('%s socket was already destroyed when trying to close', lOptsStr)
return
}

log('%s:%s closing socket', host, port)
log('%s closing socket', lOptsStr)
await new Promise<void>((resolve, reject) => {
const start = Date.now()

// Attempt to end the socket. If it takes longer to close than the
// timeout, destroy it manually.
const timeout = setTimeout(() => {
if (socket.destroyed) {
log('%s:%s is already destroyed', host, port)
log('%s is already destroyed', lOptsStr)
resolve()
} else {
log('%s:%s socket close timeout after %dms, destroying it manually', host, port, Date.now() - start)
log('%s socket close timeout after %dms, destroying it manually', lOptsStr, Date.now() - start)

// will trigger 'error' and 'close' events that resolves promise
socket.destroy(errCode(new Error('Socket close timeout'), 'ERR_SOCKET_CLOSE_TIMEOUT'))
}
}, closeTimeout).unref()

socket.once('close', () => {
log('%s:%s socket closed', host, port)
log('%s socket closed', lOptsStr)
// socket completely closed
clearTimeout(timeout)
resolve()
})
socket.once('error', (err: Error) => {
log('%s:%s socket error', host, port, err)
log('%s socket error', lOptsStr, err)

// error closing socket
if (maConn.timeline.close == null) {
Expand All @@ -171,7 +173,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
if (socket.writableLength > 0) {
// there are outgoing bytes waiting to be sent
socket.once('drain', () => {
log('%s:%s socket drained', host, port)
log('%s socket drained', lOptsStr)

// all bytes have been sent we can destroy the socket (maybe) before the timeout
socket.destroy()
Expand Down
12 changes: 9 additions & 3 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import { Multiaddr } from '@multiformats/multiaddr'
import type { ListenOptions, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import os from 'os'
import path from 'path'

const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' }

export function multiaddrToNetConfig (addr: Multiaddr) {
export function multiaddrToNetConfig (addr: Multiaddr): ListenOptions | (IpcSocketConnectOpts & TcpSocketConnectOpts) {
const listenPath = addr.getPath()

// unix socket listening
if (listenPath != null) {
// TCP should not return unix socket else need to refactor listener which accepts connection options object
throw new Error('Unix Sockets are not supported by the TCP transport')
if (os.platform() === 'win32') {
// Use named pipes on Windows systems.
return { path: path.join('\\\\.\\pipe\\', listenPath) }
} else {
return { path: listenPath }
}
}

// tcp listening
Expand Down
12 changes: 5 additions & 7 deletions test/listen-dial.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ describe('listen', () => {
}
})

// TCP doesn't support unix paths
it.skip('listen on path', async () => {
const mh = new Multiaddr(`/unix${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)
it('listen on path', async () => {
const mh = new Multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)

listener = tcp.createListener({
upgrader
Expand Down Expand Up @@ -207,9 +206,8 @@ describe('dial', () => {
await listener.close()
})

// TCP doesn't support unix paths
it.skip('dial on path', async () => {
const ma = new Multiaddr(`/unix${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)
it('dial on path', async () => {
const ma = new Multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`)

const listener = tcp.createListener({
upgrader
Expand All @@ -226,7 +224,7 @@ describe('dial', () => {
async (source) => await all(source)
)

expect(values).to.deep.equal(['hey'])
expect(values[0].subarray()).to.equalBytes(uint8ArrayFromString('hey'))
await conn.close()
await listener.close()
})
Expand Down