Implement Array.stable_sort_segment.
This PR adds a new standard library function Array.stable_sort_segment.
Motivation: this is needed in order to (efficiently) implement sorting for dynamic arrays (e.g., Dynarray.stable_sort).
Only a few lines of code are changed. The core sorting algorithm is unchanged.
Note: one might wish to also implement Array.sort_segment (a variant of Array.sort). I have not done so here.
Note: I am surprised to see that the sorting algorithms use get and set instead of unsafe_get and unsafe_set. Some benchmarks suggest that this can make a 5% difference in performance.
I wonder if people want to suggest other names. I am personally fine with the
_segmentprefix.
Why not use sub which the stdlib generally uses for that ?
So stable_subsort?
You rather want to plan a convention here, so that given an operation it's easy to check if you have a corresponding function working on a sub-array without having to guess how the heck it could have been called. For example it could be prefix the operation with sub_ or suffix the operation with _sub.
Array.{sort,stable_sort,iter,map}_sub
Array.sub_{sort,stable_sort,iter,map}
Didn't think much about it but I think I would rather go with the prefix.
I wonder if people want to suggest other names. I am personally fine with the
_segmentprefix. (I could see suggestions for aSubarraysubmodule that could over time grow with other capabilities. This could work for iterators andmap*_inplace.)
I think I would prefer segment or seg over sub. Making it a prefix or a suffix is fine with me. Or, we might create a submodule Array.Segment, if we intend to add more functions in the long run. (There are certainly many functions that could be added.)
I wondered why
sortdoes not get the same treatment. A quick look suggests that its implementation is more obscure, so it looks less easy to expose a segment version of it.
Indeed, it is slightly more obscure. I think I could do it if desired. It is a matter of adding an offset in every call to get or set -- or, if we trust the compiler to inline, define local versions of get and set which add an offset. I did not do it because of the (probably very small) performance penalty of adding offsets, and because it was not clear to me that we need two sorting algorithms for array segments.
For the record, here is my commented version of the code. I have added several assertions to check that my comments are correct. I was surprised and delighted to discover that merge performs an in-place merge where one source segment and the destination segment overlap; nice trick!
I have modified the code to use unsafe_get and unsafe_set.
Furthermore, I have added several optimizations (see terminal_blit, optimistic_merge, magic_optimistic_merge) which, according to my micro-benchmarks, cost essentially nothing in the general case, and allow a performance increase of almost 3x when the input data is already sorted (or almost sorted).
If there is interest, I could propose a PR with this more invasive change.
(* [disjoint a1 ofs1 len1 a2 ofs2 len2] tests whether the array segments
described by [a1], [ofs1], [len1] and [a2], [ofs2], [len2] are disjoint.
It is used only in assertions. *)
let disjoint a1 ofs1 len1 a2 ofs2 len2 =
a1 != a2 ||
ofs1 + len1 <= ofs2 ||
ofs2 + len2 <= ofs1
(* [suffix a1 ofs1 len1 a2 ofs2 len2] tests whether the array segment
described by [a1], [ofs1], [len1] is a suffix of the array segment
described by [a2], [ofs2], [len2]. It is used only in assertions. *)
let suffix a1 ofs1 len1 a2 ofs2 len2 =
a1 == a2 &&
ofs1 + len1 = ofs2 + len2 &&
len1 <= len2
(* If the input data is already sorted, or close to sorted, it may be the
case that the calls from [merge] (below) to [blit] copy an array segment
to itself. We recognize this situation, where there is nothing to do. *)
let[@inline] terminal_blit src srcofs dst dstofs len =
if src == dst && srcofs = dstofs then
()
else
blit src srcofs dst dstofs len
(* [merge cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs] merges
the sorted array segments described by [src1], [src1ofs], [src1len] and
[src2], [src2ofs], [src2len]. The resulting data is written into the
array segment described by [dst], [dstofs], and [src1len + src2len].
One of the source segments (say, the first one) must be disjoint with
the destination segment. The other source segment (say, the second one)
can be either disjoint with the destination segment or a suffix of the
destination segment. (This works because the data in the second source
segment is then moved left: it is read before it is overwritten.) *)
(* [src1len] and [src2len] must be nonzero. *)
(* This is a stable merge: when [s1] and [s2] are equal, [s1] is favored. *)
let merge cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs =
assert (0 < src1len && 0 < src2len);
assert (
let dstlen = src1len + src2len in
(* Either both source segments are disjoint with the destination segment, *)
disjoint src1 src1ofs src1len dst dstofs dstlen &&
disjoint src2 src2ofs src2len dst dstofs dstlen ||
(* or the first source segment is a suffix of the destination segment, *)
suffix src1 src1ofs src1len dst dstofs dstlen &&
disjoint src2 src2ofs src2len dst dstofs dstlen ||
(* or the second source segment is a suffix of the destination segment. *)
disjoint src1 src1ofs src1len dst dstofs dstlen &&
suffix src2 src2ofs src2len dst dstofs dstlen
);
let src1r = src1ofs + src1len
and src2r = src2ofs + src2len in
let rec loop i1 s1 i2 s2 d =
if cmp s1 s2 <= 0 then begin
unsafe_set dst d s1;
let i1 = i1 + 1 in
if i1 < src1r then
loop i1 (unsafe_get src1 i1) i2 s2 (d + 1)
else
terminal_blit src2 i2 dst (d + 1) (src2r - i2)
end else begin
unsafe_set dst d s2;
let i2 = i2 + 1 in
if i2 < src2r then
loop i1 s1 i2 (unsafe_get src2 i2) (d + 1)
else
terminal_blit src1 i1 dst (d + 1) (src1r - i1)
end
in
loop src1ofs (unsafe_get src1 src1ofs) src2ofs (unsafe_get src2 src2ofs) dstofs
(* Although [merge] (above) works in all situations, we can make it much
faster in the special case where the data in the first source segment is
less than or equal to the data in the second source segment. Indeed, in
that case, two calls to [blit] suffice. The cost of recognizing this
special case is two reads, a comparison, and a conditional. *)
let[@inline] optimistic_merge
cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs =
assert (0 < src1len && 0 < src2len);
let last1 = unsafe_get src1 (src1ofs + src1len - 1)
and first2 = unsafe_get src2 src2ofs in
if cmp last1 first2 <= 0 then begin
blit src1 src1ofs dst dstofs src1len;
blit src2 src2ofs dst (dstofs + src1len) src2len
end
else
merge cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs
(* Even better: in the special case where the second source segment is a
suffix of the destination segment, [optimistic_merge] is simplified: the
second call to [blit] has no effect, as it copies the second source
segment to itself. Thus, it can be removed. *)
let[@inline] magic_optimistic_merge
cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs =
(* Check that the second source segment
is a suffix of the destination segment. *)
assert (src2 == dst && dstofs + src1len = src2ofs);
assert (0 < src1len && 0 < src2len);
let last1 = unsafe_get src1 (src1ofs + src1len - 1)
and first2 = unsafe_get src2 src2ofs in
if cmp last1 first2 <= 0 then
blit src1 src1ofs dst dstofs src1len
else
merge cmp src1 src1ofs src1len src2 src2ofs src2len dst dstofs
(* [isortto cmp src srcofs dst dstofs len] sorts the array segment described
by [src], [srcofs], [len]. The resulting data is written into the array
segment described by [dst], [dstofs], [len]. The source and destination
segments must either coincide or be disjoint. This is an insertion sort. *)
let isortto cmp src srcofs dst dstofs len =
assert (
src == dst && srcofs = dstofs ||
disjoint src srcofs len dst dstofs len
);
for i = 0 to len - 1 do
let e = unsafe_get src (srcofs + i) in
let j = ref (dstofs + i - 1) in
while !j >= dstofs && cmp (unsafe_get dst !j) e > 0 do
unsafe_set dst (!j + 1) (unsafe_get dst !j);
decr j
done;
unsafe_set dst (!j + 1) e
done
(* The cutoff determines where we switch from merge sort to insertion sort. *)
let cutoff = 5
(* [sortto cmp src srcofs dst dstofs len] sorts the array segment described
by [src], [srcofs], [len]. The resulting data is written into the array
segment described by [dst], [dstofs], [len]. The destination segment must
be disjoint from the source segment. This is a merge sort, with an
insertion sort at the leaves. It is a stable sort. *)
let rec sortto cmp src srcofs dst dstofs len =
assert (disjoint src srcofs len dst dstofs len);
if len <= cutoff then
isortto cmp src srcofs dst dstofs len
else begin
let len1 = len / 2 in
let len2 = len - len1 in
(* The second half of [src] can be larger by one slot. *)
assert (len1 <= len2 && len2 <= len1 + 1);
(* Sort the second half of [src] and move it to the second half of [dst]. *)
sortto cmp src (srcofs + len1) dst (dstofs + len1) len2;
(* Sort the first half of [src] and move it to the second half of [src]. *)
(* This requires [len1 <= len2]. *)
sortto cmp src srcofs src (srcofs + len2) len1;
(* Merge the two sorted halves, moving the data to [dst]. *)
(* This is an in-place merge: the second source segment is contained
within the destination segment! *)
(* This is a stable sort, because the first half of the original
data (now moved and sorted) is the first argument to [merge]. *)
magic_optimistic_merge cmp
src (srcofs + len2) len1 dst (dstofs + len1) len2 dst dstofs
end
(* [unsafe_stable_sort_segment cmp a ofs len] sorts the array segment
described by [a], [ofs], [len]. This array segment is sorted in place.
This function is named [unsafe] because it does not validate [ofs] and
[len]. *)
let unsafe_stable_sort cmp a ofs len =
let base = ofs in
if len <= cutoff then
isortto cmp a base a base len
else begin
let len1 = len / 2 in
let len2 = len - len1 in
(* The second half of [a] can be larger by one slot. *)
assert (len1 <= len2 && len2 <= len1 + 1);
(* Allocate a temporary array that fits the second half of [a]. *)
let t = make len2 (unsafe_get a base) in
(* Sort the second half of [a] and move it to [t]. *)
sortto cmp a (base + len1) t 0 len2;
(* Sort the first half of [a] and move it to the second half of [a]. *)
(* This requires [len1 <= len2]. *)
sortto cmp a base a (base + len2) len1;
(* Merge the two sorted halves, moving the data to [a]. *)
(* This is an in-place merge: the first source segment is contained
within the destination segment! *)
(* This is a stable sort, because the first half of the original
data (now moved and sorted) is the first argument to [merge]. *)
optimistic_merge cmp a (base + len2) len1 t 0 len2 a base
end
I think I would prefer
segmentorsegoversub.
I think that if you want to have every Stdlib user to learn a new word, it should be justified a bit more than by your preference.
If this PR had been called "Implement Array.sub_stable_sort" I wouldn't have scratched my head at what it could be about (and no I wouldn't have thought that this is a sub-stable sort…).
There are quite a few slicing operations in the Stdlib menagerie of sequences that sport the sub subsequence. I don't think anyone has a problem with talking about substrings, subsequences and subarrays. So personally I see little point in adding a new word here.
Could you perhaps clarify why you think it's necessary to introduce new terminology in the Stdlib ?
Could you perhaps clarify why you think it's necessary to introduce new terminology in the
Stdlib?
I don't have a strong case, and am happy with keeping sub.
I think I would prefer to say sort_sub rather than sub_sort because this function sorts a subarray.
I think I would prefer to say
sort_subrather thansub_sortbecause this function sorts a subarray.
I'm a little bit agnostic to this choice, so as far as I'm concerned you get to choose.
I don't think there's a strong case justifying one over the other. The way I read it, Array.sub_op is that you talk about the subarray and then apply an operation, and Array.op_sub is that you talk about the array and apply an operation on a subarray of it. For reference here is either how these choices could read for other operations.
Also about collecting them in a submodule I think I'd rather not. A submodule would make more sense if a new kind of value holding indices on the subarray was being defined. I'm not sure it's worth adding hierarchy for a bunch of operations.
Also about collecting them in a submodule I think I'd rather not. A submodule would make more sense if a new kind of value holding indices on the subarray was being defined. I'm not sure it's worth adding hierarchy for a bunch of operations.
(We could have added a type type 'a t = { arr : 'a array; ofs : int; len : int } for subarrays.)
This is good news, because it is easy to incrementally add sub_ functions at the toplevel, and harder to incrementally add a whole submodule; going with prefixed names makes the API evolution a bit easier I think.