Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: move session-specific logic to Timeline #89

Merged
merged 1 commit into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 21 additions & 119 deletions android/src/main/java/com/amplitude/android/Amplitude.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import android.content.Context
import com.amplitude.android.plugins.AndroidContextPlugin
import com.amplitude.android.plugins.AndroidLifecyclePlugin
import com.amplitude.core.Amplitude
import com.amplitude.core.Storage
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.plugins.AmplitudeDestination
import com.amplitude.core.platform.plugins.GetAmpliExtrasPlugin
Expand All @@ -23,22 +22,23 @@ open class Amplitude(
) : Amplitude(configuration) {

internal var inForeground = false
var sessionId: Long
private set
internal var lastEventId: Long
var lastEventTime: Long
private lateinit var androidContextPlugin: AndroidContextPlugin

init {
storage = configuration.storageProvider.getStorage(this)
(timeline as Timeline).start()
registerShutdownHook()
}

this.sessionId = storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLong() ?: -1
this.lastEventId = storage.read(Storage.Constants.LAST_EVENT_ID)?.toLong() ?: 0
this.lastEventTime = storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLong() ?: -1
override fun createTimeline(): Timeline {
return Timeline().also { it.amplitude = this }
}

override fun build(): Deferred<Boolean> {
val client = this

val built = amplitudeScope.async(amplitudeDispatcher) {
storage = configuration.storageProvider.getStorage(client)

val storageDirectory = (configuration as Configuration).context.getDir("${FileStorage.STORAGE_PREFIX}-${configuration.instanceName}", Context.MODE_PRIVATE)
idContainer = IdentityContainer.getInstance(
IdentityConfiguration(
Expand Down Expand Up @@ -79,59 +79,19 @@ open class Amplitude(
return this
}

override fun processEvent(event: BaseEvent): Iterable<BaseEvent>? {
val eventTimestamp = event.timestamp ?: System.currentTimeMillis()
event.timestamp = eventTimestamp
var sessionEvents: Iterable<BaseEvent>? = null

if (!(event.eventType == START_SESSION_EVENT || event.eventType == END_SESSION_EVENT)) {
if (!inForeground) {
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
} else {
refreshSessionTime(eventTimestamp)
}
}

if (event.sessionId < 0) {
event.sessionId = sessionId
}

val savedLastEventId = lastEventId

sessionEvents ?. let {
it.forEach { e ->
e.eventId ?: let {
val newEventId = lastEventId + 1
e.eventId = newEventId
lastEventId = newEventId
}
}
}

event.eventId ?: let {
val newEventId = lastEventId + 1
event.eventId = newEventId
lastEventId = newEventId
}

if (lastEventId > savedLastEventId) {
amplitudeScope.launch(amplitudeDispatcher) {
storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
}
}

return sessionEvents
}

fun onEnterForeground(timestamp: Long) {
startNewSessionIfNeeded(timestamp) ?. let {
it.forEach { event -> process(event) }
}
inForeground = true

val dummySessionStartEvent = BaseEvent()
dummySessionStartEvent.eventType = START_SESSION_EVENT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we always trigger this and decide later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. To avoid any data race issues, all logic is in Timeout now

dummySessionStartEvent.timestamp = timestamp
dummySessionStartEvent.sessionId = -1
timeline.process(dummySessionStartEvent)
}

fun onExitForeground() {
inForeground = false

amplitudeScope.launch(amplitudeDispatcher) {
isBuilt.await()
if ((configuration as Configuration).flushEventsOnClose) {
Expand All @@ -140,70 +100,12 @@ open class Amplitude(
}
}

fun startNewSessionIfNeeded(timestamp: Long): Iterable<BaseEvent>? {
if (inSession()) {

if (isWithinMinTimeBetweenSessions(timestamp)) {
refreshSessionTime(timestamp)
return null
private fun registerShutdownHook() {
Runtime.getRuntime().addShutdownHook(object : Thread() {
override fun run() {
(this@Amplitude.timeline as Timeline).stop()
}

return startNewSession(timestamp)
}

return startNewSession(timestamp)
}

private fun setSessionId(timestamp: Long) {
sessionId = timestamp
amplitudeScope.launch(amplitudeDispatcher) {
storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString())
}
}

private fun startNewSession(timestamp: Long): Iterable<BaseEvent> {
val sessionEvents = mutableListOf<BaseEvent>()

// end previous session
if ((configuration as Configuration).trackingSessionEvents && inSession()) {
val sessionEndEvent = BaseEvent()
sessionEndEvent.eventType = END_SESSION_EVENT
sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null
sessionEndEvent.sessionId = sessionId
sessionEvents.add(sessionEndEvent)
}

// start new session
setSessionId(timestamp)
refreshSessionTime(timestamp)
if (configuration.trackingSessionEvents) {
val sessionStartEvent = BaseEvent()
sessionStartEvent.eventType = START_SESSION_EVENT
sessionStartEvent.timestamp = timestamp
sessionStartEvent.sessionId = sessionId
sessionEvents.add(sessionStartEvent)
}

return sessionEvents
}

fun refreshSessionTime(timestamp: Long) {
if (!inSession()) {
return
}
lastEventTime = timestamp
amplitudeScope.launch(amplitudeDispatcher) {
storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
}
}

private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean {
val sessionLimit: Long = (configuration as Configuration).minTimeBetweenSessionsMillis
return timestamp - lastEventTime < sessionLimit
}

private fun inSession(): Boolean {
return sessionId >= 0
})
}

companion object {
Expand Down
166 changes: 166 additions & 0 deletions android/src/main/java/com/amplitude/android/Timeline.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package com.amplitude.android

import com.amplitude.core.Storage
import com.amplitude.core.events.BaseEvent
import com.amplitude.core.platform.Timeline
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch

class Timeline : Timeline() {
private val eventMessageChannel: Channel<EventQueueMessage> = Channel(Channel.UNLIMITED)
var sessionId: Long = -1
private set
internal var lastEventId: Long = 0
var lastEventTime: Long = -1

internal fun start() {
amplitude.amplitudeScope.launch(amplitude.storageIODispatcher) {
amplitude.isBuilt.await()

sessionId = amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLong() ?: -1
lastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLong() ?: 0
lastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLong() ?: -1

for (message in eventMessageChannel) {
processEventMessage(message)
}
}
}

internal fun stop() {
this.eventMessageChannel.cancel()
}

override fun process(incomingEvent: BaseEvent) {
if (incomingEvent.timestamp == null) {
incomingEvent.timestamp = System.currentTimeMillis()
}

eventMessageChannel.trySend(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground))
}

private suspend fun processEventMessage(message: EventQueueMessage) {
val event = message.event
var sessionEvents: Iterable<BaseEvent>? = null
val eventTimestamp = event.timestamp!!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe other event timestamp is bind later in process plugins

Copy link
Contributor Author

@falconandy falconandy Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean. I believe, timestamps should be assigned as soon as possible to have actual live values. Now it is here: https://github.com/amplitude/Amplitude-Kotlin/blob/DXOC-297-DiskReadViolation-in-strict-mode/core/src/main/java/com/amplitude/core/Amplitude.kt#L328.
Other code does fallback assignments like:

event.timestamp ?: let {
    event.timestamp = System.currentTimeMillis()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so this is added to cover this. I was worried if this could raiser exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processEventMessage is invoked only for items from eventMessageChannel
https://github.com/amplitude/Amplitude-Kotlin/blob/DXOC-297-DiskReadViolation-in-strict-mode/android/src/main/java/com/amplitude/android/Timeline.kt#L25
the only way to add items to eventMessageChannel is process function
https://github.com/amplitude/Amplitude-Kotlin/blob/DXOC-297-DiskReadViolation-in-strict-mode/android/src/main/java/com/amplitude/android/Timeline.kt#L34
here incomingEvent.timestamp = System.currentTimeMillis() exists to be ensure to have non-null timestamp

var skipEvent = false

if (event.eventType == Amplitude.START_SESSION_EVENT) {
if (event.sessionId < 0) { // dummy start_session event
skipEvent = true
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
} else {
setSessionId(event.sessionId)
refreshSessionTime(eventTimestamp)
}
} else if (event.eventType == Amplitude.END_SESSION_EVENT) {
// do nothing
} else {
if (!message.inForeground) {
sessionEvents = startNewSessionIfNeeded(eventTimestamp)
} else {
refreshSessionTime(eventTimestamp)
}
}

if (!skipEvent && event.sessionId < 0) {
event.sessionId = sessionId
}

val savedLastEventId = lastEventId

sessionEvents ?. let {
it.forEach { e ->
e.eventId ?: let {
val newEventId = lastEventId + 1
e.eventId = newEventId
lastEventId = newEventId
}
}
}

if (!skipEvent) {
event.eventId ?: let {
val newEventId = lastEventId + 1
event.eventId = newEventId
lastEventId = newEventId
}
}

if (lastEventId > savedLastEventId) {
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString())
}

sessionEvents ?. let {
it.forEach { e ->
super.process(e)
}
}

if (!skipEvent) {
super.process(event)
}
}

private suspend fun startNewSessionIfNeeded(timestamp: Long): Iterable<BaseEvent>? {
if (inSession() && isWithinMinTimeBetweenSessions(timestamp)) {
refreshSessionTime(timestamp)
return null
}
return startNewSession(timestamp)
}

private suspend fun setSessionId(timestamp: Long) {
sessionId = timestamp
amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString())
}

private suspend fun startNewSession(timestamp: Long): Iterable<BaseEvent> {
val sessionEvents = mutableListOf<BaseEvent>()
val trackingSessionEvents = (amplitude.configuration as Configuration).trackingSessionEvents

// end previous session
if (trackingSessionEvents && inSession()) {
val sessionEndEvent = BaseEvent()
sessionEndEvent.eventType = Amplitude.END_SESSION_EVENT
sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null
sessionEndEvent.sessionId = sessionId
sessionEvents.add(sessionEndEvent)
}

// start new session
setSessionId(timestamp)
refreshSessionTime(timestamp)
if (trackingSessionEvents) {
val sessionStartEvent = BaseEvent()
sessionStartEvent.eventType = Amplitude.START_SESSION_EVENT
sessionStartEvent.timestamp = timestamp
sessionStartEvent.sessionId = sessionId
sessionEvents.add(sessionStartEvent)
}

return sessionEvents
}

private suspend fun refreshSessionTime(timestamp: Long) {
if (!inSession()) {
return
}
lastEventTime = timestamp
amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString())
}

private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean {
val sessionLimit: Long = (amplitude.configuration as Configuration).minTimeBetweenSessionsMillis
return timestamp - lastEventTime < sessionLimit
}

private fun inSession(): Boolean {
return sessionId >= 0
}
}

data class EventQueueMessage(
val event: BaseEvent,
val inForeground: Boolean
)
Loading