diff --git a/garden-service/src/analytics/analytics.ts b/garden-service/src/analytics/analytics.ts index 7946cf72685..a992886c495 100644 --- a/garden-service/src/analytics/analytics.ts +++ b/garden-service/src/analytics/analytics.ts @@ -20,6 +20,7 @@ import { Events, EventName } from "../events" import { AnalyticsType } from "./analytics-types" import dedent from "dedent" import { getGitHubUrl } from "../docs/common" +import { InternalError } from "../exceptions" const API_KEY = process.env.ANALYTICS_DEV ? SEGMENT_DEV_API_KEY : SEGMENT_PROD_API_KEY @@ -59,7 +60,7 @@ export interface AnalyticsEventProperties { ciName: string | null system: SystemInfo isCI: boolean - sessionId: string + sessionId: string | null projectMetadata: ProjectMetadata } @@ -134,14 +135,18 @@ export class AnalyticsHandler { private ciName = ci.name private systemConfig: SystemInfo private isCI = ci.isCI - private sessionId = uuidv4() + private sessionId: string protected garden: Garden private projectMetadata: ProjectMetadata private constructor(garden: Garden, log: LogEntry) { + if (!garden.sessionId) { + throw new InternalError(`Garden instance with null sessionId passed to AnalyticsHandler constructor.`, {}) + } this.segment = new segmentClient(API_KEY, { flushAt: 20, flushInterval: 300 }) this.log = log this.garden = garden + this.sessionId = garden.sessionId this.globalConfigStore = new GlobalConfigStore() this.analyticsConfig = { userId: "", diff --git a/garden-service/src/cli/cli.ts b/garden-service/src/cli/cli.ts index f0a5cb8f972..e2dc68746fd 100644 --- a/garden-service/src/cli/cli.ts +++ b/garden-service/src/cli/cli.ts @@ -55,6 +55,7 @@ import { generateBasicDebugInfoReport } from "../commands/get/get-debug-info" import { AnalyticsHandler } from "../analytics/analytics" import { defaultDotIgnoreFiles } from "../util/fs" import { renderError } from "../logger/renderers" +import { BufferedEventStream } from "../platform/buffered-event-stream" const OUTPUT_RENDERERS = { json: (data: DeepPrimitiveMap) => { @@ -302,7 +303,9 @@ export class GardenCli { logger.info("") const footerLog = logger.placeholder() + // Init event & log streaming. const sessionId = uuidv4() + const bufferedEventStream = new BufferedEventStream(log, sessionId) const contextOpts: GardenOpts = { commandInfo: { @@ -335,6 +338,11 @@ export class GardenCli { } else { garden = await Garden.factory(root, contextOpts) } + + if (garden.clientAuthToken && garden.platformUrl) { + bufferedEventStream.connect(garden.events, garden.clientAuthToken, garden.platformUrl) + } + // Register log file writers. We need to do this after the Garden class is initialised because // the file writers depend on the project root. await this.initFileWriters(logger, garden.projectRoot, garden.gardenDirPath) diff --git a/garden-service/src/db/base-entity.ts b/garden-service/src/db/base-entity.ts index a7b691d315a..49342af03e4 100644 --- a/garden-service/src/db/base-entity.ts +++ b/garden-service/src/db/base-entity.ts @@ -45,7 +45,6 @@ export class GardenEntity extends BaseEntity { /** * Helper method to avoid circular import issues. */ - static getConnection() { return getConnection() } diff --git a/garden-service/src/events.ts b/garden-service/src/events.ts index c055ca3ac7a..8b638164c7f 100644 --- a/garden-service/src/events.ts +++ b/garden-service/src/events.ts @@ -7,9 +7,9 @@ */ import { EventEmitter2 } from "eventemitter2" -import { LogEntry } from "./logger/log-entry" import { ModuleVersion } from "./vcs/vcs" import { TaskResult } from "./task-graph" +import { LogEntryEvent } from "./platform/buffered-event-stream" /** * This simple class serves as the central event bus for a Garden instance. Its function @@ -18,7 +18,7 @@ import { TaskResult } from "./task-graph" * See below for the event interfaces. */ export class EventBus extends EventEmitter2 { - constructor(private log: LogEntry) { + constructor() { super({ wildcard: false, newListener: false, @@ -27,7 +27,6 @@ export class EventBus extends EventEmitter2 { } emit(name: T, payload: Events[T]) { - this.log.silly(`Emit event '${name}'`) return super.emit(name, payload) } @@ -47,9 +46,20 @@ export class EventBus extends EventEmitter2 { } /** - * The supported events and their interfaces. + * Supported logger events and their interfaces. */ -export type Events = { +export interface LoggerEvents { + _test: any + logEntryCreated: LogEntryEvent + logEntryUpdated: LogEntryEvent +} + +export type LoggerEventName = keyof LoggerEvents + +/** + * Supported Garden events and their interfaces. + */ +export interface Events extends LoggerEvents { // Internal test/control events _exit: {} _restart: {} @@ -108,8 +118,29 @@ export type Events = { taskGraphComplete: { completedAt: Date } - watchingForChanges: {} } export type EventName = keyof Events + +// Note: Does not include logger events. +export const eventNames: EventName[] = [ + "_exit", + "_restart", + "_test", + "configAdded", + "configRemoved", + "internalError", + "projectConfigChanged", + "moduleConfigChanged", + "moduleSourcesChanged", + "moduleRemoved", + "taskPending", + "taskProcessing", + "taskComplete", + "taskError", + "taskCancelled", + "taskGraphProcessing", + "taskGraphComplete", + "watchingForChanges", +] diff --git a/garden-service/src/garden.ts b/garden-service/src/garden.ts index d7637999791..e88dfa4ce0e 100644 --- a/garden-service/src/garden.ts +++ b/garden-service/src/garden.ts @@ -215,7 +215,7 @@ export class Garden { this.resolvedProviders = {} this.taskGraph = new TaskGraph(this, this.log) - this.events = new EventBus(this.log) + this.events = new EventBus() // Register plugins for (const plugin of [...builtinPlugins, ...params.plugins]) { @@ -329,6 +329,7 @@ export class Garden { * Clean up before shutting down. */ async close() { + this.events.removeAllListeners() this.watcher && (await this.watcher.stop()) } diff --git a/garden-service/src/logger/log-entry.ts b/garden-service/src/logger/log-entry.ts index de4093ba897..58bb54ce41a 100644 --- a/garden-service/src/logger/log-entry.ts +++ b/garden-service/src/logger/log-entry.ts @@ -14,6 +14,7 @@ import { Omit } from "../util/util" import { getChildEntries, findParentEntry } from "./util" import { GardenError } from "../exceptions" import { Logger } from "./logger" +import { formatForEventStream } from "../platform/buffered-event-stream" export type EmojiName = keyof typeof nodeEmoji.emoji export type LogSymbol = keyof typeof logSymbols | "empty" @@ -33,6 +34,8 @@ export interface TaskMetadata { durationMs?: number } +export const EVENT_LOG_LEVEL = LogLevel.debug + interface MessageBase { msg?: string emoji?: EmojiName @@ -90,6 +93,7 @@ export class LogEntry extends LogNode { public readonly childEntriesInheritLevel?: boolean public readonly id?: string public isPlaceholder?: boolean + public revision: number constructor(params: LogEntryConstructor) { super(params.level, params.parent, params.id) @@ -102,6 +106,7 @@ export class LogEntry extends LogNode { this.metadata = params.metadata this.id = params.id this.isPlaceholder = params.isPlaceholder + this.revision = 0 if (!params.isPlaceholder) { this.update({ @@ -114,6 +119,10 @@ export class LogEntry extends LogNode { dataFormat: params.dataFormat, maxSectionWidth: params.maxSectionWidth, }) + + if (this.level <= EVENT_LOG_LEVEL) { + this.root.events.emit("logEntryCreated", formatForEventStream(this)) + } } } @@ -124,6 +133,7 @@ export class LogEntry extends LogNode { * 3. next metadata is merged with the previous metadata */ protected update(updateParams: UpdateLogEntryParams): void { + this.revision = this.revision + 1 const messageState = this.getMessageState() // Explicitly set all the fields so the shape stays consistent @@ -192,6 +202,9 @@ export class LogEntry extends LogNode { protected onGraphChange(node: LogEntry) { this.root.onGraphChange(node) + if (node.level <= EVENT_LOG_LEVEL) { + this.root.events.emit("logEntryUpdated", formatForEventStream(node)) + } } getMetadata() { diff --git a/garden-service/src/logger/logger.ts b/garden-service/src/logger/logger.ts index d05bca3c6a8..7788ec30331 100644 --- a/garden-service/src/logger/logger.ts +++ b/garden-service/src/logger/logger.ts @@ -17,6 +17,7 @@ import { FancyTerminalWriter } from "./writers/fancy-terminal-writer" import { JsonTerminalWriter } from "./writers/json-terminal-writer" import { parseLogLevel } from "../cli/helpers" import { FullscreenTerminalWriter } from "./writers/fullscreen-terminal-writer" +import { EventBus } from "../events" export type LoggerType = "quiet" | "basic" | "fancy" | "fullscreen" | "json" export const LOGGER_TYPES = new Set(["quiet", "basic", "fancy", "fullscreen", "json"]) @@ -44,6 +45,7 @@ export interface LoggerConfig { export class Logger extends LogNode { public writers: Writer[] + public events: EventBus public useEmoji: boolean private static instance: Logger @@ -102,6 +104,7 @@ export class Logger extends LogNode { super(config.level) this.writers = config.writers || [] this.useEmoji = config.useEmoji === false ? false : true + this.events = new EventBus() } protected createNode(params: CreateNodeParams): LogEntry { diff --git a/garden-service/src/logger/writers/json-terminal-writer.ts b/garden-service/src/logger/writers/json-terminal-writer.ts index e2e3736e6ea..063149751aa 100644 --- a/garden-service/src/logger/writers/json-terminal-writer.ts +++ b/garden-service/src/logger/writers/json-terminal-writer.ts @@ -15,7 +15,6 @@ export interface JsonLogEntry { msg: string data?: any section?: string - durationMs?: number metadata?: LogEntryMetadata } diff --git a/garden-service/src/platform/buffered-event-stream.ts b/garden-service/src/platform/buffered-event-stream.ts new file mode 100644 index 00000000000..93bd2674d6e --- /dev/null +++ b/garden-service/src/platform/buffered-event-stream.ts @@ -0,0 +1,192 @@ +/* + * Copyright (C) 2018-2020 Garden Technologies, Inc. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +import { registerCleanupFunction } from "../util/util" +import { Events, EventName, EventBus, eventNames } from "../events" +import { LogEntryMetadata, LogEntry } from "../logger/log-entry" +import { chainMessages } from "../logger/renderers" +import { got } from "../util/http" + +export type StreamEvent = { + name: EventName + payload: Events[EventName] + timestamp: Date +} + +export interface LogEntryEvent { + key: string + parentKey: string | null + revision: number + msg: string | string[] + timestamp: Date + data?: any + section?: string + metadata?: LogEntryMetadata +} + +export function formatForEventStream(entry: LogEntry): LogEntryEvent { + const { section, data } = entry.getMessageState() + const { key, revision } = entry + const parentKey = entry.parent ? entry.parent.key : null + const metadata = entry.getMetadata() + const msg = chainMessages(entry.getMessageStates() || []) + const timestamp = new Date() + return { key, parentKey, revision, msg, data, metadata, section, timestamp } +} + +export const FLUSH_INTERVAL_MSEC = 1000 +export const MAX_BATCH_SIZE = 100 + +/** + * Buffers events and log entries and periodically POSTs them to the platform. + * + * Subscribes to logger events once, in the constructor. + * + * Subscribes to Garden events via the connect method, since we need to subscribe to the event bus of + * new Garden instances (and unsubscribe from events of the previously connected Garden instance, if + * any) e.g. when config changes during a watch-mode command. + */ +export class BufferedEventStream { + private log: LogEntry + private eventBus: EventBus + public sessionId: string + private platformUrl: string + private clientAuthToken: string + + /** + * We maintain this map to facilitate unsubscribing from a previously connected event bus + * when a new event bus is connected. + */ + private gardenEventListeners: { [eventName: string]: (payload: any) => void } + + private intervalId: NodeJS.Timer | null + private bufferedEvents: StreamEvent[] + private bufferedLogEntries: LogEntryEvent[] + + // For testing. + public flushEventsTestHandler: null | ((events: StreamEvent[]) => void) + public flushLogEntriesTestHandler: null | ((logEntries: LogEntryEvent[]) => void) + + constructor(log: LogEntry, sessionId: string) { + this.sessionId = sessionId + this.log = log + this.log.root.events.onAny((_name: string, payload: LogEntryEvent) => { + this.streamLogEntry(payload) + }) + this.flushEventsTestHandler = null + this.flushLogEntriesTestHandler = null + this.bufferedEvents = [] + this.bufferedLogEntries = [] + } + + connect(eventBus: EventBus, clientAuthToken: string, platformUrl: string) { + this.clientAuthToken = clientAuthToken + this.platformUrl = platformUrl + + if (!this.intervalId) { + this.startInterval() + } + + if (this.eventBus) { + // We unsubscribe from the old event bus' events. + this.unsubscribeFromGardenEvents(this.eventBus) + } + + this.eventBus = eventBus + this.subscribeToGardenEvents(this.eventBus) + } + + subscribeToGardenEvents(eventBus: EventBus) { + // We maintain this map to facilitate unsubscribing from events when the Garden instance is closed. + const gardenEventListeners = {} + for (const gardenEventName of eventNames) { + const listener = (payload: LogEntryEvent) => this.streamEvent(gardenEventName, payload) + gardenEventListeners[gardenEventName] = listener + eventBus.on(gardenEventName, listener) + } + this.gardenEventListeners = gardenEventListeners + } + + unsubscribeFromGardenEvents(eventBus: EventBus) { + for (const [gardenEventName, listener] of Object.entries(this.gardenEventListeners)) { + eventBus.removeListener(gardenEventName, listener) + } + } + + startInterval() { + this.intervalId = setInterval(() => { + this.flushBuffered({ flushAll: false }) + }, FLUSH_INTERVAL_MSEC) + + registerCleanupFunction("flushAllBufferedEventsAndLogEntries", () => { + this.close() + }) + } + + close() { + if (this.intervalId) { + clearInterval(this.intervalId) + this.intervalId = null + } + this.flushBuffered({ flushAll: true }) + } + + streamEvent(name: T, payload: Events[T]) { + this.bufferedEvents.push({ + name, + payload, + timestamp: new Date(), + }) + } + + streamLogEntry(logEntry: LogEntryEvent) { + this.bufferedLogEntries.push(logEntry) + } + + flushEvents(events: StreamEvent[]) { + const data = { + events, + clientAuthToken: this.clientAuthToken, + sessionId: this.sessionId, + } + got.post(`${this.platformUrl}/events`, { json: data }).catch((err) => { + this.log.error(err) + }) + } + + flushLogEntries(logEntries: LogEntryEvent[]) { + const data = { + logEntries, + clientAuthToken: this.clientAuthToken, + sessionId: this.sessionId, + } + got.post(`${this.platformUrl}/log-entries`, { json: data }).catch((err) => { + this.log.error(err) + }) + } + + flushBuffered({ flushAll = false }) { + if (!this.clientAuthToken || !this.platformUrl) { + return + } + const eventsToFlush = this.bufferedEvents.splice(0, flushAll ? this.bufferedEvents.length : MAX_BATCH_SIZE) + + if (eventsToFlush.length > 0) { + this.flushEventsTestHandler ? this.flushEventsTestHandler(eventsToFlush) : this.flushEvents(eventsToFlush) + } + + const logEntryFlushCount = flushAll ? this.bufferedLogEntries.length : MAX_BATCH_SIZE - eventsToFlush.length + const logEntriesToFlush = this.bufferedLogEntries.splice(0, logEntryFlushCount) + + if (logEntriesToFlush.length > 0) { + this.flushLogEntriesTestHandler + ? this.flushLogEntriesTestHandler(logEntriesToFlush) + : this.flushLogEntries(logEntriesToFlush) + } + } +} diff --git a/garden-service/src/process.ts b/garden-service/src/process.ts index ae5e8267492..a8920ced33a 100644 --- a/garden-service/src/process.ts +++ b/garden-service/src/process.ts @@ -211,6 +211,7 @@ async function validateConfigChange( try { const nextGarden = await Garden.factory(garden.projectRoot, garden.opts) await nextGarden.getConfigGraph(log) + await nextGarden.close() } catch (error) { if (error instanceof ConfigurationError) { const msg = dedent` diff --git a/garden-service/test/helpers.ts b/garden-service/test/helpers.ts index a93bc8b37e1..8f01d659fd6 100644 --- a/garden-service/test/helpers.ts +++ b/garden-service/test/helpers.ts @@ -313,8 +313,8 @@ interface EventLogEntry { class TestEventBus extends EventBus { public eventLog: EventLogEntry[] - constructor(log: LogEntry) { - super(log) + constructor() { + super() this.eventLog = [] } @@ -335,7 +335,7 @@ export class TestGarden extends Garden { constructor(params: GardenParams) { super(params) - this.events = new TestEventBus(this.log) + this.events = new TestEventBus() } setModuleConfigs(moduleConfigs: ModuleConfig[]) { diff --git a/garden-service/test/unit/src/events.ts b/garden-service/test/unit/src/events.ts index 9dcce566452..44c631fccad 100644 --- a/garden-service/test/unit/src/events.ts +++ b/garden-service/test/unit/src/events.ts @@ -8,13 +8,12 @@ import { EventBus } from "../../../src/events" import { expect } from "chai" -import { getLogger } from "../../../src/logger/logger" describe("EventBus", () => { let events: EventBus beforeEach(() => { - events = new EventBus(getLogger().placeholder()) + events = new EventBus() }) it("should send+receive events", (done) => { diff --git a/garden-service/test/unit/src/platform/auth.ts b/garden-service/test/unit/src/platform/auth.ts index 7d7ebad3384..e4038f9e079 100644 --- a/garden-service/test/unit/src/platform/auth.ts +++ b/garden-service/test/unit/src/platform/auth.ts @@ -12,7 +12,7 @@ import { ClientAuthToken } from "../../../../src/db/entities/client-auth-token" import { makeTestGardenA } from "../../../helpers" import { saveAuthToken, readAuthToken, clearAuthToken } from "../../../../src/platform/auth" -async function cleanupAuthTokens() { +export async function cleanupAuthTokens() { await ClientAuthToken.createQueryBuilder() .delete() .execute() diff --git a/garden-service/test/unit/src/platform/buffered-event-stream.ts b/garden-service/test/unit/src/platform/buffered-event-stream.ts new file mode 100644 index 00000000000..fbc8b525039 --- /dev/null +++ b/garden-service/test/unit/src/platform/buffered-event-stream.ts @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2018-2020 Garden Technologies, Inc. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +import { expect } from "chai" +import { StreamEvent, LogEntryEvent, BufferedEventStream } from "../../../../src/platform/buffered-event-stream" +import { getLogger } from "../../../../src/logger/logger" +import { EventBus } from "../../../../src/events" + +describe("BufferedEventStream", () => { + it("should flush events and log entries emitted by a connected event emitter", async () => { + const flushedEvents: StreamEvent[] = [] + const flushedLogEntries: LogEntryEvent[] = [] + + const log = getLogger().placeholder() + + const bufferedEventStream = new BufferedEventStream(log, "dummy-session-id") + + bufferedEventStream.flushEventsTestHandler = (events: StreamEvent[]) => { + flushedEvents.push(...events) + } + bufferedEventStream.flushLogEntriesTestHandler = (logEntries: LogEntryEvent[]) => { + flushedLogEntries.push(...logEntries) + } + + const eventBus = new EventBus() + bufferedEventStream.connect(eventBus, "dummy-client-token", "dummy-platform_url") + + eventBus.emit("_test", {}) + log.root.events.emit("_test", {}) + + bufferedEventStream.flushBuffered({ flushAll: true }) + + expect(flushedEvents.length).to.eql(1) + expect(flushedLogEntries.length).to.eql(1) + }) + + it("should only flush events or log entries emitted by the last connected event emitter", async () => { + const flushedEvents: StreamEvent[] = [] + const flushedLogEntries: LogEntryEvent[] = [] + + const log = getLogger().placeholder() + + const bufferedEventStream = new BufferedEventStream(log, "dummy-session-id") + + bufferedEventStream.flushEventsTestHandler = (events: StreamEvent[]) => { + flushedEvents.push(...events) + } + bufferedEventStream.flushLogEntriesTestHandler = (logEntries: LogEntryEvent[]) => { + flushedLogEntries.push(...logEntries) + } + + const oldEventBus = new EventBus() + bufferedEventStream.connect(oldEventBus, "dummy-client-token", "dummy-platform_url") + const newEventBus = new EventBus() + bufferedEventStream.connect(newEventBus, "dummy-client-token", "dummy-platform_url") + + log.root.events.emit("_test", {}) + oldEventBus.emit("_test", {}) + + bufferedEventStream.flushBuffered({ flushAll: true }) + + expect(flushedEvents.length).to.eql(0) + expect(flushedLogEntries.length).to.eql(1) + + newEventBus.emit("_test", {}) + bufferedEventStream.flushBuffered({ flushAll: true }) + + expect(flushedEvents.length).to.eql(1) + }) +})