Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: start tracking sessions at init for session replay #186

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
307e196
fix: start tracking sessions at init for session replay, support addi…
justin-fiedler Mar 19, 2024
523ab2b
chore: remove dead code
justin-fiedler Mar 19, 2024
7d288fa
chore: test sessionId is the time of instantiation after isBuilt
justin-fiedler Mar 19, 2024
c5eac10
chore: clean up session object creation, test clean up
justin-fiedler Mar 19, 2024
c05f19c
chore: lint fix
justin-fiedler Mar 19, 2024
024368b
chore: remove unused logger in session class
justin-fiedler Mar 19, 2024
63a121c
fix: fixed broken tests, added mockSystemTime() util for testing
justin-fiedler Mar 20, 2024
16a5adb
chore: lint
justin-fiedler Mar 20, 2024
819ce3f
chore: fix broken test
justin-fiedler Mar 20, 2024
3633066
fix: add tests for ObservePlugin.onSessionIdChanged, minor fixes
justin-fiedler Mar 20, 2024
3522ace
chore: lint
justin-fiedler Mar 20, 2024
d144c3b
chore: clean up tests
justin-fiedler Mar 20, 2024
5474f85
chore: code clean up
justin-fiedler Mar 20, 2024
b0954dd
fix: override amplitude by default in ObservePlugin
justin-fiedler Mar 20, 2024
e174ca6
chore: minor test clean up
justin-fiedler Mar 20, 2024
0adfce6
chore: try to fix RemnantDataMigrationTest.kt in CI
justin-fiedler Mar 20, 2024
79b5c03
chore: delete dead code
justin-fiedler Mar 20, 2024
f458ee2
chore: move session start logic into Timeline, clean up tests
justin-fiedler Mar 21, 2024
e963b2e
chore: minor code clean up
justin-fiedler Mar 21, 2024
94daa66
chore: remove unneeded session logic in Timeline.process
justin-fiedler Mar 22, 2024
3f64e00
chore: remove logging in test
justin-fiedler Mar 22, 2024
4c4d73b
chore: linty lint
justin-fiedler Mar 22, 2024
3bf0585
chore: move lastEventId logic back to timeline (remove from session)
justin-fiedler Mar 22, 2024
d4299cd
chore: fix broken test
justin-fiedler Mar 22, 2024
5dfba6c
chore: fix test
justin-fiedler Mar 22, 2024
2aa0afa
chore: re-add lastEventId to RemnantDataMigration
justin-fiedler Mar 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions android/src/main/java/com/amplitude/android/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,36 @@ import com.amplitude.android.plugins.AnalyticsConnectorPlugin
import com.amplitude.android.plugins.AndroidContextPlugin
import com.amplitude.android.plugins.AndroidLifecyclePlugin
import com.amplitude.android.plugins.AndroidNetworkConnectivityCheckerPlugin
import com.amplitude.android.utilities.Session
import com.amplitude.android.utilities.SystemTime
import com.amplitude.core.Amplitude
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.plugins.AmplitudeDestination
import com.amplitude.core.platform.plugins.GetAmpliExtrasPlugin
import com.amplitude.core.utilities.FileStorage
import com.amplitude.id.IdentityConfiguration
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

open class Amplitude(
configuration: Configuration
) : Amplitude(configuration) {

internal var inForeground = false
private lateinit var androidContextPlugin: AndroidContextPlugin
internal lateinit var session: Session

val sessionId: Long
get() {
return (timeline as Timeline).sessionId
return if (session == null) -1 else session.sessionId
}

init {
registerShutdownHook()
}

override fun createTimeline(): Timeline {
return Timeline(configuration.sessionId).also { it.amplitude = this }
return Timeline().also { it.amplitude = this }
}

override fun createIdentityConfiguration(): IdentityConfiguration {
Expand All @@ -50,11 +54,12 @@ open class Amplitude(
}

override suspend fun buildInternal(identityConfiguration: IdentityConfiguration) {
// Migrations
ApiKeyStorageMigration(this).execute()

if ((this.configuration as Configuration).migrateLegacyData) {
RemnantDataMigration(this).execute()
}

this.createIdentityContainer(identityConfiguration)

if (this.configuration.offline != AndroidNetworkConnectivityCheckerPlugin.Disabled) {
Expand All @@ -72,8 +77,27 @@ open class Amplitude(
add(AnalyticsConnectorIdentityPlugin())
add(AnalyticsConnectorPlugin())
add(AmplitudeDestination())
val plugins = configuration.plugins
yuhao900914 marked this conversation as resolved.
Show resolved Hide resolved
if (plugins != null) {
for (plugin in plugins) {
add(plugin)
}
}

(timeline as Timeline).start()
// WARNING: Session events need to run after migrations as not to modify `lastEventTime`
// Check if we need to start a new session
session = Session(configuration as Configuration, storage, store)
logger.debug("Configured session. Session=$session")
val sessionEvents = session.startNewSessionIfNeeded(SystemTime.getCurrentTimeMillis(), configuration.sessionId)

val androidTimeline = timeline as Timeline
androidTimeline.start(session)

runBlocking {
sessionEvents?.forEach {
androidTimeline.processImmediately(it)
}
}
}

/**
Expand Down
3 changes: 3 additions & 0 deletions android/src/main/java/com/amplitude/android/Configuration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.amplitude.core.ServerZone
import com.amplitude.core.StorageProvider
import com.amplitude.core.events.IngestionMetadata
import com.amplitude.core.events.Plan
import com.amplitude.core.platform.Plugin
import com.amplitude.id.FileIdentityStorageProvider
import com.amplitude.id.IdentityStorageProvider

Expand Down Expand Up @@ -49,6 +50,7 @@ open class Configuration @JvmOverloads constructor(
override var offline: Boolean? = false,
override var deviceId: String? = null,
override var sessionId: Long? = null,
override var plugins: List<Plugin>? = null,
) : Configuration(
apiKey,
flushQueueSize,
Expand All @@ -72,6 +74,7 @@ open class Configuration @JvmOverloads constructor(
offline,
deviceId,
sessionId,
plugins
) {
companion object {
const val MIN_TIME_BETWEEN_SESSIONS_MILLIS: Long = 300000
Expand Down
129 changes: 25 additions & 104 deletions android/src/main/java/com/amplitude/android/Timeline.kt
Original file line number Diff line number Diff line change
@@ -1,40 +1,22 @@
package com.amplitude.android

import com.amplitude.core.Storage
import com.amplitude.android.utilities.Session
import com.amplitude.android.utilities.SystemTime
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.Timeline
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicLong

class Timeline(
private val initialSessionId: Long? = null,
) : Timeline() {
class Timeline : Timeline() {
private val eventMessageChannel: Channel<EventQueueMessage> = Channel(Channel.UNLIMITED)
private lateinit var session: Session

private val _sessionId = AtomicLong(initialSessionId ?: -1L)
val sessionId: Long
get() {
return _sessionId.get()
}

internal var lastEventId: Long = 0
var lastEventTime: Long = -1L

internal fun start() {
internal fun start(session: Session) {
this.session = session
amplitude.amplitudeScope.launch(amplitude.storageIODispatcher) {
// Wait until build (including possible legacy data migration) is finished.
amplitude.isBuilt.await()

if (initialSessionId == null) {
_sessionId.set(
amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLongOrNull()
?: -1
)
}
lastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLongOrNull() ?: 0
lastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull() ?: -1

for (message in eventMessageChannel) {
processEventMessage(message)
}
Expand All @@ -47,12 +29,20 @@ class Timeline(

override fun process(incomingEvent: BaseEvent) {
if (incomingEvent.timestamp == null) {
incomingEvent.timestamp = System.currentTimeMillis()
incomingEvent.timestamp = SystemTime.getCurrentTimeMillis()
}

eventMessageChannel.trySend(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
}

internal suspend fun processImmediately(incomingEvent: BaseEvent) {
if (incomingEvent.timestamp == null) {
incomingEvent.timestamp = SystemTime.getCurrentTimeMillis()
}

processEventMessage(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
}

private suspend fun processEventMessage(message: EventQueueMessage) {
val event = message.event
var sessionEvents: Iterable<BaseEvent>? = null
Expand All @@ -61,52 +51,43 @@ class Timeline(
var skipEvent = false

if (event.eventType == Amplitude.START_SESSION_EVENT) {
setSessionId(eventSessionId ?: eventTimestamp)
refreshSessionTime(eventTimestamp)
session.setSessionId(eventSessionId ?: eventTimestamp)
yuhao900914 marked this conversation as resolved.
Show resolved Hide resolved
session.refreshSessionTime(eventTimestamp)
} else if (event.eventType == Amplitude.END_SESSION_EVENT) {
// do nothing
} else if (event.eventType == Amplitude.DUMMY_ENTER_FOREGROUND_EVENT) {
skipEvent = true
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
sessionEvents = session.startNewSessionIfNeeded(eventTimestamp)
} else if (event.eventType == Amplitude.DUMMY_EXIT_FOREGROUND_EVENT) {
skipEvent = true
refreshSessionTime(eventTimestamp)
session.refreshSessionTime(eventTimestamp)
} else {
if (!message.inForeground) {
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
sessionEvents = session.startNewSessionIfNeeded(eventTimestamp)
} else {
refreshSessionTime(eventTimestamp)
session.refreshSessionTime(eventTimestamp)
}
}

if (!skipEvent && event.sessionId == null) {
event.sessionId = sessionId
event.sessionId = session.sessionId
}

val savedLastEventId = lastEventId

val savedLastEventId = session.lastEventId
sessionEvents?.let {
it.forEach { e ->
e.eventId ?: let {
val newEventId = lastEventId + 1
e.eventId = newEventId
lastEventId = newEventId
e.eventId = session.getAndSetNextEventId()
}
}
}

if (!skipEvent) {
event.eventId ?: let {
val newEventId = lastEventId + 1
event.eventId = newEventId
lastEventId = newEventId
event.eventId = session.getAndSetNextEventId()
}
}

if (lastEventId > savedLastEventId) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
}

sessionEvents?.let {
it.forEach { e ->
super.process(e)
Expand All @@ -117,66 +98,6 @@ class Timeline(
super.process(event)
}
}

private suspend fun startNewSessionIfNeeded(timestamp: Long): Iterable<BaseEvent>? {
if (inSession() && isWithinMinTimeBetweenSessions(timestamp)) {
refreshSessionTime(timestamp)
return null
}
return startNewSession(timestamp)
}

private suspend fun setSessionId(timestamp: Long) {
_sessionId.set(timestamp)
amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString())
}

private suspend fun startNewSession(timestamp: Long): Iterable<BaseEvent> {
val sessionEvents = mutableListOf<BaseEvent>()
val configuration = amplitude.configuration as Configuration
// If any trackingSessionEvents is false (default value is true), means it is manually set
@Suppress("DEPRECATION")
val trackingSessionEvents = configuration.trackingSessionEvents && configuration.defaultTracking.sessions

// end previous session
if (trackingSessionEvents && inSession()) {
val sessionEndEvent = BaseEvent()
sessionEndEvent.eventType = Amplitude.END_SESSION_EVENT
sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null
sessionEndEvent.sessionId = sessionId
sessionEvents.add(sessionEndEvent)
}

// start new session
setSessionId(timestamp)
refreshSessionTime(timestamp)
if (trackingSessionEvents) {
val sessionStartEvent = BaseEvent()
sessionStartEvent.eventType = Amplitude.START_SESSION_EVENT
sessionStartEvent.timestamp = timestamp
sessionStartEvent.sessionId = sessionId
sessionEvents.add(sessionStartEvent)
}

return sessionEvents
}

private suspend fun refreshSessionTime(timestamp: Long) {
if (!inSession()) {
return
}
lastEventTime = timestamp
amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
}

private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean {
val sessionLimit: Long = (amplitude.configuration as Configuration).minTimeBetweenSessionsMillis
return timestamp - lastEventTime < sessionLimit
}

private fun inSession(): Boolean {
return sessionId >= 0
}
}

data class EventQueueMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ class RemnantDataMigration(
companion object {
const val DEVICE_ID_KEY = "device_id"
const val USER_ID_KEY = "user_id"
const val LAST_EVENT_TIME_KEY = "last_event_time"
const val LAST_EVENT_ID_KEY = "last_event_id"
const val PREVIOUS_SESSION_ID_KEY = "previous_session_id"
}

lateinit var databaseStorage: DatabaseStorage
Expand All @@ -33,8 +30,8 @@ class RemnantDataMigration(

val firstRunSinceUpgrade = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull() == null

// WARNING: We don't migrate session data as we want to reset on a new app install
moveDeviceAndUserId()
moveSessionData()

if (firstRunSinceUpgrade) {
moveInterceptedIdentifies()
Expand Down Expand Up @@ -67,37 +64,6 @@ class RemnantDataMigration(
}
}

private suspend fun moveSessionData() {
try {
val currentSessionId = amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLongOrNull()
val currentLastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLongOrNull()
val currentLastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLongOrNull()

val previousSessionId = databaseStorage.getLongValue(PREVIOUS_SESSION_ID_KEY)
val lastEventTime = databaseStorage.getLongValue(LAST_EVENT_TIME_KEY)
val lastEventId = databaseStorage.getLongValue(LAST_EVENT_ID_KEY)

if (currentSessionId == null && previousSessionId != null) {
amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, previousSessionId.toString())
databaseStorage.removeLongValue(PREVIOUS_SESSION_ID_KEY)
}

if (currentLastEventTime == null && lastEventTime != null) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
databaseStorage.removeLongValue(LAST_EVENT_TIME_KEY)
}

if (currentLastEventId == null && lastEventId != null) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
databaseStorage.removeLongValue(LAST_EVENT_ID_KEY)
}
} catch (e: Exception) {
LogcatLogger.logger.error(
"session data migration failed: ${e.message}"
)
}
}

private suspend fun moveEvents() {
try {
val remnantEvents = databaseStorage.readEventsContent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.amplitude.android.plugins
import com.amplitude.android.BuildConfig
import com.amplitude.android.Configuration
import com.amplitude.android.TrackingOptions
import com.amplitude.android.utilities.SystemTime
import com.amplitude.common.android.AndroidContextProvider
import com.amplitude.core.Amplitude
import com.amplitude.core.events.BaseEvent
Expand Down Expand Up @@ -70,7 +71,7 @@ open class AndroidContextPlugin : Plugin {
private fun applyContextData(event: BaseEvent) {
val configuration = amplitude.configuration as Configuration
event.timestamp ?: let {
val eventTime = System.currentTimeMillis()
val eventTime = SystemTime.getCurrentTimeMillis()
event.timestamp = eventTime
}
event.insertId ?: let {
Expand Down
Loading
Loading