@@ -417,59 +417,105 @@ static void fill_flow_event(struct flow_event *fe, struct packet_info *p_info,
417
417
}
418
418
419
419
/*
420
- * Attempt to create a new flow-state and push flow-opening message
420
+ * Attempt to create a new flow-state.
421
421
* Returns a pointer to the flow_state if successful, NULL otherwise
422
422
*/
423
- static struct flow_state * create_flow (struct network_tuple * flow , void * ctx ,
424
- struct packet_info * p_info ,
425
- struct flow_event * fe )
423
+ static struct flow_state * create_flow (struct packet_info * p_info )
426
424
{
427
425
struct flow_state new_state = { 0 };
428
-
429
426
new_state .last_timestamp = p_info -> time ;
427
+ new_state .opening_reason = p_info -> event_type == FLOW_EVENT_OPENING ?
428
+ p_info -> event_reason :
429
+ EVENT_REASON_FIRST_OBS_PCKT ;
430
+
430
431
if (bpf_map_update_elem (& flow_state , & p_info -> pid .flow , & new_state ,
431
432
BPF_NOEXIST ) != 0 )
432
433
return NULL ;
433
434
434
- if (p_info -> event_type != FLOW_EVENT_OPENING ) {
435
- p_info -> event_type = FLOW_EVENT_OPENING ;
436
- p_info -> event_reason = EVENT_REASON_FIRST_OBS_PCKT ;
437
- }
438
- fill_flow_event (fe , p_info , true);
439
- bpf_perf_event_output (ctx , & events , BPF_F_CURRENT_CPU , fe , sizeof (* fe ));
440
-
441
435
return bpf_map_lookup_elem (& flow_state , & p_info -> pid .flow );
442
436
}
443
437
444
438
static struct flow_state * update_flow (void * ctx , struct packet_info * p_info ,
445
439
struct flow_event * fe , bool * new_flow )
446
440
{
447
441
struct flow_state * f_state ;
442
+ bool has_opened ;
448
443
* new_flow = false;
449
444
450
445
f_state = bpf_map_lookup_elem (& flow_state , & p_info -> pid .flow );
446
+
447
+ // Flow is closing - attempt to delete state if it exists
448
+ if (p_info -> event_type == FLOW_EVENT_CLOSING ||
449
+ p_info -> event_type == FLOW_EVENT_CLOSING_BOTH ) {
450
+ if (!f_state )
451
+ return NULL ;
452
+
453
+ has_opened = f_state -> has_opened ;
454
+ if (bpf_map_delete_elem (& flow_state , & p_info -> pid .flow ) == 0 &&
455
+ has_opened ) {
456
+ fill_flow_event (fe , p_info , true);
457
+ bpf_perf_event_output (ctx , & events , BPF_F_CURRENT_CPU ,
458
+ fe , sizeof (* fe ));
459
+ }
460
+ return NULL ;
461
+ }
462
+
463
+ // Attempt to create flow if it does not exist
451
464
if (!f_state && p_info -> pid_valid ) {
452
465
* new_flow = true;
453
- f_state = create_flow (& p_info -> pid . flow , ctx , p_info , fe );
466
+ f_state = create_flow (p_info );
454
467
}
455
468
456
469
if (!f_state )
457
470
return NULL ;
458
471
472
+ // Update flow state
459
473
f_state -> sent_pkts ++ ;
460
474
f_state -> sent_bytes += p_info -> payload ;
461
475
462
476
return f_state ;
463
477
}
464
478
465
- static struct flow_state * update_rev_flow (struct packet_info * p_info )
479
+ static struct flow_state * update_rev_flow (void * ctx , struct packet_info * p_info ,
480
+ struct flow_event * fe )
466
481
{
467
482
struct flow_state * f_state ;
483
+ bool has_opened ;
468
484
469
485
f_state = bpf_map_lookup_elem (& flow_state , & p_info -> reply_pid .flow );
486
+
470
487
if (!f_state )
471
488
return NULL ;
472
489
490
+ // Close reverse flow
491
+ if (p_info -> event_type == FLOW_EVENT_CLOSING_BOTH ) {
492
+
493
+ has_opened = f_state -> has_opened ;
494
+ if (bpf_map_delete_elem (& flow_state , & p_info -> reply_pid .flow ) ==
495
+ 0 && has_opened ) {
496
+ fill_flow_event (fe , p_info , false);
497
+ bpf_perf_event_output (ctx , & events , BPF_F_CURRENT_CPU ,
498
+ fe , sizeof (* fe ));
499
+ }
500
+ return NULL ;
501
+ }
502
+
503
+ // Is a new flow, push opening flow message
504
+ if (!f_state -> has_opened ) {
505
+ f_state -> has_opened = true;
506
+
507
+ fe -> event_type = EVENT_TYPE_FLOW ;
508
+ fe -> flow = p_info -> pid .flow ;
509
+ fe -> timestamp = f_state -> last_timestamp ;
510
+ fe -> flow_event_type = FLOW_EVENT_OPENING ;
511
+ fe -> reason = f_state -> opening_reason ;
512
+ fe -> source = EVENT_SOURCE_PKT_DEST ;
513
+ fe -> reserved = 0 ;
514
+ bpf_perf_event_output (ctx , & events , BPF_F_CURRENT_CPU , fe ,
515
+ sizeof (* fe ));
516
+ }
517
+
518
+ // Update flow state
473
519
f_state -> rec_pkts ++ ;
474
520
f_state -> rec_bytes += p_info -> payload ;
475
521
@@ -607,34 +653,11 @@ static void pping(void *ctx, struct parsing_context *pctx)
607
653
if (parse_packet_identifier (pctx , & p_info ) < 0 )
608
654
return ;
609
655
610
- if (p_info .event_type != FLOW_EVENT_CLOSING &&
611
- p_info .event_type != FLOW_EVENT_CLOSING_BOTH ) {
612
- f_state = update_flow (ctx , & p_info , & fe , & new_flow );
613
- pping_timestamp_packet (f_state , ctx , pctx , & p_info , new_flow );
614
- }
656
+ f_state = update_flow (ctx , & p_info , & fe , & new_flow );
657
+ pping_timestamp_packet (f_state , ctx , pctx , & p_info , new_flow );
615
658
616
- f_state = update_rev_flow (& p_info );
659
+ f_state = update_rev_flow (ctx , & p_info , & fe );
617
660
pping_match_packet (f_state , ctx , pctx , & p_info );
618
-
619
- // Flow closing - try to delete flow state and push closing-event
620
- if (p_info .event_type == FLOW_EVENT_CLOSING ||
621
- p_info .event_type == FLOW_EVENT_CLOSING_BOTH ) {
622
- if (bpf_map_delete_elem (& flow_state , & p_info .pid .flow ) == 0 ) {
623
- fill_flow_event (& fe , & p_info , true);
624
- bpf_perf_event_output (ctx , & events , BPF_F_CURRENT_CPU ,
625
- & fe , sizeof (fe ));
626
- }
627
- }
628
-
629
- // Also close reverse flow
630
- if (p_info .event_type == FLOW_EVENT_CLOSING_BOTH ) {
631
- if (bpf_map_delete_elem (& flow_state , & p_info .reply_pid .flow ) ==
632
- 0 ) {
633
- fill_flow_event (& fe , & p_info , false);
634
- bpf_perf_event_output (ctx , & events , BPF_F_CURRENT_CPU ,
635
- & fe , sizeof (fe ));
636
- }
637
- }
638
661
}
639
662
640
663
// Programs
0 commit comments