SeqAn3 3.2.0-rc.1
The Modern C++ library for sequence analysis.
async_input_buffer.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2021, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2021, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
13#pragma once
14
15#include <concepts>
16#include <iterator>
17#include <seqan3/std/ranges>
18#include <thread>
19
22
23//-----------------------------------------------------------------------------
24// This is the path a value takes when using this views::
25// urange
26// → async_input_buffer_view.buffer [size n]
27// → iterator.cached_value [size 1]
28// → user
29//-----------------------------------------------------------------------------
30
31namespace seqan3::detail
32{
33
39template <std::ranges::range urng_t>
40class async_input_buffer_view : public std::ranges::view_interface<async_input_buffer_view<urng_t>>
41{
42private:
43 static_assert(std::ranges::input_range<urng_t>,
44 "The range parameter to async_input_buffer_view must be at least a std::ranges::input_range.");
45 static_assert(std::ranges::view<urng_t>,
46 "The range parameter to async_input_buffer_view must model std::ranges::view.");
47 static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
48 "The range parameter to async_input_buffer_view must have a value_type that is std::movable.");
49 static_assert(std::constructible_from<std::ranges::range_value_t<urng_t>,
51 "The range parameter to async_input_buffer_view must have a value_type that is constructible by a moved "
52 "value of its reference type.");
53
55 using urng_iterator_type = std::ranges::iterator_t<urng_t>;
56
58 struct state
59 {
61 urng_t urange;
62
64 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> buffer;
65
68 };
69
72
74 class iterator;
75
76public:
86
88 async_input_buffer_view(urng_t _urng, size_t const buffer_size)
89 {
90 auto deleter = [] (state * p)
91 {
92 if (p != nullptr)
93 {
94 p->buffer.close();
95 p->producer.join();
96 delete p;
97 }
98 };
99
100 state_ptr = std::shared_ptr<state>(new state{std::move(_urng),
101 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>>{buffer_size},
102 std::thread{}}, // thread is set/started below, needs rest of state
103 deleter);
104
105 auto runner = [&state = *state_ptr] ()
106 {
107 for (auto && val : state.urange)
108 if (state.buffer.wait_push(std::move(val)) == contrib::queue_op_status::closed)
109 break;
110
111 state.buffer.close();
112 };
113
114 state_ptr->producer = std::thread{runner};
115 }
116
118 template <typename other_urng_t>
120 requires (!std::same_as<std::remove_cvref_t<other_urng_t>, async_input_buffer_view>) && // prevent recursive instantiation
121 std::ranges::viewable_range<other_urng_t> &&
122 std::constructible_from<urng_t, std::ranges::ref_view<std::remove_reference_t<other_urng_t>>>
124 async_input_buffer_view(other_urng_t && _urng, size_t const buffer_size) :
125 async_input_buffer_view{std::views::all(_urng), buffer_size}
126 {}
128
144 {
145 assert(state_ptr != nullptr);
146 return {state_ptr->buffer};
147 }
148
150 iterator begin() const = delete;
151
153 std::default_sentinel_t end()
154 {
155 return std::default_sentinel;
156 }
157
159 std::default_sentinel_t end() const = delete;
161};
162
164template <std::ranges::range urng_t>
166{
168 using sentinel_type = std::default_sentinel_t;
169
171 contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> * buffer_ptr = nullptr;
172
174 mutable std::ranges::range_value_t<urng_t> cached_value;
175
177 bool at_end = false;
178
179public:
180
197
202 iterator() = default;
203 //TODO: delete:
204 iterator(iterator const & rhs) = default;
205 iterator(iterator && rhs) = default;
206 //TODO: delete:
207 iterator & operator=(iterator const & rhs) = default;
208 iterator & operator=(iterator && rhs) = default;
209 ~iterator() noexcept = default;
210
212 iterator(contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> & buffer) noexcept : buffer_ptr{&buffer}
213 {
214 ++(*this); // cache first value
215 }
217
222 reference operator*() const noexcept
223 {
224 return cached_value;
225 }
226
228 pointer operator->() const noexcept
229 {
230 return std::addressof(cached_value);
231 }
233
238 iterator & operator++() noexcept
239 {
240 if (at_end) // TODO unlikely
241 return *this;
242
243 assert(buffer_ptr != nullptr);
244
245 if (buffer_ptr->wait_pop(cached_value) == contrib::queue_op_status::closed)
246 at_end = true;
247
248 return *this;
249 }
250
252 void operator++(int) noexcept
253 {
254 ++(*this);
255 }
257
262 friend constexpr bool operator==(iterator const & lhs, std::default_sentinel_t const &) noexcept
263 {
264 return lhs.at_end;
265 }
266
268 friend constexpr bool operator==(std::default_sentinel_t const &, iterator const & rhs) noexcept
269 {
270 return rhs == std::default_sentinel_t{};
271 }
272
274 friend constexpr bool operator!=(iterator const & lhs, std::default_sentinel_t const &) noexcept
275 {
276 return !(lhs == std::default_sentinel_t{});
277 }
278
280 friend constexpr bool operator!=(std::default_sentinel_t const &, iterator const & rhs) noexcept
281 {
282 return rhs != std::default_sentinel_t{};
283 }
285};
286
293template <std::ranges::viewable_range urng_t>
296
297// ============================================================================
298// async_input_buffer_fn (adaptor definition
299// ============================================================================
300
303{
305 constexpr auto operator()(size_t const buffer_size) const
306 {
307 return detail::adaptor_from_functor{*this, buffer_size};
308 }
309
315 template <std::ranges::range urng_t>
316 constexpr auto operator()(urng_t && urange, size_t const buffer_size) const
317 {
318 static_assert(std::ranges::input_range<urng_t>,
319 "The range parameter to views::async_input_buffer must be at least a std::ranges::input_range.");
320 static_assert(std::ranges::viewable_range<urng_t>,
321 "The range parameter to views::async_input_buffer cannot be a temporary of a non-view range.");
322 static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
323 "The range parameter to views::async_input_buffer must have a value_type that is std::movable.");
324 static_assert(std::constructible_from<std::ranges::range_value_t<urng_t>,
326 "The range parameter to views::async_input_buffer must have a value_type that is constructible by a moved "
327 "value of its reference type.");
328
329 if (buffer_size == 0)
330 throw std::invalid_argument{"The buffer_size parameter to views::async_input_buffer must be > 0."};
331
332 return detail::async_input_buffer_view{std::forward<urng_t>(urange), buffer_size};
333 }
334};
335
336} // seqan3::detail
337
338//-----------------------------------------------------------------------------
339// View shortcut for functor.
340//-----------------------------------------------------------------------------
341
342namespace seqan3::views
343{
480} // namespace seqan3::views
Provides seqan3::detail::adaptor_from_functor.
T addressof(T... args)
Provides seqan3::buffer_queue.
Template for range adaptor closure objects that store arguments and wrap a proto-adaptor.
Definition: adaptor_from_functor.hpp:57
The iterator of the seqan3::detail::async_input_buffer_view.
Definition: async_input_buffer.hpp:166
iterator(iterator &&rhs)=default
Defaulted.
void operator++(int) noexcept
Post-increment.
Definition: async_input_buffer.hpp:252
pointer operator->() const noexcept
Returns pointer to the pointed-to object.
Definition: async_input_buffer.hpp:228
iterator & operator=(iterator &&rhs)=default
Defaulted.
friend constexpr bool operator!=(std::default_sentinel_t const &, iterator const &rhs) noexcept
Compares for inequality with sentinel.
Definition: async_input_buffer.hpp:280
iterator & operator++() noexcept
Pre-increment.
Definition: async_input_buffer.hpp:238
std::default_sentinel_t sentinel_type
The sentinel type to compare to.
Definition: async_input_buffer.hpp:168
reference operator*() const noexcept
Return the cached value.
Definition: async_input_buffer.hpp:222
std::ranges::range_value_t< urng_t > cached_value
The cached value this iterator holds.
Definition: async_input_buffer.hpp:174
iterator & operator=(iterator const &rhs)=default
Defaulted.
detail::iter_pointer_t< urng_iterator_type > pointer
Pointer type.
Definition: async_input_buffer.hpp:189
friend constexpr bool operator==(iterator const &lhs, std::default_sentinel_t const &) noexcept
Compares for equality with sentinel.
Definition: async_input_buffer.hpp:262
friend constexpr bool operator!=(iterator const &lhs, std::default_sentinel_t const &) noexcept
Compares for inequality with sentinel.
Definition: async_input_buffer.hpp:274
iterator(iterator const &rhs)=default
Defaulted.
friend constexpr bool operator==(std::default_sentinel_t const &, iterator const &rhs) noexcept
Compares for equality with sentinel.
Definition: async_input_buffer.hpp:268
The type returned by seqan3::views::async_input_buffer.
Definition: async_input_buffer.hpp:41
async_input_buffer_view & operator=(async_input_buffer_view &&)=default
Defaulted.
std::ranges::iterator_t< urng_t > urng_iterator_type
The iterator type for the underlying range.
Definition: async_input_buffer.hpp:55
std::shared_ptr< state > state_ptr
Shared holder of the state.
Definition: async_input_buffer.hpp:71
async_input_buffer_view(urng_t _urng, size_t const buffer_size)
Construction from the underlying view.
Definition: async_input_buffer.hpp:88
std::default_sentinel_t end()
Returns a sentinel.
Definition: async_input_buffer.hpp:153
async_input_buffer_view & operator=(async_input_buffer_view const &)=default
Defaulted.
std::default_sentinel_t end() const =delete
Const-qualified async_input_buffer_view::end() is deleted, because iterating changes the view.
iterator begin()
Returns an iterator to the current begin of the underlying range.
Definition: async_input_buffer.hpp:143
async_input_buffer_view(urng_t &&, size_t const buffer_size) -> async_input_buffer_view< std::views::all_t< urng_t > >
Deduces the async_input_buffer_view from the underlying range if it is a std::ranges::viewable_range.
~async_input_buffer_view()=default
Defaulted.
iterator begin() const =delete
Const-qualified async_input_buffer_view::begin() is deleted, because iterating changes the view.
async_input_buffer_view()=default
Defaulted.
async_input_buffer_view(async_input_buffer_view &&)=default
Defaulted.
async_input_buffer_view(async_input_buffer_view const &)=default
Defaulted.
async_input_buffer_view(other_urng_t &&_urng, size_t const buffer_size)
Construction from std::ranges::viewable_range.
Definition: async_input_buffer.hpp:124
typename iter_pointer< it_t >::type iter_pointer_t
Return the pointer type of the input type (transformation_trait shortcut).
Definition: iterator_traits.hpp:176
constexpr auto async_input_buffer
A view adapter that returns a concurrent-queue-like view over the underlying range.
Definition: async_input_buffer.hpp:479
The internal SeqAn3 namespace.
Definition: aligned_sequence_concept.hpp:29
The SeqAn namespace for views.
Definition: char_strictly_to.hpp:22
SeqAn specific customisations in the standard namespace.
The <ranges> header from C++20's standard library.
Definition of the range adaptor object type for seqan3::views::async_input_buffer.
Definition: async_input_buffer.hpp:303
constexpr auto operator()(urng_t &&urange, size_t const buffer_size) const
Directly return an instance of the view, initialised with the given parameters.
Definition: async_input_buffer.hpp:316
constexpr auto operator()(size_t const buffer_size) const
Store the argument and return a range adaptor closure object.
Definition: async_input_buffer.hpp:305
Buffer and thread and shared between copies of this type.
Definition: async_input_buffer.hpp:59
contrib::fixed_buffer_queue< std::ranges::range_value_t< urng_t > > buffer
The buffer queue.
Definition: async_input_buffer.hpp:64
std::thread producer
Thread that rebuffers in the background.
Definition: async_input_buffer.hpp:67
urng_t urange
The underlying range.
Definition: async_input_buffer.hpp:61