-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding dropTake to RDD. Allows to drop first 'drop' elements and then take next 'take' elements #617
base: master
Are you sure you want to change the base?
Adding dropTake to RDD. Allows to drop first 'drop' elements and then take next 'take' elements #617
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -713,6 +713,35 @@ abstract class RDD[T: ClassManifest]( | |
return buf.toArray | ||
} | ||
|
||
/** | ||
* Drop the first drop elements and then take next num elements of the RDD. This currently scans the partitions *one by one*, so | ||
* it will be slow if a lot of partitions are required. In that case, use dropCollect(drop) to get the | ||
* whole RDD instead. | ||
*/ | ||
def dropTake(drop: Int, num: Int): Array[T] = { | ||
if (num == 0) { | ||
return new Array[T](0) | ||
} | ||
val buf = new ArrayBuffer[T] | ||
var p = 0 | ||
var dropped = 0 | ||
while (buf.size < num && p < partitions.size) { | ||
val left = num - buf.size | ||
val res = sc.runJob(this, (it: Iterator[T]) => { | ||
while ((drop - dropped) > 0 && it.hasNext) { | ||
it.next() | ||
dropped += 1 | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure this will work the way you expect it to - "dropped" is updated in the slave and not 'brought back' : all slaves will see it as '0'. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added another commit. Please take a look if that is okay. I added some comments which can aid in the discussion. Can remove them later if we want to merge. |
||
it.take(left).toArray | ||
}, Array(p), true) | ||
buf ++= res(0) | ||
if (buf.size == num) | ||
return buf.toArray | ||
p += 1 | ||
} | ||
return buf.toArray | ||
} | ||
|
||
/** | ||
* Return the first element in this RDD. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have a
dropCollect
method; maybe suggestcollect().drop(drop)
instead?