diff --git a/tap_sftp/helper.py b/tap_sftp/helper.py new file mode 100644 index 0000000..935f2b0 --- /dev/null +++ b/tap_sftp/helper.py @@ -0,0 +1,21 @@ +import simplejson as json +import sys +from singer import RecordMessage + + +def format_message(message,ensure_ascii=True): + return json.dumps(message.asdict(), use_decimal=True, ensure_ascii=ensure_ascii) + + +def write_message(message, ensure_ascii=True): + sys.stdout.write(format_message(message, ensure_ascii=ensure_ascii) + '\n') + sys.stdout.flush() + + +def write_record(stream_name, record, stream_alias=None, time_extracted=None, ensure_ascii=True): + """Write a single record for the given stream. + + """ + write_message(RecordMessage(stream=(stream_alias or stream_name), + record=record, + time_extracted=time_extracted), ensure_ascii=ensure_ascii) diff --git a/tap_sftp/sync.py b/tap_sftp/sync.py index aef90d6..1a723f2 100644 --- a/tap_sftp/sync.py +++ b/tap_sftp/sync.py @@ -6,6 +6,7 @@ from singer import metadata, utils, Transformer from tap_sftp import client from tap_sftp import stats +from tap_sftp.helper import write_record from singer_encodings import csv LOGGER = singer.get_logger() @@ -86,7 +87,8 @@ def sync_file(conn, f, stream, table_spec, encoding_format): to_write = transformer.transform(rec, stream.schema.to_dict(), metadata.to_map(stream.metadata)) - singer.write_record(stream.tap_stream_id, to_write) + write_record(stream.tap_stream_id, to_write, ensure_ascii=False) + # singer.write_record(stream.tap_stream_id, to_write) records_synced += 1 stats.add_file_data(table_spec, f['filepath'], f['last_modified'], records_synced)