analysis.py 15.7 KB
Newer Older
Rob Jansen's avatar
Rob Jansen committed
1
'''
2
  OnionPerf
Rob Jansen's avatar
Rob Jansen committed
3
  Authored by Rob Jansen, 2015
4
  See LICENSE for licensing information
Rob Jansen's avatar
Rob Jansen committed
5
6
'''

Karsten Loesing's avatar
Karsten Loesing committed
7
import os, re, json, datetime, logging
8
9

from abc import ABCMeta, abstractmethod
Rob Jansen's avatar
Rob Jansen committed
10

11
12
# stem imports
from stem import CircEvent, CircStatus, CircPurpose, StreamStatus
13
from stem.response.events import CircuitEvent, CircMinorEvent, StreamEvent, BuildTimeoutSetEvent
14
from stem.response import ControlMessage, convert
15

16
17
18
# tgentools imports
from tgentools.analysis import Analysis, TGenParser

19
# onionperf imports
Karsten Loesing's avatar
Karsten Loesing committed
20
from . import util
Rob Jansen's avatar
Rob Jansen committed
21

22
class OPAnalysis(Analysis):
Rob Jansen's avatar
Rob Jansen committed
23

24
    def __init__(self, nickname=None, ip_address=None):
25
        super().__init__(nickname, ip_address)
Karsten Loesing's avatar
Karsten Loesing committed
26
        self.json_db = {'type': 'onionperf', 'version': '3.0', 'data': {}}
27
28
29
30
        self.torctl_filepaths = []

    def add_torctl_file(self, filepath):
        self.torctl_filepaths.append(filepath)
31

32
    def analyze(self, date_filter=None):
33
34
35
36
        if self.did_analysis:
            return

        self.date_filter = date_filter
37
        super().analyze(do_complete=True)
38
39
        torctl_parser = TorCtlParser(date_filter=self.date_filter)

40
        for (filepaths, parser, json_db_key) in [(self.torctl_filepaths, torctl_parser, 'tor')]:
41
42
43
            if len(filepaths) > 0:
                for filepath in filepaths:
                    logging.info("parsing log file at {0}".format(filepath))
44
                    parser.parse(util.DataSource(filepath))
45
46
47
48
49
50
51
52
53
54
55
56
57

                if self.nickname is None:
                    parsed_name = parser.get_name()
                    if parsed_name is not None:
                        self.nickname = parsed_name
                    elif self.hostname is not None:
                        self.nickname = self.hostname
                    else:
                        self.nickname = "unknown"

                if self.measurement_ip is None:
                    self.measurement_ip = "unknown"

Karsten Loesing's avatar
Karsten Loesing committed
58
                self.json_db['data'].setdefault(self.nickname, {'measurement_ip': self.measurement_ip}).setdefault(json_db_key, parser.get_data())
59
60
61
        self.json_db['data'][self.nickname]["tgen"].pop("heartbeats")
        self.json_db['data'][self.nickname]["tgen"].pop("init_ts")
        self.json_db['data'][self.nickname]["tgen"].pop("stream_summary")
62
63
        self.did_analysis = True

Rob Jansen's avatar
Rob Jansen committed
64

65
    def save(self, filename=None, output_prefix=os.getcwd(), do_compress=True, date_prefix=None):
66
        if filename is None:
67
68
69
70
71
            base_filename = "onionperf.analysis.json.xz"
            if date_prefix is not None:
                filename = "{0}.{1}".format(util.date_to_string(date_prefix), base_filename)
            elif self.date_filter is not None:
                filename = "{0}.{1}".format(util.date_to_string(self.date_filter), base_filename)
72
            else:
73
                filename = base_filename
74

75
76
77
        filepath = os.path.abspath(os.path.expanduser("{0}/{1}".format(output_prefix, filename)))
        if not os.path.exists(output_prefix):
            os.makedirs(output_prefix)
Rob Jansen's avatar
Rob Jansen committed
78

79
        logging.info("saving analysis results to {0}".format(filepath))
Rob Jansen's avatar
Rob Jansen committed
80

81
82
83
        outf = util.FileWritable(filepath, do_compress=do_compress)
        json.dump(self.json_db, outf, sort_keys=True, separators=(',', ': '), indent=2)
        outf.close()
Rob Jansen's avatar
Rob Jansen committed
84

85
        logging.info("done!")
86

87
88
89
90
91
92
93

    def get_tgen_streams(self, node):
        try:
            return self.json_db['data'][node]['tgen']['streams']
        except:
            return None

94
95
96
97
98
99
    def get_tgen_transfers(self, node):
        try:
            return self.json_db['data'][node]['tgen']['transfers']
        except:
            return None

100
101
102
103
104
105
    def get_tor_streams(self, node):
        try:
            return self.json_db['data'][node]['tor']['streams']
        except:
            return None

Rob Jansen's avatar
Rob Jansen committed
106
    @classmethod
107
    def load(cls, filename="onionperf.analysis.json.xz", input_prefix=os.getcwd()):
108
        filepath = os.path.abspath(os.path.expanduser("{0}".format(filename)))
109
        if not os.path.exists(filepath):
110
111
112
113
            filepath = os.path.abspath(os.path.expanduser("{0}/{1}".format(input_prefix, filename)))
            if not os.path.exists(filepath):
                logging.warning("file does not exist at '{0}'".format(filepath))
                return None
114
115
116
117
118
119
120
121
122
123
124
125
126

        logging.info("loading analysis results from {0}".format(filepath))

        inf = util.DataSource(filepath)
        inf.open()
        db = json.load(inf.get_file_handle())
        inf.close()

        logging.info("done!")

        if 'type' not in db or 'version' not in db:
            logging.warning("'type' or 'version' not present in database")
            return None
Ana Custura's avatar
Ana Custura committed
127
        elif db['type'] != 'onionperf' or str(db['version']) >= '4.':
128
129
130
131
132
133
134
            logging.warning("type or version not supported (type={0}, version={1})".format(db['type'], db['version']))
            return None
        else:
            analysis_instance = cls()
            analysis_instance.json_db = db
            return analysis_instance

Karsten Loesing's avatar
Karsten Loesing committed
135
class Parser(object, metaclass=ABCMeta):
136
    @abstractmethod
137
    def parse(self, source):
138
139
140
141
142
143
144
        pass
    @abstractmethod
    def get_data(self):
        pass
    @abstractmethod
    def get_name(self):
        pass
Rob Jansen's avatar
Rob Jansen committed
145
146


147
class TorStream(object):
148
    def __init__(self, sid):
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
        self.stream_id = sid
        self.circuit_id = None
        self.unix_ts_start = None
        self.unix_ts_end = None
        self.failure_reason_local = None
        self.failure_reason_remote = None
        self.source = None
        self.target = None
        self.elapsed_seconds = []
        self.last_purpose = None

    def add_event(self, purpose, status, arrived_at):
        if purpose is not None:
            self.last_purpose = purpose
        key = "{0}:{1}".format(self.last_purpose, status)
        self.elapsed_seconds.append([key, arrived_at])

    def set_circ_id(self, circ_id):
        if circ_id is not None:
            self.circuit_id = circ_id

    def set_start_time(self, unix_ts):
        if self.unix_ts_start is None:
            self.unix_ts_start = unix_ts

    def set_end_time(self, unix_ts):
        self.unix_ts_end = unix_ts

    def set_local_failure(self, reason):
        self.failure_reason_local = reason

    def set_remote_failure(self, reason):
        self.failure_reason_remote = reason

    def set_target(self, target):
        self.target = target

    def set_source(self, source):
        self.source = source

    def get_data(self):
        if self.unix_ts_start is None or self.unix_ts_end is None:
191
            return None
192
193
194
195
196
197
198
199
200
        d = self.__dict__
        for item in d['elapsed_seconds']:
            item[1] = item[1] - self.unix_ts_start
        del(d['last_purpose'])
        if d['failure_reason_local'] is None: del(d['failure_reason_local'])
        if d['failure_reason_remote'] is None: del(d['failure_reason_remote'])
        if d['source'] is None: del(d['source'])
        if d['target'] is None: del(d['target'])
        return d
201
202
203
204

    def __str__(self):
        return('stream id=%d circ_id=%s %s' % (self.id, self.circ_id,
               ' '.join(['%s=%s' % (event, arrived_at)
205
               for (event, arrived_at) in sorted(self.elapsed_seconds, key=lambda item: item[1])])))
206

207
class TorCircuit(object):
208
    def __init__(self, cid):
209
210
211
212
213
214
215
216
217
218
        self.circuit_id = cid
        self.unix_ts_start = None
        self.unix_ts_end = None
        self.failure_reason_local = None
        self.failure_reason_remote = None
        self.buildtime_seconds = None
        self.build_timeout = None
        self.build_quantile = None
        self.elapsed_seconds = []
        self.path = []
219
220

    def add_event(self, event, arrived_at):
221
        self.elapsed_seconds.append([str(event), arrived_at])
222

223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
    def add_hop(self, hop, arrived_at):
        self.path.append(["${0}~{1}".format(hop[0], hop[1]), arrived_at])

    def set_launched(self, unix_ts, build_timeout, build_quantile):
        if self.unix_ts_start is None:
            self.unix_ts_start = unix_ts
        self.build_timeout = build_timeout
        self.build_quantile = build_quantile

    def set_end_time(self, unix_ts):
        self.unix_ts_end = unix_ts

    def set_local_failure(self, reason):
        self.failure_reason_local = reason

    def set_remote_failure(self, reason):
        self.failure_reason_remote = reason

    def set_build_time(self, unix_ts):
        if self.buildtime_seconds is None:
            self.buildtime_seconds = unix_ts

    def get_data(self):
        if self.unix_ts_start is None or self.unix_ts_end is None:
247
            return None
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
        d = self.__dict__
        for item in d['elapsed_seconds']:
            item[1] = item[1] - self.unix_ts_start
        for item in d['path']:
            item[1] = item[1] - self.unix_ts_start
        if d['buildtime_seconds'] is None:
            del(d['buildtime_seconds'])
        else:
            d['buildtime_seconds'] = self.buildtime_seconds - self.unix_ts_start
        if len(d['path']) == 0: del(d['path'])
        if d['failure_reason_local'] is None: del(d['failure_reason_local'])
        if d['failure_reason_remote'] is None: del(d['failure_reason_remote'])
        if d['build_timeout'] is None: del(d['build_timeout'])
        if d['build_quantile'] is None: del(d['build_quantile'])
        return d
Rob Jansen's avatar
Rob Jansen committed
263

264
265
266
    def __str__(self):
        return('circuit id=%d %s' % (self.id, ' '.join(['%s=%s' %
               (event, arrived_at) for (event, arrived_at) in
267
               sorted(self.elapsed_seconds, key=lambda item: item[1])])))
Rob Jansen's avatar
Rob Jansen committed
268

269
class TorCtlParser(Parser):
Rob Jansen's avatar
Rob Jansen committed
270

271
272
    def __init__(self, date_filter=None):
        ''' date_filter should be given in UTC '''
273
        self.circuits_state = {}
274
        self.circuits = {}
275
276
        self.streams_state = {}
        self.streams = {}
Rob Jansen's avatar
Rob Jansen committed
277
278
        self.name = None
        self.boot_succeeded = False
279
280
        self.build_timeout_last = None
        self.build_quantile_last = None
281
        self.date_filter = date_filter
Rob Jansen's avatar
Rob Jansen committed
282

283
284
    def __handle_circuit(self, event, arrival_dt):
        # first make sure we have a circuit object
285
        cid = int(event.id)
286
        circ = self.circuits_state.setdefault(cid, TorCircuit(cid))
287
288
        is_hs_circ = True if event.purpose in (CircPurpose.HS_CLIENT_INTRO, CircPurpose.HS_CLIENT_REND, \
                                   CircPurpose.HS_SERVICE_INTRO, CircPurpose.HS_SERVICE_REND) else False
289
290
291

        # now figure out what status we want to track
        key = None
292
293
        if isinstance(event, CircuitEvent):
            if event.status == CircStatus.LAUNCHED:
294
295
296
297
298
                circ.set_launched(arrival_dt, self.build_timeout_last, self.build_quantile_last)

            key = "{0}:{1}".format(event.purpose, event.status)
            circ.add_event(key, arrival_dt)

299
            if event.status == CircStatus.EXTENDED:
300
                circ.add_hop(event.path[-1], arrival_dt)
301
            elif event.status == CircStatus.FAILED:
302
303
304
                circ.set_local_failure(event.reason)
                if event.remote_reason is not None and event.remote_reason != '':
                    circ.set_remote_failure(event.remote_reason)
305
            elif event.status == CircStatus.BUILT:
306
307
308
309
310
311
312
                circ.set_build_time(arrival_dt)
                if is_hs_circ:
                    key = event.hs_state
                    if event.rend_query is not None and event.rend_query != '':
                        key = "{0}:{1}".format(key, event.rend_query)
                    circ.add_event(key, arrival_dt)

313
            if event.status == CircStatus.CLOSED or event.status == CircStatus.FAILED:
314
315
316
317
318
                circ.set_end_time(arrival_dt)
                started, built, ended = circ.unix_ts_start, circ.buildtime_seconds, circ.unix_ts_end

                data = circ.get_data()
                if data is not None:
319
                    self.circuits[cid] = data
320
321
                self.circuits_state.pop(cid)

322
        elif isinstance(event, CircMinorEvent):
323
            if event.purpose != event.old_purpose or event.event != CircEvent.PURPOSE_CHANGED:
324
325
326
327
328
329
330
331
                key = "{0}:{1}".format(event.event, event.purpose)
                circ.add_event(key, arrival_dt)

            if is_hs_circ:
                key = event.hs_state
                if event.rend_query is not None and event.rend_query != '':
                    key = "{0}:{1}".format(key, event.rend_query)
                circ.add_event(key, arrival_dt)
332
333
334

    def __handle_stream(self, event, arrival_dt):
        sid = int(event.id)
335
        strm = self.streams_state.setdefault(sid, TorStream(sid))
336
337
338
339
340
341
342

        if event.circ_id is not None:
            strm.set_circ_id(event.circ_id)

        strm.add_event(event.purpose, event.status, arrival_dt)
        strm.set_target(event.target)

343
        if event.status == StreamStatus.NEW or event.status == StreamStatus.NEWRESOLVE:
344
345
            strm.set_start_time(arrival_dt)
            strm.set_source(event.source_addr)
346
        elif event.status == StreamStatus.FAILED:
347
348
349
350
            strm.set_local_failure(event.reason)
            if event.remote_reason is not None and event.remote_reason != '':
                strm.set_remote_failure(event.remote_reason)

351
        if event.status == StreamStatus.CLOSED or event.status == StreamStatus.FAILED:
352
353
354
355
356
357
            strm.set_end_time(arrival_dt)
            stream_type = strm.last_purpose
            started, ended = strm.unix_ts_start, strm.unix_ts_end

            data = strm.get_data()
            if data is not None:
358
                self.streams[sid] = data
359
360
361
362
363
            self.streams_state.pop(sid)

    def __handle_buildtimeout(self, event, arrival_dt):
        self.build_timeout_last = event.timeout
        self.build_quantile_last = event.quantile
364
365

    def __handle_event(self, event, arrival_dt):
366
        if isinstance(event, (CircuitEvent, CircMinorEvent)):
367
            self.__handle_circuit(event, arrival_dt)
368
        elif isinstance(event, StreamEvent):
369
            self.__handle_stream(event, arrival_dt)
370
        elif isinstance(event, BuildTimeoutSetEvent):
371
            self.__handle_buildtimeout(event, arrival_dt)
372

373
374
375
376
377
378
379
380
381
    def __is_date_valid(self, date_to_check):
        if self.date_filter is None:
            # we are not asked to filter, so every date is valid
            return True
        else:
            # we are asked to filter, so the line is only valid if the date matches the filter
            # both the filter and the unix timestamp should be in UTC at this point
            return util.do_dates_match(self.date_filter, date_to_check)

382
    def __parse_line(self, line):
383
384
385
386
387
388
389
390
391
392
393
        if not self.boot_succeeded:
            if re.search("Starting\storctl\sprogram\son\shost", line) is not None:
                parts = line.strip().split()
                if len(parts) < 11:
                    return True
                self.name = parts[10]
            if re.search("Bootstrapped\s100", line) is not None:
                self.boot_succeeded = True
            elif re.search("BOOTSTRAP", line) is not None and re.search("PROGRESS=100", line) is not None:
                self.boot_succeeded = True

394
395
396
397
398
399
400
        # parse with stem
        timestamps, sep, raw_event_str = line.partition(" 650 ")
        if sep == '':
            return True

        # event.arrived_at is also available but at worse granularity
        unix_ts = float(timestamps.strip().split()[2])
401

402
403
404
405
        # check if we should ignore the line
        line_date = datetime.datetime.utcfromtimestamp(unix_ts).date()
        if not self.__is_date_valid(line_date):
            return True
406

407
408
409
        event = ControlMessage.from_str("{0} {1}".format(sep.strip(), raw_event_str))
        convert('EVENT', event)
        self.__handle_event(event, unix_ts)
410

411
        return True
412

413
    def parse(self, source):
414
        source.open(newline='\r\n')
415
416
417
        for line in source:
            # ignore line parsing errors
            try:
418
                if self.__parse_line(line):
419
420
421
422
423
424
                    continue
                else:
                    break
            except:
                continue
        source.close()
425

426
    def get_data(self):
427
        return {'circuits': self.circuits, 'streams': self.streams}
Rob Jansen's avatar
Rob Jansen committed
428

429
430
    def get_name(self):
        return self.name