def merge_events_if_necessary(events):
index_to_remove = []
for index in range(1, len(events)):
if events[index - 1][1] == events[index][0]:
events[index - 1] = (events[index-1][0], events[index][1])
index_to_remove.append(index)
index_to_remove.reverse()
for index in index_to_remove:
del events[index]
return events
def is_segment_in_interval(segment_start, segment_end, interval_start, interval_end):
if interval_start <= segment_start and segment_end <= interval_end:
return True
return False
def get_standard_category_for_segment(segment_start, segment_end, ground_truth, detected_events):
"""
Return standard category for a single segment
:param segment_start:
:param segment_end:
:param ground_truth:
:param detected_events:
:return: "TP", "FP", "FN", "TN"
"""
is_part_of_ground_truth = False
is_part_of_detection = False
for gt in ground_truth:
if is_segment_in_interval(segment_start, segment_end, gt[0], gt[1]):
is_part_of_ground_truth = True
break
for det in detected_events:
if is_segment_in_interval(segment_start, segment_end, det[0], det[1]):
is_part_of_detection = True
break
# decide which category
if is_part_of_ground_truth:
if is_part_of_detection:
category = "TP"
else:
category = "FN"
else:
if is_part_of_detection:
category = "FP"
else:
category = "TN"
return category, is_part_of_ground_truth, is_part_of_detection
def get_segments_with_standard_error_categories(ground_truth_events, detected_events, evaluation_start=None, evaluation_end=None):
segments = []
gt_index = 0
det_index = 0
last_segment = None
# TODO: handle events with zero length
while True:
if gt_index >= len(ground_truth_events):
if det_index >= len(detected_events):
if evaluation_end is not None:
category, is_gt, is_det = get_standard_category_for_segment(last_segment[1], evaluation_end, ground_truth_events, detected_events)
if is_gt:
gt_i = gt_index
else:
gt_i = -1
if is_det:
det_i = det_index
else:
det_i = -1
last_segment = (last_segment[1], evaluation_end, gt_i, det_i, category)
segments.append(last_segment)
break
gt_index = len(ground_truth_events) - 1
elif det_index >= len(detected_events):
det_index = len(detected_events) - 1
gt_start = ground_truth_events[gt_index][0]
gt_end = ground_truth_events[gt_index][1]
d_start = detected_events[det_index][0]
d_end = detected_events[det_index][1]
if last_segment is not None:
seg_start = last_segment[1]
values_to_consider = []
if gt_start > seg_start:
values_to_consider.append(gt_start)
if d_start > seg_start:
values_to_consider.append(d_start)
if d_end > seg_start:
values_to_consider.append(d_end)
if gt_end > seg_start:
values_to_consider.append(gt_end)
seg_end = min(values_to_consider)
category, is_gt, is_det = get_standard_category_for_segment(seg_start, seg_end, ground_truth_events, detected_events)
if is_gt:
gt_i = gt_index
else:
gt_i = -1
if is_det:
det_i = det_index
else:
det_i = -1
last_segment = (seg_start, seg_end, gt_i, det_i, category)
segments.append(last_segment)
else:
# Create first segment:
if evaluation_start is None:
seg_start = min(gt_start, d_start)
seg_end = min(max(gt_start, d_start), min(gt_end, d_end))
else:
seg_start = evaluation_start
seg_end = min(gt_start, d_start)
category, is_gt, is_det = get_standard_category_for_segment(seg_start, seg_end, ground_truth_events,
detected_events)
if is_gt:
gt_i = gt_index
else:
gt_i = -1
if is_det:
det_i = det_index
else:
det_i = -1
last_segment = (seg_start, seg_end, gt_i, det_i, category)
segments.append(last_segment)
if seg_end >= d_end:
det_index += 1
if seg_end >= gt_end:
gt_index += 1
index_to_remove = []
for index, segment in enumerate(segments):
if segment[1] - segment[0] <= 0:
index_to_remove.append(index)
index_to_remove.reverse()
for index in index_to_remove:
del segments[index]
return segments
def score_segment(previous_segment, current_segment, next_segment):
"""
Computing scores for current segment based on it's surroundings
:param previous_segment: segment tuple for previous segment defined as (start, end, gt_event_index, det_event_index, standard_score) or None
:param current_segment: segment tuple for current segment defined as (start, end, gt_event_index, det_event_index, standard_score)
:param next_segment: segment tuple for next segment defined as (start, end, gt_event_index, det_event_index, standard_score) or None
:return: return one of the following categories: 'TP' - true positive, 'TN' - true negative,
'I' - insertion, 'M' - merge, 'D' - deletion, 'F' - fragmenting,
'Os' - start overfill, 'Oe' - end overfill, 'Us' - start underfill, 'Ue' - end underfill,
or 'no score' for errors
"""
if current_segment is None:
raise ValueError("current_segment must not be None.")
index_of_standard_category = 4
current_segment_score = "no score"
if current_segment[index_of_standard_category] == "TP":
current_segment_score = "TP"
elif current_segment[index_of_standard_category] == "TN":
current_segment_score = "TN"
else:
# Handle error categories:
if previous_segment is not None and next_segment is not None:
# normal case (in the middle):
if current_segment[index_of_standard_category] == "FP":
if (previous_segment[index_of_standard_category] == "TN" or previous_segment[index_of_standard_category] == "FN") and \
(next_segment[index_of_standard_category] == "TN" or next_segment[index_of_standard_category] == "FN"):
current_segment_score = "I"
elif previous_segment[index_of_standard_category] == "TP" and next_segment[index_of_standard_category] == "TP":
current_segment_score = "M"
elif (previous_segment[index_of_standard_category] == "TN" or previous_segment[index_of_standard_category] == "FN") and \
next_segment[index_of_standard_category] == "TP":
current_segment_score = "Os"
elif previous_segment[index_of_standard_category] == "TP" and (
next_segment[index_of_standard_category] == "TN" or next_segment[index_of_standard_category] == "FN"):
current_segment_score = "Oe"
elif current_segment[index_of_standard_category] == "FN":
if (previous_segment[index_of_standard_category] == "TN" or previous_segment[index_of_standard_category] == "FP") and (
next_segment[index_of_standard_category] == "TN" or next_segment[index_of_standard_category] == "FP"):
current_segment_score = "D"
elif previous_segment[index_of_standard_category] == "TP" and next_segment[index_of_standard_category] == "TP":
current_segment_score = "F"
elif (previous_segment[index_of_standard_category] == "TN" or previous_segment[index_of_standard_category] == "FP") and \
next_segment[index_of_standard_category] == "TP":
current_segment_score = "Us"
elif previous_segment[index_of_standard_category] == "TP" and (
next_segment[index_of_standard_category] == "TN" or next_segment[index_of_standard_category] == "FP"):
current_segment_score = "Ue"
elif previous_segment is None and next_segment is not None:
# start case (for the first segment):
if current_segment[index_of_standard_category] == "FP":
if next_segment[index_of_standard_category] == "TN" or next_segment[index_of_standard_category] == "FN":
current_segment_score = "I"
elif next_segment[index_of_standard_category] == "TP":
current_segment_score = "Os"
elif current_segment[index_of_standard_category] == "FN":
if next_segment[index_of_standard_category] == "TN" or next_segment[index_of_standard_category] == "FP":
current_segment_score = "D"
elif next_segment[index_of_standard_category] == "TP":
current_segment_score = "Us"
elif previous_segment is not None and next_segment is None:
# end case (for the last segment):
if current_segment[index_of_standard_category] == "FP":
if previous_segment[index_of_standard_category] == "TN" or previous_segment[index_of_standard_category] == "FN":
current_segment_score = "I"
elif previous_segment[index_of_standard_category] == "TP":
current_segment_score = "Oe"
elif current_segment[index_of_standard_category] == "FN":
if previous_segment[index_of_standard_category] == "TN" or previous_segment[index_of_standard_category] == "FP":
current_segment_score = "D"
elif previous_segment[index_of_standard_category] == "TP":
current_segment_score = "Ue"
elif previous_segment is None and next_segment is None:
# if only one segment is given (exceptional case):
if current_segment[index_of_standard_category] == "FP":
current_segment_score = "I"
elif current_segment[index_of_standard_category] == "FN":
current_segment_score = "D"
return current_segment_score
def compute_detailed_segment_scores(segments):
new_segments = []
# Handle special case if only one segment exists:
if len(segments) == 1:
seg_score = score_segment(None, segments[0], None)
n_seg = segments[0] + (seg_score,) # Create new tuple (append to the end)
new_segments.append(n_seg)
return new_segments
# handle first segment:
seg_score = score_segment(None, segments[0], segments[1])
n_seg = segments[0] + (seg_score,) # Create new tuple (append to the end)
new_segments.append(n_seg)
# Handle segments in the middle:
for i in range(1, len(segments) - 1):
seg_score = score_segment(segments[i-1], segments[i], segments[i+1])
n_seg = segments[i] + (seg_score,) # Create new tuple (append to the end)
new_segments.append(n_seg)
# handle last segment:
seg_score = score_segment(segments[-2], segments[-1], None)
n_seg = segments[-1] + (seg_score,) # Create new tuple (append to the end)
new_segments.append(n_seg)
return new_segments
def count_segment_scores(segments):
categories = ["TP", "TN", "I", "D", "F", "M", "Os", "Oe", "Us", "Ue"]
results = {}
# Init values:
for c in categories:
results[c] = 0
# Calculate total segment length for each category
for s in segments:
results[s[5]] += s[1] - s[0]
# Calculate normed values:
eval_length = segments[-1][1] - segments[0][0]
results_normed = {}
for c in categories:
results_normed[c] = results[c]/eval_length
return results, results_normed
def twoset_metrics(segment_counts):
P = segment_counts["D"] + segment_counts["F"] + segment_counts["Us"] + segment_counts["Ue"] + segment_counts["TP"]
N = segment_counts["I"] + segment_counts["M"] + segment_counts["Os"] + segment_counts["Oe"] + segment_counts["TN"]
dr = segment_counts["D"]/P
fr = segment_counts["F"]/P
us = segment_counts["Us"] / P
ue = segment_counts["Ue"] / P
tpr = 1 - (dr + fr + us + ue)
ir = segment_counts["I"]/N
mr = segment_counts["M"] / N
o_s = segment_counts["Os"] / N
oe = segment_counts["Oe"] / N
fpr = ir + mr + o_s + oe
results = {"dr": dr, "fr": fr, "us": us, "ue": ue, "tpr": tpr,
"ir": ir, "mr": mr, "os": o_s, "oe": oe, "fpr": fpr }
return results
def _get_ground_truth_event_index_list(segments):
index_list = []
for s in segments:
if s[2] not in index_list and s[2] != -1:
index_list.append(s[2])
return index_list
def _get_detected_event_index_list(segments):
index_list = []
for s in segments:
if s[3] not in index_list and s[3] != -1:
index_list.append(s[3])
return index_list
def _get_segments_for_ground_truth_event(segments, event_index):
return [s for s in segments if s[2] == event_index]
def _get_segments_for_detected_event(segments, event_index):
return [s for s in segments if s[3] == event_index]
def _score_ground_truth_event(segments_for_event):
# get segment score values as a list:
segment_scores = []
for s in segments_for_event:
segment_scores.append(s[5])
# get event's score:
current_event_score = ""
if segment_scores.count("TP") == 1:
current_event_score += "C"
elif len(segment_scores) == 1 and segment_scores.count("D") == 1:
current_event_score += "D"
if segment_scores.count("F") > 0:
current_event_score += "F"
return current_event_score
def _score_detected_event(segments_for_event):
# get segment score values as a list:
segment_scores = []
for s in segments_for_event:
segment_scores.append(s[5])
# get event's score:
current_event_score = ""
if segment_scores.count("TP") == 1:
current_event_score += "C"
elif len(segment_scores) == 1 and segment_scores.count("I") == 1:
current_event_score += "I'"
if segment_scores.count("M") > 0:
current_event_score += "M'"
return current_event_score
def _have_overlapping_segments(segments_1, segments_2):
overlap = False
for s in segments_1:
if s in segments_2:
overlap = True
break
return overlap
def compute_event_scores(segments):
# Get list of events indexes:
detected_indexes = _get_detected_event_index_list(segments)
ground_truth_indexes = _get_ground_truth_event_index_list(segments)
# get score for each gt event:
gt_event_scores = []
for i in ground_truth_indexes:
current_segments = _get_segments_for_ground_truth_event(segments, i)
e_score = _score_ground_truth_event(current_segments)
gt_event_scores.append(e_score)
# get score for each detection event
det_event_scores = []
for i in detected_indexes:
current_segments = _get_segments_for_detected_event(segments, i)
e_score = _score_detected_event(current_segments)
det_event_scores.append(e_score)
# cross check event scores for merging and fragmented results:
for i in ground_truth_indexes:
for j in detected_indexes:
segments_gt = _get_segments_for_ground_truth_event(segments, i)
segments_det = _get_segments_for_detected_event(segments, j)
if _have_overlapping_segments(segments_gt, segments_det):
# change ground truth event score if needed:
if det_event_scores[j] == "M'" or det_event_scores[j] == "M'F'":
gt_event_scores[i] += "M"
# change detected event score if needed:
if gt_event_scores[i] == "F" or gt_event_scores[i] == "FM":
det_event_scores[j] += "F'"
# clean up event score labels:
for i in range(len(gt_event_scores)):
if "F" in gt_event_scores[i] and "M" in gt_event_scores[i]:
gt_event_scores[i] = "FM"
elif "C" in gt_event_scores[i] and "M" in gt_event_scores[i]:
gt_event_scores[i] = "M"
for i in range(len(det_event_scores)):
if "F" in det_event_scores[i] and "M" in det_event_scores[i]:
det_event_scores[i] = "FM'"
if "C" in det_event_scores[i] and "F" in det_event_scores[i]:
det_event_scores[i] = "F'"
return gt_event_scores, det_event_scores
def _count_event_scores(gt_event_scores, detection_scores):
results = {
"total_gt": len(gt_event_scores),
"total_det": len(detection_scores),
"D": gt_event_scores.count("D"),
"F": gt_event_scores.count("F"),
"FM": gt_event_scores.count("FM"),
"M": gt_event_scores.count("M"),
"C": gt_event_scores.count("C"),
"M'": detection_scores.count("M'"),
"FM'": detection_scores.count("FM'"),
"F'": detection_scores.count("F'"),
"I'": detection_scores.count("I'")
}
return results
def _get_detailed_event_metrics(segments):
gt_event_scores, det_event_scores = compute_event_scores(segments)
detailed_event_metrics = _count_event_scores(gt_event_scores, det_event_scores)
return gt_event_scores, det_event_scores, detailed_event_metrics
def _get_standard_event_metrics(ground_truth_events, detected_events, gt_event_scores, detected_event_scores):
# Compute recall:
tp_gt = 0
tp_gt_w = 0
fn_gt = 0
fn_gt_w = 0
for i in range(len(ground_truth_events)):
if gt_event_scores[i] == 'D':
fn_gt += 1
fn_gt_w += ground_truth_events[i][1] - ground_truth_events[i][0]
else:
tp_gt += 1
tp_gt_w += ground_truth_events[i][1] - ground_truth_events[i][0]
recall = tp_gt / (tp_gt + fn_gt)
recall_w = tp_gt_w / (tp_gt_w + fn_gt_w)
# Compute precision:
tp_det = 0
tp_det_w = 0
fp_det = 0
fp_det_w = 0
for i in range(len(detected_events)):
if detected_event_scores[i] == "I'":
fp_det += 1
fp_det_w += detected_events[i][1] - detected_events[i][0]
else:
tp_det += 1
tp_det_w += detected_events[i][1] - detected_events[i][0]
precision = tp_det / (tp_det + fp_det)
precision_w = tp_det_w / (tp_det_w + fp_det_w)
standard_metrics = {
"precision": precision,
"recall": recall,
"precision (weighted)": precision_w,
"recall (weighted)": recall_w
}
return standard_metrics
[docs]def eval_segments(ground_truth_events, detected_events, evaluation_start=None, evaluation_end=None):
"""
Segment-based evaluation (frame - length based)
Computes and scores segments and returns the occurrences of each error type in the overall dataset segments
Args
----
ground_truth_events: list of tuples (start, end) or lists [start, end]
numeric values (e.g. frame number or posix timestamp) for ground truth events' start and end times
detected_events: list of tuples (start, end) or lists [start, end]
numeric values (e.g. frame number or posix timestamp) for detected events' start and end times
evaluation_start: numeric value or None
This should be the first segment's start value. None indicates that start of the first event should be used.
evaluation_end: numeric value or None
This should be the first segment's start value. None indicates that start of the first event should be used.
Returns
-------
twoset_results: dictionary
result for the 2SET metrics as a dictonary
segments_with_detailed_categories: list of tuples
list of detected segments including standard and detailed score categories
segment_counts: dictionary
frame counts/length of segments for each category
normed_segment_counts: dictionary
same as before but normed
"""
if len(ground_truth_events) <= 0 or len(detected_events) <= 0:
raise AttributeError("Insufficient data. List of ground truth or detected events is empty - calculation not possible.")
ground_truth_events = merge_events_if_necessary(ground_truth_events)
detected_events = merge_events_if_necessary(detected_events)
segments_with_category = get_segments_with_standard_error_categories(ground_truth_events, detected_events,
evaluation_start, evaluation_end)
segments_with_detailed_categories = compute_detailed_segment_scores(segments_with_category)
segment_counts, normed_segment_counts = count_segment_scores(segments_with_detailed_categories)
twoset_results = twoset_metrics(segment_counts)
return twoset_results, segments_with_detailed_categories, segment_counts, normed_segment_counts
[docs]def eval_events(ground_truth_events, detected_events, evaluation_start=None, evaluation_end=None):
"""
Event-based evaluation
Assigns scores to each ground truth and detection event and calculates statistics
Args
----
ground_truth_events: list of tuples (start, end) or lists [start, end]
numeric values (e.g. frame number or posix timestamp) for ground truth events' start and end times
detected_events: list of tuples (start, end) or lists [start, end]
numeric values (e.g. frame number or posix timestamp) for detected events' start and end times
evaluation_start: numeric value or None
This should be the first segment's start value. None indicates that start of the first event should be used.
evaluation_end: numeric value or None
This should be the first segment's start value. None indicates that start of the first event should be used.
Returns
-------
gt_scores: list
score label for each ground truth event
detection_scores: list
score label for each detected event
detailed_score_statistics: dictionary
containing total number of events for each score category
standard_score_statistics: dictionary
precision and recall values (normal and weighted with event length) based on standard event scores (TP, FP, TN, FN)
"""
if len(ground_truth_events) <= 0 or len(detected_events) <= 0:
raise AttributeError("Insufficient data. List of ground truth or detected events is empty - calculation not possible.")
ground_truth_events = merge_events_if_necessary(ground_truth_events)
detected_events = merge_events_if_necessary(detected_events)
segments_with_category = get_segments_with_standard_error_categories(ground_truth_events, detected_events, evaluation_start, evaluation_end)
segments_with_detailed_categories = compute_detailed_segment_scores(segments_with_category)
gt_scores, detection_scores, detailed_score_statistics = _get_detailed_event_metrics(segments_with_detailed_categories)
standard_score_statistics = _get_standard_event_metrics(ground_truth_events, detected_events, gt_scores, detection_scores)
return gt_scores, detection_scores, detailed_score_statistics, standard_score_statistics