metrics: rework to store points as named tuple and write in batches.
Savings of around 400MB per 10,000 requests. Using a named tuple was the original approach for this reason, but the influxdb interface requires dict()s and it seemed silly to spend time converting them. Additionally, influxdb client already does batching. Unfortunately, with the amount of data processed for Factory that will continue to grow this approach is necessary. The dict() final structures are buffered up to ~1000 before being written and released. Another benefit of the batching is that influxdb does not allocate memory for the entire incoming batch.
This commit is contained in:
parent
8a87becd55
commit
26883b12d2
76
metrics.py
76
metrics.py
@ -19,6 +19,7 @@ from osclib.conf import Config
|
|||||||
from osclib.stagingapi import StagingAPI
|
from osclib.stagingapi import StagingAPI
|
||||||
|
|
||||||
SOURCE_DIR = os.path.dirname(os.path.realpath(__file__))
|
SOURCE_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
Point = namedtuple('Point', ['measurement', 'tags', 'fields', 'time', 'delta'])
|
||||||
|
|
||||||
# Duplicate Leap config to handle 13.2 without issue.
|
# Duplicate Leap config to handle 13.2 without issue.
|
||||||
osclib.conf.DEFAULT[
|
osclib.conf.DEFAULT[
|
||||||
@ -80,13 +81,7 @@ points = []
|
|||||||
|
|
||||||
def point(measurement, fields, datetime, tags={}, delta=False):
|
def point(measurement, fields, datetime, tags={}, delta=False):
|
||||||
global points
|
global points
|
||||||
points.append({
|
points.append(Point(measurement, tags, fields, timestamp(datetime), delta))
|
||||||
'measurement': measurement,
|
|
||||||
'tags': tags,
|
|
||||||
'fields': fields,
|
|
||||||
'time': timestamp(datetime),
|
|
||||||
'delta': delta,
|
|
||||||
})
|
|
||||||
|
|
||||||
def timestamp(datetime):
|
def timestamp(datetime):
|
||||||
return int(datetime.strftime('%s'))
|
return int(datetime.strftime('%s'))
|
||||||
@ -237,9 +232,8 @@ def ingest_requests(api, project):
|
|||||||
else:
|
else:
|
||||||
print('unable to find priority history entry for {} to {}'.format(request.get('id'), priority.text))
|
print('unable to find priority history entry for {} to {}'.format(request.get('id'), priority.text))
|
||||||
|
|
||||||
walk_points(points, project)
|
print('finalizing {:,} points'.format(len(points)))
|
||||||
|
return walk_points(points, project)
|
||||||
return points
|
|
||||||
|
|
||||||
def who_workaround(request, review, relax=False):
|
def who_workaround(request, review, relax=False):
|
||||||
# Super ugly workaround for incorrect and missing data:
|
# Super ugly workaround for incorrect and missing data:
|
||||||
@ -266,37 +260,59 @@ def who_workaround(request, review, relax=False):
|
|||||||
|
|
||||||
return who
|
return who
|
||||||
|
|
||||||
|
# Walk data points in order by time, adding up deltas and merging points at
|
||||||
|
# the same time. Data is converted to dict() and written to influx batches to
|
||||||
|
# avoid extra memory usage required for all data in dict() and avoid influxdb
|
||||||
|
# allocating memory for entire incoming data set at once.
|
||||||
def walk_points(points, target):
|
def walk_points(points, target):
|
||||||
|
global client
|
||||||
|
# Wait until just before writing to drop database.
|
||||||
|
client.drop_database(client._database)
|
||||||
|
client.create_database(client._database)
|
||||||
|
|
||||||
counters = {}
|
counters = {}
|
||||||
final = []
|
final = []
|
||||||
for point in sorted(points, key=lambda l: l['time']):
|
time_last = None
|
||||||
if not point['delta']:
|
wrote = 0
|
||||||
final.append(point)
|
for point in sorted(points, key=lambda l: l.time):
|
||||||
|
if point.time != time_last and len(final) >= 1000:
|
||||||
|
# Write final point in batches of ~1000, but guard against writing
|
||||||
|
# when in the middle of points at the same time as they may end up
|
||||||
|
# being merged. As such the previous time should not match current.
|
||||||
|
client.write_points(final, 's')
|
||||||
|
wrote += len(final)
|
||||||
|
final = []
|
||||||
|
time_last = point.time
|
||||||
|
|
||||||
|
if not point.delta:
|
||||||
|
final.append(dict(point._asdict()))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# A more generic method like 'key' which ended up being needed is likely better.
|
# A more generic method like 'key' which ended up being needed is likely better.
|
||||||
measurement = counters_tag_key = point['measurement']
|
measurement = counters_tag_key = point.measurement
|
||||||
if measurement == 'staging':
|
if measurement == 'staging':
|
||||||
counters_tag_key += point['tags']['id']
|
counters_tag_key += point.tags['id']
|
||||||
elif measurement == 'review_count':
|
elif measurement == 'review_count':
|
||||||
counters_tag_key += '_'.join(point['tags']['key'])
|
counters_tag_key += '_'.join(point.tags['key'])
|
||||||
elif measurement == 'priority':
|
elif measurement == 'priority':
|
||||||
counters_tag_key += point['tags']['level']
|
counters_tag_key += point.tags['level']
|
||||||
counters_tag = counters.setdefault(counters_tag_key, {'last': None, 'values': {}})
|
counters_tag = counters.setdefault(counters_tag_key, {'last': None, 'values': {}})
|
||||||
|
|
||||||
values = counters_tag['values']
|
values = counters_tag['values']
|
||||||
for key, value in point['fields'].items():
|
for key, value in point.fields.items():
|
||||||
values[key] = values.setdefault(key, 0) + value
|
values[key] = values.setdefault(key, 0) + value
|
||||||
|
|
||||||
if counters_tag['last'] and point['time'] == counters_tag['last']['time']:
|
if counters_tag['last'] and point.time == counters_tag['last']['time']:
|
||||||
point = counters_tag['last']
|
point = counters_tag['last']
|
||||||
else:
|
else:
|
||||||
|
point = dict(point._asdict())
|
||||||
counters_tag['last'] = point
|
counters_tag['last'] = point
|
||||||
final.append(point)
|
final.append(point)
|
||||||
point['fields'].update(counters_tag['values'])
|
point['fields'].update(counters_tag['values'])
|
||||||
|
|
||||||
# In order to allow for merging delta entries for the same time.
|
# Write any remaining final points.
|
||||||
points = final
|
client.write_points(final, 's')
|
||||||
|
return wrote + len(final)
|
||||||
|
|
||||||
def ingest_release_schedule(project):
|
def ingest_release_schedule(project):
|
||||||
points = []
|
points = []
|
||||||
@ -321,9 +337,13 @@ def ingest_release_schedule(project):
|
|||||||
'time': timestamp(date),
|
'time': timestamp(date),
|
||||||
})
|
})
|
||||||
|
|
||||||
return points
|
client.write_points(points, 's')
|
||||||
|
return len(points)
|
||||||
|
|
||||||
def main(args):
|
def main(args):
|
||||||
|
global client
|
||||||
|
client = InfluxDBClient(args.host, args.port, args.user, args.password, args.project)
|
||||||
|
|
||||||
osc.conf.get_config(override_apiurl=args.apiurl)
|
osc.conf.get_config(override_apiurl=args.apiurl)
|
||||||
osc.conf.config['debug'] = args.debug
|
osc.conf.config['debug'] = args.debug
|
||||||
|
|
||||||
@ -344,16 +364,8 @@ def main(args):
|
|||||||
print('who_workaround_swap', who_workaround_swap)
|
print('who_workaround_swap', who_workaround_swap)
|
||||||
print('who_workaround_miss', who_workaround_miss)
|
print('who_workaround_miss', who_workaround_miss)
|
||||||
|
|
||||||
print('writing {:,} points and {:,} annotation points to db'.format(
|
print('wrote {:,} points and {:,} annotation points to db'.format(
|
||||||
len(points_requests), len(points_schedule)))
|
points_requests, points_schedule))
|
||||||
|
|
||||||
db = args.project
|
|
||||||
client = InfluxDBClient(args.host, args.port, args.user, args.password, db)
|
|
||||||
client.drop_database(db)
|
|
||||||
client.create_database(db)
|
|
||||||
|
|
||||||
client.write_points(points_requests, 's')
|
|
||||||
client.write_points(points_schedule, 's')
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
Loading…
x
Reference in New Issue
Block a user