forked from rupa/sprunge
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
126 lines (106 loc) · 3.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import datetime
import random
import pygments
import pygments.formatters
import pygments.lexers
from flask import abort, Flask, request, Response
from google.cloud import exceptions, ndb, storage
PROJECT_ID = "pastesha-re"
BUCKET = "%s.appspot.com" % PROJECT_ID
URL = "https://pastesha.re"
POST = "sprunge"
SYMBOLS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
HELP = f"""
<html>
<body>
<style> a {{ text-decoration: none }} </style>
<pre>
sprunge(1) SPRUNGE sprunge(1)
NAME
sprunge: command line pastebin.
SYNOPSIS
<command> | curl -F '{POST}=<-' {URL}
DESCRIPTION
add <a href="https://pygments.org/docs/lexers/">?<lang></a> to resulting url for line numbers and syntax highlighting
use <a href="/submit">this form</a> to paste from a browser
EXAMPLES
~$ cat bin/ching | curl -F '{POST}=<-' {URL}
{URL}/aXZI
~$ firefox {URL}/aXZI?py#n-7
SEE ALSO
https://github.com/beledouxdenis/sprunge
</pre>
</body>
</html>
"""
app = Flask(__name__)
ds_client = ndb.Client()
gcs_client = storage.Client()
class Sprunge(ndb.Model):
name = ndb.StringProperty()
date = ndb.DateTimeProperty(auto_now_add=True)
@app.route("/", methods=["GET", "POST"])
def main():
if request.method == "POST":
with ds_client.context():
while True:
name = "".join(
SYMBOLS[random.randint(0, len(SYMBOLS) - 1)] for i in range(4)
)
if not Sprunge.gql("WHERE name = :1", name).get():
break
gcs_client.bucket(BUCKET).blob(name).upload_from_string(request.form["sprunge"])
with ds_client.context():
Sprunge(name=name).put()
return f"{URL}/{name}\n"
return HELP
@app.route("/<name>", methods=["GET"])
def get_sprunge(name):
with ds_client.context():
s = Sprunge.gql("WHERE name = :1", name).get()
blob = gcs_client.bucket(BUCKET).blob(s.name) if s else None
try:
content = blob.download_as_text() if blob else None
except exceptions.NotFound:
content = None
if not content:
abort(404)
syntax = request.query_string.decode()
if not syntax:
return Response(content, mimetype="text/plain")
try:
lexer = pygments.lexers.get_lexer_by_name(syntax)
except Exception:
lexer = pygments.lexers.TextLexer()
return pygments.highlight(
content,
lexer,
pygments.formatters.HtmlFormatter(
full=True,
style="borland",
lineanchors="n",
linenos="inline",
),
)
@app.route("/submit", methods=["GET"])
def submit():
return f"""
<form action="{URL}" method="POST">
<textarea name="{POST}" cols="80" rows="24"></textarea>
<br/>
<button type="submit">{POST}</button>
</form>
"""
@app.route("/purge", methods=["GET"])
def purge():
a_month_ago = (datetime.datetime.now() - datetime.timedelta(days=30)).strftime(
"%Y-%m-%d %H:%M:%S"
)
with ds_client.context():
for record in Sprunge.gql("WHERE date < DATETIME(:1)", a_month_ago).fetch():
try:
gcs_client.bucket(BUCKET).blob(record.name).delete()
except exceptions.NotFound:
pass
record.key.delete()
return "OK"