-
Notifications
You must be signed in to change notification settings - Fork 11
Examples
import csv
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException
api_key = 'your_api_key'
form_id = 'your_form_id'
fulcrum = Fulcrum(key=api_key)
records = csv.reader(open('new-records.csv'), delimiter=',')
#### CSV format ###
#latitude,longitude,status,name,technician
#27.770787,-82.638039,Excellent,Fulcrum HQ,Bryan
# skip header
next(records, None)
# loop through CSV projects
for index, row in enumerate(records):
# build record object
record = {
"record": {
"form_id": form_id,
"latitude": row[0],
"longitude": row[1],
"status": row[2],
"form_values": {
"925c": row[3],
"784b": {
"choice_values": [row[4]]
}
}
}
}
# create record
fulcrum.records.create(record)
print('Record ' + str(index) + ' successfully created!')
import csv
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException
api_key = 'your_api_key'
form_id = 'your_form_id'
fulcrum = Fulcrum(key=api_key)
updates = csv.reader(open('records-to-update.csv'), delimiter=',')
#### CSV format ###
#fulcrum_id,latitude,longitude,status,name,technician
#d59139d1-9551-4e59-a1eb-8fa29323850f,27.770787,-82.638039,Excellent,Fulcrum HQ,Bryan
# skip header
next(updates, None)
# loop through CSV records
for row in updates:
# fetch existing record (first column must contain fulcrum_id)
try:
data = fulcrum.records.find(row[0])
except NotFoundException:
print('No record found with id ' + row[0])
continue
# update fields with CSV values
data['record']['latitude'] = row[1]
data['record']['longitude'] = row[2]
data['record']['status'] = row[3]
data['record']['form_values']['925c'] = row[4]
data['record']['form_values']['784b'] = {"choice_values": [row[5]]}
# send updates back to Fulcrum
record = fulcrum.records.update(row[0], data)
print('Record ' + row[0] + ' successfully updated!')
import csv, argparse
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException
parser = argparse.ArgumentParser(description='This program deletes Fulcrum records specified by record ID in the first column of a CSV file. The CSV file should have a header row but the column name is not important.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-f','--file', help='CSV file', required=True)
args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
f = open(args['file'], 'r')
deletes = csv.reader(f, delimiter=',')
# Skip the file header in csv
next(deletes, None)
# Get a total record count
total = sum(1 for row in deletes)
# Back to the top of the file and skip header
f.seek(0)
next(deletes, None)
def deleteRecords():
# Loop through rows in csv
for count, row in enumerate(deletes):
# First column must contain Fulcrum record ID
record_id = row[0]
# Try to delete record, but continue if not found
try:
fulcrum.records.delete(record_id)
except NotFoundException:
print('No record found with id ' + record_id)
continue
print('deleted ' + record_id + ' ('+ format(count+1, ',') + '/' + format(total, ',') + ')')
print('finished!')
confirm = input('Are you absolutely sure you want to permanently delete all ' + format(total, ',') + ' records? (y)es or (n)o: ')
if confirm == 'yes' or confirm == 'y':
deleteRecords()
elif confirm == 'no' or confirm == 'n':
print('No records have been deleted!')
import sys
from fulcrum import Fulcrum
api_key = 'your-api-key'
form_id = 'your-form-id'
if api_key == '' or api_key == 'your-api-key' or form_id == '' or form_id == 'your-form-id':
sys.exit('api_key and form_id are required!')
fulcrum = Fulcrum(key=api_key)
form = fulcrum.forms.find(form_id)
form_name = form['form']['name']
records = fulcrum.records.search(url_params={'form_id': form_id})
total_count = str(records['total_count'])
confirm = input('Are you absolutely sure you want to permanently delete all ' + total_count + ' records from the ' + form_name + ' app? (y)es or (n)o: ')
if confirm == 'yes' or confirm == 'y':
for record in records['records']:
#print(record['id'])
fulcrum.records.delete(record['id'])
print('Permanently deleted ' + total_count + ' records!')
elif confirm == 'no' or confirm == 'n':
print('No records have been deleted!')
import csv, argparse
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException
parser = argparse.ArgumentParser(description='This program creates Fulcrum projects from a CSV file.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-f','--file', help='CSV file', required=True)
args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
f = open(args['file'], 'r')
projects = csv.reader(f, delimiter=',')
#### CSV format ###
#name,description
#Pinellas County,For records in Pinellas County
# Skip the file header in csv
next(projects, None)
# Loop through rows in csv
for index, row in enumerate(projects):
# build project object
project = {
"project": {
"name": row[0]
# "description": row[1]
}
}
# create project
fulcrum.projects.create(project)
print('Project ' + row[0] + ' successfully created!')
Below is an example of downloading the first 3 videos and tracks for a form. You could change the code below to download all videos and tracks for a form if needed.
from fulcrum import Fulcrum
api_key = 'your_api_key'
fulcrum = Fulcrum(key=api_key)
videos = fulcrum.videos.search({'page': 1, 'per_page': 3, 'form_id': '4a3f0a6d-c1d3-4805-9aab-7cdd39d58e5f'})
for video in videos['videos']:
id = video['access_key']
media = fulcrum.videos.media(id, 'small')
track = fulcrum.videos.track(id, 'geojson')
with open('{}_small.mp4'.format(id), 'wb') as f:
f.write(media)
with open('{}.geojson'.format(id), 'wb') as f:
f.write(track)
import sys, argparse
from fulcrum import Fulcrum
parser = argparse.ArgumentParser(description='This program downloads Fulcrum videos and tracks.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-f','--form', help='Form ID', required=False)
args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
if args['form']:
videos = fulcrum.videos.search(url_params={'form_id': args['form']})
else:
videos = fulcrum.videos.search()
for (index, item) in enumerate(videos['videos']):
id = item['access_key']
if item['processed']:
video = fulcrum.videos.media(id, 'original')
with open('{}.mp4'.format(id), 'wb') as f:
f.write(video)
print('Downloaded {}.mp4'.format(id))
if item['track']:
track = fulcrum.videos.track(id, 'geojson')
with open('{}.geojson'.format(id), 'wb') as f:
f.write(track)
print('Downloaded {}.geojson'.format(id))
else:
print('Video ' + id + ' not processed.')
import sys, argparse
from fulcrum import Fulcrum
parser = argparse.ArgumentParser(description='This program downloads Fulcrum photos.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-f','--form', help='Form ID', required=False)
args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
if args['form']:
photos = fulcrum.photos.search(url_params={'form_id': args['form']})
else:
photos = fulcrum.photos.search()
for (index, item) in enumerate(photos['photos']):
id = item['access_key']
photo = fulcrum.photos.media(id, 'original')
with open('{}.jpg'.format(id), 'wb') as f:
f.write(photo)
print('Downloaded {}.jpg'.format(id))
import sys, argparse
from fulcrum import Fulcrum
parser = argparse.ArgumentParser(description='This program takes a changeset and deletes all of the records. Use with extreme caution!')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-f','--form', help='Form ID', required=True)
parser.add_argument('-c','--changeset', help='Changeset ID', required=True)
args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
form = fulcrum.forms.find(args['form'])
form_name = form['form']['name']
changeset = fulcrum.changesets.find(args['changeset'])
closed_by = changeset['changeset']['closed_by']
closed_at = changeset['changeset']['closed_at']
records = fulcrum.records.search(url_params={'form_id': args['form'], 'changeset_id': args['changeset'], 'per_page': '10000'})
total_count = str(records['total_count'])
confirm = input('Are you absolutely sure you want to permanently delete all ' + total_count + ' records in the ' + form_name + ' app, which were changed by ' + closed_by + ' on ' + closed_at + ' by changeset ' + args['changeset'] + '? (y)es or (n)o: ')
if confirm == 'yes' or confirm == 'y':
index = 1
for record in records['records']:
print('Deleting record: ' + record['id'] + '(' + str(index) + '/' + total_count + ')')
fulcrum.records.delete(record['id'])
index += 1
print('Permanently deleted ' + total_count + ' records in the ' + form_name + ' app from changeset ' + args['changeset'] + '!')
elif confirm == 'no' or confirm == 'n':
print('No records have been deleted!')
Example: python delete-records.py -t abcdefghijklmnopqrstuvwxyz -q "SELECT _record_id FROM \"My App\" WHERE _status = 'Good'"
import argparse
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException
parser = argparse.ArgumentParser(description='This program deletes Fulcrum records from a Query API result.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-q','--query', help='Query', required=True)
args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
records = fulcrum.query(args['query'], 'json')
count = str(len(records['rows']))
confirm = input('Are you absolutely sure you want to permanently delete all ' + count + ' records? (y)es or (n)o: ')
if confirm == 'yes' or confirm == 'y':
for row in records['rows']:
# print(row['_record_id'])
try:
fulcrum.records.delete(row['_record_id'])
except NotFoundException:
print('No record found with id ' + row['_record_id'])
continue
print('record ' + row['_record_id'] + ' successfully deleted!')
elif confirm == 'no' or confirm == 'n':
print('No records have been deleted!')
Child records are actually just form values and not true records - so this is more of an update than a delete, where we update the repeatable form value to be an empty array of repeatable objects.
Example: python delete-records.py -t abcdefghijklmnopqrstuvwxyz -q "SELECT _record_id FROM \"My App/my_repeatable\";"
import requests, argparse
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException
parser = argparse.ArgumentParser(description='This program deletes Fulcrum repeatable form values from a Query API result.')
parser.add_argument('-t', '--token', help='API token', required=True)
parser.add_argument('-q','--query', help='Query', required=True)
args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
url = 'https://api.fulcrumapp.com/api/v2/query/?format=json&token='+args['token']+'&q='+args['query']
req = requests.get(url)
records = req.json()
def deleteRepeatables():
for count, row in enumerate(records['rows']):
# print(row['_record_id'])
try:
record = fulcrum.records.find(row['_record_id'])
# Build an array of repeatable objects
repeatables = []
# Replace your repeatable key here, example ['c6c3']
record['record']['form_values']['c6c3'] = repeatables
# Update the record
updated_record = fulcrum.records.update(row['_record_id'], record)
print('Successfully updated ' + row['_record_id'])
except NotFoundException:
print('No record found with id ' + row['_record_id'])
continue
print('finished!')
confirm = input('Are you absolutely sure you want to permanently delete these child records from their parent records? (y)es or (n)o: ')
if confirm == 'yes' or confirm == 'y':
deleteRepeatables()
elif confirm == 'no' or confirm == 'n':
print('No records have been deleted!')
Below is an example of fetching an existing record, adding some "repeatable records" to the parent record, and updating the record.
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException
# Import uuid module for generating repeatable IDs
import uuid
api_key = 'your-api-key'
fulcrum = Fulcrum(key=api_key)
# Find the record you want to update
record = fulcrum.records.find('a190b99c-be00-4793-a4b0-669fa9773c0c')
# Build an array of repeatable objects
repeatables = [{
'id': str(uuid.uuid4()),
'geometry': {
'type': 'Point',
'coordinates': [-82.6388836, 27.7707606]
},
'form_values': {
'e8e3': '360 Central Ave'
}
}, {
'id': str(uuid.uuid4()),
'geometry': {
'type': 'Point',
'coordinates': [-82.637812, 27.772983]
},
'form_values': {
'e8e3': 'Williams Park'
}
}]
# Add array to repeatable field
record['record']['form_values']['4501'] = repeatables
# Update the record
updated_record = fulcrum.records.update('a190b99c-be00-4793-a4b0-669fa9773c0c', record)
print('record successfully updated!')
This example allows you to query a subset of records from an app, download the resulting records in CSV format, and download all of the photos associated with the returned records.
import argparse, csv
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException
parser = argparse.ArgumentParser(description='This program downloads records and photos from a Query API result.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-q','--query', help='Query', required=True)
parser.add_argument('-s','--size', help='Photo size (original, thumbnail, large', choices=['original', 'thumbnail', 'large'], default='original')
parser.add_argument('-n','--name', help='Photo name field')
args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
response = fulcrum.query(args['query'], 'csv')
def downloadCSV(response):
open('records.csv', 'wb').write(response)
print('Downloaded records.csv')
def downloadPhotos(response):
csvtext = response.decode('utf8')
reader = csv.DictReader(csvtext.split('\n'), delimiter=',')
for row in reader:
if row['_record_id']:
try:
photos = fulcrum.photos.search(url_params={'record_id': row['_record_id']})
for item in photos['photos']:
id = item['access_key']
photo = fulcrum.photos.media(id, args['size'])
if args['name'] and row[args['name']]:
name = row[args['name']] + '-' + id
else:
name = id
with open('{}.jpg'.format(name), 'wb') as f:
f.write(photo)
print('Downloaded {}.jpg'.format(name))
except NotFoundException:
print('No photos found for record ' + row[0])
continue
downloadCSV(response)
downloadPhotos(response)
This example allows you to query a subset of records to update, loop through the results and fetch the record JSON, modify the record, and send updated record back to the server.
from fulcrum import Fulcrum
from datetime import datetime
# authenticate with Fulcrum API token
fulcrum = Fulcrum('your-token')
def fetchRecord(id):
record = fulcrum.records.find(id)
modifyRecord(record, id)
def modifyRecord(record, id):
# Set the value for the element with a key of '0cb8'
record['record']['form_values']['0cb8'] = '5.5'
# Set the value for the element with a key of '15ac'
record['record']['form_values']['15ac'] = 'Hello world!'
updateRecord(record, id)
def updateRecord(record, id):
update = fulcrum.records.update(id, record)
print(update['record']['id'] + ' has been updated!')
# query records where _server_updated_at timestamp date is equal to today's date
today = str(datetime.date(datetime.now()))
sql = 'SELECT _record_id FROM "My App" WHERE FCM_FormatTimestamp(_server_updated_at, \'America/New_York\')::date = \''+today+'\''
response = fulcrum.query(sql, 'json')
for record in response['rows']:
fetchRecord(record['_record_id'])