forked from kytos/flow_manager
-
Notifications
You must be signed in to change notification settings - Fork 7
/
__init__.py
146 lines (120 loc) · 4.38 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""DB models."""
# pylint: disable=unused-argument,invalid-name,unused-argument
# pylint: disable=no-self-argument,no-name-in-module
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import List, Optional, Union
from bson.decimal128 import Decimal128
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
class FlowEntryState(Enum):
"""Enum for stored Flow Entry states."""
PENDING = "pending" # initial state, it has been stored, but not confirmed yet
INSTALLED = "installed" # final state, when the installtion has been confirmed
DELETED = "deleted" # final state when the flow gets soft deleted
class DocumentBaseModel(BaseModel):
"""DocumentBaseModel."""
id: str = Field(None, alias="_id")
inserted_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
def model_dump(self, **kwargs) -> dict:
"""Model to dict."""
values = super().model_dump(**kwargs)
if "id" in values and values["id"]:
values["_id"] = values["id"]
if "exclude" in kwargs and "_id" in kwargs["exclude"]:
values.pop("_id")
return values
class FlowCheckDoc(DocumentBaseModel):
"""FlowCheckDoc."""
state: str = Field(default="active")
class MatchSubDoc(BaseModel):
"""Match DB SubDocument Model."""
in_port: Optional[int] = None
dl_src: Optional[str] = None
dl_dst: Optional[str] = None
dl_type: Optional[int] = None
dl_vlan: Optional[Union[int, str]] = None
dl_vlan_pcp: Optional[int] = None
nw_src: Optional[str] = None
nw_dst: Optional[str] = None
nw_proto: Optional[int] = None
tp_src: Optional[int] = None
tp_dst: Optional[int] = None
in_phy_port: Optional[int] = None
ip_dscp: Optional[int] = None
ip_ecn: Optional[int] = None
udp_src: Optional[int] = None
udp_dst: Optional[int] = None
sctp_src: Optional[int] = None
sctp_dst: Optional[int] = None
icmpv4_type: Optional[int] = None
icmpv4_code: Optional[int] = None
arp_op: Optional[int] = None
arp_spa: Optional[str] = None
arp_tpa: Optional[str] = None
arp_sha: Optional[str] = None
arp_tha: Optional[str] = None
ipv6_src: Optional[str] = None
ipv6_dst: Optional[str] = None
ipv6_flabel: Optional[int] = None
icmpv6_type: Optional[int] = None
icmpv6_code: Optional[int] = None
nd_tar: Optional[int] = None
nd_sll: Optional[int] = None
nd_tll: Optional[int] = None
mpls_lab: Optional[int] = None
mpls_tc: Optional[int] = None
mpls_bos: Optional[int] = None
pbb_isid: Optional[int] = None
v6_hdr: Optional[int] = None
metadata: Optional[int] = None
tun_id: Optional[int] = None
@field_validator("dl_vlan")
@classmethod
def vlan_with_mask(cls, v):
"""Validate vlan format"""
try:
return int(v)
except ValueError:
try:
[int(part) for part in v.split("/")]
except ValueError:
raise ValueError(
"must be an integer or an integer with a mask in format vlan/mask"
)
return v
class FlowSubDoc(BaseModel):
"""Flow DB SubDocument Model."""
table_id: int = 0
owner: Optional[str] = None
table_group: str = "base"
priority: int = 0x8000
cookie: Decimal128 = Decimal128("0")
idle_timeout: int = 0
hard_timeout: int = 0
match: Optional[MatchSubDoc] = None
actions: Optional[List[dict]] = None
instructions: Optional[List[dict]] = None
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_validator("cookie", mode="before")
@classmethod
def preset_cookie(cls, v, values, **kwargs) -> Decimal128:
"""Preset cookie."""
if isinstance(v, (int, str)):
return Decimal128(Decimal(v))
return v
@model_validator(mode="after")
def validate_actions_intructions(self) -> dict:
"""Validate that actions and intructions are mutually exclusive"""
if self.actions is not None and self.instructions is not None:
raise ValueError(
'Cannot have both "actions" and "instructions" at the same time'
)
return self
class FlowDoc(DocumentBaseModel):
"""Flow DB Document Model."""
switch: str
flow_id: str
flow: FlowSubDoc
state: str = FlowEntryState.PENDING.value