Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use complete port configs when plumbing mark rules #1432

Merged
merged 2 commits into from
Sep 22, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 141 additions & 43 deletions service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

func init() {
reexec.Register("fwmarker", fwMarker)
reexec.Register("redirecter", redirecter)
}

func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
Expand Down Expand Up @@ -275,6 +276,12 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
n := ep.getNetwork()
eIP := ep.Iface().Address()

if n.ingress {
if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil {
logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err)
}
}

if sb.ingress {
// For the ingress sandbox if this is not gateway
// endpoint do nothing.
Expand Down Expand Up @@ -380,17 +387,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
}

if addService {
var iPorts []*PortConfig
var filteredPorts []*PortConfig
if sb.ingress {
iPorts = filterPortConfigs(ingressPorts, false)
if err := programIngress(gwIP, iPorts, false); err != nil {
filteredPorts = filterPortConfigs(ingressPorts, false)
if err := programIngress(gwIP, filteredPorts, false); err != nil {
logrus.Errorf("Failed to add ingress: %v", err)
return
}
}

logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil {
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return
}
Expand Down Expand Up @@ -453,15 +460,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
}

var iPorts []*PortConfig
var filteredPorts []*PortConfig
if sb.ingress {
iPorts = filterPortConfigs(ingressPorts, true)
if err := programIngress(gwIP, iPorts, true); err != nil {
filteredPorts = filterPortConfigs(ingressPorts, true)
if err := programIngress(gwIP, filteredPorts, true); err != nil {
logrus.Errorf("Failed to delete ingress: %v", err)
}
}

if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
}
}
Expand Down Expand Up @@ -715,33 +722,57 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error {
return nil
}

func writePortsToFile(ports []*PortConfig) (string, error) {
f, err := ioutil.TempFile("", "port_configs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we removing these files somewhere, or are they just accumulating in the tmp directory ?
Can the file creation frequency be that high that we should worry about removing them ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not removing them but that has been the case forever since this code was there. We can fix it here though.

if err != nil {
return "", err
}
defer f.Close()

buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ports,
})

n, err := f.Write(buf)
if err != nil {
return "", err
}

if n < len(buf) {
return "", io.ErrShortWrite
}

return f.Name(), nil
}

func readPortsFromFile(fileName string) ([]*PortConfig, error) {
buf, err := ioutil.ReadFile(fileName)
if err != nil {
return nil, err
}

var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if err != nil {
return nil, err
}

return epRec.IngressPorts, nil
}

// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var ingressPortsFile string
if len(ingressPorts) != 0 {
f, err := ioutil.TempFile("", "port_configs")
if err != nil {
return err
}

buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ingressPorts,
})

n, err := f.Write(buf)
if len(ingressPorts) != 0 {
var err error
ingressPortsFile, err = writePortsToFile(ingressPorts)
if err != nil {
f.Close()
return err
}

if n < len(buf) {
f.Close()
return io.ErrShortWrite
}

ingressPortsFile = f.Name()
f.Close()
defer os.Remove(ingressPortsFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

addDelOpt := "-A"
Expand Down Expand Up @@ -775,20 +806,12 @@ func fwMarker() {

var ingressPorts []*PortConfig
if os.Args[5] != "" {
buf, err := ioutil.ReadFile(os.Args[5])
var err error
ingressPorts, err = readPortsFromFile(os.Args[5])
if err != nil {
logrus.Errorf("Failed to read ports config file: %v", err)
logrus.Errorf("Failed reading ingress ports file: %v", err)
os.Exit(6)
}

var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if err != nil {
logrus.Errorf("Failed to unmarshal ports config data: %v", err)
os.Exit(7)
}

ingressPorts = epRec.IngressPorts
}

vip := os.Args[2]
Expand All @@ -801,11 +824,7 @@ func fwMarker() {

rules := [][]string{}
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)

rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
rules = append(rules, rule)
}
Expand Down Expand Up @@ -852,3 +871,82 @@ func fwMarker() {
}
}
}

func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error {
var ingressPortsFile string

if len(ingressPorts) != 0 {
var err error
ingressPortsFile, err = writePortsToFile(ingressPorts)
if err != nil {
return err
}
defer os.Remove(ingressPortsFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile),
Stdout: os.Stdout,
Stderr: os.Stderr,
}

if err := cmd.Run(); err != nil {
return fmt.Errorf("reexec failed: %v", err)
}

return nil
}

// Redirecter reexec function.
func redirecter() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

if len(os.Args) < 4 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}

var ingressPorts []*PortConfig
if os.Args[3] != "" {
var err error
ingressPorts, err = readPortsFromFile(os.Args[3])
if err != nil {
logrus.Errorf("Failed reading ingress ports file: %v", err)
os.Exit(2)
}
}

eIP, _, err := net.ParseCIDR(os.Args[2])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err)
os.Exit(3)
}

rules := [][]string{}
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d",
eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)
}

ns, err := netns.GetFromPath(os.Args[1])
if err != nil {
logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
os.Exit(4)
}
defer ns.Close()

if err := netns.Set(ns); err != nil {
logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
os.Exit(5)
}

for _, rule := range rules {
if err := iptables.RawCombinedOutputNative(rule...); err != nil {
logrus.Errorf("setting up rule failed, %v: %v", rule, err)
os.Exit(5)
}
}
}