2929import java .util .ArrayList ;
3030import java .util .Collections ;
3131import java .util .List ;
32+ import java .util .concurrent .atomic .AtomicLong ;
3233
3334abstract class BlobDescriptorStreamRead implements AutoCloseable , Closeable {
3435
3536 protected final long readId ;
36- protected final ReadCursor readCursor ;
37+ protected final RangeSpec rangeSpec ;
3738 protected final List <ChildRef > childRefs ;
3839 protected final RetryContext retryContext ;
40+ protected final AtomicLong readOffset ;
3941 protected boolean closed ;
4042 protected boolean tombstoned ;
4143
42- BlobDescriptorStreamRead (long readId , ReadCursor readCursor , RetryContext retryContext ) {
43- this (readId , readCursor , Collections .synchronizedList (new ArrayList <>()), retryContext , false );
44+ BlobDescriptorStreamRead (long readId , RangeSpec rangeSpec , RetryContext retryContext ) {
45+ this (
46+ readId ,
47+ rangeSpec ,
48+ Collections .synchronizedList (new ArrayList <>()),
49+ new AtomicLong (rangeSpec .begin ()),
50+ retryContext ,
51+ false );
4452 }
4553
4654 BlobDescriptorStreamRead (
4755 long readId ,
48- ReadCursor readCursor ,
56+ RangeSpec rangeSpec ,
4957 List <ChildRef > childRefs ,
58+ AtomicLong readOffset ,
5059 RetryContext retryContext ,
5160 boolean closed ) {
5261 this .readId = readId ;
53- this .readCursor = readCursor ;
62+ this .rangeSpec = rangeSpec ;
5463 this .childRefs = childRefs ;
5564 this .retryContext = retryContext ;
65+ this .readOffset = readOffset ;
5666 this .closed = closed ;
5767 this .tombstoned = false ;
5868 }
5969
60- ReadCursor getReadCursor () {
61- return readCursor ;
70+ long readOffset () {
71+ return readOffset . get () ;
6272 }
6373
6474 abstract boolean acceptingBytes ();
@@ -76,11 +86,16 @@ final void preFail() {
7686 abstract BlobDescriptorStreamRead withNewReadId (long newReadId );
7787
7888 final ReadRange makeReadRange () {
79- return ReadRange .newBuilder ()
80- .setReadId (readId )
81- .setReadOffset (readCursor .position ())
82- .setReadLength (readCursor .remaining ())
83- .build ();
89+ long currentOffset = readOffset .get ();
90+ ReadRange .Builder b = ReadRange .newBuilder ().setReadId (readId ).setReadOffset (currentOffset );
91+ rangeSpec
92+ .limit ()
93+ .ifPresent (
94+ limit -> {
95+ long readSoFar = currentOffset - rangeSpec .begin ();
96+ b .setReadLength (limit - readSoFar );
97+ });
98+ return b .build ();
8499 }
85100
86101 @ Override
@@ -98,18 +113,18 @@ public void close() throws IOException {
98113
99114 static AccumulatingRead <byte []> createByteArrayAccumulatingRead (
100115 long readId ,
101- ReadCursor readCursor ,
116+ RangeSpec rangeSpec ,
102117 RetryContext retryContext ,
103118 SettableApiFuture <byte []> complete ) {
104- return new ByteArrayAccumulatingRead (readId , readCursor , retryContext , complete );
119+ return new ByteArrayAccumulatingRead (readId , rangeSpec , retryContext , complete );
105120 }
106121
107122 static ZeroCopyByteStringAccumulatingRead createZeroCopyByteStringAccumulatingRead (
108123 long readId ,
109- ReadCursor readCursor ,
124+ RangeSpec rangeSpec ,
110125 SettableApiFuture <DisposableByteString > complete ,
111126 RetryContext retryContext ) {
112- return new ZeroCopyByteStringAccumulatingRead (readId , readCursor , retryContext , complete );
127+ return new ZeroCopyByteStringAccumulatingRead (readId , rangeSpec , retryContext , complete );
113128 }
114129
115130 /** Base class of a read that will accumulate before completing by resolving a future */
@@ -118,27 +133,28 @@ abstract static class AccumulatingRead<Result> extends BlobDescriptorStreamRead
118133
119134 private AccumulatingRead (
120135 long readId ,
121- ReadCursor readCursor ,
136+ RangeSpec rangeSpec ,
122137 RetryContext retryContext ,
123138 SettableApiFuture <Result > complete ) {
124- super (readId , readCursor , retryContext );
139+ super (readId , rangeSpec , retryContext );
125140 this .complete = complete ;
126141 }
127142
128143 private AccumulatingRead (
129144 long readId ,
130- ReadCursor readCursor ,
145+ RangeSpec rangeSpec ,
131146 List <ChildRef > childRefs ,
147+ AtomicLong readOffset ,
132148 RetryContext retryContext ,
133149 boolean closed ,
134150 SettableApiFuture <Result > complete ) {
135- super (readId , readCursor , childRefs , retryContext , closed );
151+ super (readId , rangeSpec , childRefs , readOffset , retryContext , closed );
136152 this .complete = complete ;
137153 }
138154
139155 @ Override
140156 boolean acceptingBytes () {
141- return !complete .isDone () && !tombstoned && readCursor . hasRemaining () ;
157+ return !complete .isDone () && !tombstoned ;
142158 }
143159
144160 @ Override
@@ -170,46 +186,48 @@ public boolean readyToSend() {
170186 * java.nio.channels.ReadableByteChannel})
171187 */
172188 abstract static class StreamingRead extends BlobDescriptorStreamRead {
173- private StreamingRead (long readId , long readOffset , long readLimit , RetryContext retryContext ) {
174- super (readId , new ReadCursor ( readOffset , readOffset + readLimit ) , retryContext );
189+ private StreamingRead (long readId , RangeSpec range , RetryContext retryContext ) {
190+ super (readId , range , retryContext );
175191 }
176192
177193 private StreamingRead (
178194 long readId ,
179- ReadCursor readCursor ,
195+ RangeSpec rangeSpec ,
180196 List <ChildRef > childRefs ,
197+ AtomicLong readOffset ,
181198 RetryContext retryContext ,
182199 boolean closed ) {
183- super (readId , readCursor , childRefs , retryContext , closed );
200+ super (readId , rangeSpec , childRefs , readOffset , retryContext , closed );
184201 }
185202 }
186203
187204 static final class ByteArrayAccumulatingRead extends AccumulatingRead <byte []> {
188205
189206 private ByteArrayAccumulatingRead (
190207 long readId ,
191- ReadCursor readCursor ,
208+ RangeSpec rangeSpec ,
192209 RetryContext retryContext ,
193210 SettableApiFuture <byte []> complete ) {
194- super (readId , readCursor , retryContext , complete );
211+ super (readId , rangeSpec , retryContext , complete );
195212 }
196213
197214 private ByteArrayAccumulatingRead (
198215 long readId ,
199- ReadCursor readCursor ,
216+ RangeSpec rangeSpec ,
200217 List <ChildRef > childRefs ,
201218 RetryContext retryContext ,
219+ AtomicLong readOffset ,
202220 boolean closed ,
203221 SettableApiFuture <byte []> complete ) {
204- super (readId , readCursor , childRefs , retryContext , closed , complete );
222+ super (readId , rangeSpec , childRefs , readOffset , retryContext , closed , complete );
205223 }
206224
207225 @ Override
208226 void accept (ChildRef childRef ) throws IOException {
209227 retryContext .reset ();
210228 int size = childRef .byteString ().size ();
211229 childRefs .add (childRef );
212- readCursor . advance (size );
230+ readOffset . addAndGet (size );
213231 }
214232
215233 @ Override
@@ -230,7 +248,7 @@ void eof() throws IOException {
230248 ByteArrayAccumulatingRead withNewReadId (long newReadId ) {
231249 this .tombstoned = true ;
232250 return new ByteArrayAccumulatingRead (
233- newReadId , readCursor , childRefs , retryContext , closed , complete );
251+ newReadId , rangeSpec , childRefs , retryContext , readOffset , closed , complete );
234252 }
235253 }
236254
@@ -241,21 +259,22 @@ static final class ZeroCopyByteStringAccumulatingRead
241259
242260 private ZeroCopyByteStringAccumulatingRead (
243261 long readId ,
244- ReadCursor readCursor ,
262+ RangeSpec rangeSpec ,
245263 RetryContext retryContext ,
246264 SettableApiFuture <DisposableByteString > complete ) {
247- super (readId , readCursor , retryContext , complete );
265+ super (readId , rangeSpec , retryContext , complete );
248266 }
249267
250268 public ZeroCopyByteStringAccumulatingRead (
251269 long readId ,
252- ReadCursor readCursor ,
270+ RangeSpec rangeSpec ,
253271 List <ChildRef > childRefs ,
272+ AtomicLong readOffset ,
254273 RetryContext retryContext ,
255274 boolean closed ,
256275 SettableApiFuture <DisposableByteString > complete ,
257276 ByteString byteString ) {
258- super (readId , readCursor , childRefs , retryContext , closed , complete );
277+ super (readId , rangeSpec , childRefs , readOffset , retryContext , closed , complete );
259278 this .byteString = byteString ;
260279 }
261280
@@ -269,7 +288,7 @@ void accept(ChildRef childRef) throws IOException {
269288 retryContext .reset ();
270289 int size = childRef .byteString ().size ();
271290 childRefs .add (childRef );
272- readCursor . advance (size );
291+ readOffset . addAndGet (size );
273292 }
274293
275294 @ Override
@@ -287,7 +306,7 @@ void eof() throws IOException {
287306 ZeroCopyByteStringAccumulatingRead withNewReadId (long newReadId ) {
288307 this .tombstoned = true ;
289308 return new ZeroCopyByteStringAccumulatingRead (
290- newReadId , readCursor , childRefs , retryContext , closed , complete , byteString );
309+ newReadId , rangeSpec , childRefs , readOffset , retryContext , closed , complete , byteString );
291310 }
292311 }
293312}
0 commit comments