-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: move session-specific logic to Timeline #89
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
package com.amplitude.android | ||
|
||
import com.amplitude.core.Storage | ||
import com.amplitude.core.events.BaseEvent | ||
import com.amplitude.core.platform.Timeline | ||
import kotlinx.coroutines.channels.Channel | ||
import kotlinx.coroutines.launch | ||
|
||
class Timeline : Timeline() { | ||
private val eventMessageChannel: Channel<EventQueueMessage> = Channel(Channel.UNLIMITED) | ||
var sessionId: Long = -1 | ||
private set | ||
internal var lastEventId: Long = 0 | ||
var lastEventTime: Long = -1 | ||
|
||
internal fun start() { | ||
amplitude.amplitudeScope.launch(amplitude.storageIODispatcher) { | ||
amplitude.isBuilt.await() | ||
|
||
sessionId = amplitude.storage.read(Storage.Constants.PREVIOUS_SESSION_ID)?.toLong() ?: -1 | ||
lastEventId = amplitude.storage.read(Storage.Constants.LAST_EVENT_ID)?.toLong() ?: 0 | ||
lastEventTime = amplitude.storage.read(Storage.Constants.LAST_EVENT_TIME)?.toLong() ?: -1 | ||
|
||
for (message in eventMessageChannel) { | ||
processEventMessage(message) | ||
} | ||
} | ||
} | ||
|
||
internal fun stop() { | ||
this.eventMessageChannel.cancel() | ||
} | ||
|
||
override fun process(incomingEvent: BaseEvent) { | ||
if (incomingEvent.timestamp == null) { | ||
incomingEvent.timestamp = System.currentTimeMillis() | ||
} | ||
|
||
eventMessageChannel.trySend(EventQueueMessage(incomingEvent, (amplitude as Amplitude).inForeground)) | ||
} | ||
|
||
private suspend fun processEventMessage(message: EventQueueMessage) { | ||
val event = message.event | ||
var sessionEvents: Iterable<BaseEvent>? = null | ||
val eventTimestamp = event.timestamp!! | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe other event timestamp is bind later in process plugins There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what you mean. I believe, timestamps should be assigned as soon as possible to have actual live values. Now it is here: https://github.com/amplitude/Amplitude-Kotlin/blob/DXOC-297-DiskReadViolation-in-strict-mode/core/src/main/java/com/amplitude/core/Amplitude.kt#L328.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, so this is added to cover this. I was worried if this could raiser exceptions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
var skipEvent = false | ||
|
||
if (event.eventType == Amplitude.START_SESSION_EVENT) { | ||
if (event.sessionId < 0) { // dummy start_session event | ||
skipEvent = true | ||
sessionEvents = startNewSessionIfNeeded(eventTimestamp) | ||
} else { | ||
setSessionId(event.sessionId) | ||
refreshSessionTime(eventTimestamp) | ||
} | ||
} else if (event.eventType == Amplitude.END_SESSION_EVENT) { | ||
// do nothing | ||
} else { | ||
if (!message.inForeground) { | ||
sessionEvents = startNewSessionIfNeeded(eventTimestamp) | ||
} else { | ||
refreshSessionTime(eventTimestamp) | ||
} | ||
} | ||
|
||
if (!skipEvent && event.sessionId < 0) { | ||
event.sessionId = sessionId | ||
} | ||
|
||
val savedLastEventId = lastEventId | ||
|
||
sessionEvents ?. let { | ||
it.forEach { e -> | ||
e.eventId ?: let { | ||
val newEventId = lastEventId + 1 | ||
e.eventId = newEventId | ||
lastEventId = newEventId | ||
} | ||
} | ||
} | ||
|
||
if (!skipEvent) { | ||
event.eventId ?: let { | ||
val newEventId = lastEventId + 1 | ||
event.eventId = newEventId | ||
lastEventId = newEventId | ||
} | ||
} | ||
|
||
if (lastEventId > savedLastEventId) { | ||
amplitude.storage.write(Storage.Constants.LAST_EVENT_ID, lastEventId.toString()) | ||
} | ||
|
||
sessionEvents ?. let { | ||
it.forEach { e -> | ||
super.process(e) | ||
} | ||
} | ||
|
||
if (!skipEvent) { | ||
super.process(event) | ||
} | ||
} | ||
|
||
private suspend fun startNewSessionIfNeeded(timestamp: Long): Iterable<BaseEvent>? { | ||
if (inSession() && isWithinMinTimeBetweenSessions(timestamp)) { | ||
refreshSessionTime(timestamp) | ||
return null | ||
} | ||
return startNewSession(timestamp) | ||
} | ||
|
||
private suspend fun setSessionId(timestamp: Long) { | ||
sessionId = timestamp | ||
amplitude.storage.write(Storage.Constants.PREVIOUS_SESSION_ID, sessionId.toString()) | ||
} | ||
|
||
private suspend fun startNewSession(timestamp: Long): Iterable<BaseEvent> { | ||
val sessionEvents = mutableListOf<BaseEvent>() | ||
val trackingSessionEvents = (amplitude.configuration as Configuration).trackingSessionEvents | ||
|
||
// end previous session | ||
if (trackingSessionEvents && inSession()) { | ||
val sessionEndEvent = BaseEvent() | ||
sessionEndEvent.eventType = Amplitude.END_SESSION_EVENT | ||
sessionEndEvent.timestamp = if (lastEventTime > 0) lastEventTime else null | ||
sessionEndEvent.sessionId = sessionId | ||
sessionEvents.add(sessionEndEvent) | ||
} | ||
|
||
// start new session | ||
setSessionId(timestamp) | ||
refreshSessionTime(timestamp) | ||
if (trackingSessionEvents) { | ||
val sessionStartEvent = BaseEvent() | ||
sessionStartEvent.eventType = Amplitude.START_SESSION_EVENT | ||
sessionStartEvent.timestamp = timestamp | ||
sessionStartEvent.sessionId = sessionId | ||
sessionEvents.add(sessionStartEvent) | ||
} | ||
|
||
return sessionEvents | ||
} | ||
|
||
private suspend fun refreshSessionTime(timestamp: Long) { | ||
if (!inSession()) { | ||
return | ||
} | ||
lastEventTime = timestamp | ||
amplitude.storage.write(Storage.Constants.LAST_EVENT_TIME, lastEventTime.toString()) | ||
} | ||
|
||
private fun isWithinMinTimeBetweenSessions(timestamp: Long): Boolean { | ||
val sessionLimit: Long = (amplitude.configuration as Configuration).minTimeBetweenSessionsMillis | ||
return timestamp - lastEventTime < sessionLimit | ||
} | ||
|
||
private fun inSession(): Boolean { | ||
return sessionId >= 0 | ||
} | ||
} | ||
|
||
data class EventQueueMessage( | ||
val event: BaseEvent, | ||
val inForeground: Boolean | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we always trigger this and decide later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. To avoid any
data race
issues, all logic is inTimeout
now