From 258f64c7cf1794b445c42a229254a8b7a7a97efc Mon Sep 17 00:00:00 2001 From: Thorarinn Sigurdsson Date: Mon, 10 Feb 2020 15:12:15 +0100 Subject: [PATCH] feat(core): add event and log streaming * Added an event bus to Logger, which emits events when log entries are created or updated. * Added the `BufferedEventStream` class. This is used for batching and streaming events from the Logger and the active Garden instance to the platform when the user is logged in. Later, we can use this class for streaming to the dashboard as well. --- docs/reference/commands.md | 20 ++ garden-service/src/analytics/analytics.ts | 9 +- garden-service/src/cli/cli.ts | 8 + garden-service/src/db/base-entity.ts | 1 - garden-service/src/events.ts | 43 +++- garden-service/src/garden.ts | 3 +- garden-service/src/logger/log-entry.ts | 13 ++ garden-service/src/logger/logger.ts | 3 + .../logger/writers/json-terminal-writer.ts | 1 - .../src/platform/buffered-event-stream.ts | 192 ++++++++++++++++++ garden-service/src/process.ts | 1 + garden-service/test/helpers.ts | 6 +- garden-service/test/unit/src/events.ts | 3 +- garden-service/test/unit/src/platform/auth.ts | 2 +- .../src/platform/buffered-event-stream.ts | 75 +++++++ 15 files changed, 363 insertions(+), 17 deletions(-) create mode 100644 garden-service/src/platform/buffered-event-stream.ts create mode 100644 garden-service/test/unit/src/platform/buffered-event-stream.ts diff --git a/docs/reference/commands.md b/docs/reference/commands.md index b064fda2bc7..f6be426804a 100644 --- a/docs/reference/commands.md +++ b/docs/reference/commands.md @@ -942,3 +942,23 @@ Examples: | -------- | -------- | ----------- | | `enable` | No | Enable analytics. Defaults to "true" +### garden login + +Log in to Garden Cloud. + +Logs you in to Garden Cloud. Subsequent commands will have access to platform features. + +##### Usage + + garden login + +### garden logout + +Log out of Garden Cloud. + +Logs you out of Garden Cloud. + +##### Usage + + garden logout + diff --git a/garden-service/src/analytics/analytics.ts b/garden-service/src/analytics/analytics.ts index 7946cf72685..a992886c495 100644 --- a/garden-service/src/analytics/analytics.ts +++ b/garden-service/src/analytics/analytics.ts @@ -20,6 +20,7 @@ import { Events, EventName } from "../events" import { AnalyticsType } from "./analytics-types" import dedent from "dedent" import { getGitHubUrl } from "../docs/common" +import { InternalError } from "../exceptions" const API_KEY = process.env.ANALYTICS_DEV ? SEGMENT_DEV_API_KEY : SEGMENT_PROD_API_KEY @@ -59,7 +60,7 @@ export interface AnalyticsEventProperties { ciName: string | null system: SystemInfo isCI: boolean - sessionId: string + sessionId: string | null projectMetadata: ProjectMetadata } @@ -134,14 +135,18 @@ export class AnalyticsHandler { private ciName = ci.name private systemConfig: SystemInfo private isCI = ci.isCI - private sessionId = uuidv4() + private sessionId: string protected garden: Garden private projectMetadata: ProjectMetadata private constructor(garden: Garden, log: LogEntry) { + if (!garden.sessionId) { + throw new InternalError(`Garden instance with null sessionId passed to AnalyticsHandler constructor.`, {}) + } this.segment = new segmentClient(API_KEY, { flushAt: 20, flushInterval: 300 }) this.log = log this.garden = garden + this.sessionId = garden.sessionId this.globalConfigStore = new GlobalConfigStore() this.analyticsConfig = { userId: "", diff --git a/garden-service/src/cli/cli.ts b/garden-service/src/cli/cli.ts index f0a5cb8f972..e2dc68746fd 100644 --- a/garden-service/src/cli/cli.ts +++ b/garden-service/src/cli/cli.ts @@ -55,6 +55,7 @@ import { generateBasicDebugInfoReport } from "../commands/get/get-debug-info" import { AnalyticsHandler } from "../analytics/analytics" import { defaultDotIgnoreFiles } from "../util/fs" import { renderError } from "../logger/renderers" +import { BufferedEventStream } from "../platform/buffered-event-stream" const OUTPUT_RENDERERS = { json: (data: DeepPrimitiveMap) => { @@ -302,7 +303,9 @@ export class GardenCli { logger.info("") const footerLog = logger.placeholder() + // Init event & log streaming. const sessionId = uuidv4() + const bufferedEventStream = new BufferedEventStream(log, sessionId) const contextOpts: GardenOpts = { commandInfo: { @@ -335,6 +338,11 @@ export class GardenCli { } else { garden = await Garden.factory(root, contextOpts) } + + if (garden.clientAuthToken && garden.platformUrl) { + bufferedEventStream.connect(garden.events, garden.clientAuthToken, garden.platformUrl) + } + // Register log file writers. We need to do this after the Garden class is initialised because // the file writers depend on the project root. await this.initFileWriters(logger, garden.projectRoot, garden.gardenDirPath) diff --git a/garden-service/src/db/base-entity.ts b/garden-service/src/db/base-entity.ts index a7b691d315a..49342af03e4 100644 --- a/garden-service/src/db/base-entity.ts +++ b/garden-service/src/db/base-entity.ts @@ -45,7 +45,6 @@ export class GardenEntity extends BaseEntity { /** * Helper method to avoid circular import issues. */ - static getConnection() { return getConnection() } diff --git a/garden-service/src/events.ts b/garden-service/src/events.ts index c055ca3ac7a..8b638164c7f 100644 --- a/garden-service/src/events.ts +++ b/garden-service/src/events.ts @@ -7,9 +7,9 @@ */ import { EventEmitter2 } from "eventemitter2" -import { LogEntry } from "./logger/log-entry" import { ModuleVersion } from "./vcs/vcs" import { TaskResult } from "./task-graph" +import { LogEntryEvent } from "./platform/buffered-event-stream" /** * This simple class serves as the central event bus for a Garden instance. Its function @@ -18,7 +18,7 @@ import { TaskResult } from "./task-graph" * See below for the event interfaces. */ export class EventBus extends EventEmitter2 { - constructor(private log: LogEntry) { + constructor() { super({ wildcard: false, newListener: false, @@ -27,7 +27,6 @@ export class EventBus extends EventEmitter2 { } emit(name: T, payload: Events[T]) { - this.log.silly(`Emit event '${name}'`) return super.emit(name, payload) } @@ -47,9 +46,20 @@ export class EventBus extends EventEmitter2 { } /** - * The supported events and their interfaces. + * Supported logger events and their interfaces. */ -export type Events = { +export interface LoggerEvents { + _test: any + logEntryCreated: LogEntryEvent + logEntryUpdated: LogEntryEvent +} + +export type LoggerEventName = keyof LoggerEvents + +/** + * Supported Garden events and their interfaces. + */ +export interface Events extends LoggerEvents { // Internal test/control events _exit: {} _restart: {} @@ -108,8 +118,29 @@ export type Events = { taskGraphComplete: { completedAt: Date } - watchingForChanges: {} } export type EventName = keyof Events + +// Note: Does not include logger events. +export const eventNames: EventName[] = [ + "_exit", + "_restart", + "_test", + "configAdded", + "configRemoved", + "internalError", + "projectConfigChanged", + "moduleConfigChanged", + "moduleSourcesChanged", + "moduleRemoved", + "taskPending", + "taskProcessing", + "taskComplete", + "taskError", + "taskCancelled", + "taskGraphProcessing", + "taskGraphComplete", + "watchingForChanges", +] diff --git a/garden-service/src/garden.ts b/garden-service/src/garden.ts index d7637999791..e88dfa4ce0e 100644 --- a/garden-service/src/garden.ts +++ b/garden-service/src/garden.ts @@ -215,7 +215,7 @@ export class Garden { this.resolvedProviders = {} this.taskGraph = new TaskGraph(this, this.log) - this.events = new EventBus(this.log) + this.events = new EventBus() // Register plugins for (const plugin of [...builtinPlugins, ...params.plugins]) { @@ -329,6 +329,7 @@ export class Garden { * Clean up before shutting down. */ async close() { + this.events.removeAllListeners() this.watcher && (await this.watcher.stop()) } diff --git a/garden-service/src/logger/log-entry.ts b/garden-service/src/logger/log-entry.ts index de4093ba897..58bb54ce41a 100644 --- a/garden-service/src/logger/log-entry.ts +++ b/garden-service/src/logger/log-entry.ts @@ -14,6 +14,7 @@ import { Omit } from "../util/util" import { getChildEntries, findParentEntry } from "./util" import { GardenError } from "../exceptions" import { Logger } from "./logger" +import { formatForEventStream } from "../platform/buffered-event-stream" export type EmojiName = keyof typeof nodeEmoji.emoji export type LogSymbol = keyof typeof logSymbols | "empty" @@ -33,6 +34,8 @@ export interface TaskMetadata { durationMs?: number } +export const EVENT_LOG_LEVEL = LogLevel.debug + interface MessageBase { msg?: string emoji?: EmojiName @@ -90,6 +93,7 @@ export class LogEntry extends LogNode { public readonly childEntriesInheritLevel?: boolean public readonly id?: string public isPlaceholder?: boolean + public revision: number constructor(params: LogEntryConstructor) { super(params.level, params.parent, params.id) @@ -102,6 +106,7 @@ export class LogEntry extends LogNode { this.metadata = params.metadata this.id = params.id this.isPlaceholder = params.isPlaceholder + this.revision = 0 if (!params.isPlaceholder) { this.update({ @@ -114,6 +119,10 @@ export class LogEntry extends LogNode { dataFormat: params.dataFormat, maxSectionWidth: params.maxSectionWidth, }) + + if (this.level <= EVENT_LOG_LEVEL) { + this.root.events.emit("logEntryCreated", formatForEventStream(this)) + } } } @@ -124,6 +133,7 @@ export class LogEntry extends LogNode { * 3. next metadata is merged with the previous metadata */ protected update(updateParams: UpdateLogEntryParams): void { + this.revision = this.revision + 1 const messageState = this.getMessageState() // Explicitly set all the fields so the shape stays consistent @@ -192,6 +202,9 @@ export class LogEntry extends LogNode { protected onGraphChange(node: LogEntry) { this.root.onGraphChange(node) + if (node.level <= EVENT_LOG_LEVEL) { + this.root.events.emit("logEntryUpdated", formatForEventStream(node)) + } } getMetadata() { diff --git a/garden-service/src/logger/logger.ts b/garden-service/src/logger/logger.ts index d05bca3c6a8..7788ec30331 100644 --- a/garden-service/src/logger/logger.ts +++ b/garden-service/src/logger/logger.ts @@ -17,6 +17,7 @@ import { FancyTerminalWriter } from "./writers/fancy-terminal-writer" import { JsonTerminalWriter } from "./writers/json-terminal-writer" import { parseLogLevel } from "../cli/helpers" import { FullscreenTerminalWriter } from "./writers/fullscreen-terminal-writer" +import { EventBus } from "../events" export type LoggerType = "quiet" | "basic" | "fancy" | "fullscreen" | "json" export const LOGGER_TYPES = new Set(["quiet", "basic", "fancy", "fullscreen", "json"]) @@ -44,6 +45,7 @@ export interface LoggerConfig { export class Logger extends LogNode { public writers: Writer[] + public events: EventBus public useEmoji: boolean private static instance: Logger @@ -102,6 +104,7 @@ export class Logger extends LogNode { super(config.level) this.writers = config.writers || [] this.useEmoji = config.useEmoji === false ? false : true + this.events = new EventBus() } protected createNode(params: CreateNodeParams): LogEntry { diff --git a/garden-service/src/logger/writers/json-terminal-writer.ts b/garden-service/src/logger/writers/json-terminal-writer.ts index e2e3736e6ea..063149751aa 100644 --- a/garden-service/src/logger/writers/json-terminal-writer.ts +++ b/garden-service/src/logger/writers/json-terminal-writer.ts @@ -15,7 +15,6 @@ export interface JsonLogEntry { msg: string data?: any section?: string - durationMs?: number metadata?: LogEntryMetadata } diff --git a/garden-service/src/platform/buffered-event-stream.ts b/garden-service/src/platform/buffered-event-stream.ts new file mode 100644 index 00000000000..93bd2674d6e --- /dev/null +++ b/garden-service/src/platform/buffered-event-stream.ts @@ -0,0 +1,192 @@ +/* + * Copyright (C) 2018-2020 Garden Technologies, Inc. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +import { registerCleanupFunction } from "../util/util" +import { Events, EventName, EventBus, eventNames } from "../events" +import { LogEntryMetadata, LogEntry } from "../logger/log-entry" +import { chainMessages } from "../logger/renderers" +import { got } from "../util/http" + +export type StreamEvent = { + name: EventName + payload: Events[EventName] + timestamp: Date +} + +export interface LogEntryEvent { + key: string + parentKey: string | null + revision: number + msg: string | string[] + timestamp: Date + data?: any + section?: string + metadata?: LogEntryMetadata +} + +export function formatForEventStream(entry: LogEntry): LogEntryEvent { + const { section, data } = entry.getMessageState() + const { key, revision } = entry + const parentKey = entry.parent ? entry.parent.key : null + const metadata = entry.getMetadata() + const msg = chainMessages(entry.getMessageStates() || []) + const timestamp = new Date() + return { key, parentKey, revision, msg, data, metadata, section, timestamp } +} + +export const FLUSH_INTERVAL_MSEC = 1000 +export const MAX_BATCH_SIZE = 100 + +/** + * Buffers events and log entries and periodically POSTs them to the platform. + * + * Subscribes to logger events once, in the constructor. + * + * Subscribes to Garden events via the connect method, since we need to subscribe to the event bus of + * new Garden instances (and unsubscribe from events of the previously connected Garden instance, if + * any) e.g. when config changes during a watch-mode command. + */ +export class BufferedEventStream { + private log: LogEntry + private eventBus: EventBus + public sessionId: string + private platformUrl: string + private clientAuthToken: string + + /** + * We maintain this map to facilitate unsubscribing from a previously connected event bus + * when a new event bus is connected. + */ + private gardenEventListeners: { [eventName: string]: (payload: any) => void } + + private intervalId: NodeJS.Timer | null + private bufferedEvents: StreamEvent[] + private bufferedLogEntries: LogEntryEvent[] + + // For testing. + public flushEventsTestHandler: null | ((events: StreamEvent[]) => void) + public flushLogEntriesTestHandler: null | ((logEntries: LogEntryEvent[]) => void) + + constructor(log: LogEntry, sessionId: string) { + this.sessionId = sessionId + this.log = log + this.log.root.events.onAny((_name: string, payload: LogEntryEvent) => { + this.streamLogEntry(payload) + }) + this.flushEventsTestHandler = null + this.flushLogEntriesTestHandler = null + this.bufferedEvents = [] + this.bufferedLogEntries = [] + } + + connect(eventBus: EventBus, clientAuthToken: string, platformUrl: string) { + this.clientAuthToken = clientAuthToken + this.platformUrl = platformUrl + + if (!this.intervalId) { + this.startInterval() + } + + if (this.eventBus) { + // We unsubscribe from the old event bus' events. + this.unsubscribeFromGardenEvents(this.eventBus) + } + + this.eventBus = eventBus + this.subscribeToGardenEvents(this.eventBus) + } + + subscribeToGardenEvents(eventBus: EventBus) { + // We maintain this map to facilitate unsubscribing from events when the Garden instance is closed. + const gardenEventListeners = {} + for (const gardenEventName of eventNames) { + const listener = (payload: LogEntryEvent) => this.streamEvent(gardenEventName, payload) + gardenEventListeners[gardenEventName] = listener + eventBus.on(gardenEventName, listener) + } + this.gardenEventListeners = gardenEventListeners + } + + unsubscribeFromGardenEvents(eventBus: EventBus) { + for (const [gardenEventName, listener] of Object.entries(this.gardenEventListeners)) { + eventBus.removeListener(gardenEventName, listener) + } + } + + startInterval() { + this.intervalId = setInterval(() => { + this.flushBuffered({ flushAll: false }) + }, FLUSH_INTERVAL_MSEC) + + registerCleanupFunction("flushAllBufferedEventsAndLogEntries", () => { + this.close() + }) + } + + close() { + if (this.intervalId) { + clearInterval(this.intervalId) + this.intervalId = null + } + this.flushBuffered({ flushAll: true }) + } + + streamEvent(name: T, payload: Events[T]) { + this.bufferedEvents.push({ + name, + payload, + timestamp: new Date(), + }) + } + + streamLogEntry(logEntry: LogEntryEvent) { + this.bufferedLogEntries.push(logEntry) + } + + flushEvents(events: StreamEvent[]) { + const data = { + events, + clientAuthToken: this.clientAuthToken, + sessionId: this.sessionId, + } + got.post(`${this.platformUrl}/events`, { json: data }).catch((err) => { + this.log.error(err) + }) + } + + flushLogEntries(logEntries: LogEntryEvent[]) { + const data = { + logEntries, + clientAuthToken: this.clientAuthToken, + sessionId: this.sessionId, + } + got.post(`${this.platformUrl}/log-entries`, { json: data }).catch((err) => { + this.log.error(err) + }) + } + + flushBuffered({ flushAll = false }) { + if (!this.clientAuthToken || !this.platformUrl) { + return + } + const eventsToFlush = this.bufferedEvents.splice(0, flushAll ? this.bufferedEvents.length : MAX_BATCH_SIZE) + + if (eventsToFlush.length > 0) { + this.flushEventsTestHandler ? this.flushEventsTestHandler(eventsToFlush) : this.flushEvents(eventsToFlush) + } + + const logEntryFlushCount = flushAll ? this.bufferedLogEntries.length : MAX_BATCH_SIZE - eventsToFlush.length + const logEntriesToFlush = this.bufferedLogEntries.splice(0, logEntryFlushCount) + + if (logEntriesToFlush.length > 0) { + this.flushLogEntriesTestHandler + ? this.flushLogEntriesTestHandler(logEntriesToFlush) + : this.flushLogEntries(logEntriesToFlush) + } + } +} diff --git a/garden-service/src/process.ts b/garden-service/src/process.ts index ae5e8267492..a8920ced33a 100644 --- a/garden-service/src/process.ts +++ b/garden-service/src/process.ts @@ -211,6 +211,7 @@ async function validateConfigChange( try { const nextGarden = await Garden.factory(garden.projectRoot, garden.opts) await nextGarden.getConfigGraph(log) + await nextGarden.close() } catch (error) { if (error instanceof ConfigurationError) { const msg = dedent` diff --git a/garden-service/test/helpers.ts b/garden-service/test/helpers.ts index a93bc8b37e1..8f01d659fd6 100644 --- a/garden-service/test/helpers.ts +++ b/garden-service/test/helpers.ts @@ -313,8 +313,8 @@ interface EventLogEntry { class TestEventBus extends EventBus { public eventLog: EventLogEntry[] - constructor(log: LogEntry) { - super(log) + constructor() { + super() this.eventLog = [] } @@ -335,7 +335,7 @@ export class TestGarden extends Garden { constructor(params: GardenParams) { super(params) - this.events = new TestEventBus(this.log) + this.events = new TestEventBus() } setModuleConfigs(moduleConfigs: ModuleConfig[]) { diff --git a/garden-service/test/unit/src/events.ts b/garden-service/test/unit/src/events.ts index 9dcce566452..44c631fccad 100644 --- a/garden-service/test/unit/src/events.ts +++ b/garden-service/test/unit/src/events.ts @@ -8,13 +8,12 @@ import { EventBus } from "../../../src/events" import { expect } from "chai" -import { getLogger } from "../../../src/logger/logger" describe("EventBus", () => { let events: EventBus beforeEach(() => { - events = new EventBus(getLogger().placeholder()) + events = new EventBus() }) it("should send+receive events", (done) => { diff --git a/garden-service/test/unit/src/platform/auth.ts b/garden-service/test/unit/src/platform/auth.ts index 7d7ebad3384..e4038f9e079 100644 --- a/garden-service/test/unit/src/platform/auth.ts +++ b/garden-service/test/unit/src/platform/auth.ts @@ -12,7 +12,7 @@ import { ClientAuthToken } from "../../../../src/db/entities/client-auth-token" import { makeTestGardenA } from "../../../helpers" import { saveAuthToken, readAuthToken, clearAuthToken } from "../../../../src/platform/auth" -async function cleanupAuthTokens() { +export async function cleanupAuthTokens() { await ClientAuthToken.createQueryBuilder() .delete() .execute() diff --git a/garden-service/test/unit/src/platform/buffered-event-stream.ts b/garden-service/test/unit/src/platform/buffered-event-stream.ts new file mode 100644 index 00000000000..fbc8b525039 --- /dev/null +++ b/garden-service/test/unit/src/platform/buffered-event-stream.ts @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2018-2020 Garden Technologies, Inc. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +import { expect } from "chai" +import { StreamEvent, LogEntryEvent, BufferedEventStream } from "../../../../src/platform/buffered-event-stream" +import { getLogger } from "../../../../src/logger/logger" +import { EventBus } from "../../../../src/events" + +describe("BufferedEventStream", () => { + it("should flush events and log entries emitted by a connected event emitter", async () => { + const flushedEvents: StreamEvent[] = [] + const flushedLogEntries: LogEntryEvent[] = [] + + const log = getLogger().placeholder() + + const bufferedEventStream = new BufferedEventStream(log, "dummy-session-id") + + bufferedEventStream.flushEventsTestHandler = (events: StreamEvent[]) => { + flushedEvents.push(...events) + } + bufferedEventStream.flushLogEntriesTestHandler = (logEntries: LogEntryEvent[]) => { + flushedLogEntries.push(...logEntries) + } + + const eventBus = new EventBus() + bufferedEventStream.connect(eventBus, "dummy-client-token", "dummy-platform_url") + + eventBus.emit("_test", {}) + log.root.events.emit("_test", {}) + + bufferedEventStream.flushBuffered({ flushAll: true }) + + expect(flushedEvents.length).to.eql(1) + expect(flushedLogEntries.length).to.eql(1) + }) + + it("should only flush events or log entries emitted by the last connected event emitter", async () => { + const flushedEvents: StreamEvent[] = [] + const flushedLogEntries: LogEntryEvent[] = [] + + const log = getLogger().placeholder() + + const bufferedEventStream = new BufferedEventStream(log, "dummy-session-id") + + bufferedEventStream.flushEventsTestHandler = (events: StreamEvent[]) => { + flushedEvents.push(...events) + } + bufferedEventStream.flushLogEntriesTestHandler = (logEntries: LogEntryEvent[]) => { + flushedLogEntries.push(...logEntries) + } + + const oldEventBus = new EventBus() + bufferedEventStream.connect(oldEventBus, "dummy-client-token", "dummy-platform_url") + const newEventBus = new EventBus() + bufferedEventStream.connect(newEventBus, "dummy-client-token", "dummy-platform_url") + + log.root.events.emit("_test", {}) + oldEventBus.emit("_test", {}) + + bufferedEventStream.flushBuffered({ flushAll: true }) + + expect(flushedEvents.length).to.eql(0) + expect(flushedLogEntries.length).to.eql(1) + + newEventBus.emit("_test", {}) + bufferedEventStream.flushBuffered({ flushAll: true }) + + expect(flushedEvents.length).to.eql(1) + }) +})