aboutsummaryrefslogtreecommitdiff
path: root/Tools/portbuild/scripts/pollmachine
blob: c3438041c881601ff36b771955b90edbab746d17 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
#!/usr/bin/env python
#
# pollmachine
#
# Monitors build machines and notifies qmgr of changes

#
# pollmachine [options] [arch] ...
#   - update every machine in the mlist file for [arch]
#
# pollmachine [options] [arch/mach] ...
#   - update individual machine(s) for specified architecture
#
# options are:
#   -daemon : poll repeatedly
#   -queue  : update queue entries (XXX racy)

#
# TODO:
# XXX qmgr notification of new/removed machines
# XXX log state changes in daemon mode
# XXX clean up inactive builds
# XXX test thread shutdown
# XXX needed an explicit way to request setup?
# XXX signal handler

# * Deal with machines change OS/kernel version
#     - ACL list might change!
#          - take machine offline, update ACL/arch/etc, reboot, bring online

import sys, threading, socket
from popen2 import *
from time import sleep

if len(sys.argv) < 1:
    print "Usage: %s <arch> [<arch> ...]" % sys.argv[0]
    sys.exit(1)

arches=set()
mlist={}
polldelay=0
queue=0
for i in sys.argv[1:]:
    if i == "-daemon":
        polldelay = 30
        continue

    if i == "-queue":
        queue = 1
        continue

    if "/" in i:
        item=i.partition("/")
        arch=item[0]
        mach=item[2]
        arches.add(arch)
        try:
            mlist[arch].add(mach)
        except KeyError:
            mlist[arch] = set((mach,))
    else:
        arches.add(i)

pb="/var/portbuild"

# set of machines for each arch
machines={}
for i in arches:
    machines[i]=set()

# Mapping from machine names to monitor threads
pollthreads={}

class MachinePoll(threading.Thread):
    """ Poll a machine regularly """

    mach = None	# Which machine name to poll
    arch = None # Which arch is this assigned to

    # Which host/port to poll for this machine status (might be SSH
    # tunnel endpoint)
    host = None
    port = 414

    # Should we update queue entry?
    queue = None

    timeout = None	# How often to poll
    shutdown = False # Exit at next poll wakeup

    # State variables tracked
    online = False

    # Dictionary of variables reported by the client
    vars = None

    def __init__(self, mach, arch, timeout, host, port, queue):
        super(MachinePoll, self).__init__()
        self.mach = mach
        self.arch = arch
        self.timeout = timeout
        self.host = host
        self.port = port
        self.queue = queue

        self.vars = {}
    
    def run(self):
        while True:
            if self.shutdown:
                break

            self.poll()
            
            if not self.timeout:
                break
            else:
                sleep(self.timeout)

    def poll(self):
        """ Poll the status of this machine """

        nowonline = False
        lines = []
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect((self.host, self.port))
            f = s.makefile()
        
            lines = f.readlines()
            nowonline = True
        except:
            pass
        finally:
            try:
                s.close()
            except:
                pass

        if nowonline != self.online:
            print "State change: %s %s -> %s" % (self.mach, self.online, nowonline)
            self.online = nowonline
            # XXX inform qmgr of state change

        if self.online and not lines:
            # reportload script is missing
            dosetup=1
        else:
            dosetup=0

        for line in lines:
            line=line.rstrip()
            part=line.partition('=')
            if part[1] != '=' or not part[0]:
#                if "No such file or directory" in line:
#                    # Client may require setting up post-boot
#                    dosetup=1
                print "Bad input from %s: %s" % (self.mach, line)
                # Assume client needs setting up
                dosetup=1

            try:
                old = self.vars[part[0]]
            except KeyError:
                old = ""
            if old != part[2]:
                self.vars[part[0]] = part[2]
#                print "%s@%s: \"%s\" -> \"%s\"" % (part[0], self.mach, old, part[2])
                # XXX update qmgr

        if dosetup:
            print "Setting up %s" % (self.mach)
            (err, out) = self.setup()
            if err:
                print "Error from setup of %s:" % (self.mach)
                print out
            print "Setup of %s complete" % (self.mach)
            return
                            
        # Validate that arch has not changed (e.g. i386 -> amd64)
        try:
            if self.arch != self.vars['arch']:
                print "Machine %s reporting unexpected arch: %s -> %s" % (self.mach, self.arch, self.vars['arch'])
        except KeyError:
            pass

        # Record current system load
        try:
            f = file("%s/%s/loads/%s" % (pb, self.arch, self.mach), "w")
        except:
            return
        try:
            f.write("%s %s\n" % (self.vars['jobs'], self.vars['load']))
        except:
            pass
        f.close()

        if self.queue:
            try:
                f = file("%s/%s/queue/%s" % (pb, self.arch, self.mach), "w")
            except:
                return

            try:
                f.write("%s\n" % self.vars['jobs'])
            except:
                pass
            f.close()

    def setup(self):
        child = Popen4("su ports-%s -c \"/var/portbuild/scripts/dosetupnode %s - - %s\"" % (self.arch, self.arch, self.mach), 0)
        err = child.wait()
        out = "".join(child.fromchild.readlines())
        return (err, out)

while True:
    for arch in arches:
        try:
            now = mlist[arch]
        except KeyError:
            mlistfile="%s/%s/mlist" % (pb, arch)
            try:
                f = file(mlistfile, "r")
            except OSError, error:
                raise

            now=set(mach.rstrip() for mach in f.readlines())
            f.close()

        gone = machines[arch].difference(now)
        new = now.difference(machines[arch])

        machines[arch]=now

        for mach in gone:
            print "Removing machine %s" % mach
            # XXX disable from qmgr
            pollthreads[mach].shutdown=True
            del pollthreads[mach]

        for mach in new:
            print "Adding machine %s" % mach
            # XXX set up qmgr

            pc="%s/%s/portbuild.conf" % (pb, arch)
            pch="%s/%s/portbuild.%s" % (pb, arch, mach)
            config = Popen4("test -f %s && . %s; test -f %s && . %s; echo $infoseek_host; echo $infoseek_port" % (pc, pc, pch, pch))
            host=config.fromchild.readline().rstrip()
            if not host:
                host = mach
            port=config.fromchild.readline().rstrip()
            try:
                port = int(port)
            except (TypeError, ValueError):
                port = 414

            pollthreads[mach] = MachinePoll(mach, arch, polldelay, host, port, queue)
            pollthreads[mach].start()

    if not polldelay:
        break

    sleep(polldelay)