Skip to content
This repository has been archived by the owner on May 27, 2024. It is now read-only.

feat: add multiprocess event serialization #82

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 97 additions & 46 deletions intelmq_webinput_csv/bin/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
import logging
import os

import functools
import concurrent.futures

from typing import Union, Tuple

import dateutil.parser
from flask import Flask, jsonify, make_response, request

Expand All @@ -18,7 +23,7 @@
from intelmq.lib.message import Event, MessageFactory
from intelmq.lib.pipeline import PipelineFactory
from intelmq.lib.exceptions import InvalidValue, KeyExists
from intelmq.lib.utils import RewindableFileHandle
from intelmq.lib.utils import RewindableFileHandle, load_configuration

from intelmq_webinput_csv.version import __version__

Expand Down Expand Up @@ -195,6 +200,68 @@ def handle_extra(value: str) -> dict:
value = {'data': value}
return value

def serialize_event(data: Tuple[dict, str], harmonization: dict, parameters: dict,
raw_header: list, time_observation: str) -> Union[Tuple[str, str], Tuple[None, None]]:
"""
Serialize an CSV line into IntelMQ raw message

Parameters:
data: tuple of CSV line (parsed, raw)
harmonization: dict of Harmonization config
parameters: dict of form parameters
raw_header: list of raw header
time_observation: str of time of observation

Returns:
(queue, raw_message) or (None, None) if error occured
"""
queue = ""
(line, raw) = data
event = Event(harmonization=harmonization)

try:
for columnindex, (column, value) in \
enumerate(zip(parameters['columns'], line)):
if not column or not value:
continue
if column.startswith('time.'):
parsed = dateutil.parser.parse(value, fuzzy=True)
if not parsed.tzinfo:
value += parameters['timezone']
parsed = dateutil.parser.parse(value)
value = parsed.isoformat()
if column == 'extra':
value = handle_extra(value)
event.add(column, value)
for key, value in parameters.get('constant_fields', {}).items():
if key not in event:
event.add(key, value)
for key, value in request.form.items():
if not key.startswith('custom_'):
continue
key = key[7:]
if key not in event:
event.add(key, value)
if CONFIG.get('destination_pipeline_queue_formatted', False):
queue = CONFIG['destination_pipeline_queue'].format(ev=event)
except Exception:
app.logger.exception('Failure')
return (None, None)

if 'classification.type' not in event:
event.add('classification.type', parameters['classification.type'])
if 'classification.identifier' not in event:
event.add('classification.identifier', parameters['classification.identifier'])
if 'feed.code' not in event:
event.add('feed.code', parameters['feed.code'])
if 'time.observation' not in event:
event.add('time.observation', time_observation, sanitize=False)

if 'raw' not in event:
event.add('raw', ''.join(raw_header + [raw]))

return (queue, MessageFactory.serialize(event))


@app.route('/')
def form():
Expand Down Expand Up @@ -404,6 +471,10 @@ def submit():
successful_lines = 0

raw_header = []

# Ensure Harmonization config is only loaded once
harmonization = load_configuration(HARMONIZATION_CONF_FILE)

with open(tmp_file[0], encoding='utf8') as handle:
handle_rewindable = RewindableFileHandle(handle)
reader = csv.reader(handle_rewindable, delimiter=parameters['delimiter'],
Expand All @@ -416,51 +487,31 @@ def submit():
raw_header.append(handle_rewindable.current_line)
for _ in range(parameters['skipInitialLines']):
next(reader)
for lineindex, line in enumerate(reader):
event = Event()
try:
for columnindex, (column, value) in \
enumerate(zip(parameters['columns'], line)):
if not column or not value:
continue
if column.startswith('time.'):
parsed = dateutil.parser.parse(value, fuzzy=True)
if not parsed.tzinfo:
value += parameters['timezone']
parsed = dateutil.parser.parse(value)
value = parsed.isoformat()
if column == 'extra':
value = handle_extra(value)
event.add(column, value)
for key, value in parameters.get('constant_fields', {}).items():
if key not in event:
event.add(key, value)
for key, value in request.form.items():
if not key.startswith('custom_'):
continue
key = key[7:]
if key not in event:
event.add(key, value)
if CONFIG.get('destination_pipeline_queue_formatted', False):
queue_name = CONFIG['destination_pipeline_queue'].format(ev=event)
destination_pipeline.set_queues(queue_name, "destination")
destination_pipeline.connect()
except Exception:
app.logger.exception('Failure')
continue
if 'classification.type' not in event:
event.add('classification.type', parameters['classification.type'])
if 'classification.identifier' not in event:
event.add('classification.identifier', parameters['classification.identifier'])
if 'feed.code' not in event:
event.add('feed.code', parameters['feed.code'])
if 'time.observation' not in event:
event.add('time.observation', time_observation, sanitize=False)
if 'raw' not in event:
event.add('raw', ''.join(raw_header + [handle_rewindable.current_line]))
raw_message = MessageFactory.serialize(event)
destination_pipeline.send(raw_message)
successful_lines += 1

# Generator func for retrieving parsed & raw line in single tuple
generator = ((entry, handle_rewindable.current_line) for entry in reader)

# Parallelize serialization over all available cores
with concurrent.futures.ProcessPoolExecutor() as executor:
part_serialize_event = functools.partial(serialize_event, harmonization=harmonization,
parameters=parameters, raw_header=raw_header,
time_observation=time_observation)

future = executor.map(part_serialize_event, generator, chunksize=1_000)

# Loop through results
for (queue_name, raw_message) in future:

# If queue_name specified, alternate pipeline should be used
if queue_name:
destination_pipeline.set_queues(queue_name, "destination")
destination_pipeline.connect()

# If no error occured
if raw_message:
destination_pipeline.send(raw_message)
successful_lines += 1

return create_response('Successfully processed %s lines.' % successful_lines)


Expand Down