diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index efb5290b..5b122fb7 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -6,14 +6,15 @@ jobs: build-and-deploy: runs-on: ubuntu-latest steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up JDK 11 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '11' distribution: 'zulu' - - - name: Checkout - uses: actions/checkout@v3 + cache: 'gradle' - name: Setup Docs run: ./gradlew dokkaHtmlMultiModule diff --git a/.github/workflows/pull-request-test.yml b/.github/workflows/pull-request-test.yml index fb8d3093..c74579fa 100644 --- a/.github/workflows/pull-request-test.yml +++ b/.github/workflows/pull-request-test.yml @@ -6,12 +6,13 @@ jobs: test: runs-on: ubuntu-20.04 steps: + - uses: actions/checkout@v4 - name: Set up JDK 11 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '11' distribution: 'temurin' - - uses: actions/checkout@v3 + cache: 'gradle' - name: Build run: ./gradlew build - name: Unit Test @@ -20,7 +21,7 @@ jobs: run: ./gradlew ktlintCheck - name: Upload build results if: always() - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: # artifacts name name: build-results diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index d833fbb2..5f9c159b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -26,13 +26,14 @@ jobs: needs: [authorize] steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up JDK 11 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '11' distribution: 'zulu' + cache: 'gradle' - name: Build run: ./gradlew build diff --git a/android/src/main/java/com/amplitude/android/migration/ApiKeyStorageMigration.kt b/android/src/main/java/com/amplitude/android/migration/ApiKeyStorageMigration.kt index 28656ed8..9507a66e 100644 --- a/android/src/main/java/com/amplitude/android/migration/ApiKeyStorageMigration.kt +++ b/android/src/main/java/com/amplitude/android/migration/ApiKeyStorageMigration.kt @@ -5,7 +5,7 @@ import com.amplitude.android.Configuration import com.amplitude.android.utilities.AndroidStorage class ApiKeyStorageMigration( - private val amplitude: Amplitude + private val amplitude: Amplitude, ) { suspend fun execute() { val configuration = amplitude.configuration as Configuration @@ -13,13 +13,14 @@ class ApiKeyStorageMigration( val storage = amplitude.storage as? AndroidStorage if (storage != null) { - val apiKeyStorage = AndroidStorage(configuration.context, configuration.apiKey, logger, storage.prefix) + val apiKeyStorage = AndroidStorage(configuration.context, configuration.apiKey, logger, storage.prefix, amplitude.diagnostics) StorageKeyMigration(apiKeyStorage, storage, logger).execute() } val identifyInterceptStorage = amplitude.identifyInterceptStorage as? AndroidStorage if (identifyInterceptStorage != null) { - val apiKeyStorage = AndroidStorage(configuration.context, configuration.apiKey, logger, identifyInterceptStorage.prefix) + val apiKeyStorage = + AndroidStorage(configuration.context, configuration.apiKey, logger, identifyInterceptStorage.prefix, amplitude.diagnostics) StorageKeyMigration(apiKeyStorage, identifyInterceptStorage, logger).execute() } } diff --git a/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt b/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt index b3e24ede..10fa9ef3 100644 --- a/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt +++ b/android/src/main/java/com/amplitude/android/utilities/AndroidStorage.kt @@ -10,6 +10,7 @@ import com.amplitude.core.Storage import com.amplitude.core.StorageProvider import com.amplitude.core.events.BaseEvent import com.amplitude.core.platform.EventPipeline +import com.amplitude.core.utilities.Diagnostics import com.amplitude.core.utilities.EventsFileManager import com.amplitude.core.utilities.EventsFileStorage import com.amplitude.core.utilities.FileResponseHandler @@ -24,9 +25,9 @@ class AndroidStorage( context: Context, val storageKey: String, private val logger: Logger, - internal val prefix: String? + internal val prefix: String?, + private val diagnostics: Diagnostics, ) : Storage, EventsFileStorage { - companion object { const val STORAGE_PREFIX = "amplitude-android" } @@ -35,7 +36,7 @@ class AndroidStorage( context.getSharedPreferences("${getPrefix()}-$storageKey", Context.MODE_PRIVATE) private val storageDirectory: File = context.getDir(getDir(), Context.MODE_PRIVATE) private val eventsFile = - EventsFileManager(storageDirectory, storageKey, AndroidKVS(sharedPreferences)) + EventsFileManager(storageDirectory, storageKey, AndroidKVS(sharedPreferences), logger, diagnostics) private val eventCallbacksMap = mutableMapOf() override suspend fun writeEvent(event: BaseEvent) { @@ -47,7 +48,10 @@ class AndroidStorage( } } - override suspend fun write(key: Storage.Constants, value: String) { + override suspend fun write( + key: Storage.Constants, + value: String, + ) { sharedPreferences.edit().putString(key.rawVal, value).apply() } @@ -87,7 +91,7 @@ class AndroidStorage( configuration, scope, dispatcher, - logger + logger, ) } @@ -103,7 +107,10 @@ class AndroidStorage( eventCallbacksMap.remove(insertId) } - override fun splitEventFile(filePath: String, events: JSONArray) { + override fun splitEventFile( + filePath: String, + events: JSONArray, + ) { eventsFile.splitFile(filePath, events) } @@ -120,13 +127,17 @@ class AndroidStorage( } class AndroidStorageProvider : StorageProvider { - override fun getStorage(amplitude: Amplitude, prefix: String?): Storage { + override fun getStorage( + amplitude: Amplitude, + prefix: String?, + ): Storage { val configuration = amplitude.configuration as com.amplitude.android.Configuration return AndroidStorage( configuration.context, configuration.instanceName, configuration.loggerProvider.getLogger(amplitude), - prefix + prefix, + amplitude.diagnostics, ) } } diff --git a/android/src/test/java/com/amplitude/android/migration/StorageKeyMigrationTest.kt b/android/src/test/java/com/amplitude/android/migration/StorageKeyMigrationTest.kt index 8b1e069f..2ca0a5ff 100644 --- a/android/src/test/java/com/amplitude/android/migration/StorageKeyMigrationTest.kt +++ b/android/src/test/java/com/amplitude/android/migration/StorageKeyMigrationTest.kt @@ -6,6 +6,7 @@ import com.amplitude.android.utilities.AndroidStorage import com.amplitude.common.jvm.ConsoleLogger import com.amplitude.core.Storage import com.amplitude.core.events.BaseEvent +import com.amplitude.core.utilities.Diagnostics import kotlinx.coroutines.runBlocking import org.junit.Test import org.junit.jupiter.api.Assertions @@ -16,13 +17,15 @@ import java.util.UUID @RunWith(RobolectricTestRunner::class) class StorageKeyMigrationTest { + private val testDiagnostics = Diagnostics() + @Test fun `simple values should be migrated`() { val context = ApplicationProvider.getApplicationContext() val logger = ConsoleLogger() - val source = AndroidStorage(context, UUID.randomUUID().toString(), logger, null) - val destination = AndroidStorage(context, UUID.randomUUID().toString(), logger, null) + val source = AndroidStorage(context, UUID.randomUUID().toString(), logger, null, testDiagnostics) + val destination = AndroidStorage(context, UUID.randomUUID().toString(), logger, null, testDiagnostics) val sourceFileIndexKey = "amplitude.events.file.index.${source.storageKey}" val destinationFileIndexKey = "amplitude.events.file.index.${destination.storageKey}" @@ -74,8 +77,8 @@ class StorageKeyMigrationTest { val context = ApplicationProvider.getApplicationContext() val logger = ConsoleLogger() - val source = AndroidStorage(context, UUID.randomUUID().toString(), logger, null) - val destination = AndroidStorage(context, UUID.randomUUID().toString(), logger, null) + val source = AndroidStorage(context, UUID.randomUUID().toString(), logger, null, testDiagnostics) + val destination = AndroidStorage(context, UUID.randomUUID().toString(), logger, null, testDiagnostics) runBlocking { source.writeEvent(createEvent(1)) @@ -117,8 +120,8 @@ class StorageKeyMigrationTest { val context = ApplicationProvider.getApplicationContext() val logger = ConsoleLogger() - val source = AndroidStorage(context, UUID.randomUUID().toString(), logger, null) - val destination = AndroidStorage(context, UUID.randomUUID().toString(), logger, null) + val source = AndroidStorage(context, UUID.randomUUID().toString(), logger, null, testDiagnostics) + val destination = AndroidStorage(context, UUID.randomUUID().toString(), logger, null, testDiagnostics) var destinationPreviousSessionId = destination.read(Storage.Constants.PREVIOUS_SESSION_ID) var destinationLastEventTime = destination.read(Storage.Constants.LAST_EVENT_TIME) @@ -150,8 +153,8 @@ class StorageKeyMigrationTest { val context = ApplicationProvider.getApplicationContext() val logger = ConsoleLogger() - val source = AndroidStorage(context, UUID.randomUUID().toString(), logger, null) - val destination = AndroidStorage(context, UUID.randomUUID().toString(), logger, null) + val source = AndroidStorage(context, UUID.randomUUID().toString(), logger, null, testDiagnostics) + val destination = AndroidStorage(context, UUID.randomUUID().toString(), logger, null, testDiagnostics) runBlocking { source.writeEvent(createEvent(1)) diff --git a/android/src/test/java/com/amplitude/android/utilities/AndroidStorageTest.kt b/android/src/test/java/com/amplitude/android/utilities/AndroidStorageTest.kt new file mode 100644 index 00000000..7d266bc5 --- /dev/null +++ b/android/src/test/java/com/amplitude/android/utilities/AndroidStorageTest.kt @@ -0,0 +1,333 @@ +package com.amplitude.android.utilities + +import android.content.Context +import androidx.test.core.app.ApplicationProvider +import com.amplitude.android.events.BaseEvent +import com.amplitude.common.jvm.ConsoleLogger +import com.amplitude.core.utilities.Diagnostics +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.json.JSONArray +import org.junit.Assert.assertEquals +import org.junit.Test +import org.junit.runner.RunWith +import org.robolectric.RobolectricTestRunner +import java.io.File + +@RunWith(RobolectricTestRunner::class) +class AndroidStorageTest { + private val context = ApplicationProvider.getApplicationContext() + private val testDiagnostics = Diagnostics() + + @Test + fun `test write event and read`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val storage = AndroidStorage(context, storageKey, logger, "test", testDiagnostics) + + runBlocking { + storage.writeEvent(createEvent("test1")) + storage.writeEvent(createEvent("test2")) + storage.rollover() + storage.writeEvent(createEvent("test3")) + storage.writeEvent(createEvent("test4")) + storage.rollover() + } + + runBlocking { + val eventsData = storage.readEventsContent() + eventsData.withIndex().forEach { (index, filePath) -> + val eventsString = storage.getEventsString(filePath) + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals("test${index * 2 + 1}", events.getJSONObject(0).getString("event_type")) + assertEquals("test${index * 2 + 2}", events.getJSONObject(1).getString("event_type")) + } + } + } + + @Test + fun `could handle earlier version of event files`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val prefix = "test" + createEarlierVersionEventFiles(prefix) + val storage = AndroidStorage(context, storageKey, logger, prefix, testDiagnostics) + + runBlocking { + val eventsData = storage.readEventsContent() + eventsData.withIndex().forEach { (index, filePath) -> + val eventsString = storage.getEventsString(filePath) + val events = JSONArray(eventsString) + if (index == 4) { + assertEquals(1, events.length()) + assertEquals("test9", events.getJSONObject(0).getString("event_type")) + } else { + assertEquals(2, events.length()) + assertEquals( + "test${index * 2 + 1}", + events.getJSONObject(0).getString("event_type"), + ) + assertEquals( + "test${index * 2 + 2}", + events.getJSONObject(1).getString("event_type"), + ) + } + } + } + } + + @Test + fun `handle earlier and new events`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val prefix = "test" + createEarlierVersionEventFiles(prefix) + val storage = AndroidStorage(context, storageKey, logger, prefix, testDiagnostics) + + runBlocking { + storage.writeEvent(createEvent("test13")) + storage.writeEvent(createEvent("test14")) + storage.rollover() + } + + var eventsCount = 0 + runBlocking { + val eventsData = storage.readEventsContent() + eventsData.withIndex().forEach { (_, filePath) -> + val eventsString = storage.getEventsString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + } + assertEquals(13, eventsCount) + } + + @Test + fun `verify could handle line break in event name`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val prefix = "test" + val storage = AndroidStorage(context, storageKey, logger, prefix, testDiagnostics) + + runBlocking { + storage.writeEvent(createEvent("test1\n")) + storage.rollover() + val eventsData = storage.readEventsContent() + eventsData.withIndex().forEach { (_, filePath) -> + val eventsString = storage.getEventsString(filePath) + val events = JSONArray(eventsString) + assertEquals(1, events.length()) + assertEquals("test1\n", events.getJSONObject(0).getString("event_type")) + } + } + } + + @Test + fun `verify malformed event show in diagonstics`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val prefix = "test" + val storageDirectory = context.getDir("$prefix-disk-queue", Context.MODE_PRIVATE) + val file0 = File(storageDirectory, "storageKey-0") + file0.writeText("{\"eventType\":\"test1\"}\u0000{\"eventType\":\"test2\"}\u0000{\"eventType\":\"test3\"\u0000") + val diagnostics = Diagnostics() + val storage = AndroidStorage(context, storageKey, logger, prefix, diagnostics) + runBlocking { + var eventsCount = 0 + val eventsData = storage.readEventsContent() + eventsData.withIndex().forEach { (_, filePath) -> + val eventsString = storage.getEventsString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + assertEquals(2, eventsCount) + assertEquals("{\"malformed_events\":[\"{\\\"eventType\\\":\\\"test3\\\"\"]}", diagnostics.extractDiagnostics()) + } + } + + @Test + fun `concurrent writes to the same instance`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val storage = AndroidStorage(context, storageKey, logger, "test", testDiagnostics) + + runBlocking { + val job1 = + kotlinx.coroutines.GlobalScope.launch { + storage.writeEvent(createEvent("test1")) + storage.writeEvent(createEvent("test2")) + storage.rollover() + } + val job2 = + kotlinx.coroutines.GlobalScope.launch { + storage.writeEvent(createEvent("test3")) + storage.writeEvent(createEvent("test4")) + storage.rollover() + } + val job3 = + kotlinx.coroutines.GlobalScope.launch { + storage.writeEvent(createEvent("test5")) + storage.writeEvent(createEvent("test6")) + storage.rollover() + } + kotlinx.coroutines.joinAll(job1, job2, job3) + } + + var eventsCount = 0 + runBlocking { + val eventsData = storage.readEventsContent() + eventsData.withIndex().forEach { (_index, filePath) -> + val eventsString = storage.getEventsString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + } + assertEquals(6, eventsCount) + } + + @Test + fun `concurrent writes from multiple threads`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val storage = AndroidStorage(context, storageKey, logger, "test", testDiagnostics) + for (i in 0..100) { + val currentThread = + Thread { + runBlocking { + for (d in 0..10) { + storage.writeEvent(createEvent("test$i-$d")) + } + storage.rollover() + } + } + currentThread.start() + currentThread.join() + } + var eventsCount = 0 + runBlocking { + val eventsData = storage.readEventsContent() + eventsData.withIndex().forEach { (_index, filePath) -> + val eventsString = storage.getEventsString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + } + assertEquals(101 * 11, eventsCount) + } + + @Test + fun `concurrent write to two instances`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val prefix = "test" + val storage1 = AndroidStorage(context, storageKey, logger, prefix, testDiagnostics) + val storage2 = AndroidStorage(context, storageKey, logger, prefix, testDiagnostics) + + runBlocking { + val job1 = + kotlinx.coroutines.GlobalScope.launch { + storage1.writeEvent(createEvent("test1")) + storage1.writeEvent(createEvent("test2")) + storage1.rollover() + } + val job2 = + kotlinx.coroutines.GlobalScope.launch { + storage2.writeEvent(createEvent("test3")) + storage2.writeEvent(createEvent("test4")) + storage2.rollover() + } + val job3 = + kotlinx.coroutines.GlobalScope.launch { + storage1.writeEvent(createEvent("test5")) + storage1.writeEvent(createEvent("test6")) + storage1.rollover() + } + val job4 = + kotlinx.coroutines.GlobalScope.launch { + storage2.writeEvent(createEvent("test7")) + storage2.writeEvent(createEvent("test8")) + storage2.rollover() + } + kotlinx.coroutines.joinAll(job1, job2, job3, job4) + } + + var eventsCount = 0 + runBlocking { + val eventsData1 = storage1.readEventsContent() + eventsData1.withIndex().forEach { (_, filePath) -> + val eventsString = storage1.getEventsString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + } + assertEquals(8, eventsCount) + } + + @Test + fun `concurrent write from mutiple threads on multiple instances`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val prefix = "test" + for (i in 0..100) { + val storage = AndroidStorage(context, storageKey, logger, prefix, testDiagnostics) + val currentThread = + Thread { + runBlocking { + for (d in 0..10) { + storage.writeEvent(createEvent("test$i-$d")) + } + storage.rollover() + } + } + currentThread.start() + currentThread.join() + } + var eventsCount = 0 + val storageForRead = AndroidStorage(context, storageKey, logger, prefix, testDiagnostics) + runBlocking { + val eventsData = storageForRead.readEventsContent() + eventsData.withIndex().forEach { (_, filePath) -> + val eventsString = storageForRead.getEventsString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + } + assertEquals(101 * 11, eventsCount) + } + + private fun createEarlierVersionEventFiles(prefix: String) { + val storageDirectory = context.getDir("$prefix-disk-queue", Context.MODE_PRIVATE) + val file0 = File(storageDirectory, "storageKey-0") + file0.writeText( + "[{\"event_type\":\"test1\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\",\"device_brand\":\"OP\",\"device_manufacturer\":\"OP\",\"device_model\":\"C71\",\"carrier\":\"WO\",\"language\":\"es\",\"ip\":\"\$remote\",\"version_name\":\"24.1.0\",\"adid\":\"9ea5\",\"event_id\":3681,\"session_id\":1708434677402,\"insert_id\":\"283b4eda-32d4-4919-9817-f97e53f5f288\",\"library\":\"amplitude-analytics-android\\/1.18\",\"android_app_set_id\":\"2a38\"},{\"event_type\":\"test2\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\"}]", + ) + val file1 = File(storageDirectory, "storageKey-1") + file1.writeText( + ",{\"event_type\":\"test3\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\",\"device_brand\":\"OP\",\"device_manufacturer\":\"OP\",\"device_model\":\"C71\",\"carrier\":\"WO\",\"language\":\"es\",\"ip\":\"\$remote\",\"version_name\":\"24.1.0\",\"adid\":\"9ea5\",\"event_id\":3681,\"session_id\":1708434677402,\"insert_id\":\"283b4eda-32d4-4919-9817-f97e53f5f288\",\"library\":\"amplitude-analytics-android\\/1.18\",\"android_app_set_id\":\"2a38\"},{\"event_type\":\"test4\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\"}]", + ) + val file2 = File(storageDirectory, "storageKey-2") + file2.writeText( + "[[{\"event_type\":\"test5\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\",\"device_brand\":\"OP\",\"device_manufacturer\":\"OP\",\"device_model\":\"C71\",\"carrier\":\"WO\",\"language\":\"es\",\"ip\":\"\$remote\",\"version_name\":\"24.1.0\",\"adid\":\"9ea5\",\"event_id\":3681,\"session_id\":1708434677402,\"insert_id\":\"283b4eda-32d4-4919-9817-f97e53f5f288\",\"library\":\"amplitude-analytics-android\\/1.18\",\"android_app_set_id\":\"2a38\"},{\"event_type\":\"test6\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\"}]]", + ) + val file3 = File(storageDirectory, "storageKey-3") + file3.writeText( + "{\"event_type\":\"test7\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\",\"device_brand\":\"OP\",\"device_manufacturer\":\"OP\",\"device_model\":\"C71\",\"carrier\":\"WO\",\"language\":\"es\",\"ip\":\"\$remote\",\"version_name\":\"24.1.0\",\"adid\":\"9ea5\",\"event_id\":3681,\"session_id\":1708434677402,\"insert_id\":\"283b4eda-32d4-4919-9817-f97e53f5f288\",\"library\":\"amplitude-analytics-android\\/1.18\",\"android_app_set_id\":\"2a38\"},{\"event_type\":\"test8\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\"}]", + ) + val file4 = File(storageDirectory, "storageKey-4") + file4.writeText( + "[{\"event_type\":\"test9\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\",\"device_brand\":\"OP\",\"device_manufacturer\":\"OP\",\"device_model\":\"C71\",\"carrier\":\"WO\",\"language\":\"es\",\"ip\":\"\$remote\",\"version_name\":\"24.1.0\",\"adid\":\"9ea5\",\"event_id\":3681,\"session_id\":1708434677402,\"insert_id\":\"283b4eda-32d4-4919-9817-f97e53f5f288\",\"library\":\"amplitude-analytics-android\\/1.18\",\"android_app_set_id\":\"2a38\"}],{\"event_type\":\"test10\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\"}]", + ) + val file5 = File(storageDirectory, "storageKey-5.tmp") + file5.writeText( + "[{\"event_type\":\"test11\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\",\"device_brand\":\"OP\",\"device_manufacturer\":\"OP\",\"device_model\":\"C71\",\"carrier\":\"WO\",\"language\":\"es\",\"ip\":\"\$remote\",\"version_name\":\"24.1.0\",\"adid\":\"9ea5\",\"event_id\":3681,\"session_id\":1708434677402,\"insert_id\":\"283b4eda-32d4-4919-9817-f97e53f5f288\",\"library\":\"amplitude-analytics-android\\/1.18\",\"android_app_set_id\":\"2a38\"},{\"event_type\":\"test12\",\"user_id\":\"159995596214061\",\"device_id\":\"9b935bb3cd75\",\"time\":1708434679570,\"event_properties\":{},\"user_properties\":{},\"groups\":{},\"group_properties\":{},\"platform\":\"Android\",\"os_name\":\"android\",\"os_version\":\"13\"}", + ) + } + + private fun createEvent(eventType: String): BaseEvent { + val event = BaseEvent() + event.eventType = eventType + event.deviceId = "test-device-id" + return event + } +} diff --git a/core/src/main/java/com/amplitude/core/Amplitude.kt b/core/src/main/java/com/amplitude/core/Amplitude.kt index f9c415cb..9b4edf86 100644 --- a/core/src/main/java/com/amplitude/core/Amplitude.kt +++ b/core/src/main/java/com/amplitude/core/Amplitude.kt @@ -17,6 +17,7 @@ import com.amplitude.core.platform.plugins.ContextPlugin import com.amplitude.core.platform.plugins.GetAmpliExtrasPlugin import com.amplitude.core.utilities.AnalyticsEventReceiver import com.amplitude.core.utilities.AnalyticsIdentityListener +import com.amplitude.core.utilities.Diagnostics import com.amplitude.eventbridge.EventBridgeContainer import com.amplitude.eventbridge.EventChannel import com.amplitude.id.IdentityConfiguration @@ -46,7 +47,7 @@ open class Amplitude internal constructor( val amplitudeDispatcher: CoroutineDispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher(), val networkIODispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(), val storageIODispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(), - val retryDispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + val retryDispatcher: CoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher(), ) { val timeline: Timeline lateinit var storage: Storage @@ -59,6 +60,7 @@ open class Amplitude internal constructor( lateinit var idContainer: IdentityContainer private set val isBuilt: Deferred + val diagnostics = Diagnostics() init { require(configuration.isValid()) { "invalid configuration" } @@ -82,7 +84,7 @@ open class Amplitude internal constructor( instanceName = configuration.instanceName, apiKey = configuration.apiKey, identityStorageProvider = configuration.identityStorageProvider, - logger = logger + logger = logger, ) } @@ -98,27 +100,36 @@ open class Amplitude internal constructor( protected open fun build(): Deferred { val amplitude = this - val built = amplitudeScope.async(amplitudeDispatcher, CoroutineStart.LAZY) { - storage = configuration.storageProvider.getStorage(amplitude) - identifyInterceptStorage = configuration.identifyInterceptStorageProvider.getStorage(amplitude, "amplitude-identify-intercept") - val identityConfiguration = createIdentityConfiguration() - identityStorage = configuration.identityStorageProvider.getIdentityStorage(identityConfiguration) - - amplitude.buildInternal(identityConfiguration) - true - } + val built = + amplitudeScope.async(amplitudeDispatcher, CoroutineStart.LAZY) { + storage = configuration.storageProvider.getStorage(amplitude) + identifyInterceptStorage = + configuration.identifyInterceptStorageProvider.getStorage( + amplitude, + "amplitude-identify-intercept", + ) + val identityConfiguration = createIdentityConfiguration() + identityStorage = configuration.identityStorageProvider.getIdentityStorage(identityConfiguration) + + amplitude.buildInternal(identityConfiguration) + true + } return built } protected open suspend fun buildInternal(identityConfiguration: IdentityConfiguration) { createIdentityContainer(identityConfiguration) - EventBridgeContainer.getInstance(configuration.instanceName).eventBridge.setEventReceiver(EventChannel.EVENT, AnalyticsEventReceiver(this)) - add(object : ContextPlugin() { - override fun setDeviceId(deviceId: String) { - // set device id immediately, don't wait for isBuilt - setDeviceIdInternal(deviceId) - } - }) + EventBridgeContainer.getInstance( + configuration.instanceName, + ).eventBridge.setEventReceiver(EventChannel.EVENT, AnalyticsEventReceiver(this)) + add( + object : ContextPlugin() { + override fun setDeviceId(deviceId: String) { + // set device id immediately, don't wait for isBuilt + setDeviceIdInternal(deviceId) + } + }, + ) add(GetAmpliExtrasPlugin()) add(AmplitudeDestination()) } @@ -137,7 +148,11 @@ open class Amplitude internal constructor( * @return the Amplitude instance */ @JvmOverloads - fun track(event: BaseEvent, options: EventOptions? = null, callback: EventCallBack? = null): Amplitude { + fun track( + event: BaseEvent, + options: EventOptions? = null, + callback: EventCallBack? = null, + ): Amplitude { options ?. let { event.mergeEventOptions(it) } @@ -157,7 +172,11 @@ open class Amplitude internal constructor( * @return the Amplitude instance */ @JvmOverloads - fun track(eventType: String, eventProperties: Map? = null, options: EventOptions? = null): Amplitude { + fun track( + eventType: String, + eventProperties: Map? = null, + options: EventOptions? = null, + ): Amplitude { val event = BaseEvent() event.eventType = eventType event.eventProperties = eventProperties?.toMutableMap() @@ -177,7 +196,10 @@ open class Amplitude internal constructor( * @return the Amplitude instance */ @JvmOverloads - fun identify(userProperties: Map?, options: EventOptions? = null): Amplitude { + fun identify( + userProperties: Map?, + options: EventOptions? = null, + ): Amplitude { return identify(convertPropertiesToIdentify(userProperties), options) } @@ -190,7 +212,10 @@ open class Amplitude internal constructor( * @return the Amplitude instance */ @JvmOverloads - fun identify(identify: Identify, options: EventOptions? = null): Amplitude { + fun identify( + identify: Identify, + options: EventOptions? = null, + ): Amplitude { val event = IdentifyEvent() event.userProperties = identify.properties @@ -284,7 +309,12 @@ open class Amplitude internal constructor( * @return the Amplitude instance */ @JvmOverloads - fun groupIdentify(groupType: String, groupName: String, groupProperties: Map?, options: EventOptions? = null): Amplitude { + fun groupIdentify( + groupType: String, + groupName: String, + groupProperties: Map?, + options: EventOptions? = null, + ): Amplitude { return groupIdentify(groupType, groupName, convertPropertiesToIdentify(groupProperties), options) } @@ -298,7 +328,12 @@ open class Amplitude internal constructor( * @return the Amplitude instance */ @JvmOverloads - fun groupIdentify(groupType: String, groupName: String, identify: Identify, options: EventOptions? = null): Amplitude { + fun groupIdentify( + groupType: String, + groupName: String, + identify: Identify, + options: EventOptions? = null, + ): Amplitude { val event = GroupIdentifyEvent() val group = mutableMapOf() group.put(groupType, groupName) @@ -320,12 +355,17 @@ open class Amplitude internal constructor( * @return the Amplitude instance */ @JvmOverloads - fun setGroup(groupType: String, groupName: String, options: EventOptions? = null): Amplitude { + fun setGroup( + groupType: String, + groupName: String, + options: EventOptions? = null, + ): Amplitude { val identify = Identify().set(groupType, groupName) - val event = IdentifyEvent().apply { - groups = mutableMapOf(groupType to groupName) - userProperties = identify.properties - } + val event = + IdentifyEvent().apply { + groups = mutableMapOf(groupType to groupName) + userProperties = identify.properties + } track(event, options) return this } @@ -339,12 +379,17 @@ open class Amplitude internal constructor( * @return the Amplitude instance */ @JvmOverloads - fun setGroup(groupType: String, groupName: Array, options: EventOptions? = null): Amplitude { + fun setGroup( + groupType: String, + groupName: Array, + options: EventOptions? = null, + ): Amplitude { val identify = Identify().set(groupType, groupName) - val event = IdentifyEvent().apply { - groups = mutableMapOf(groupType to groupName) - userProperties = identify.properties - } + val event = + IdentifyEvent().apply { + groups = mutableMapOf(groupType to groupName) + userProperties = identify.properties + } track(event, options) return this } @@ -364,7 +409,10 @@ open class Amplitude internal constructor( * @return the Amplitude instance */ @JvmOverloads - fun revenue(revenue: Revenue, options: EventOptions? = null): Amplitude { + fun revenue( + revenue: Revenue, + options: EventOptions? = null, + ): Amplitude { if (!revenue.isValid()) { logger.warn("Invalid revenue object, missing required fields") return this @@ -459,7 +507,10 @@ open class Amplitude internal constructor( * @param configs * @return */ -fun Amplitude(apiKey: String, configs: Configuration.() -> Unit): Amplitude { +fun Amplitude( + apiKey: String, + configs: Configuration.() -> Unit, +): Amplitude { val config = Configuration(apiKey) configs.invoke(config) return Amplitude(config) diff --git a/core/src/main/java/com/amplitude/core/Storage.kt b/core/src/main/java/com/amplitude/core/Storage.kt index a18e94be..f5beebca 100644 --- a/core/src/main/java/com/amplitude/core/Storage.kt +++ b/core/src/main/java/com/amplitude/core/Storage.kt @@ -7,7 +7,6 @@ import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope interface Storage { - enum class Constants(val rawVal: String) { LAST_EVENT_ID("last_event_id"), PREVIOUS_SESSION_ID("previous_session_id"), @@ -15,12 +14,15 @@ interface Storage { OPT_OUT("opt_out"), Events("events"), APP_VERSION("app_version"), - APP_BUILD("app_build") + APP_BUILD("app_build"), } suspend fun writeEvent(event: BaseEvent) - suspend fun write(key: Constants, value: String) + suspend fun write( + key: Constants, + value: String, + ) suspend fun remove(key: Constants) @@ -32,9 +34,17 @@ interface Storage { suspend fun getEventsString(content: Any): String - fun getResponseHandler(eventPipeline: EventPipeline, configuration: Configuration, scope: CoroutineScope, dispatcher: CoroutineDispatcher): ResponseHandler + fun getResponseHandler( + eventPipeline: EventPipeline, + configuration: Configuration, + scope: CoroutineScope, + dispatcher: CoroutineDispatcher, + ): ResponseHandler } interface StorageProvider { - fun getStorage(amplitude: Amplitude, prefix: String? = null): Storage + fun getStorage( + amplitude: Amplitude, + prefix: String? = null, + ): Storage } diff --git a/core/src/main/java/com/amplitude/core/platform/EventPipeline.kt b/core/src/main/java/com/amplitude/core/platform/EventPipeline.kt index 78886073..6a2bcbf7 100644 --- a/core/src/main/java/com/amplitude/core/platform/EventPipeline.kt +++ b/core/src/main/java/com/amplitude/core/platform/EventPipeline.kt @@ -16,9 +16,8 @@ import java.io.FileNotFoundException import java.util.concurrent.atomic.AtomicInteger class EventPipeline( - private val amplitude: Amplitude + private val amplitude: Amplitude, ) { - private val writeChannel: Channel private val uploadChannel: Channel @@ -59,12 +58,13 @@ class EventPipeline( registerShutdownHook() - responseHandler = storage.getResponseHandler( - this@EventPipeline, - amplitude.configuration, - scope, - amplitude.retryDispatcher, - ) + responseHandler = + storage.getResponseHandler( + this@EventPipeline, + amplitude.configuration, + scope, + amplitude.retryDispatcher, + ) } fun put(event: BaseEvent) { @@ -88,67 +88,71 @@ class EventPipeline( running = false } - private fun write() = scope.launch(amplitude.storageIODispatcher) { - for (message in writeChannel) { - // write to storage - val triggerFlush = (message.type == WriteQueueMessageType.FLUSH) - if (!triggerFlush && message.event != null) try { - storage.writeEvent(message.event) - } catch (e: Exception) { - e.logWithStackTrace(amplitude.logger, "Error when writing event to pipeline") - } + private fun write() = + scope.launch(amplitude.storageIODispatcher) { + for (message in writeChannel) { + // write to storage + val triggerFlush = (message.type == WriteQueueMessageType.FLUSH) + if (!triggerFlush && message.event != null) { + try { + storage.writeEvent(message.event) + } catch (e: Exception) { + e.logWithStackTrace(amplitude.logger, "Error when writing event to pipeline") + } + } - // Skip flush when offline - if (amplitude.configuration.offline == true) { - continue - } + // Skip flush when offline + if (amplitude.configuration.offline == true) { + continue + } - // if flush condition met, generate paths - if (eventCount.incrementAndGet() >= getFlushCount() || triggerFlush) { - eventCount.set(0) - uploadChannel.trySend(UPLOAD_SIG) - } else { - schedule() + // if flush condition met, generate paths + if (eventCount.incrementAndGet() >= getFlushCount() || triggerFlush) { + eventCount.set(0) + uploadChannel.trySend(UPLOAD_SIG) + } else { + schedule() + } } } - } - - private fun upload() = scope.launch(amplitude.networkIODispatcher) { - uploadChannel.consumeEach { - withContext(amplitude.storageIODispatcher) { - try { - storage.rollover() - } catch (e: FileNotFoundException) { - e.message?.let { - amplitude.logger.warn("Event storage file not found: $it") + private fun upload() = + scope.launch(amplitude.networkIODispatcher) { + uploadChannel.consumeEach { + withContext(amplitude.storageIODispatcher) { + try { + storage.rollover() + } catch (e: FileNotFoundException) { + e.message?.let { + amplitude.logger.warn("Event storage file not found: $it") + } } } - } - - val eventsData = storage.readEventsContent() - for (events in eventsData) { - try { - val eventsString = storage.getEventsString(events) - if (eventsString.isEmpty()) continue - val connection = httpClient.upload() - connection.outputStream?.let { - connection.setEvents(eventsString) - // Upload the payloads. - connection.close() - } - responseHandler.handle(connection.response, events, eventsString) - } catch (e: FileNotFoundException) { - e.message?.let { - amplitude.logger.warn("Event storage file not found: $it") + val eventsData = storage.readEventsContent() + for (events in eventsData) { + try { + val eventsString = storage.getEventsString(events) + if (eventsString.isEmpty()) continue + val connection = httpClient.upload() + connection.outputStream?.let { + connection.setEvents(eventsString) + connection.setDiagnostics(amplitude.diagnostics) + // Upload the payloads. + connection.close() + } + + responseHandler.handle(connection.response, events, eventsString) + } catch (e: FileNotFoundException) { + e.message?.let { + amplitude.logger.warn("Event storage file not found: $it") + } + } catch (e: Exception) { + e.logWithStackTrace(amplitude.logger, "Error when uploading event") } - } catch (e: Exception) { - e.logWithStackTrace(amplitude.logger, "Error when uploading event") } } } - } private fun getFlushCount(): Int { val count = flushQueueSize / flushSizeDivider.get() @@ -159,30 +163,34 @@ class EventPipeline( return flushInterval } - private fun schedule() = scope.launch(amplitude.storageIODispatcher) { - if (isActive && running && !scheduled && !exceededRetries) { - scheduled = true - delay(getFlushIntervalInMillis()) - flush() - scheduled = false + private fun schedule() = + scope.launch(amplitude.storageIODispatcher) { + if (isActive && running && !scheduled && !exceededRetries) { + scheduled = true + delay(getFlushIntervalInMillis()) + flush() + scheduled = false + } } - } private fun registerShutdownHook() { // close the stream if the app shuts down - Runtime.getRuntime().addShutdownHook(object : Thread() { - override fun run() { - this@EventPipeline.stop() - } - }) + Runtime.getRuntime().addShutdownHook( + object : Thread() { + override fun run() { + this@EventPipeline.stop() + } + }, + ) } } enum class WriteQueueMessageType { - EVENT, FLUSH + EVENT, + FLUSH, } data class WriteQueueMessage( val type: WriteQueueMessageType, - val event: BaseEvent? + val event: BaseEvent?, ) diff --git a/core/src/main/java/com/amplitude/core/utilities/Diagnostics.kt b/core/src/main/java/com/amplitude/core/utilities/Diagnostics.kt new file mode 100644 index 00000000..7d3579ca --- /dev/null +++ b/core/src/main/java/com/amplitude/core/utilities/Diagnostics.kt @@ -0,0 +1,47 @@ +package com.amplitude.core.utilities + +import java.util.Collections + +class Diagnostics() { + private var malformedEvents: MutableList? = null + private var errorLogs: MutableList? = null + + fun addMalformedEvent(event: String) { + if (malformedEvents == null) { + malformedEvents = Collections.synchronizedList(mutableListOf()) + } + malformedEvents?.add(event) + } + + fun addErrorLog(log: String) { + if (errorLogs == null) { + errorLogs = Collections.synchronizedList(mutableListOf()) + } + errorLogs?.add(log) + } + + fun hasDiagnostics(): Boolean { + return (malformedEvents != null && malformedEvents!!.isNotEmpty()) || (errorLogs != null && errorLogs!!.isNotEmpty()) + } + + /** + * Extracts the diagnostics as a JSON string. + * @return JSON string of diagnostics or empty if no diagnostics are present. + */ + fun extractDiagnostics(): String { + if (!hasDiagnostics()) { + return "" + } + val diagnostics = mutableMapOf>() + if (malformedEvents != null && malformedEvents!!.isNotEmpty()) { + diagnostics["malformed_events"] = malformedEvents!! + } + if (errorLogs != null && errorLogs!!.isNotEmpty()) { + diagnostics["error_logs"] = errorLogs!! + } + val result = diagnostics.toJSONObject().toString() + malformedEvents?.clear() + errorLogs?.clear() + return result + } +} diff --git a/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt b/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt index d0828e61..47e01167 100644 --- a/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt +++ b/core/src/main/java/com/amplitude/core/utilities/EventsFileManager.kt @@ -1,68 +1,94 @@ package com.amplitude.core.utilities +import com.amplitude.common.Logger import com.amplitude.id.utilities.KeyValueStore import com.amplitude.id.utilities.createDirectory +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.json.JSONArray +import org.json.JSONException +import org.json.JSONObject import java.io.BufferedReader import java.io.File +import java.io.FileNotFoundException import java.io.FileOutputStream +import java.io.IOException +import java.io.UnsupportedEncodingException import java.util.Collections +import java.util.Random import java.util.concurrent.ConcurrentHashMap class EventsFileManager( private val directory: File, private val storageKey: String, - private val kvs: KeyValueStore + private val kvs: KeyValueStore, + private val logger: Logger, + private val diagnostics: Diagnostics, ) { - init { - createDirectory(directory) - } - private val fileIndexKey = "amplitude.events.file.index.$storageKey" + private val storageVersionKey = "amplitude.events.file.version.$storageKey" + val filePathSet: MutableSet = Collections.newSetFromMap(ConcurrentHashMap()) + val curFile: MutableMap = ConcurrentHashMap() companion object { const val MAX_FILE_SIZE = 975_000 // 975KB + const val DELIMITER = "\u0000" + val writeMutexMap = ConcurrentHashMap() + val readMutexMap = ConcurrentHashMap() } - val writeMutex = Mutex() - val readMutex = Mutex() - val filePathSet: MutableSet = Collections.newSetFromMap(ConcurrentHashMap()) - val curFile: MutableMap = ConcurrentHashMap() + val writeMutex = writeMutexMap.getOrPut(storageKey) { Mutex() } + private val readMutex = readMutexMap.getOrPut(storageKey) { Mutex() } + + init { + guardDirectory() + runBlocking { + handleV1Files() + } + } /** * closes existing file, if at capacity * opens a new file, if current file is full or uncreated * stores the event */ - suspend fun storeEvent(event: String) = writeMutex.withLock { - var file = currentFile() - if (!file.exists()) { - // create it - file.createNewFile() - } - - // check if file is at capacity - while (file.length() > MAX_FILE_SIZE) { - finish(file) - // update index - file = currentFile() + suspend fun storeEvent(event: String) = + writeMutex.withLock { + if (!guardDirectory()) { + return@withLock + } + var file = currentFile() if (!file.exists()) { // create it - file.createNewFile() + try { + file.createNewFile() + } catch (e: IOException) { + diagnostics.addErrorLog("Failed to create new storage file: ${e.message}") + logger.error("Failed to create new storage file: ${file.path}") + return@withLock + } } - } - var contents = "" - if (file.length() == 0L) { - start(file) - } else if (file.length() > 1) { - contents += "," + // check if file is at capacity + while (file.length() > MAX_FILE_SIZE) { + finish(file) + // update index + file = currentFile() + if (!file.exists()) { + // create it + try { + file.createNewFile() + } catch (e: IOException) { + diagnostics.addErrorLog("Failed to create new storage file: ${e.message}") + logger.error("Failed to create new storage file: ${file.path}") + return@withLock + } + } + } + val contents = event.replace(DELIMITER, "") + DELIMITER + writeToFile(contents.toByteArray(), file, true) } - contents += event - writeToFile(contents.toByteArray(), file) - } private fun incrementFileIndex(): Boolean { val index = kvs.getLong(fileIndexKey, 0) @@ -74,9 +100,10 @@ class EventsFileManager( */ fun read(): List { // we need to filter out .temp file, since it's operating on the writing thread - val fileList = directory.listFiles { _, name -> - name.contains(storageKey) && !name.endsWith(".tmp") - } ?: emptyArray() + val fileList = + directory.listFiles { _, name -> + name.contains(storageKey) && !name.endsWith(".tmp") && !name.endsWith(".properties") + } ?: emptyArray() return fileList.sortedBy { it -> getSortKeyForFile(it) }.map { @@ -92,28 +119,26 @@ class EventsFileManager( return File(filePath).delete() } - private fun start(file: File) { - // start batch object and events array - val contents = """[""" - writeToFile(contents.toByteArray(), file) - } - /** * closes current file, and increase the index * so next write go to a new file */ - suspend fun rollover() = writeMutex.withLock { - val file = currentFile() - if (file.exists() && file.length() > 0) { - finish(file) + suspend fun rollover() = + writeMutex.withLock { + val file = currentFile() + if (file.exists() && file.length() > 0) { + finish(file) + } } - } /** * Split one file to two smaller file * This is used to handle payload too large error response */ - fun splitFile(filePath: String, events: JSONArray) { + fun splitFile( + filePath: String, + events: JSONArray, + ) { val originalFile = File(filePath) if (!originalFile.exists()) { return @@ -122,50 +147,96 @@ class EventsFileManager( val firstHalfFile = File(directory, "$fileName-1.tmp") val secondHalfFile = File(directory, "$fileName-2.tmp") val splitStrings = events.split() - writeToFile(splitStrings.first, firstHalfFile) - writeToFile(splitStrings.second, secondHalfFile) + writeEventsToSplitFile(splitStrings.first, firstHalfFile) + writeEventsToSplitFile(splitStrings.second, secondHalfFile) this.remove(filePath) } - suspend fun getEventString(filePath: String): String = readMutex.withLock { - // Block one time of file reads if another task has read the content of this file - if (filePathSet.contains(filePath)) { - filePathSet.remove(filePath) - return "" - } - filePathSet.add(filePath) - File(filePath).bufferedReader().use { - return it.readText() + suspend fun getEventString(filePath: String): String = + readMutex.withLock { + // Block one time of file reads if another task has read the content of this file + if (filePathSet.contains(filePath)) { + filePathSet.remove(filePath) + return@withLock "" + } + filePathSet.add(filePath) + File(filePath).bufferedReader().use { + val content = it.readText() + val isCurrentVersion = content.endsWith(DELIMITER) + if (isCurrentVersion) { + // handle current version + val events = JSONArray() + content.split(DELIMITER).forEach { + if (it.isNotEmpty()) { + try { + events.put(JSONObject(it)) + } catch (e: JSONException) { + diagnostics.addMalformedEvent(it) + logger.error("Failed to parse event: $it") + } + } + } + return@use if (events.length() > 0) { + events.toString() + } else { + "" + } + } else { + // handle earlier versions. This is for backward compatibility for safety and would be removed later. + val normalizedContent = "[${content.trimStart('[', ',').trimEnd(']', ',')}]" + try { + val jsonArray = JSONArray(normalizedContent) + return@use jsonArray.toString() + } catch (e: JSONException) { + diagnostics.addMalformedEvent(normalizedContent) + logger.error("Failed to parse events: $normalizedContent, dropping file: $filePath") + this.remove(filePath) + return@use normalizedContent + } + } + } } - } fun release(filePath: String) { filePathSet.remove(filePath) } private fun finish(file: File?) { - if (file == null || !file.exists() || file.length() == 0L) { + rename(file ?: return) + incrementFileIndex() + reset() + } + + private fun rename(file: File) { + if (!file.exists() || file.extension.isEmpty()) { // if tmp file doesn't exist or empty then we don't need to do anything return } - // close events array and batch object - val contents = """]""" - writeToFile(contents.toByteArray(), file) - file.renameTo(File(directory, file.nameWithoutExtension)) - incrementFileIndex() - reset() + val fileNameWithoutExtension = file.nameWithoutExtension + val finishedFile = File(directory, fileNameWithoutExtension) + if (finishedFile.exists()) { + logger.debug("File already exists: $finishedFile, handle gracefully.") + // if the file already exists, race condition detected and rename the current file to a new name to avoid collision + val newName = "$fileNameWithoutExtension-${System.currentTimeMillis()}-${Random().nextInt(1000)}" + file.renameTo(File(directory, newName)) + return + } else { + file.renameTo(File(directory, file.nameWithoutExtension)) + } } // return the current tmp file private fun currentFile(): File { - val file = curFile[storageKey] ?: run { - // check leftover tmp file - val fileList = directory.listFiles { _, name -> - name.contains(storageKey) && name.endsWith(".tmp") - } ?: emptyArray() + val file = + curFile[storageKey] ?: run { + // check leftover tmp file + val fileList = + directory.listFiles { _, name -> + name.contains(storageKey) && name.endsWith(".tmp") + } ?: emptyArray() - fileList.getOrNull(0) - } + fileList.getOrNull(0) + } val index = kvs.getLong(fileIndexKey, 0) curFile[storageKey] = file ?: File(directory, "$storageKey-$index.tmp") return curFile[storageKey]!! @@ -181,23 +252,104 @@ class EventsFileManager( } // write to underlying file - private fun writeToFile(content: ByteArray, file: File) { - FileOutputStream(file, true).use { - it.write(content) - it.flush() + private fun writeToFile( + content: ByteArray, + file: File, + append: Boolean = true, + ) { + try { + FileOutputStream(file, append).use { + it.write(content) + it.flush() + } + } catch (e: FileNotFoundException) { + diagnostics.addErrorLog(("Error writing to file: ${e.message}")) + logger.error("File not found: ${file.path}") + } catch (e: IOException) { + diagnostics.addErrorLog(("Error writing to file: ${e.message}")) + logger.error("Failed to write to file: ${file.path}") + } catch (e: SecurityException) { + diagnostics.addErrorLog(("Error writing to file: ${e.message}")) + logger.error("Security exception when saving event: ${e.message}") + } catch (e: Exception) { + diagnostics.addErrorLog(("Error writing to file: ${e.message}")) + logger.error("Failed to write to file: ${file.path}") } } - private fun writeToFile(content: String, file: File) { - file.createNewFile() - FileOutputStream(file).use { - it.write(content.toByteArray()) - it.flush() + private fun writeEventsToSplitFile( + events: List, + file: File, + append: Boolean = true, + ) { + try { + val contents = + events.joinToString(separator = DELIMITER, postfix = DELIMITER) { + it.toString().replace( + DELIMITER, + "", + ) + } + file.createNewFile() + writeToFile(contents.toByteArray(), file, append) + rename(file) + } catch (e: IOException) { + diagnostics.addErrorLog("Failed to create or write to split file: ${e.message}") + logger.error("Failed to create or write to split file: ${file.path}") + } catch (e: UnsupportedEncodingException) { + diagnostics.addErrorLog("Failed to encode event: ${e.message}") + logger.error("Failed to encode event: ${e.message}") + } catch (e: Exception) { + diagnostics.addErrorLog("Failed to write to split file: ${e.message}") + logger.error("Failed to write to split file: ${file.path} for error: ${e.message}") } - file.renameTo(File(directory, file.nameWithoutExtension)) } private fun reset() { curFile.remove(storageKey) } + + /** + * Migrate V1 files to V2 format + */ + private suspend fun handleV1Files() = + writeMutex.withLock { + if (kvs.getLong(storageVersionKey, 1L) > 1L) { + return@withLock + } + val unFinishedFiles = + directory.listFiles { _, name -> + name.contains(storageKey) && !name.endsWith(".properties") + } ?: emptyArray() + unFinishedFiles.forEach { + val content = it.readText() + if (!content.endsWith(DELIMITER)) { + // handle earlier versions + val normalizedContent = "[${content.trimStart('[', ',').trimEnd(']', ',')}]" + try { + val jsonArray = JSONArray(normalizedContent) + val list = jsonArray.toJSONObjectList() + writeEventsToSplitFile(list, it, false) + if (it.extension == "tmp") { + finish(it) + } + } catch (e: JSONException) { + logger.error("Failed to parse events: $normalizedContent, dropping file: ${it.path}") + this.remove(it.path) + } + } + } + kvs.putLong(storageVersionKey, 2) + } + + private fun guardDirectory(): Boolean { + try { + createDirectory(directory) + return true + } catch (e: IOException) { + diagnostics.addErrorLog("Failed to create directory: ${e.message}") + logger.error("Failed to create directory for events storage: ${directory.path}") + return false + } + } } diff --git a/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt b/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt index 3b978ad6..6878d62f 100644 --- a/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt +++ b/core/src/main/java/com/amplitude/core/utilities/FileStorage.kt @@ -17,9 +17,9 @@ import java.io.File class FileStorage( storageKey: String, private val logger: Logger, - private val prefix: String? + private val prefix: String?, + private val diagnostics: Diagnostics, ) : Storage, EventsFileStorage { - companion object { const val STORAGE_PREFIX = "amplitude-kotlin" } @@ -28,7 +28,7 @@ class FileStorage( private val storageDirectoryEvents = File(storageDirectory, "events") private val propertiesFile = PropertiesFile(storageDirectory, storageKey, getPrefix(), null) - private val eventsFile = EventsFileManager(storageDirectoryEvents, storageKey, propertiesFile) + private val eventsFile = EventsFileManager(storageDirectoryEvents, storageKey, propertiesFile, logger, diagnostics) private val eventCallbacksMap = mutableMapOf() init { @@ -44,7 +44,10 @@ class FileStorage( } } - override suspend fun write(key: Storage.Constants, value: String) { + override suspend fun write( + key: Storage.Constants, + value: String, + ) { propertiesFile.putString(key.rawVal, value) } @@ -86,7 +89,7 @@ class FileStorage( configuration, scope, dispatcher, - logger + logger, ) } @@ -102,7 +105,10 @@ class FileStorage( eventCallbacksMap.remove(insertId) } - override fun splitEventFile(filePath: String, events: JSONArray) { + override fun splitEventFile( + filePath: String, + events: JSONArray, + ) { eventsFile.splitFile(filePath, events) } @@ -112,11 +118,15 @@ class FileStorage( } class FileStorageProvider : StorageProvider { - override fun getStorage(amplitude: Amplitude, prefix: String?): Storage { + override fun getStorage( + amplitude: Amplitude, + prefix: String?, + ): Storage { return FileStorage( amplitude.configuration.instanceName, amplitude.configuration.loggerProvider.getLogger(amplitude), - prefix + prefix, + amplitude.diagnostics, ) } } @@ -128,7 +138,10 @@ interface EventsFileStorage { fun removeEventCallback(insertId: String) - fun splitEventFile(filePath: String, events: JSONArray) + fun splitEventFile( + filePath: String, + events: JSONArray, + ) fun readEventsContent(): List diff --git a/core/src/main/java/com/amplitude/core/utilities/HttpClient.kt b/core/src/main/java/com/amplitude/core/utilities/HttpClient.kt index a7b977c4..62e3a527 100644 --- a/core/src/main/java/com/amplitude/core/utilities/HttpClient.kt +++ b/core/src/main/java/com/amplitude/core/utilities/HttpClient.kt @@ -17,9 +17,8 @@ import java.util.Date import java.util.TimeZone internal class HttpClient( - private val configuration: Configuration + private val configuration: Configuration, ) { - fun upload(): Connection { val connection: HttpURLConnection = getConnection(getApiHost()) val outputStream: OutputStream = connection.outputStream @@ -54,11 +53,12 @@ internal class HttpClient( } private fun getConnection(url: String): HttpURLConnection { - val requestedURL: URL = try { - URL(url) - } catch (e: MalformedURLException) { - throw IOException("Attempted to use malformed url: $url", e) - } + val requestedURL: URL = + try { + URL(url) + } catch (e: MalformedURLException) { + throw IOException("Attempted to use malformed url: $url", e) + } val connection = requestedURL.openConnection() as HttpURLConnection connection.requestMethod = "POST" connection.setRequestProperty("Content-Type", "application/json; charset=utf-8") @@ -111,13 +111,13 @@ internal class HttpClient( abstract class Connection( val connection: HttpURLConnection, val inputStream: InputStream?, - val outputStream: OutputStream? + val outputStream: OutputStream?, ) : Closeable { - private lateinit var apiKey: String private lateinit var clientUploadTime: String private lateinit var events: String private var minIdLength: Int? = null + private var diagnostics: Diagnostics? = null internal lateinit var response: Response @Throws(IOException::class) @@ -141,6 +141,10 @@ abstract class Connection( this.events = events } + internal fun setDiagnostics(diagnostics: Diagnostics) { + this.diagnostics = diagnostics + } + internal fun setBody() { this.outputStream?.let { val bodyString = getBodyStr() @@ -150,10 +154,16 @@ abstract class Connection( } private fun getBodyStr(): String { - if (minIdLength == null) { - return "{\"api_key\":\"$apiKey\",\"client_upload_time\":\"$clientUploadTime\",\"events\":$events}" + return buildString { + append("{\"api_key\":\"$apiKey\",\"client_upload_time\":\"$clientUploadTime\",\"events\":$events") + if (minIdLength != null) { + append(",\"options\":{\"min_id_length\":$minIdLength}") + } + if (diagnostics != null && diagnostics!!.hasDiagnostics()) { + append(",\"request_metadata\":{\"sdk\":${diagnostics!!.extractDiagnostics()}}") + } + append("}") } - return "{\"api_key\":\"$apiKey\",\"client_upload_time\":\"$clientUploadTime\",\"events\":$events,\"options\":{\"min_id_length\":$minIdLength}}" } } @@ -163,5 +173,5 @@ enum class HttpStatus(val code: Int) { TIMEOUT(408), PAYLOAD_TOO_LARGE(413), TOO_MANY_REQUESTS(429), - FAILED(500) + FAILED(500), } diff --git a/core/src/main/java/com/amplitude/core/utilities/JSONUtil.kt b/core/src/main/java/com/amplitude/core/utilities/JSONUtil.kt index 87c2791a..9462bb46 100644 --- a/core/src/main/java/com/amplitude/core/utilities/JSONUtil.kt +++ b/core/src/main/java/com/amplitude/core/utilities/JSONUtil.kt @@ -9,7 +9,6 @@ import org.json.JSONException import org.json.JSONObject object JSONUtil { - fun eventToJsonObject(event: BaseEvent): JSONObject { val eventJSON = JSONObject() eventJSON.put("event_type", event.eventType) @@ -97,10 +96,7 @@ object JSONUtil { } } catch (e: JSONException) { throw IllegalArgumentException( - ( - "JSON parsing error. Too long (>" + - Constants.MAX_STRING_LENGTH - ) + " chars) or invalid JSON" + "JSON parsing error. Too long (> ${Constants.MAX_STRING_LENGTH} chars) or invalid JSON", ) } } @@ -126,14 +122,21 @@ object JSONUtil { } private fun truncate(value: String): String { - return if (value.length <= Constants.MAX_STRING_LENGTH) value else value.substring( - 0, - Constants.MAX_STRING_LENGTH - ) + return if (value.length <= Constants.MAX_STRING_LENGTH) { + value + } else { + value.substring( + 0, + Constants.MAX_STRING_LENGTH, + ) + } } } -internal fun JSONObject.getStringWithDefault(key: String, defaultValue: String): String { +internal fun JSONObject.getStringWithDefault( + key: String, + defaultValue: String, +): String { if (this.has(key)) { return this.getString(key) } @@ -203,7 +206,15 @@ fun JSONObject.toBaseEvent(): BaseEvent { event.library = if (this.has("library")) this.getString("library") else null event.partnerId = this.optionalString("partner_id", null) event.plan = if (this.has("plan")) Plan.fromJSONObject(this.getJSONObject("plan")) else null - event.ingestionMetadata = if (this.has("ingestion_metadata")) IngestionMetadata.fromJSONObject(this.getJSONObject("ingestion_metadata")) else null + event.ingestionMetadata = + if (this.has( + "ingestion_metadata", + ) + ) { + IngestionMetadata.fromJSONObject(this.getJSONObject("ingestion_metadata")) + } else { + null + } return event } @@ -215,34 +226,51 @@ fun JSONArray.toEvents(): List { return events } -internal fun JSONArray.split(): Pair { +internal fun JSONArray.split(): Pair, List> { val mid = this.length() / 2 - val firstHalf = JSONArray() - val secondHalf = JSONArray() - (0 until this.length()).forEach { index, -> + val firstHalf = mutableListOf() + val secondHalf = mutableListOf() + (0 until this.length()).forEach { index -> if (index < mid) { - firstHalf.put(this.getJSONObject(index)) + firstHalf.add(this.getJSONObject(index)) } else { - secondHalf.put(this.getJSONObject(index)) + secondHalf.add(this.getJSONObject(index)) } } - return Pair(firstHalf.toString(), secondHalf.toString()) + return Pair(firstHalf, secondHalf) +} + +internal fun JSONArray.toJSONObjectList(): List { + val list = mutableListOf() + (0 until this.length()).forEach { + list.add(this.getJSONObject(it)) + } + return list } -internal fun JSONObject.addValue(key: String, value: Any?) { +internal fun JSONObject.addValue( + key: String, + value: Any?, +) { value?.let { this.put(key, value) } } -fun JSONObject.optionalJSONObject(key: String, defaultValue: JSONObject?): JSONObject? { +fun JSONObject.optionalJSONObject( + key: String, + defaultValue: JSONObject?, +): JSONObject? { if (this.has(key)) { return this.getJSONObject(key) } return defaultValue } -fun JSONObject.optionalString(key: String, defaultValue: String?): String? { +fun JSONObject.optionalString( + key: String, + defaultValue: String?, +): String? { if (this.has(key)) { return this.getString(key) } diff --git a/core/src/test/kotlin/com/amplitude/core/utilities/DiagnosticsTest.kt b/core/src/test/kotlin/com/amplitude/core/utilities/DiagnosticsTest.kt new file mode 100644 index 00000000..a51af57d --- /dev/null +++ b/core/src/test/kotlin/com/amplitude/core/utilities/DiagnosticsTest.kt @@ -0,0 +1,44 @@ +package com.amplitude.core.utilities + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test + +class DiagnosticsTest { + @Test + fun `test addMalformedEvent`() { + val diagnostics = Diagnostics() + diagnostics.addMalformedEvent("event") + assertTrue(diagnostics.hasDiagnostics()) + assertEquals("{\"malformed_events\":[\"event\"]}", diagnostics.extractDiagnostics()) + } + + @Test + fun `test addErrorLog`() { + val diagnostics = Diagnostics() + diagnostics.addErrorLog("log") + assertTrue(diagnostics.hasDiagnostics()) + assertEquals("{\"error_logs\":[\"log\"]}", diagnostics.extractDiagnostics()) + } + + @Test + fun `test hasDiagnostics`() { + val diagnostics = Diagnostics() + assertFalse(diagnostics.hasDiagnostics()) + diagnostics.addMalformedEvent("event") + assertTrue(diagnostics.hasDiagnostics()) + diagnostics.addErrorLog("log") + assertTrue(diagnostics.hasDiagnostics()) + } + + @Test + fun `test extractDiagnostics`() { + val diagnostics = Diagnostics() + assertEquals("", diagnostics.extractDiagnostics()) + diagnostics.addErrorLog("log") + diagnostics.addMalformedEvent("event") + assertEquals("{\"error_logs\":[\"log\"],\"malformed_events\":[\"event\"]}", diagnostics.extractDiagnostics()) + assertFalse(diagnostics.hasDiagnostics()) + } +} diff --git a/core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt b/core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt new file mode 100644 index 00000000..7f7ab1f6 --- /dev/null +++ b/core/src/test/kotlin/com/amplitude/core/utilities/EventsFileManagerTest.kt @@ -0,0 +1,499 @@ +package com.amplitude.core.utilities + +import com.amplitude.common.jvm.ConsoleLogger +import com.amplitude.id.utilities.PropertiesFile +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.json.JSONArray +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir +import java.io.File +import kotlin.concurrent.thread + +class EventsFileManagerTest { + @TempDir + lateinit var tempDir: File + + private val testDiagnostics = Diagnostics() + + @Test + fun `test store event and read`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + runBlocking { + eventsFileManager.storeEvent(createEvent("test1")) + eventsFileManager.storeEvent(createEvent("test2")) + eventsFileManager.rollover() + eventsFileManager.storeEvent(createEvent("test3")) + eventsFileManager.storeEvent(createEvent("test4")) + eventsFileManager.rollover() + eventsFileManager.storeEvent(createEvent("test5")) + } + val filePaths = eventsFileManager.read() + assertEquals(2, filePaths.size) + filePaths.withIndex().forEach { (index, filePath) -> + // verify file name and raw content + val file = File(filePath) + assertEquals("$storageKey-$index", file.name) + val content = file.readText() + val lines = content.split(EventsFileManager.DELIMITER) + assertEquals(3, lines.size) + assertEquals(createEvent("test${index * 2 + 1}"), lines[0]) + assertEquals(createEvent("test${index * 2 + 2}"), lines[1]) + assertEquals("", lines[2]) + } + + runBlocking { + // verify the content read from the file + val eventsString0 = eventsFileManager.getEventString(filePaths[0]) + val eventsString1 = eventsFileManager.getEventString(filePaths[1]) + val events0 = JSONArray(eventsString0) + val events1 = JSONArray(eventsString1) + assertEquals(2, events0.length()) + assertEquals(2, events1.length()) + assertEquals("test1", events0.getJSONObject(0).getString("eventType")) + assertEquals("test2", events0.getJSONObject(1).getString("eventType")) + assertEquals("test3", events1.getJSONObject(0).getString("eventType")) + assertEquals("test4", events1.getJSONObject(1).getString("eventType")) + } + } + + @Test + fun `rollover should finish current non-empty temp file`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + runBlocking { + eventsFileManager.storeEvent(createEvent("test1")) + } + val filePaths = eventsFileManager.read() + assertEquals(0, filePaths.size) + runBlocking { + eventsFileManager.rollover() + } + val filePathsAfterRollover = eventsFileManager.read() + assertEquals(1, filePathsAfterRollover.size) + val file = File(filePathsAfterRollover[0]) + val content = file.readText() + val lines = content.split(EventsFileManager.DELIMITER) + assertEquals(2, lines.size) + assertEquals(createEvent("test1"), lines[0]) + assertEquals("", lines[1]) + runBlocking { + val eventsString = eventsFileManager.getEventString(filePathsAfterRollover[0]) + val events = JSONArray(eventsString) + assertEquals(1, events.length()) + assertEquals("test1", events.getJSONObject(0).getString("eventType")) + } + } + + @Test + fun `rollover should ignore current empty temp file`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + runBlocking { + eventsFileManager.rollover() + } + val filePathsAfterRollover = eventsFileManager.read() + assertEquals(0, filePathsAfterRollover.size) + } + + @Test + fun `remove should delete a file`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + runBlocking { + eventsFileManager.storeEvent(createEvent("test1")) + eventsFileManager.rollover() + } + val filePaths = eventsFileManager.read() + assertEquals(1, filePaths.size) + eventsFileManager.remove(filePaths[0]) + val filePathsAfterRemove = eventsFileManager.read() + assertEquals(0, filePathsAfterRemove.size) + } + + @Test + fun `test split`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + runBlocking { + eventsFileManager.storeEvent(createEvent("test1")) + eventsFileManager.storeEvent(createEvent("test2")) + eventsFileManager.rollover() + } + val filePaths = eventsFileManager.read() + assertEquals(1, filePaths.size) + runBlocking { + val eventsString = eventsFileManager.getEventString(filePaths[0]) + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + eventsFileManager.splitFile(filePaths[0], events) + } + val filePathsAfterSplit = eventsFileManager.read() + assertEquals(2, filePathsAfterSplit.size) + val file0 = File(filePathsAfterSplit[0]) + val content0 = file0.readText() + val lines0 = content0.split(EventsFileManager.DELIMITER) + assertEquals(2, lines0.size) + assertEquals(createEvent("test1"), lines0[0]) + assertEquals("", lines0[1]) + val file1 = File(filePathsAfterSplit[1]) + val content1 = file1.readText() + val lines1 = content1.split(EventsFileManager.DELIMITER) + assertEquals(2, lines1.size) + assertEquals(createEvent("test2"), lines1[0]) + assertEquals("", lines1[1]) + } + + @Test + fun `verify delimiter handled gracefully`() { + val file0 = File(tempDir, "storageKey-0") + file0.writeText("{\"eventType\":\"test1\"}\u0000{\"eventType\":\"test2\"}\u0000") + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + runBlocking { + val filePaths = eventsFileManager.read() + assertEquals(1, filePaths.size) + val eventsString = eventsFileManager.getEventString(filePaths[0]) + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals("test1", events.getJSONObject(0).getString("eventType")) + assertEquals("test2", events.getJSONObject(1).getString("eventType")) + } + } + + @Test + fun `verify malformed event shows up in diagnostics`() { + val file0 = File(tempDir, "storageKey-0") + file0.writeText("{\"eventType\":\"test1\"}\u0000{\"eventType\":\"test2\"}\u0000{\"eventType\":\"test3\"\u0000") + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val diagnostics = Diagnostics() + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, diagnostics) + runBlocking { + val filePaths = eventsFileManager.read() + assertEquals(1, filePaths.size) + val eventsString = eventsFileManager.getEventString(filePaths[0]) + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals("test1", events.getJSONObject(0).getString("eventType")) + assertEquals("test2", events.getJSONObject(1).getString("eventType")) + assertEquals("{\"malformed_events\":[\"{\\\"eventType\\\":\\\"test3\\\"\"]}", diagnostics.extractDiagnostics()) + } + } + + @Test + fun `verify delimiter in event names`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + runBlocking { + eventsFileManager.storeEvent(createEvent("test1")) + eventsFileManager.storeEvent(createEvent("test2\u0000")) + eventsFileManager.rollover() + val filePaths = eventsFileManager.read() + assertEquals(1, filePaths.size) + val eventsString = eventsFileManager.getEventString(filePaths[0]) + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals("test1", events.getJSONObject(0).getString("eventType")) + assertEquals("test2", events.getJSONObject(1).getString("eventType")) + } + } + + @Test + fun `could handle earlier version of events file`() { + createEarlierVersionEventFiles() + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + val filePaths = eventsFileManager.read() + assertEquals(7, filePaths.size) + runBlocking { + filePaths.withIndex().forEach { (index, filePath) -> + val file = File(filePath) + assertTrue(file.extension.isEmpty(), "file extension should be empty for v1 event files") + // verify file format updated to v2 + val content = file.readText() + val lines = content.split(EventsFileManager.DELIMITER) + if (index == 5) { + assertEquals(2, lines.size) + assertEquals("{\"eventType\":\"test11\"}", lines[0]) + } else { + assertEquals(3, lines.size) + assertEquals("{\"eventType\":\"test${index * 2 + 1}\"}", lines[0]) + assertEquals("{\"eventType\":\"test${index * 2 + 2}\"}", lines[1]) + } + + val eventsString = eventsFileManager.getEventString(filePath) + if (index == 5) { + assertEquals("[{\"eventType\":\"test11\"}]", eventsString) + } else { + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals( + "test${index * 2 + 1}", + events.getJSONObject(0).getString("eventType"), + ) + assertEquals( + "test${index * 2 + 2}", + events.getJSONObject(1).getString("eventType"), + ) + } + } + } + } + + @Test + fun `could handle earlier versions with name conflict and new events`() { + createEarlierVersionEventFiles() + val file = File(tempDir, "storageKey-6") + file.writeText("{\"eventType\":\"test15\"},{\"eventType\":\"test16\"}]") + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + runBlocking { + eventsFileManager.storeEvent(createEvent("test17")) + eventsFileManager.storeEvent(createEvent("test18")) + eventsFileManager.rollover() + } + var eventsCount = 0 + val filePaths = eventsFileManager.read() + runBlocking { + filePaths.forEach { filePath -> + val eventsString = eventsFileManager.getEventString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + } + assertEquals(17, eventsCount) + } + + @Test + fun `could handle earlier versions with line break in event name`() { + val file = File(tempDir, "storageKey-6") + file.writeText("{\"eventType\":\"test15\"},{\"eventType\":\"test16\\nsuffix\"}]") + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + runBlocking { + val filePaths = eventsFileManager.read() + assertEquals(1, filePaths.size) + val eventsString = eventsFileManager.getEventString(filePaths[0]) + val events = JSONArray(eventsString) + assertEquals(2, events.length()) + assertEquals("test15", events.getJSONObject(0).getString("eventType")) + assertEquals("test16\nsuffix", events.getJSONObject(1).getString("eventType")) + } + } + + @Test + fun `concurrent writes to the same event file manager instance`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + runBlocking { + val job1 = + kotlinx.coroutines.GlobalScope.launch { + eventsFileManager.storeEvent(createEvent("test1")) + eventsFileManager.storeEvent(createEvent("test2")) + eventsFileManager.rollover() + } + val job2 = + kotlinx.coroutines.GlobalScope.launch { + eventsFileManager.rollover() + eventsFileManager.storeEvent(createEvent("test3")) + eventsFileManager.storeEvent(createEvent("test4")) + eventsFileManager.rollover() + } + val job3 = + kotlinx.coroutines.GlobalScope.launch { + eventsFileManager.rollover() + eventsFileManager.storeEvent(createEvent("test5")) + eventsFileManager.storeEvent(createEvent("test6")) + eventsFileManager.rollover() + } + kotlinx.coroutines.joinAll(job1, job2, job3) + } + val filePaths = eventsFileManager.read() + var eventsCount = 0 + runBlocking { + filePaths.forEach { filePath -> + val eventsString = eventsFileManager.getEventString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + } + assertEquals(6, eventsCount) + } + + @Test + fun `concurrent write from multiple threads`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + for (i in 0..100) { + val thread = + thread { + runBlocking { + for (d in 0..10) { + eventsFileManager.storeEvent(createEvent("test$i-$d")) + } + eventsFileManager.rollover() + } + } + thread.join() + } + val filePaths = eventsFileManager.read() + var eventsCount = 0 + runBlocking { + filePaths.forEach { filePath -> + val eventsString = eventsFileManager.getEventString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + } + assertEquals(101 * 11, eventsCount) + } + + @Test + fun `concurrent write to two instances with same configuration`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile1 = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val propertiesFile2 = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + val eventsFileManager1 = + EventsFileManager(tempDir, storageKey, propertiesFile1, logger, testDiagnostics) + val eventsFileManager2 = + EventsFileManager(tempDir, storageKey, propertiesFile2, logger, testDiagnostics) + runBlocking { + val job1 = + kotlinx.coroutines.GlobalScope.launch { + eventsFileManager1.storeEvent(createEvent("test1")) + eventsFileManager1.storeEvent(createEvent("test2")) + eventsFileManager1.rollover() + } + val job2 = + kotlinx.coroutines.GlobalScope.launch { + eventsFileManager2.rollover() + eventsFileManager2.storeEvent(createEvent("test3")) + eventsFileManager2.storeEvent(createEvent("test4")) + eventsFileManager2.rollover() + } + val job3 = + kotlinx.coroutines.GlobalScope.launch { + eventsFileManager1.rollover() + eventsFileManager1.storeEvent(createEvent("test5")) + eventsFileManager1.storeEvent(createEvent("test6")) + eventsFileManager1.rollover() + } + val job4 = + kotlinx.coroutines.GlobalScope.launch { + eventsFileManager2.rollover() + eventsFileManager2.storeEvent(createEvent("test7")) + eventsFileManager2.storeEvent(createEvent("test8")) + eventsFileManager2.rollover() + } + kotlinx.coroutines.joinAll(job1, job2, job3, job4) + } + val filePaths = eventsFileManager1.read() + var eventsCount = 0 + runBlocking { + filePaths.forEach { filePath -> + val eventsString = eventsFileManager1.getEventString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + } + assertEquals(8, eventsCount) + } + + @Test + fun `concurrent write from multiple threads on multiple instances`() { + val logger = ConsoleLogger() + val storageKey = "storageKey" + val propertiesFile = PropertiesFile(tempDir, storageKey, "test-prefix", logger) + for (i in 0..100) { + val eventsFileManager = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + val thread = + thread { + runBlocking { + for (d in 0..10) { + eventsFileManager.storeEvent(createEvent("test$i-$d")) + } + eventsFileManager.rollover() + } + } + thread.join() + } + + val eventsFileManagerForRead = + EventsFileManager(tempDir, storageKey, propertiesFile, logger, testDiagnostics) + val filePaths = eventsFileManagerForRead.read() + var eventsCount = 0 + runBlocking { + filePaths.forEach { filePath -> + val eventsString = eventsFileManagerForRead.getEventString(filePath) + val events = JSONArray(eventsString) + eventsCount += events.length() + } + } + assertEquals(101 * 11, eventsCount) + } + + private fun createEarlierVersionEventFiles() { + val file0 = File(tempDir, "storageKey-0") + file0.writeText("[{\"eventType\":\"test1\"},{\"eventType\":\"test2\"}]") + val file1 = File(tempDir, "storageKey-1") + file1.writeText(",{\"eventType\":\"test3\"},{\"eventType\":\"test4\"}]") + val file2 = File(tempDir, "storageKey-2") + file2.writeText("[[{\"eventType\":\"test5\"},{\"eventType\":\"test6\"}]]") + val file3 = File(tempDir, "storageKey-3") + file3.writeText("[{\"eventType\":\"test7\"},{\"eventType\":\"test8\"}]]") + val file4 = File(tempDir, "storageKey-4") + file4.writeText("{\"eventType\":\"test9\"},{\"eventType\":\"test10\"}]") + val file5 = File(tempDir, "storageKey-5") + file5.writeText("[{\"eventType\":\"test11\"}],{\"eventType\":\"test12\"}") + val file6 = File(tempDir, "storageKey-6.tmp") + file6.writeText("[{\"eventType\":\"test13\"},{\"eventType\":\"test14\"}") + } + + private fun createEvent(eventType: String): String { + return "{\"eventType\":\"$eventType\"}" + } +} diff --git a/core/src/test/kotlin/com/amplitude/core/utilities/HttpClientTest.kt b/core/src/test/kotlin/com/amplitude/core/utilities/HttpClientTest.kt index 191bf1a5..8a4a4602 100644 --- a/core/src/test/kotlin/com/amplitude/core/utilities/HttpClientTest.kt +++ b/core/src/test/kotlin/com/amplitude/core/utilities/HttpClientTest.kt @@ -11,6 +11,7 @@ import okhttp3.mockwebserver.RecordedRequest import org.json.JSONObject import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance @@ -39,10 +40,11 @@ class HttpClientTest { fun `test client_upload_time is set on the request`() { server.enqueue(MockResponse().setBody("{\"code\": \"success\"}")) - val config = Configuration( - apiKey = apiKey, - serverUrl = server.url("/").toString() - ) + val config = + Configuration( + apiKey = apiKey, + serverUrl = server.url("/").toString(), + ) val event = BaseEvent() event.eventType = "test" @@ -67,11 +69,12 @@ class HttpClientTest { fun `test client_upload_time is set correctly when minIdLength is set`() { server.enqueue(MockResponse().setBody("{\"code\": \"success\"}")) - val config = Configuration( - apiKey = apiKey, - serverUrl = server.url("/").toString(), - minIdLength = 3, - ) + val config = + Configuration( + apiKey = apiKey, + serverUrl = server.url("/").toString(), + minIdLength = 3, + ) val event = BaseEvent() event.eventType = "test" @@ -90,6 +93,42 @@ class HttpClientTest { assertEquals(apiKey, result.getString("api_key")) assertEquals(clientUploadTimeString, result.getString("client_upload_time")) + assertNull(result.optJSONObject("request_metadata")) + } + + @Test + fun `test payload is correct when diagnostics are set`() { + server.enqueue(MockResponse().setBody("{\"code\": \"success\"}")) + + val config = + Configuration( + apiKey = apiKey, + serverUrl = server.url("/").toString(), + ) + val event = BaseEvent() + event.eventType = "test" + + val httpClient = spyk(HttpClient(config)) + val diagnostics = Diagnostics() + diagnostics.addErrorLog("error") + diagnostics.addMalformedEvent("malformed-event") + + val connection = httpClient.upload() + connection.outputStream?.let { + connection.setEvents(JSONUtil.eventsToString(listOf(event))) + connection.setDiagnostics(diagnostics) + // Upload the payloads. + connection.close() + } + + val request = runRequest() + val result = JSONObject(request?.body?.readUtf8()) + + assertEquals(apiKey, result.getString("api_key")) + assertEquals( + "{\"error_logs\":[\"error\"],\"malformed_events\":[\"malformed-event\"]}", + result.getJSONObject("request_metadata").getJSONObject("sdk").toString(), + ) } private fun runRequest(): RecordedRequest? {