This repository has been archived by the owner on Sep 10, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlocustfile.py
59 lines (49 loc) · 1.79 KB
/
locustfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
"""
This is a (much needed) load test for this repo. These are some example usages:
locust --host=http://127.0.0.1:8001 --headless --users 250 --hatch-rate 25 --run-time 1m PeopleUser
locust --host=http://127.0.0.1:8001 --headless --users 250 --hatch-rate 25 --run-time 30s
"""
import datetime
import time
from uuid import uuid4
from locust import HttpUser, between, task
AUTHORIZATION_HEADER = {'Authorization': f"Token {os.environ['AUTHORIZATION_TOKEN']}"}
PEOPLE_MEASUREMENT_ENDPOINT_URL = "/telcameras/v1/"
def get_dt_with_tz_info():
# Calculate the offset taking into account daylight saving time
utc_offset_sec = time.altzone if time.localtime().tm_isdst else time.timezone
utc_offset = datetime.timedelta(seconds=-utc_offset_sec)
return datetime.datetime.now().replace(tzinfo=datetime.timezone(offset=utc_offset)).isoformat()
def create_message():
return {
"data": {
"id": str(uuid4()),
"density": "0.0",
"sensortype": "countingcamera",
"latitude": "52.3709",
"count": "6.0",
"sensor": "TEST",
"version": "1",
"speed": "0.0",
"timestamp": get_dt_with_tz_info(),
"longitude": "4.89175"
},
"details": [
{
"count": "2",
"id": "315fc896-82dd-46ee-ac91-31e1a1b1807a",
"direction": "down"
},
{
"count": "4",
"id": "a70b424f-4144-437b-a990-cbac421a4830",
"direction": "down"
}
]
}
class PeopleUser(HttpUser):
weight = 1
wait_time = between(0, 1)
@task(1)
def post_people(self):
self.client.post(PEOPLE_MEASUREMENT_ENDPOINT_URL, json=create_message(), headers=AUTHORIZATION_HEADER)