-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain.go
168 lines (151 loc) · 4.89 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package main
import (
"bufio"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"log/syslog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"net"
"strings"
"golang.org/x/build/kubernetes/api"
)
// Stream : structure for holding the stream of data coming from OpenShift
type Stream struct {
Type string `json:"type,omitempty"`
Event api.Event `json:"object"`
}
func main() {
apiAddr := os.Getenv("OPENSHIFT_API_URL")
apiToken := os.Getenv("OPENSHIFT_TOKEN")
syslogServer := os.Getenv("SYSLOG_SERVER")
syslogProto := strings.ToLower(os.Getenv("SYSLOG_PROTO"))
syslogTag := strings.ToUpper(os.Getenv("SYSLOG_TAG"))
ignoreSSL := strings.ToUpper(os.Getenv("IGNORE_SSL"))
debugFlag := strings.ToUpper(os.Getenv("DEBUG"))
// enable signal trapping
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c,
syscall.SIGINT, // Ctrl+C
syscall.SIGTERM, // Termination Request
syscall.SIGSEGV, // FullDerp
syscall.SIGABRT, // Abnormal termination
syscall.SIGILL, // illegal instruction
syscall.SIGFPE) // floating point
sig := <-c
log.Fatalf("Signal (%v) Detected, Shutting Down", sig)
}()
// check and make sure we have the minimum config information before continuing
if apiAddr == "" {
// use the default internal cluster URL if not defined
apiAddr = "https://openshift.default.svc.cluster.local"
ignoreSSL = "TRUE"
log.Print("Missing environment variable OPENSHIFT_API_URL. Using default API URL")
}
if apiToken == "" {
// if we dont set it in the environment variable, read it out of
// /var/run/secrets/kubernetes.io/serviceaccount/token
log.Print("Missing environment variable OPENSHIFT_TOKEN. Leveraging serviceaccount token")
fileData, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
if err != nil {
log.Fatal("Service Account token does not exist.")
}
apiToken = string(fileData)
}
if syslogTag == "" {
// we don't need to error out here, but we do need to set a default if the variable isn't defined
syslogTag = "OSE"
}
if ignoreSSL == "" {
// we don't need to error out here, but we do need to set a default if the variable isn't defined
ignoreSSL = "FALSE"
}
if debugFlag == "" {
// we don't need to error out here, but we do need to set a default if the variable isn't defined
debugFlag = "FALSE"
}
if (syslogProto == "") || (syslogProto == "tcp") || (syslogProto == "udp") {
// we don't need to error out here, but we do need to set a default if the variable isn't defined
if syslogProto == "" {
syslogProto = "udp"
} else {
log.Printf("Will use %s for syslog protocol", syslogProto)
}
} else {
log.Fatalf("SYSLOG_PROTO must be either blank, or tcp or udp not %s", syslogProto)
}
// Setup syslog connection only if syslogServer is defined
if syslogServer != "" {
sysLog, err := syslog.Dial(syslogProto, syslogServer,
syslog.LOG_WARNING|syslog.LOG_DAEMON, syslogTag)
if err != nil {
log.Printf("Error connecting to %s", syslogServer)
log.Fatal(err)
} else {
log.Printf("Event Forwarder configured to send all events to %s using tag %s", syslogServer, syslogTag)
if debugFlag == "TRUE" {
// dump the data to stdout AND syslog for testing.
log.SetOutput(io.MultiWriter(sysLog, os.Stdout))
ipAddr, _ := net.LookupHost(syslogServer)
log.Printf("Connecting to IP address: %v\n", ipAddr)
} else {
log.SetOutput(sysLog)
}
}
} else {
log.Print("SYSLOG_SERVER environment variable not set. Sending all output to console.")
}
// setup ose connection
var client http.Client
if ignoreSSL == "TRUE" {
tr := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
client = http.Client{Transport: tr}
} else {
client = http.Client{}
}
req, err := http.NewRequest("GET", apiAddr+"/api/v1/events?watch=true", nil)
if err != nil {
log.Fatal("## Error while opening connection to openshift api", err)
}
req.Header.Add("Authorization", "Bearer "+apiToken)
for {
resp, err := client.Do(req)
if err != nil {
log.Println("## Error while connecting to:", apiAddr, err)
time.Sleep(5 * time.Second)
continue
}
streamStart := time.Now()
reader := bufio.NewReader(resp.Body)
for {
line, err := reader.ReadBytes('\n')
if err != nil {
log.Println("## Error reading from response stream.", err, line)
resp.Body.Close()
break
}
event := Stream{}
decErr := json.Unmarshal(line, &event)
if decErr != nil {
log.Println("## Error decoding json.", err)
resp.Body.Close()
break
}
// Kubernetes sends all data from ETCD, we only want the logs since the stream started
if event.Event.LastTimestamp.Time.After(streamStart) {
fmt.Printf("%v | Project: %v | Name: %v | Kind: %v | Reason: %v | Message: %v\n",
event.Event.LastTimestamp.Format(time.RFC3339),
event.Event.Namespace, event.Event.InvolvedObject.Name,
event.Event.Kind, event.Event.Reason, event.Event.Message)
}
}
}
}