Skip to content

Commit

Permalink
continue fleshing out PlayerCore backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee Jun Kit committed Oct 8, 2021
1 parent 74c09f6 commit 7b14ada
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 20 deletions.
1 change: 1 addition & 0 deletions PlayerCore/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion PlayerCore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ librespot = "0.2.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.8"
futures = "0.3"
futures = "0.3"
bytes = "1"
64 changes: 61 additions & 3 deletions PlayerCore/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@ pub mod filters {
use serde::{Deserialize};
use crate::queue::Queue;
use std::convert::Infallible;
use warp::http::HeaderMap;
use warp::{
Filter,
Rejection
};
use bytes::{Bytes, Buf};
use std::io::Read;

#[derive(Clone, Debug, Deserialize)]
struct QueueRequestBody {
track_ids: Vec<String>
}

#[derive(Clone, Debug, Deserialize)]
struct SeekRequestBody {
position_ms: u32
}

pub fn get_token(spotify: Arc<Mutex<Spotify>>) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("token")
.and(with_spotify(spotify))
Expand All @@ -31,6 +40,9 @@ pub mod filters {
pub fn play(queue: Arc<Queue>) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::post()
.and(warp::path!("queue" / "play"))
// .and(log_headers())
// .and(log_body())
// .and_then(handlers::noop)
.and(json_body())
.map(|body: QueueRequestBody| {
body.track_ids
Expand All @@ -53,6 +65,18 @@ pub mod filters {
.and_then(handlers::previous)
}

pub fn seek(spotify: Arc<Mutex<Spotify>>) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::post()
.and(warp::path!("player" / "seek"))
.and(warp::body::json())
.and(warp::body::content_length_limit(1000))
.map(|body: SeekRequestBody| {
body.position_ms
})
.and(with_spotify(spotify))
.and_then(handlers::seek)
}

// pub fn play_next(queue: Arc<Queue>) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
// warp::post()
// .and(warp::path!("queue/play_next"))
Expand All @@ -64,7 +88,7 @@ pub mod filters {
// }

fn json_body() -> impl Filter<Extract = (QueueRequestBody,), Error = warp::Rejection> + Clone {
warp::body::content_length_limit(1024 * 16).and(warp::body::json())
warp::body::content_length_limit(1000000).and(warp::body::json())
}

fn with_queue(queue: Arc<Queue>) -> impl Filter<Extract = (Arc<Queue>,), Error = Infallible> + Clone {
Expand All @@ -74,6 +98,30 @@ pub mod filters {
fn with_spotify(spotify: Arc<Mutex<Spotify>>) -> impl Filter<Extract = (Arc<Mutex<Spotify>>,), Error = Infallible> + Clone {
warp::any().map(move || spotify.clone())
}

fn log_headers() -> impl Filter<Extract = (), Error = Infallible> + Copy {
warp::header::headers_cloned()
.map(|headers: HeaderMap| {
for (k, v) in headers.iter() {
// Error from `to_str` should be handled properly
println!("{}: {}", k, v.to_str().expect("Failed to print header value"))
}
})
.untuple_one()
}

fn log_body() -> impl Filter<Extract = (), Error = Rejection> + Copy {
warp::body::bytes()
.map(|b: Bytes| {
// std::str::from_utf8(b.bytes());
println!("bytes = {:?}", b);

let data: Result<Vec<_>, _> = b.bytes().collect();
let data = data.expect("Unable to read data");
// println!("Request body: {}", std::str::from_utf8(data).expect("error converting bytes to &str"));
})
.untuple_one()
}
}

mod handlers {
Expand All @@ -96,8 +144,9 @@ mod handlers {
pub async fn get_token(spotify: Arc<Mutex<Spotify>>) -> Result<impl Reply, Infallible> {
let session = spotify.lock().unwrap().get_session().clone();
let client_id = "d420a117a32841c2b3474932e49fb54b";
let scopes = "user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played";
let token = keymaster::get_token(&session, client_id, scopes).await.unwrap();
//let scopes = "user-read-private,playlist-read-private,playlist-read-collaborative,playlist-modify-public,playlist-modify-private,user-follow-modify,user-follow-read,user-library-read,user-library-modify,user-top-read,user-read-recently-played";
let scopes = ["ugc-image-upload", "playlist-read-collaborative", "playlist-modify-private", "playlist-modify-public", "playlist-read-private", "user-read-playback-position", "user-read-recently-played", "user-top-read", "user-modify-playback-state", "user-read-currently-playing", "user-read-playback-state", "user-read-private", "user-read-email", "user-library-modify", "user-library-read", "user-follow-modify", "user-follow-read", "streaming", "app-remote-control"].join(",");
let token = keymaster::get_token(&session, client_id, &scopes).await.unwrap();
let proxy = TokenProxy {
access_token: token.access_token,
expires_in: token.expires_in,
Expand Down Expand Up @@ -134,6 +183,11 @@ mod handlers {
Ok(warp::reply())
}

pub async fn seek(position_ms: u32, spotify: Arc<Mutex<Spotify>>) -> Result<impl Reply, Infallible> {
spotify.lock().unwrap().seek(position_ms);
Ok(warp::reply())
}

pub async fn next(queue: Arc<Queue>) -> Result<impl Reply, Infallible> {
let q = queue.clone();
q.next(true);
Expand All @@ -149,4 +203,8 @@ mod handlers {
pub fn queue_append_after_current() {

}

pub async fn noop() -> Result<impl warp::Reply, warp::Rejection> {
Ok(warp::reply::reply())
}
}
3 changes: 3 additions & 0 deletions PlayerCore/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub fn start_event_broadcast(spotify: Arc<Mutex<Spotify>>, queue: Arc<Queue>, b:
let mut spotify_event_rx = spotify.lock().unwrap().get_spotify_event_channel();
loop {
let spotify_event = spotify_event_rx.recv().await;

let queue_items = queue.queue.read().unwrap();
let base62 = queue_items.clone().into_iter().map(|item| item.to_base62()).collect();

Expand All @@ -34,6 +35,8 @@ pub fn start_event_broadcast(spotify: Arc<Mutex<Spotify>>, queue: Arc<Queue>, b:
let len = v.len();

let cb = b.callback;
println!("at start_event_broadcast cb: {:?}", json);

cb(b.ptr, ptr, len);
}
});
Expand Down
36 changes: 24 additions & 12 deletions PlayerCore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{env, fs};
use std::sync::mpsc;
use std::os::raw::c_char;
use std::os::raw::{c_char};
use tokio::runtime::Runtime;
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
use warp::Filter;
use std::ffi::{c_void, CString};
use std::ffi::{c_void, CString, CStr};

mod api;
mod sink_wrapper;
Expand All @@ -15,7 +15,11 @@ mod queue;
mod event;

#[no_mangle]
pub extern "C" fn librespot_init(user_data: *const c_void, event_callback: extern fn(*const c_void, *mut u8, usize)) {
pub extern "C" fn librespot_init(socket_path: *const c_char, user_data: *const c_void, event_callback: extern fn(*const c_void, *mut u8, usize)) {
// validate socket_path
let p = unsafe { CStr::from_ptr(socket_path) };
let p = p.to_str().unwrap();

let rt = Runtime::new().unwrap();
rt.block_on(async move {
let (sink_reload_tx, sink_reload_rx) = mpsc::sync_channel(1);
Expand All @@ -27,24 +31,32 @@ pub extern "C" fn librespot_init(user_data: *const c_void, event_callback: exter
callback: event_callback,
});

let mut dir = env::temp_dir();
dir.push("warp.sock");
println!("Temporary directory: {}", &dir.display());
println!("Socket Path: {}", &p);

fs::remove_file(&dir).ok();
fs::remove_file(&p).ok();

let listener = UnixListener::bind(dir).unwrap();
let listener = UnixListener::bind(p).unwrap();
let incoming = UnixListenerStream::new(listener);

let routes = api::filters::get_token(spotify.clone())
.or(api::filters::play(queue.clone()))
.or(api::filters::toggle_playback(queue.clone()))
.or(api::filters::next(queue.clone()))
.or(api::filters::previous(queue.clone()));
.or(api::filters::previous(queue.clone()))
.or(api::filters::seek(spotify.clone()));

let fut = warp::serve(routes).run_incoming(incoming);

warp::serve(routes)
.run_incoming(incoming)
.await;
// trigger a PlayerCore state update back to the frontend
{
let s = spotify.clone();
let mut spotify = s.lock().unwrap();
let current_status = spotify.get_current_status();
println!("current status {:?}", current_status);
spotify.update_status(current_status);
}

fut.await;
});
}

Expand Down
12 changes: 8 additions & 4 deletions PlayerCore/src/spotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub enum PlayerState {
pub struct SpotifyState {
pub state: PlayerState,
pub track_id: Option<String>,
pub elapsed: Option<Duration>,
pub since: Option<SystemTime>,
pub elapsed: Option<u64>,
pub since: Option<u64>,
}

#[derive(Clone, Copy, Debug, PartialEq)]
Expand Down Expand Up @@ -149,6 +149,10 @@ impl Spotify {
self.player.stop();
}

pub fn seek(&self, position_ms: u32) {
self.player.seek(position_ms);
}

pub fn toggleplayback(&self) {
match self.get_current_status() {
PlayerEvent::Playing(_, _) => self.pause(),
Expand Down Expand Up @@ -182,7 +186,7 @@ impl Spotify {
self.send_event(SpotifyState {
state: PlayerState::Paused,
track_id: Some(track_id.to_base62()),
elapsed: Some(position),
elapsed: Some(position.as_secs()),
since: None,
});
}
Expand All @@ -194,7 +198,7 @@ impl Spotify {
state: PlayerState::Playing,
track_id: Some(track_id.to_base62()),
elapsed: None,
since: Some(playback_start),
since: Some(playback_start.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs()),
});
}
PlayerEvent::Stopped | PlayerEvent::FinishedTrack => {
Expand Down

0 comments on commit 7b14ada

Please sign in to comment.