Source: lib/offline/download_manager.js

  1. /*! @license
  2. * Shaka Player
  3. * Copyright 2016 Google LLC
  4. * SPDX-License-Identifier: Apache-2.0
  5. */
  6. goog.provide('shaka.offline.DownloadManager');
  7. goog.require('shaka.net.NetworkingEngine');
  8. goog.require('shaka.offline.DownloadProgressEstimator');
  9. goog.require('shaka.util.ArrayUtils');
  10. goog.require('shaka.util.BufferUtils');
  11. goog.require('shaka.util.Destroyer');
  12. goog.require('shaka.util.Error');
  13. goog.require('shaka.util.IDestroyable');
  14. goog.require('shaka.util.Pssh');
  15. /**
  16. * This manages downloading segments.
  17. *
  18. * @implements {shaka.util.IDestroyable}
  19. * @final
  20. */
  21. shaka.offline.DownloadManager = class {
  22. /**
  23. * Create a new download manager. It will use (but not own) |networkingEngine|
  24. * and call |onProgress| after each download.
  25. *
  26. * @param {!shaka.net.NetworkingEngine} networkingEngine
  27. */
  28. constructor(networkingEngine) {
  29. /** @private {shaka.net.NetworkingEngine} */
  30. this.networkingEngine_ = networkingEngine;
  31. /**
  32. * We group downloads. Within each group, the requests are executed in
  33. * series. Between groups, the requests are executed in parallel. We store
  34. * the promise chain that is doing the work.
  35. *
  36. * @private {!Map<number, !Promise>}
  37. */
  38. this.groups_ = new Map();
  39. /** @private {!shaka.util.Destroyer} */
  40. this.destroyer_ = new shaka.util.Destroyer(() => {
  41. // Add a "catch" block to stop errors from being returned.
  42. return this.abortAll().catch(() => {});
  43. });
  44. /**
  45. * A list of callback functions to cancel any in-progress downloads.
  46. *
  47. * @private {!Array<function(): !Promise>}
  48. */
  49. this.abortCallbacks_ = [];
  50. /**
  51. * A callback for when a segment has been downloaded. The first parameter
  52. * is the progress of all segments, a number between 0.0 (0% complete) and
  53. * 1.0 (100% complete). The second parameter is the total number of bytes
  54. * that have been downloaded.
  55. *
  56. * @private {function(number, number)}
  57. */
  58. this.onProgress_ = (progress, size) => {};
  59. /**
  60. * A callback for when a segment has new PSSH data and we pass
  61. * on the initData to storage
  62. *
  63. * @private {function(!Uint8Array, string)}
  64. */
  65. this.onInitData_ = (initData, systemId) => {};
  66. /** @private {shaka.offline.DownloadProgressEstimator} */
  67. this.estimator_ = new shaka.offline.DownloadProgressEstimator();
  68. /** @private {boolean} */
  69. this.aborted_ = false;
  70. }
  71. /** @override */
  72. destroy() {
  73. return this.destroyer_.destroy();
  74. }
  75. /**
  76. * @param {function(number, number)} onProgress
  77. * @param {function(!Uint8Array, string)} onInitData
  78. */
  79. setCallbacks(onProgress, onInitData) {
  80. this.onProgress_ = onProgress;
  81. this.onInitData_ = onInitData;
  82. }
  83. /**
  84. * Aborts all in-progress downloads.
  85. * @return {!Promise} A promise that will resolve once the downloads are fully
  86. * aborted.
  87. */
  88. abortAll() {
  89. this.aborted_ = true;
  90. const promises = this.abortCallbacks_.map((callback) => callback());
  91. this.abortCallbacks_ = [];
  92. return Promise.all(promises);
  93. }
  94. /**
  95. * @return {boolean}
  96. */
  97. isAborted() {
  98. return this.aborted_;
  99. }
  100. /**
  101. * Adds a byte length to the download estimate.
  102. *
  103. * @param {number} estimatedByteLength
  104. * @return {number} estimateId
  105. */
  106. addDownloadEstimate(estimatedByteLength) {
  107. return this.estimator_.open(estimatedByteLength);
  108. }
  109. /**
  110. * Add a request to be downloaded as part of a group.
  111. *
  112. * @param {number} groupId
  113. * The group to add this segment to. If the group does not exist, a new
  114. * group will be created.
  115. * @param {shaka.extern.Request} request
  116. * @param {number} estimateId
  117. * @param {boolean} isInitSegment
  118. * @param {function(BufferSource):!Promise} onDownloaded
  119. * The callback for when this request has been downloaded. Downloading for
  120. * |group| will pause until the promise returned by |onDownloaded| resolves.
  121. * @return {!Promise} Resolved when this request is complete.
  122. */
  123. queue(groupId, request, estimateId, isInitSegment, onDownloaded) {
  124. this.destroyer_.ensureNotDestroyed();
  125. const group = this.groups_.get(groupId) || Promise.resolve();
  126. // Add another download to the group.
  127. const newPromise = group.then(async () => {
  128. const response = await this.fetchSegment_(request);
  129. // Make sure we stop downloading if we have been destroyed.
  130. if (this.destroyer_.destroyed()) {
  131. throw new shaka.util.Error(
  132. shaka.util.Error.Severity.CRITICAL,
  133. shaka.util.Error.Category.STORAGE,
  134. shaka.util.Error.Code.OPERATION_ABORTED);
  135. }
  136. // Update initData
  137. if (isInitSegment) {
  138. const segmentBytes = shaka.util.BufferUtils.toUint8(response);
  139. const pssh = new shaka.util.Pssh(segmentBytes);
  140. for (const key in pssh.data) {
  141. const index = Number(key);
  142. const data = pssh.data[index];
  143. const systemId = pssh.systemIds[index];
  144. this.onInitData_(data, systemId);
  145. }
  146. }
  147. // Update all our internal stats.
  148. this.estimator_.close(estimateId, response.byteLength);
  149. this.onProgress_(
  150. this.estimator_.getEstimatedProgress(),
  151. this.estimator_.getTotalDownloaded());
  152. return onDownloaded(response);
  153. });
  154. this.groups_.set(groupId, newPromise);
  155. return newPromise;
  156. }
  157. /**
  158. * Add already-downloaded data to a group.
  159. *
  160. * @param {number} groupId
  161. * The group to add this segment to. If the group does not exist, a new
  162. * group will be created.
  163. * @param {!BufferSource} queueData
  164. * @param {number} estimateId
  165. * @param {boolean} isInitSegment
  166. * @param {function(BufferSource):!Promise} onDownloaded
  167. * The callback for when this request has been downloaded. Downloading for
  168. * |group| will pause until the promise returned by |onDownloaded| resolves.
  169. * @return {!Promise} Resolved when this request is complete.
  170. */
  171. queueData(groupId, queueData, estimateId, isInitSegment, onDownloaded) {
  172. this.destroyer_.ensureNotDestroyed();
  173. const group = this.groups_.get(groupId) || Promise.resolve();
  174. // Add another download to the group.
  175. const newPromise = group.then(() => {
  176. // Make sure we stop downloading if we have been destroyed.
  177. if (this.destroyer_.destroyed()) {
  178. throw new shaka.util.Error(
  179. shaka.util.Error.Severity.CRITICAL,
  180. shaka.util.Error.Category.STORAGE,
  181. shaka.util.Error.Code.OPERATION_ABORTED);
  182. }
  183. // Update initData
  184. if (isInitSegment) {
  185. const segmentBytes = shaka.util.BufferUtils.toUint8(queueData);
  186. const pssh = new shaka.util.Pssh(segmentBytes);
  187. for (const key in pssh.data) {
  188. const index = Number(key);
  189. const data = pssh.data[index];
  190. const systemId = pssh.systemIds[index];
  191. this.onInitData_(data, systemId);
  192. }
  193. }
  194. // Update all our internal stats.
  195. this.estimator_.close(estimateId, queueData.byteLength);
  196. this.onProgress_(
  197. this.estimator_.getEstimatedProgress(),
  198. this.estimator_.getTotalDownloaded());
  199. return onDownloaded(queueData);
  200. });
  201. this.groups_.set(groupId, newPromise);
  202. return newPromise;
  203. }
  204. /**
  205. * Add additional async work to the group work queue.
  206. *
  207. * @param {number} groupId
  208. * The group to add this group to. If the group does not exist, a new
  209. * group will be created.
  210. * @param {function():!Promise} callback
  211. * The callback for the async work. Downloading for this group will be
  212. * blocked until the Promise returned by |callback| resolves.
  213. * @return {!Promise} Resolved when this work is complete.
  214. */
  215. queueWork(groupId, callback) {
  216. this.destroyer_.ensureNotDestroyed();
  217. const group = this.groups_.get(groupId) || Promise.resolve();
  218. const newPromise = group.then(async () => {
  219. await callback();
  220. });
  221. this.groups_.set(groupId, newPromise);
  222. return newPromise;
  223. }
  224. /**
  225. * Get a promise that will resolve when all currently queued downloads have
  226. * finished.
  227. *
  228. * @return {!Promise<number>}
  229. */
  230. async waitToFinish() {
  231. await Promise.all(this.groups_.values());
  232. return this.estimator_.getTotalDownloaded();
  233. }
  234. /**
  235. * Download a segment and return the data in the response.
  236. *
  237. * @param {shaka.extern.Request} request
  238. * @return {!Promise<BufferSource>}
  239. * @private
  240. */
  241. async fetchSegment_(request) {
  242. const type = shaka.net.NetworkingEngine.RequestType.SEGMENT;
  243. /** @type {!shaka.net.NetworkingEngine.PendingRequest} */
  244. const action = this.networkingEngine_.request(type, request);
  245. const abortCallback = () => {
  246. return action.abort();
  247. };
  248. this.abortCallbacks_.push(abortCallback);
  249. const response = await action.promise;
  250. shaka.util.ArrayUtils.remove(this.abortCallbacks_, abortCallback);
  251. return response.data;
  252. }
  253. };