Commit c1116a0f authored by Jérôme Charaoui's avatar Jérôme Charaoui
Browse files

Remove drupal2lektor script

Development for this script will be tracked in a separate project, at
https://gitlab.torproject.org/tpo/web/drupal2lektor
parent b1bc39d2
Pipeline #11077 passed with stages
in 3 minutes and 37 seconds
#!/usr/bin/python3
# Copyright (C) 2021 Jérôme Charaoui <lavamind@torproject.org>
# Antoine Beaupré <anarcat@torproject.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
The purpose of this script is to migrate Drupal CMS content into the Lektor
static-site generator contents files. Note that it will not attempt to convert
HTML into Markdown, but rather place the unedited HTML node body content into
a user-defined Lektor html-type field.
Requires a Views REST endpoint at /export/rest/<content-type> to provide the
list of node IDs to import.
"""
import argparse
import logging
import pathlib
import os
import re
import sys
from urllib.parse import urlparse, unquote
import requests
from slugify import slugify
from markdownify import markdownify as md
class BatchArgParser(argparse.ArgumentParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.add_argument(
"-v",
"--verbose",
action=LoggingAction,
const="INFO",
help="enable verbose messages",
)
self.add_argument(
"-d",
"--debug",
action=LoggingAction,
const="DEBUG",
help="enable debugging messages",
)
self.add_argument(
"-n",
"--dryrun",
action="store_true",
help="simulate",
)
self.add_argument(
"-u",
"--drupal-url",
action="store",
help="drupal site base url",
)
self.add_argument(
"-l",
"--drupal-user",
action="store",
help="drupal username",
)
self.add_argument(
"-p",
"--drupal-pass",
action="store",
help="drupal password",
)
self.add_argument(
"-a",
"--drupal-basic-auth",
action="store",
help="drupal basic auth (eg. 'username:password')",
)
self.add_argument(
"-t",
"--drupal-content-type",
action="store",
help="drupal content type (eg. 'article')",
)
self.add_argument(
"-k",
"--lektor-base-path",
action="store",
help="lektor base directory (eg. '~/src/blog')",
)
self.add_argument(
"-m",
"--lektor-model",
action="store",
help="lektor page model (eg. 'blog')",
)
self.add_argument(
"-i",
"--lektor-image-path",
action="store",
help="lektor image assets relative directory (eg. 'assets/images/blog')",
)
self.add_argument(
"-o",
"--overwrite",
action="store_true",
help="overwrite already imported content",
)
def parse_args(self, *args, **kwargs):
args = super().parse_args(*args, **kwargs)
if not os.path.exists(args.lektor_base_path):
self.error("%s does not exist" % args.lektor_content_path)
return args
class LoggingAction(argparse.Action):
"""change log level on the fly
The logging system should be initialized befure this, using
`basicConfig`.
Example usage:
parser.add_argument(
"-v",
"--verbose",
action=LoggingAction,
const="INFO",
help="enable verbose messages",
)
parser.add_argument(
"-d",
"--debug",
action=LoggingAction,
const="DEBUG",
help="enable debugging messages",
)
"""
def __init__(self, *args, **kwargs):
"""setup the action parameters
This enforces a selection of logging levels. It also checks if
const is provided, in which case we assume it's an argument
like `--verbose` or `--debug` without an argument.
"""
kwargs["choices"] = logging._nameToLevel.keys()
if "const" in kwargs:
kwargs["nargs"] = 0
super().__init__(*args, **kwargs)
def __call__(self, parser, ns, values, option):
"""if const was specified it means argument-less parameters"""
if self.const:
logging.getLogger("").setLevel(self.const)
else:
logging.getLogger("").setLevel(values)
class Drupal2Lektor():
def __init__(self, args):
self.session = None
self.tags = {}
self.authors = {}
self.drupal_url = args.drupal_url
self.drupal_user = args.drupal_user
self.drupal_pass = args.drupal_pass
self.drupal_content_type = args.drupal_content_type
self.lektor_base_path = args.lektor_base_path
self.lektor_image_path = args.lektor_image_path
self.lektor_model = args.lektor_model
self.overwrite = args.overwrite
self.dryrun = args.dryrun
if args.drupal_basic_auth:
user, pw = args.drupal_basic_auth.split(":")
self.basic_auth = requests.auth.HTTPBasicAuth(user, pw)
else:
self.basic_auth = None
self.migrate()
def drupal_login(self):
url = self.drupal_url + "/user/login"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
}
data = {
"name": self.drupal_user,
"pass": self.drupal_pass,
"form_id": "user_login_form"
}
self.session = requests.Session()
response = self.session.post(url,
data=data,
headers=headers,
auth=self.basic_auth,
allow_redirects=False)
logging.debug(self.session.cookies.get_dict())
if not self.session.cookies.get_dict():
logging.debug(response.headers)
logging.error("login failed")
sys.exit(1)
def drupal_get(self, path, json=True):
if self.session is None:
self.drupal_login()
logging.info("requesting data from %s", path)
if json:
params = {"_format": "json"}
else:
params = None
response = self.session.get(self.drupal_url + path, params=params, auth=self.basic_auth)
if response.status_code == 200:
if json:
data = response.json()
else:
data = response.content
logging.debug("got http 200, result:\n%s", data)
return data
logging.error("error, endpoint returned status %s", response.status_code)
logging.error("endpoint was: %s", self.drupal_url + path)
def migrate(self):
# first get a list of nodes
nids = self.drupal_get("/export/rest/" + self.drupal_content_type)
if not nids:
logging.error("unable to get the list of node ids for content type")
return False
for nid in nids:
# get the node content
node = self.drupal_get("/node/" + nid["nid"])
if not node:
logging.error("unable to retrieve node %s", nid["nid"])
sys.exit(1)
# get the author
node_uid = node["uid"][0]["target_id"]
if node_uid in self.authors:
node_author = self.authors[node_uid]
else:
author_data = self.drupal_get(node["uid"][0]["url"])
if not author_data:
logging.error("unable get author name for uid %s", node_uid)
continue
author_name = author_data["name"][0]["value"]
self.authors[node_uid] = author_name
node_author = author_name
# get the tags
node_tags = []
if node["field_tags"]:
for tag in node["field_tags"]:
tag_id = tag["target_id"]
if tag_id in self.tags:
node_tags.append(self.tags[tag_id])
else:
tag_data = self.drupal_get("/taxonomy/term/" + str(tag_id))
if not tag_data:
logging.error("unable get tag name for tid %s", tag_id)
continue
tag_name = tag_data["name"][0]["value"]
self.tags[tag_id] = tag_name
node_tags.append(tag_name)
# get image
node_image = node["field_image"][0]["url"] if "field_image" in node and len(node["field_image"]) else None
# convert create datetime
node_created = node["created"][0]["value"].split("T")[0]
# node alias (url slug)
# create one if it's missing
if node["path"][0]["alias"]:
node_alias = node["path"][0]["alias"]
else:
replacements = [['.', '']]
node_alias = slugify(node["title"][0]["value"], replacements=replacements)
logging.info("creating a new alias for node %s: %s", nid["nid"], node_alias)
# comments
node_comments = None
if node["comment_node_article"]:
if node["comment_node_article"][0]["comment_count"] > 0:
node_comments = self.drupal_get("/export/comments/" + nid["nid"])[0]["comment_node_article"]
# import into lektor
if self.drupal_content_type == 'article':
self.import_article_node(
nid["nid"],
node_alias,
node["title"][0]["value"],
node_author,
node_created,
node_image,
node["body"][0]["processed"],
node["body"][0]["summary"],
node_tags,
node_comments,
)
elif self.drupal_content_type == 'event':
self.import_event_node(
nid["nid"],
node_alias,
node["title"][0]["value"],
node_author,
node_created,
node["field_event_description_long"],
node["field_event_description_short"],
node["field_event_dates"],
node["field_website"],
node_tags
)
else:
logging.error("unknown content type %s", self.drupal_content_type)
sys.exit(1)
def import_event_node(self, nid, alias, title, author, created, body, summary, dates, website, tags):
node_args = locals()
abs_path = os.path.abspath("/".join([self.lektor_base_path, "content", self.lektor_model, alias]))
path_exists = os.path.exists(abs_path)
if path_exists and not self.overwrite:
logging.warning("content already exists, skipping: %s", alias)
return
logging.info("importing new content with title '%s'", title)
logging.debug(node_args)
if self.dryrun:
return
if not path_exists:
os.mkdir(abs_path)
content_file = open(abs_path + '/contents.lr', 'w')
# title
content_file.write("title: %s\n---\n" % title)
# author
content_file.write("author: %s\n---\n" % author)
# dates
content_file.write("start_date: %s\n---\n" % dates[0]["value"])
if "end_value" in dates[0] and dates[0]["value"] != dates[0]["end_value"]:
content_file.write("end_date: %s\n---\n" % dates[0]["end_value"])
# summary
if summary:
content_file.write("summary:\n%s\n---\n" % md(summary[0]["processed"]))
# body, integrating website
website_content = ""
if website:
website_content = "\n\nWebsite:"
for ws in website:
if ws["uri"] == ws["title"]:
website_content += "\n" + ws["uri"]
else:
website_content += "\n[%s](%s)" % (ws["title"], ws["uri"])
md_body = md(body[0]["processed"]) + website_content
md_body = re.sub("^---$", "----", md_body, flags=re.MULTILINE)
md_body = re.sub("^\s$", "", md_body, flags=re.MULTILINE)
content_file.write("body:\n%s\n---\n" % md_body)
# tags
if tags:
content_file.write("tags:\n%s\n---\n" % "\n".join(tags))
# redirect
abs_node_path = os.path.abspath("/".join([self.lektor_base_path, "content", "node", nid]))
pathlib.Path(abs_node_path).mkdir(parents=True, exist_ok=True)
redirect_file = open(abs_node_path + '/contents.lr', 'w')
redirect_file.write("_model: redirect\n---\ntarget: %s\n---\n_discoverable: no" % alias)
content_file.close()
def import_article_node(self, nid, alias, title, author, created, image_url, body, summary, tags, comments):
node_args = locals()
abs_path = os.path.abspath("/".join([self.lektor_base_path, "content", self.lektor_model, alias]))
path_exists = os.path.exists(abs_path)
if path_exists and not self.overwrite:
logging.warning("content already exists, skipping: %s", alias)
return
logging.info("importing new content with title '%s'", title)
logging.debug(node_args)
if not tags:
logging.warning("no tags found for '%s'", alias)
if self.dryrun:
return
if not path_exists:
os.mkdir(abs_path)
content_file = open(abs_path + '/contents.lr', 'w')
# title
content_file.write("title: %s\n---\n" % title)
# pub_date
content_file.write("pub_date: %s\n---\n" % created)
# author
content_file.write("author: %s\n---\n" % author)
# tags
if tags:
content_file.write("tags:\n%s\n---\n" % "\n".join(tags))
# image
if image_url:
# create directory if needed
abs_image_path = os.path.abspath("/".join([self.lektor_base_path, "assets", self.lektor_image_path]))
pathlib.Path(abs_image_path).mkdir(parents=True, exist_ok=True)
# parse image url
parsed_url = urlparse(image_url)
image_filename = unquote(os.path.basename(parsed_url.path))
# download file
logging.info("will download image %s", image_filename)
image_file = open(abs_image_path + "/" + image_filename, "wb")
image_file.write(self.drupal_get(parsed_url.path, False))
# write image field in contents.lr file
image_asset_path = "/".join([self.lektor_image_path, image_filename])
content_file.write("image: /%s\n---\n" % image_asset_path)
# summary
content_file.write("summary:\n%s\n---\n" % summary)
# body
body = re.sub(r"src=\"(/sites/default/files/\S+)\"", self.inline_file, body)
content_file.write("_html_body:\n%s\n---\n" % body)
if comments:
# remove Drupal metadata
comments = re.sub("\s*(data-quickedit-entity-id|data-quickedit-field-id|data-contextual-id|data-contextual-token|data-comment-user-id|data-comment-timestamp|typeof|datatype|property|about)=\"\S*\"", "", comments)
comments = re.sub("<drupal-render-placeholder.*</drupal-render-placeholder>", "", comments)
# adjust Permalink links
comments = re.sub("href=\"/comment/[0-9]+", "href=\"", comments)
# remove author profile links
comments = re.sub(r"<a.*title=\"View user profile.\".*>(\S+)</a>", r"\g<1>", comments)
# remove extra whitespace
comments = re.sub("^\s*$", "", comments, flags=re.MULTILINE)
content_file.write("_comments:\n%s\n---\n" % comments)
# process comment ids to create redirects to blog pages
cids = self.drupal_get("/export/commentids/" + nid)
for cid in cids:
abs_comment_path = os.path.abspath("/".join([self.lektor_base_path, "content", "comment", cid["cid"]]))
pathlib.Path(abs_comment_path).mkdir(parents=True, exist_ok=True)
redirect_file = open(abs_comment_path + '/contents.lr', 'w')
redirect_file.write("_model: redirect\n---\ntarget: %s#comment-%s\n---\n_discoverable: no" % (alias, cid["cid"]))
# redirect
abs_node_path = os.path.abspath("/".join([self.lektor_base_path, "content", "node", nid]))
pathlib.Path(abs_node_path).mkdir(parents=True, exist_ok=True)
redirect_file = open(abs_node_path + '/contents.lr', 'w')
redirect_file.write("_model: redirect\n---\ntarget: %s\n---\n_discoverable: no" % alias)
content_file.close()
def inline_file(self, match):
if match.group(1):
image_filename = os.path.basename(match.group(1))
image_path = os.path.dirname(match.group(1)).replace("/sites/default/files", "")
# prepare destination path
abs_image_path = os.path.abspath("/".join([self.lektor_base_path, "assets", self.lektor_image_path, image_path]))
pathlib.Path(abs_image_path).mkdir(parents=True, exist_ok=True)
# download file
image_file = open(abs_image_path + "/" + image_filename, "wb")
image_file.write(self.drupal_get(match.group(1), False))
image_asset_path = os.path.normpath("/".join([self.lektor_image_path, image_path, image_filename]))
return 'src="/' + image_asset_path + '"'
def main():
logging.basicConfig(format="%(message)s")
args = BatchArgParser().parse_args()
Drupal2Lektor(args)
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment