Skip to content

Commit

Permalink
add helper function to write record
Browse files Browse the repository at this point in the history
  • Loading branch information
sgandhi1311 committed Oct 27, 2023
1 parent a4174d5 commit dd82ec7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
21 changes: 21 additions & 0 deletions tap_sftp/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import simplejson as json
import sys
from singer import RecordMessage


def format_message(message,ensure_ascii=True):
return json.dumps(message.asdict(), use_decimal=True, ensure_ascii=ensure_ascii)


def write_message(message, ensure_ascii=True):
sys.stdout.write(format_message(message, ensure_ascii=ensure_ascii) + '\n')
sys.stdout.flush()


def write_record(stream_name, record, stream_alias=None, time_extracted=None, ensure_ascii=True):
"""Write a single record for the given stream.
"""
write_message(RecordMessage(stream=(stream_alias or stream_name),
record=record,
time_extracted=time_extracted), ensure_ascii=ensure_ascii)
4 changes: 3 additions & 1 deletion tap_sftp/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from singer import metadata, utils, Transformer
from tap_sftp import client
from tap_sftp import stats
from tap_sftp.helper import write_record
from singer_encodings import csv

LOGGER = singer.get_logger()
Expand Down Expand Up @@ -86,7 +87,8 @@ def sync_file(conn, f, stream, table_spec, encoding_format):

to_write = transformer.transform(rec, stream.schema.to_dict(), metadata.to_map(stream.metadata))

singer.write_record(stream.tap_stream_id, to_write)
write_record(stream.tap_stream_id, to_write, ensure_ascii=False)
# singer.write_record(stream.tap_stream_id, to_write)
records_synced += 1

stats.add_file_data(table_spec, f['filepath'], f['last_modified'], records_synced)
Expand Down

0 comments on commit dd82ec7

Please sign in to comment.